From a30b628ee3d5c559092d4e3141c7d79c1391107d Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Tue, 12 May 2026 10:06:04 +0300 Subject: [PATCH 1/2] IGNITE-28607 Use Message DTO for DiscoveryDataBag#GridDiscoveryData --- .../ignite/internal/CoreMessagesProvider.java | 20 +- .../ignite/internal/GridPluginComponent.java | 10 +- .../encryption/GridEncryptionManager.java | 15 +- .../encryption/KnownEncryptionKeys.java | 37 +++ .../managers/eventstorage/EnabledEvents.java | 40 +++ .../eventstorage/GridEventStorageManager.java | 10 +- .../IgniteAuthenticationProcessor.java | 33 +- .../authentication/InitialUsersData.java | 55 ++++ .../UserManagementOperation.java | 6 +- .../processors/cache/ClusterCachesInfo.java | 5 +- .../CacheObjectBinaryProcessorImpl.java | 9 +- .../processors/cluster/ClusterFlags.java | 38 +++ .../processors/cluster/ClusterIdAndTag.java | 13 +- .../processors/cluster/ClusterProcessor.java | 47 +-- .../cluster/GridClusterStateProcessor.java | 8 +- .../continuous/GridContinuousProcessor.java | 9 +- .../GridMarshallerMappingProcessor.java | 16 +- .../marshaller/MarshallerMappingsData.java | 7 + .../DistributedMetaStorageImpl.java | 2 +- .../plugin/IgnitePluginProcessor.java | 34 +- .../processors/plugin/PluginsData.java | 67 ++++ .../processors/query/GridQueryProcessor.java | 61 ++-- .../query/schema/message/ActiveProposals.java | 51 +++ .../{ => schema/message}/InlineSizesData.java | 9 +- .../SchemaAbstractDiscoveryMessage.java | 10 +- .../message/SchemaFinishDiscoveryMessage.java | 3 - .../SchemaProposeDiscoveryMessage.java | 3 - .../service/IgniteServiceProcessor.java | 2 +- .../{ObjectData.java => DataBagItem.java} | 53 +++- .../spi/discovery/DiscoveryDataBag.java | 69 +++-- .../ignite/spi/discovery/tcp/ServerImpl.java | 44 +-- .../spi/discovery/tcp/TcpDiscoverySpi.java | 22 +- .../tcp/internal/DiscoveryDataPacket.java | 291 ++++-------------- .../DiscoverySpiDataExchangeTest.java | 6 + .../discovery/tcp/TcpDiscoverySelfTest.java | 10 +- .../zk/internal/ZkBulkJoinContext.java | 8 +- .../discovery/zk/internal/ZkDiscoData.java | 51 +++ .../zk/internal/ZkMessageFactory.java | 1 + .../zk/internal/ZookeeperDiscoveryImpl.java | 16 +- 39 files changed, 655 insertions(+), 536 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/KnownEncryptionKeys.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/EnabledEvents.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/InitialUsersData.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterFlags.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/PluginsData.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/ActiveProposals.java rename modules/core/src/main/java/org/apache/ignite/internal/processors/query/{ => schema/message}/InlineSizesData.java (87%) rename modules/core/src/main/java/org/apache/ignite/spi/discovery/{ObjectData.java => DataBagItem.java} (68%) create mode 100644 modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoData.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index 9da592635d229..2695f623b2d17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -40,10 +40,13 @@ import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyRequest; import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse; import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted; +import org.apache.ignite.internal.managers.encryption.KnownEncryptionKeys; import org.apache.ignite.internal.managers.encryption.MasterKeyChangeRequest; import org.apache.ignite.internal.managers.encryption.NodeEncryptionKeys; +import org.apache.ignite.internal.managers.eventstorage.EnabledEvents; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage; import org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider; +import org.apache.ignite.internal.processors.authentication.InitialUsersData; import org.apache.ignite.internal.processors.authentication.User; import org.apache.ignite.internal.processors.authentication.UserAcceptedMessage; import org.apache.ignite.internal.processors.authentication.UserAuthenticateRequestMessage; @@ -181,6 +184,8 @@ import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; +import org.apache.ignite.internal.processors.cluster.ClusterFlags; +import org.apache.ignite.internal.processors.cluster.ClusterIdAndTag; import org.apache.ignite.internal.processors.cluster.ClusterMetricsUpdateMessage; import org.apache.ignite.internal.processors.cluster.NodeFullMetricsMessage; import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage; @@ -205,7 +210,7 @@ import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasMessage; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateAckMessage; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateMessage; -import org.apache.ignite.internal.processors.query.InlineSizesData; +import org.apache.ignite.internal.processors.plugin.PluginsData; import org.apache.ignite.internal.processors.query.QueryField; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse; @@ -213,6 +218,8 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse; import org.apache.ignite.internal.processors.query.messages.GridQueryKillRequest; import org.apache.ignite.internal.processors.query.messages.GridQueryKillResponse; +import org.apache.ignite.internal.processors.query.schema.message.ActiveProposals; +import org.apache.ignite.internal.processors.query.schema.message.InlineSizesData; import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage; import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage; import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage; @@ -253,7 +260,7 @@ import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage; import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage; import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage; -import org.apache.ignite.spi.discovery.ObjectData; +import org.apache.ignite.spi.discovery.DataBagItem; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage; @@ -353,7 +360,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(GridCacheVersion.class); withNoSchema(GridCacheVersionEx.class); withNoSchema(WALPointer.class); - withNoSchemaResolvedClassLoader(ObjectData.class); + withNoSchemaResolvedClassLoader(DataBagItem.class); withSchemaResolvedClassLoader(GridTopicMessage.class); // [5700 - 5900]: Discovery originated messages. @@ -446,6 +453,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(CacheStatisticsClearMessage.class); withNoSchema(ClientCacheChangeDummyDiscoveryMessage.class); withNoSchema(DynamicCacheChangeBatch.class); + withSchema(InitialUsersData.class); // [10000 - 10200]: Transaction and lock related messages. Most of them originally comes from Communication. msgIdx = 10000; @@ -575,6 +583,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(CacheContinuousQueryBatchAck.class); withSchema(CacheContinuousQueryEntry.class); withNoSchema(InlineSizesData.class); + withSchema(ActiveProposals.class); // [11200 - 11300]: Compute, distributed process messages. msgIdx = 11200; @@ -652,6 +661,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(MasterKeyChangeRequest.class); withNoSchema(GroupKeyEncrypted.class); withNoSchema(NodeEncryptionKeys.class); + withNoSchema(KnownEncryptionKeys.class); // [13000 - 13300]: Control, configuration, diagnostincs and other messages. msgIdx = 13000; @@ -665,6 +675,10 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchemaResolvedClassLoader(DynamicCacheChangeRequest.class); withNoSchema(PartitionHashRecord.class); withNoSchema(TransactionsHashRecord.class); + withNoSchema(ClusterIdAndTag.class); + withNoSchema(ClusterFlags.class); + withNoSchemaResolvedClassLoader(PluginsData.class); + withSchema(EnabledEvents.class); assert msgIdx <= MAX_MESSAGE_ID; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java index f0e1a7c7b627a..68ce157be8b5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java @@ -17,10 +17,10 @@ package org.apache.ignite.internal; -import java.io.Serializable; -import java.util.Map; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.plugin.PluginsData; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.plugin.PluginProvider; import org.apache.ignite.plugin.PluginValidationException; @@ -115,10 +115,10 @@ public PluginProvider plugin() { @Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node, JoiningNodeDiscoveryData discoData) { try { - Map map = discoData.joiningNodeData(); + PluginsData pluginsData = discoData.joiningNodeData(); - if (map != null) - plugin.validateNewNode(node, map.get(plugin.name())); + if (pluginsData != null && !F.isEmpty(pluginsData.data())) + plugin.validateNewNode(node, pluginsData.data().get(plugin.name())); else plugin.validateNewNode(node, null); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java index 19c58d5d7b8cf..58ae3f3a65201 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java @@ -563,7 +563,7 @@ public void onLocalJoin() { } } - dataBag.addGridCommonData(ENCRYPTION_MGR.ordinal(), knownEncKeys); + dataBag.addGridCommonData(ENCRYPTION_MGR.ordinal(), new KnownEncryptionKeys(knownEncKeys)); } /** {@inheritDoc} */ @@ -571,20 +571,15 @@ public void onLocalJoin() { if (ctx.clientNode()) return; - Map encKeysFromCluster = (Map)data.commonData(); + KnownEncryptionKeys encKeysFromCluster = data.commonData(); - if (F.isEmpty(encKeysFromCluster)) + if (encKeysFromCluster == null || F.isEmpty(encKeysFromCluster.keys)) return; - for (Map.Entry entry : encKeysFromCluster.entrySet()) { + for (Map.Entry entry : encKeysFromCluster.keys.entrySet()) { int grpId = entry.getKey(); - GroupKeyEncrypted rmtKey; - - if (entry.getValue() instanceof GroupKeyEncrypted) - rmtKey = (GroupKeyEncrypted)entry.getValue(); - else - rmtKey = new GroupKeyEncrypted(INITIAL_KEY_ID, (byte[])entry.getValue()); + GroupKeyEncrypted rmtKey = entry.getValue(); GroupKey locGrpKey = getActiveKey(grpId); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/KnownEncryptionKeys.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/KnownEncryptionKeys.java new file mode 100644 index 0000000000000..05e4e8d34e23e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/KnownEncryptionKeys.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.encryption; + +import java.util.Map; +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** */ +public class KnownEncryptionKeys implements Message { + /** */ + @Order(0) + Map keys; + + /** */ + public KnownEncryptionKeys() {} + + /** */ + KnownEncryptionKeys(Map keys) { + this.keys = keys; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/EnabledEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/EnabledEvents.java new file mode 100644 index 0000000000000..17035c8416726 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/EnabledEvents.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.eventstorage; + +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * + */ +public class EnabledEvents implements Message { + /** */ + @Order(0) + int[] evts; + + /** */ + public EnabledEvents() { } + + /** + * @param events Enabled events. + */ + public EnabledEvents(int[] events) { + this.evts = events; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index 1f96aa0a316d9..d19f913aee999 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -1173,13 +1173,15 @@ private int[] copy(int[] arr) { /** {@inheritDoc} */ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { - if (data.commonData() == null) + EnabledEvents enabled = data.commonData(); + + if (enabled == null) return; if (ctx.clientNode()) return; - GridIntList clusterData = new GridIntList((int[])data.commonData()); + GridIntList clusterData = new GridIntList(enabled.evts); GridIntList nodeData = new GridIntList(enabledEvents()); GridIntList toEnable = new GridIntList(clusterData.size()); @@ -1207,9 +1209,7 @@ private int[] copy(int[] arr) { if (dataBag.isJoiningNodeClient() && dataBag.commonDataCollectedFor(EVENT_MGR.ordinal())) return; - int[] clusterData = enabledEvents(); - - dataBag.addGridCommonData(EVENT_MGR.ordinal(), clusterData); + dataBag.addGridCommonData(EVENT_MGR.ordinal(), new EnabledEvents(enabledEvents())); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java index af9baf67bf34c..7254437f997b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java @@ -60,7 +60,6 @@ import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -430,7 +429,7 @@ private boolean isLocalNodeCoordinator() { /** {@inheritDoc} */ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { - initUsrs = (InitialUsersData)data.commonData(); + initUsrs = data.commonData(); } /** {@inheritDoc} */ @@ -999,36 +998,6 @@ public void checkUserOperation(UserManagementOperation op) throws IgniteAccessCo } /** - * Initial data is collected on coordinator to send to join node. - */ - private static final class InitialUsersData implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** Users. */ - @GridToStringInclude - private final ArrayList usrs; - - /** Active user operations. */ - @GridToStringInclude - private final ArrayList activeOps; - - /** - * @param usrs Users. - * @param ops Active operations on cluster. - */ - InitialUsersData(Collection usrs, Collection ops) { - this.usrs = new ArrayList<>(usrs); - activeOps = new ArrayList<>(ops); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(InitialUsersData.class, this); - } - } - - /**i * */ private final class UserProposedListener implements CustomEventListener { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/InitialUsersData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/InitialUsersData.java new file mode 100644 index 0000000000000..261bf09293caf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/InitialUsersData.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.authentication; + +import java.util.ArrayList; +import java.util.Collection; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** Initial data is collected on coordinator to send to join node. */ +public class InitialUsersData implements Message { + /** Users. */ + @GridToStringInclude + @Order(0) + ArrayList usrs; + + /** Active user operations. */ + @GridToStringInclude + @Order(1) + ArrayList activeOps; + + /** */ + public InitialUsersData() { } + + /** + * @param usrs Users. + * @param ops Active operations on cluster. + */ + InitialUsersData(Collection usrs, Collection ops) { + this.usrs = new ArrayList<>(usrs); + activeOps = new ArrayList<>(ops); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(InitialUsersData.class, this); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserManagementOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserManagementOperation.java index 82d3b939bf58f..da6efb8ad9ee4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserManagementOperation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserManagementOperation.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.authentication; -import java.io.Serializable; import java.util.Objects; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.internal.S; @@ -27,10 +26,7 @@ /** * The operation with users. Used to deliver the information about requested operation to all server nodes. */ -public class UserManagementOperation implements Serializable, Message { - /** */ - private static final long serialVersionUID = 0L; - +public class UserManagementOperation implements Message { /** User. */ @Order(0) User usr; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 987be04ea2fe8..d5c7d8dce2536 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -1504,10 +1504,9 @@ public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { if (data.commonData() == null) return; - assert joinDiscoData != null || disconnectedState(); - assert data.commonData() instanceof CacheNodeCommonDiscoveryData : data; + CacheNodeCommonDiscoveryData cachesData = data.commonData(); - CacheNodeCommonDiscoveryData cachesData = (CacheNodeCommonDiscoveryData)data.commonData(); + assert joinDiscoData != null || disconnectedState(); // CacheGroup configurations that were created from local node configuration. Map locCacheGrps = new HashMap<>(registeredCacheGroups()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 8867e576ec8af..7affd7b7b5744 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.binary; import java.io.File; -import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; @@ -1464,7 +1463,7 @@ private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId, Binary res.put(e.getKey(), e.getValue()); } - dataBag.addGridCommonData(BINARY_PROC.ordinal(), (Serializable)res); + dataBag.addGridCommonData(BINARY_PROC.ordinal(), new BinaryMetadataVersionsData(res)); } } @@ -1530,10 +1529,10 @@ private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId, Binary /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - Map receivedData = (Map)data.commonData(); + BinaryMetadataVersionsData receivedData = data.commonData(); - if (receivedData != null) { - for (Map.Entry e : receivedData.entrySet()) { + if (receivedData != null && !F.isEmpty(receivedData.data)) { + for (Map.Entry e : receivedData.data.entrySet()) { BinaryMetadataVersionInfo metaVerInfo = e.getValue(); BinaryMetadataVersionInfo locMetaVerInfo = new BinaryMetadataVersionInfo(metaVerInfo.metadata(), diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterFlags.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterFlags.java new file mode 100644 index 0000000000000..ccca3822389a5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterFlags.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cluster; + +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** */ +public class ClusterFlags implements Message { + /** Update notifier enabled flag. */ + @Order(0) + boolean updateNotifierEnabled; + + /** */ + public ClusterFlags() { } + + /** + * @param updateNotifierEnabled Update notifier enabled flag. + */ + public ClusterFlags(boolean updateNotifierEnabled) { + this.updateNotifierEnabled = updateNotifierEnabled; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterIdAndTag.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterIdAndTag.java index 2b03f36377d60..17c49f2f07e85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterIdAndTag.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterIdAndTag.java @@ -20,20 +20,27 @@ import java.io.Serializable; import java.util.Objects; import java.util.UUID; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Container class to send cluster ID and tag in disco data and to write them atomically to metastorage. */ -public class ClusterIdAndTag implements Serializable { +public class ClusterIdAndTag implements Serializable, Message { /** */ private static final long serialVersionUID = 0L; /** */ - private final UUID id; + @Order(0) + UUID id; /** */ - private final String tag; + @Order(1) + String tag; + + /** */ + public ClusterIdAndTag() { } /** * @param id Cluster ID. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java index b7d78a906a5b7..2a777f0feb627 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java @@ -19,7 +19,6 @@ import java.io.Serializable; import java.util.Collection; -import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.Timer; @@ -76,6 +75,7 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.metric.MetricRegistry; import org.apache.ignite.mxbean.IgniteClusterMXBean; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; @@ -465,42 +465,31 @@ public IgniteFuture clientReconnectFuture() { /** {@inheritDoc} */ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) { - dataBag.addJoiningNodeData(CLUSTER_PROC.ordinal(), getDiscoveryData()); + dataBag.addJoiningNodeData(CLUSTER_PROC.ordinal(), new ClusterFlags(notifyEnabled.get())); } /** {@inheritDoc} */ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { - dataBag.addNodeSpecificData(CLUSTER_PROC.ordinal(), getDiscoveryData()); + dataBag.addNodeSpecificData(CLUSTER_PROC.ordinal(), new ClusterFlags(notifyEnabled.get())); - dataBag.addGridCommonData(CLUSTER_PROC.ordinal(), new ClusterIdAndTag(cluster.id(), cluster.tag())); - } - - /** - * @return Discovery data. - */ - private Serializable getDiscoveryData() { - HashMap map = new HashMap<>(2); - - map.put(ATTR_UPDATE_NOTIFIER_STATUS, notifyEnabled.get()); - - return map; + dataBag.addGridCommonData(CLUSTER_PROC.ordinal(), (Message)new ClusterIdAndTag(cluster.id(), cluster.tag())); } /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - Map nodeSpecData = data.nodeSpecificData(); + Map nodeSpecData = data.nodeSpecificData(); if (nodeSpecData != null) { - Boolean lstFlag = findLastFlag(nodeSpecData.values()); + Boolean lstFlag = findLastUpdateNotifierFlag(nodeSpecData.values()); if (lstFlag != null) notifyEnabled.set(lstFlag); } - ClusterIdAndTag commonData = (ClusterIdAndTag)data.commonData(); + ClusterIdAndTag commonData = data.commonData(); if (commonData != null) { - Serializable remoteClusterId = commonData.id(); + UUID remoteClusterId = commonData.id(); if (remoteClusterId != null) { if (locClusterId != null && !locClusterId.equals(remoteClusterId)) { @@ -510,7 +499,7 @@ private Serializable getDiscoveryData() { ", local cluster ID: " + locClusterId); } - locClusterId = (UUID)remoteClusterId; + locClusterId = remoteClusterId; } String remoteClusterTag = commonData.tag(); @@ -521,21 +510,17 @@ private Serializable getDiscoveryData() { } /** - * @param vals collection to seek through. + * @param flags Flags collection to seek through. */ - private Boolean findLastFlag(Collection vals) { - Boolean flag = null; + private Boolean findLastUpdateNotifierFlag(Collection flags) { + Boolean notifierFlag = null; - for (Serializable ser : vals) { - if (ser != null) { - Map map = (Map)ser; - - if (map.containsKey(ATTR_UPDATE_NOTIFIER_STATUS)) - flag = (Boolean)map.get(ATTR_UPDATE_NOTIFIER_STATUS); - } + for (ClusterFlags flag : flags) { + if (flag != null) + notifierFlag = flag.updateNotifierEnabled; } - return flag; + return notifierFlag; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index 337b4f51a70c3..3d9d20faf535e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -965,14 +965,16 @@ protected IgniteCheckedException concurrentStateChangeError(ClusterState state, /** {@inheritDoc} */ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { - if (data.commonData() instanceof DiscoveryDataClusterState) { + Serializable commonData = data.commonData(); + + if (commonData instanceof DiscoveryDataClusterState) { if (globalState != null && globalState.baselineTopology() != null) //node with BaselineTopology is not allowed to join mixed cluster // (where some nodes don't support BaselineTopology) throw new IgniteException("Node with BaselineTopology cannot join" + " mixed cluster running in compatibility mode"); - globalState = (DiscoveryDataClusterState)data.commonData(); + globalState = (DiscoveryDataClusterState)commonData; compatibilityMode = true; @@ -981,7 +983,7 @@ protected IgniteCheckedException concurrentStateChangeError(ClusterState state, return; } - BaselineStateAndHistoryData stateDiscoData = (BaselineStateAndHistoryData)data.commonData(); + BaselineStateAndHistoryData stateDiscoData = (BaselineStateAndHistoryData)commonData; if (stateDiscoData != null) { DiscoveryDataClusterState state = stateDiscoData.globalState; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 489f13e71fee8..7291e631c2f31 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -528,8 +528,7 @@ private Map copyLocalInfos(Map l @Override public void onGridDataReceived(GridDiscoveryData data) { if (immutableDiscoCustomMsg) { if (data.commonData() != null) { - ContinuousRoutinesCommonDiscoveryData commonData = - (ContinuousRoutinesCommonDiscoveryData)data.commonData(); + ContinuousRoutinesCommonDiscoveryData commonData = data.commonData(); for (ContinuousRoutineInfo routineInfo : commonData.startedRoutines) { if (routinesInfo.routineExists(routineInfo.routineId)) @@ -542,11 +541,11 @@ private Map copyLocalInfos(Map l } } else { - Map nodeSpecData = data.nodeSpecificData(); + Map nodeSpecData = data.nodeSpecificData(); if (nodeSpecData != null) { - for (Map.Entry e : nodeSpecData.entrySet()) - onDiscoveryDataReceivedMutable((DiscoveryData)e.getValue()); + for (DiscoveryData val : nodeSpecData.values()) + onDiscoveryDataReceivedMutable(val); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java index 8946672364edb..a39d17b926ce8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java @@ -332,28 +332,26 @@ private final class MappingAcceptedListener implements CustomEventListener> mappings = (List>)data.commonData(); - - processIncomingMappings(mappings); + processIncomingMappings(data.commonData()); } /** * @param mappings Incoming marshaller mappings. */ - private void processIncomingMappings(List> mappings) { - marshallerCtx.onMappingDataReceived(log, mappings); + private void processIncomingMappings(@Nullable MarshallerMappingsData mappings) { + if (mappings != null) + marshallerCtx.onMappingDataReceived(log, mappings.mappings()); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java index 2207b1c21f47c..37bd14d511e81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java @@ -37,4 +37,11 @@ public MarshallerMappingsData() {} public MarshallerMappingsData(List> mappings) { this.mappings = mappings; } + + /** + * @return Mappings. + */ + public List> mappings() { + return mappings; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java index 1362fec2e7f5b..ad9022f2e9747 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java @@ -958,7 +958,7 @@ private DistributedMetaStorageKeyValuePair[] localFullData() { lock.writeLock().lock(); try { - DistributedMetaStorageClusterNodeData nodeData = (DistributedMetaStorageClusterNodeData)data.commonData(); + DistributedMetaStorageClusterNodeData nodeData = data.commonData(); if (nodeData != null) { if (nodeData.fullData != null) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java index 59bcabd2bfa6f..00e8e968da14f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java @@ -157,24 +157,24 @@ public T createComponent(Class cls) { /** {@inheritDoc} */ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) { - Serializable pluginsData = getDiscoveryData(dataBag.joiningNodeId()); + PluginsData discoData = getDiscoveryData(dataBag.joiningNodeId()); - if (pluginsData != null) - dataBag.addJoiningNodeData(PLUGIN.ordinal(), pluginsData); + if (!F.isEmpty(discoData.data)) + dataBag.addJoiningNodeData(PLUGIN.ordinal(), discoData); } /** {@inheritDoc} */ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { - Serializable pluginsData = getDiscoveryData(dataBag.joiningNodeId()); + PluginsData discoData = getDiscoveryData(dataBag.joiningNodeId()); - if (pluginsData != null) - dataBag.addNodeSpecificData(PLUGIN.ordinal(), pluginsData); + if (!F.isEmpty(discoData.data)) + dataBag.addNodeSpecificData(PLUGIN.ordinal(), discoData); } /** * @param joiningNodeId Joining node id. */ - private Serializable getDiscoveryData(UUID joiningNodeId) { + private PluginsData getDiscoveryData(UUID joiningNodeId) { HashMap pluginsData = null; for (Map.Entry e : plugins.entrySet()) { @@ -188,31 +188,27 @@ private Serializable getDiscoveryData(UUID joiningNodeId) { } } - return pluginsData; + return new PluginsData(pluginsData); } /** {@inheritDoc} */ @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) { - if (data.hasJoiningNodeData()) { - Map pluginsData = data.joiningNodeData(); + PluginsData discoData = data.joiningNodeData(); - applyPluginsData(data.joiningNodeId(), pluginsData); - } + if (discoData != null && !F.isEmpty(discoData.data)) + applyPluginsData(data.joiningNodeId(), discoData.data); } /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - Map nodeSpecificData = data.nodeSpecificData(); + Map nodeSpecificData = data.nodeSpecificData(); if (nodeSpecificData != null) { UUID joiningNodeId = data.joiningNodeId(); - for (Serializable v : nodeSpecificData.values()) { - if (v != null) { - Map pluginsData = (Map)v; - - applyPluginsData(joiningNodeId, pluginsData); - } + for (PluginsData pluginsData : nodeSpecificData.values()) { + if (pluginsData != null && !F.isEmpty(pluginsData.data)) + applyPluginsData(joiningNodeId, pluginsData.data); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/PluginsData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/PluginsData.java new file mode 100644 index 0000000000000..f422e35b74a82 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/PluginsData.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.plugin; + +import java.io.Serializable; +import java.util.Map; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.MarshallableMessage; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.jetbrains.annotations.Nullable; + +/** */ +public class PluginsData implements MarshallableMessage { + /** Original plugins data. */ + @Nullable Map data; + + /** Serialized plugins data. */ + @Order(0) + @Nullable byte[] dataBytes; + + /** */ + public PluginsData() { } + + /** + * @param data Plugins data. + */ + public PluginsData(@Nullable Map data) { + this.data = data; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + if (!F.isEmpty(data)) + dataBytes = U.marshal(marsh, data); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + if (dataBytes != null) + data = U.unmarshal(marsh, dataBytes, clsLdr); + } + + /** + * @return Original plugins data. + */ + public @Nullable Map data() { + return data; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 854e8a17e5880..8fa969740cbb5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.query; -import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -118,6 +117,8 @@ import org.apache.ignite.internal.processors.query.schema.SchemaOperationWorker; import org.apache.ignite.internal.processors.query.schema.SchemaSqlViewManager; import org.apache.ignite.internal.processors.query.schema.management.SchemaManager; +import org.apache.ignite.internal.processors.query.schema.message.ActiveProposals; +import org.apache.ignite.internal.processors.query.schema.message.InlineSizesData; import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage; import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage; import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage; @@ -181,9 +182,6 @@ */ @SuppressWarnings("rawtypes") public class GridQueryProcessor extends GridProcessorAdapter { - /** */ - private static final String INLINE_SIZES_DISCO_BAG_KEY = "inline_sizes"; - /** Warn message if some indexes have different inline sizes on the nodes. */ public static final String INLINE_SIZES_DIFFER_WARN_MSG_FORMAT = "Inline sizes on local node and node %s are different. " + "Please drop and create again these indexes to avoid performance problems with SQL queries. Problem indexes: %s"; @@ -473,36 +471,28 @@ public void onCacheReconnect() throws IgniteCheckedException { /** {@inheritDoc} */ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { - LinkedHashMap proposals; + ActiveProposals proposals; // Collect active proposals. synchronized (stateMux) { - proposals = new LinkedHashMap<>(activeProposals); + proposals = new ActiveProposals(activeProposals); } dataBag.addGridCommonData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), proposals); // We should send inline index sizes information only to server nodes. if (!dataBag.isJoiningNodeClient()) { - HashMap nodeSpecificMap = new HashMap<>(); - - Serializable oldVal = nodeSpecificMap.put(INLINE_SIZES_DISCO_BAG_KEY, collectSecondaryIndexesInlineSize()); - - assert oldVal == null : oldVal; - - dataBag.addNodeSpecificData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), nodeSpecificMap); + dataBag.addNodeSpecificData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), + new InlineSizesData(secondaryIndexesInlineSize())); } } /** {@inheritDoc} */ @Override public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) { - Object joiningNodeData = data.joiningNodeData(); - - if (joiningNodeData instanceof InlineSizesData) { - Map joiningNodeIndexesInlineSize = ((InlineSizesData)joiningNodeData).sizes; + InlineSizesData joiningNodeData = data.joiningNodeData(); - checkInlineSizes(secondaryIndexesInlineSize(), joiningNodeIndexesInlineSize, data.joiningNodeId()); - } + if (joiningNodeData != null) + checkInlineSizes(secondaryIndexesInlineSize(), joiningNodeData.sizes(), data.joiningNodeId()); } /** {@inheritDoc} */ @@ -514,30 +504,27 @@ public void onCacheReconnect() throws IgniteCheckedException { /** {@inheritDoc} */ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { // Preserve proposals. - LinkedHashMap activeProposals = - (LinkedHashMap)data.commonData(); + ActiveProposals discoData = data.commonData(); // Process proposals as if they were received as regular discovery messages. - if (!F.isEmpty(activeProposals)) { + if (discoData != null && !F.isEmpty(discoData.activeProposals())) { synchronized (stateMux) { - for (SchemaProposeDiscoveryMessage activeProposal : activeProposals.values()) + for (SchemaProposeDiscoveryMessage activeProposal : discoData.activeProposals().values()) onSchemaProposeDiscovery0(activeProposal); } } - if (!F.isEmpty(data.nodeSpecificData())) { + Map nodedSpecificData = data.nodeSpecificData(); + + if (!F.isEmpty(nodedSpecificData)) { Map indexesInlineSize = secondaryIndexesInlineSize(); if (!F.isEmpty(indexesInlineSize)) { - for (UUID nodeId : data.nodeSpecificData().keySet()) { - Serializable serializable = data.nodeSpecificData().get(nodeId); + for (UUID nodeId : nodedSpecificData.keySet()) { + InlineSizesData inlineSizesData = nodedSpecificData.get(nodeId); - assert serializable instanceof Map : serializable; - - Map nodeSpecificData = (Map)serializable; - - if (nodeSpecificData.containsKey(INLINE_SIZES_DISCO_BAG_KEY)) - checkInlineSizes(indexesInlineSize, (Map)nodeSpecificData.get(INLINE_SIZES_DISCO_BAG_KEY), nodeId); + if (inlineSizesData != null) + checkInlineSizes(indexesInlineSize, inlineSizesData.sizes(), nodeId); } } } @@ -685,16 +672,6 @@ private void checkInlineSizes(Map local, Map r } } - /** - * @return Serializable information about secondary indexes inline size. - * @see #secondaryIndexesInlineSize() - */ - private Serializable collectSecondaryIndexesInlineSize() { - Map map = secondaryIndexesInlineSize(); - - return map instanceof Serializable ? (Serializable)map : new HashMap<>(map); - } - /** * Process schema propose message from discovery thread. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/ActiveProposals.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/ActiveProposals.java new file mode 100644 index 0000000000000..ea46ee533fb90 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/ActiveProposals.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema.message; + +import java.util.LinkedHashMap; +import java.util.UUID; +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * Holder for active schema change propose discovery messages. + */ +public class ActiveProposals implements Message { + /** Active proposals. */ + @Order(0) + LinkedHashMap activeProposals; + + /** */ + public ActiveProposals() { + // No-op. + } + + /** + * @param activeProposals Active proposals. + */ + public ActiveProposals(LinkedHashMap activeProposals) { + this.activeProposals = activeProposals; + } + + /** + * @return Active proposals. + */ + public LinkedHashMap activeProposals() { + return activeProposals; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/InlineSizesData.java similarity index 87% rename from modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/InlineSizesData.java index eb3813501f670..f0d0f69708cb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/InlineSizesData.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query; +package org.apache.ignite.internal.processors.query.schema.message; import java.util.Map; import org.apache.ignite.internal.Order; @@ -36,4 +36,11 @@ public InlineSizesData() {} public InlineSizesData(Map sizes) { this.sizes = sizes; } + + /** + * @return Inline sizes. + */ + public Map sizes() { + return sizes; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java index 1b832abab0fe3..a7914327d60fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.query.schema.message; -import java.io.Serializable; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; @@ -30,10 +29,7 @@ /** * Abstract discovery message for schema operations. */ -public abstract class SchemaAbstractDiscoveryMessage extends DiscoveryCustomMessage implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - +public abstract class SchemaAbstractDiscoveryMessage extends DiscoveryCustomMessage { /** Operation. */ @GridToStringInclude @Order(0) @@ -41,11 +37,11 @@ public abstract class SchemaAbstractDiscoveryMessage extends DiscoveryCustomMess /** Error message. */ @Order(1) - transient String errMsg; + String errMsg; /** Error code. */ @Order(2) - transient int errCode; + int errCode; /** Error. */ SchemaOperationException err; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java index 9ff555b729a49..fc95da0c2a115 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java @@ -27,9 +27,6 @@ * Schema change finish discovery message. */ public class SchemaFinishDiscoveryMessage extends SchemaAbstractDiscoveryMessage { - /** */ - private static final long serialVersionUID = 0L; - /** No-op flag. */ @Order(0) boolean nop; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java index 1f761c1e81ed2..bef0b3b2e94d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java @@ -28,9 +28,6 @@ * Schema change propose discovery message. */ public class SchemaProposeDiscoveryMessage extends SchemaAbstractDiscoveryMessage { - /** */ - private static final long serialVersionUID = 0L; - /** Cache deployment ID. */ @Order(0) IgniteUuid depId; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java index e7f59549ea4ac..34118541ece6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java @@ -389,7 +389,7 @@ private void cancelDeployedServices() { if (data.commonData() == null) return; - ServiceProcessorCommonDiscoveryData clusterData = (ServiceProcessorCommonDiscoveryData)data.commonData(); + ServiceProcessorCommonDiscoveryData clusterData = data.commonData(); for (ServiceInfo desc : clusterData.registeredServices()) { try { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DataBagItem.java similarity index 68% rename from modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java rename to modules/core/src/main/java/org/apache/ignite/spi/discovery/DataBagItem.java index f9da59bffe415..d60f900c9e083 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DataBagItem.java @@ -29,8 +29,8 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; -/** Wrapper message for serializable data. */ -public class ObjectData implements MarshallableMessage { +/** Wrapper message for serializable data in a {@link DiscoveryDataBag}. */ +public class DataBagItem implements MarshallableMessage { /** */ @GridToStringInclude private Serializable data; @@ -40,16 +40,41 @@ public class ObjectData implements MarshallableMessage { @Order(0) byte[] dataBytes; + /** Unmarshalling error. */ + IgniteCheckedException unmarshallError; + /** */ - public ObjectData() {} + public DataBagItem() {} /** * @param data Original data. */ - public ObjectData(Serializable data) { + public DataBagItem(Serializable data) { this.data = data; } + /** + * @param msg Message. + * @param Type of data. + * + * @return Original message or data unwrapped from an DataBagItem wrapper. + */ + static @Nullable T unwrapIfNecessary(@Nullable Message msg) { + if (msg == null) + return null; + + return msg instanceof DataBagItem ? ((DataBagItem)msg).unwrap() : (T)msg; + } + + /** + * @param Type of data. + * + * @return Original data unwrapped from a message. + */ + private T unwrap() { + return (T)(data); + } + /** {@inheritDoc} */ @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { if (data != null) @@ -59,24 +84,24 @@ public ObjectData(Serializable data) { /** {@inheritDoc} */ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { if (dataBytes != null) { - data = U.unmarshal(marsh, dataBytes, clsLdr); - - dataBytes = null; + try { + data = U.unmarshal(marsh, dataBytes, clsLdr); + } + catch (IgniteCheckedException e) { + unmarshallError = e; + } } } /** - * @param msg Message. - * @param Type of data. - * - * @return Original data unwrapped from a message. + * @return Unmarshalling error. */ - public static T unwrap(@Nullable Message msg) { - return msg != null ? (T)(((ObjectData)msg).data) : null; + public IgniteCheckedException unmarshallError() { + return unmarshallError; } /** {@inheritDoc} */ @Override public String toString() { - return S.toString(ObjectData.class, this); + return S.toString(DataBagItem.class, this); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java index 58e41738265d6..f1f0fad7c81da 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.UUID; import org.apache.ignite.internal.GridComponent; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; @@ -59,11 +60,17 @@ public interface GridDiscoveryData { /** @return ID fo the joining node. */ UUID joiningNodeId(); - /** @return Common for all cluster nodes discovery data that is sent to the joining node. */ - Serializable commonData(); + /** + * @param Data type. + * @return Common for all cluster nodes discovery data that is sent to the joining node. + */ + T commonData(); - /** @return Discovery data that is mapped to the particular cluster node and sent to the joining node. */ - Map nodeSpecificData(); + /** + * @param Data type. + * @return Discovery data that is mapped to the particular cluster node and sent to the joining node. + */ + Map nodeSpecificData(); } /** @@ -87,7 +94,7 @@ private final class JoiningNodeDiscoveryDataImpl implements JoiningNodeDiscovery @Override @Nullable public T joiningNodeData() { Message dataMsg = joiningNodeData.get(cmpId); - return dataMsg instanceof ObjectData ? ObjectData.unwrap(dataMsg) : (T)dataMsg; + return DataBagItem.unwrapIfNecessary(dataMsg); } /** @@ -106,7 +113,7 @@ private final class GridDiscoveryDataImpl implements GridDiscoveryData { private int cmpId; /** */ - private Map nodeSpecificData + private Map nodeSpecificData = new LinkedHashMap<>(DiscoveryDataBag.this.nodeSpecificData.size()); /** {@inheritDoc} */ @@ -115,16 +122,16 @@ private final class GridDiscoveryDataImpl implements GridDiscoveryData { } /** {@inheritDoc} */ - @Override @Nullable public Serializable commonData() { + @Override @Nullable public T commonData() { if (commonData != null) - return commonData.get(cmpId); + return DataBagItem.unwrapIfNecessary(commonData.get(cmpId)); return null; } /** {@inheritDoc} */ - @Override public Map nodeSpecificData() { - return nodeSpecificData; + @Override public Map nodeSpecificData() { + return F.viewReadOnly(nodeSpecificData, DataBagItem::unwrapIfNecessary); } /** @@ -142,7 +149,7 @@ private void componentId(int cmpId) { private void reinitNodeSpecData(int cmpId) { nodeSpecificData.clear(); - for (Map.Entry> e : DiscoveryDataBag.this.nodeSpecificData.entrySet()) { + for (Map.Entry> e : DiscoveryDataBag.this.nodeSpecificData.entrySet()) { if (e.getValue() != null && e.getValue().containsKey(cmpId)) nodeSpecificData.put(e.getKey(), e.getValue().get(cmpId)); } @@ -156,7 +163,7 @@ private void reinitNodeSpecData(int cmpId) { private static final UUID DEFAULT_KEY = null; /** */ - private UUID joiningNodeId; + private final UUID joiningNodeId; /** * Component IDs with already initialized common discovery data. @@ -164,13 +171,13 @@ private void reinitNodeSpecData(int cmpId) { private Set cmnDataInitializedCmps; /** */ - private Map joiningNodeData = new HashMap<>(); + private final Map joiningNodeData = new HashMap<>(); /** */ - private Map commonData = new HashMap<>(); + private final Map commonData = new HashMap<>(); /** */ - private Map> nodeSpecificData = new LinkedHashMap<>(); + private final Map> nodeSpecificData = new LinkedHashMap<>(); /** */ private JoiningNodeDiscoveryDataImpl newJoinerData; @@ -246,7 +253,7 @@ public JoiningNodeDiscoveryData newJoinerDiscoveryData(int cmpId) { * @param data Serializable data. */ public void addJoiningNodeData(Integer cmpId, Serializable data) { - joiningNodeData.put(cmpId, new ObjectData(data)); + joiningNodeData.put(cmpId, new DataBagItem(data)); } /** @@ -259,19 +266,35 @@ public void addJoiningNodeData(Integer cmpId, Message data) { /** * @param cmpId Component ID. - * @param data Data. + * @param data Serializable data. */ public void addGridCommonData(Integer cmpId, Serializable data) { + commonData.put(cmpId, new DataBagItem(data)); + } + + /** + * @param cmpId Component ID. + * @param data Message data. + */ + public void addGridCommonData(Integer cmpId, Message data) { commonData.put(cmpId, data); } /** * @param cmpId Component ID. - * @param data Data. + * @param data Serializable data. */ public void addNodeSpecificData(Integer cmpId, Serializable data) { + addNodeSpecificData(cmpId, new DataBagItem(data)); + } + + /** + * @param cmpId Component ID. + * @param data Message data. + */ + public void addNodeSpecificData(Integer cmpId, Message data) { if (!nodeSpecificData.containsKey(DEFAULT_KEY)) - nodeSpecificData.put(DEFAULT_KEY, new HashMap()); + nodeSpecificData.put(DEFAULT_KEY, new HashMap<>()); nodeSpecificData.get(DEFAULT_KEY).put(cmpId, data); } @@ -296,14 +319,14 @@ public void joiningNodeData(Map joinNodeData) { /** * @param cmnData Cmn data. */ - public void commonData(Map cmnData) { + public void commonData(Map cmnData) { commonData.putAll(cmnData); } /** * @param nodeSpecData Node specific data. */ - public void nodeSpecificData(Map> nodeSpecData) { + public void nodeSpecificData(Map> nodeSpecData) { nodeSpecificData.putAll(nodeSpecData); } @@ -316,12 +339,12 @@ public Map joiningNodeData() { * @return Discovery data for each Ignite component that is aggregated from the cluster nodes and sent to the * joining node. */ - public Map commonData() { + public Map commonData() { return commonData; } /** @return Discovery data that belongs to the current cluster node and is sent to the joining node. */ - @Nullable public Map localNodeSpecificData() { + @Nullable public Map localNodeSpecificData() { return nodeSpecificData.get(DEFAULT_KEY); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 82c012c2a1a94..8c97d91c587b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -124,7 +124,6 @@ import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; import org.apache.ignite.spi.IgniteSpiThread; -import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryNotification; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiListener; @@ -2581,36 +2580,6 @@ void add(TcpDiscoveryAbstractMessage msg) { if (addedMsg.gridDiscoveryData() != null) addedMsg.clearDiscoveryData(); } - else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { - TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg; - - if (addFinishMsg.clientDiscoData() != null) { - addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(addFinishMsg); - - msg = addFinishMsg; - - DiscoveryDataPacket discoData = addFinishMsg.clientDiscoData(); - - Set mrgdCmnData = new HashSet<>(); - Set mrgdSpecData = new HashSet<>(); - - boolean allMerged = false; - - for (TcpDiscoveryAbstractMessage msg0 : msgs) { - - if (msg0 instanceof TcpDiscoveryNodeAddFinishedMessage) { - DiscoveryDataPacket existingDiscoData = - ((TcpDiscoveryNodeAddFinishedMessage)msg0).clientDiscoData(); - - if (existingDiscoData != null) - allMerged = discoData.mergeDataFrom(existingDiscoData, mrgdCmnData, mrgdSpecData); - } - - if (allMerged) - break; - } - } - } else if (msg instanceof TcpDiscoveryNodeLeftMessage) clearClientAddFinished(msg.creatorNodeId()); else if (msg instanceof TcpDiscoveryNodeFailedMessage) @@ -4768,9 +4737,18 @@ private IgniteNodeValidationResult validateByIgniteComponents(TcpDiscoveryJoinRe /** */ private IgniteNodeValidationResult validateByIgniteComponentsWithJoiningNodeData(TcpDiscoveryJoinRequestMessage req) { - DiscoveryDataBag data = req.gridDiscoveryData().bagWithJoiningNodeData(); + DiscoveryDataPacket packet = req.gridDiscoveryData(); + + List errs = packet.checkUnmarshallingErrors(spi.ignite()); + + IgniteNodeValidationResult err = F.isEmpty(errs) ? + spi.getSpiContext().validateNode(req.node(), packet.bagWithJoiningNodeData()) : + new IgniteNodeValidationResult( + req.node().id(), + errs.get(0).getMessage() // We need only first error. + ); - return spi.getSpiContext().validateNode(req.node(), data); + return err; } /** */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index a7f68fb1cf542..7345f4ff7fed3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -2068,21 +2068,14 @@ DiscoveryDataPacket collectExchangeData(DiscoveryDataPacket dataPacket) { assert dataPacket != null; assert dataPacket.joiningNodeId() != null; - //create data bag, pass it to exchange.collect DiscoveryDataBag dataBag = dataPacket.bagForDataCollection(); exchange.collect(dataBag); - //marshall collected bag into packet, return packet if (dataPacket.joiningNodeId().equals(locNode.id())) dataPacket.addJoiningNodeData(dataBag); else - dataPacket.marshalGridNodeData( - dataBag, - locNode.id(), - marshaller(), - ignite.configuration().getNetworkCompressionLevel(), - log); + dataPacket.addNodeData(dataBag, locNode.id()); return dataPacket; } @@ -2097,19 +2090,20 @@ protected void onExchange(DiscoveryDataPacket dataPacket, ClassLoader clsLdr) { DiscoveryDataBag dataBag; + List errs = dataPacket.checkUnmarshallingErrors(ignite()); + if (dataPacket.joiningNodeId().equals(locNode.id())) { - try { - dataBag = dataPacket.unmarshalGridData(marshaller(), clsLdr, locNode.clientRouterNodeId() != null, log); - } - catch (IgniteCheckedException e) { + if (!F.isEmpty(errs)) { if (ignite() instanceof IgniteEx) { FailureProcessor failure = ((IgniteEx)ignite()).context().failure(); - failure.process(new FailureContext(CRITICAL_ERROR, e)); + failure.process(new FailureContext(CRITICAL_ERROR, errs.get(0))); } - throw new IgniteException(e); + throw new IgniteException(errs.get(0)); } + + dataBag = dataPacket.bagWithNodeData(); } else dataBag = dataPacket.bagWithJoiningNodeData(); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java index 4ded165df7082..2ebf7e09d1ee3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java @@ -16,38 +16,32 @@ */ package org.apache.ignite.spi.discovery.tcp.internal; -import java.io.Serializable; -import java.util.Arrays; -import java.util.Collection; +import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.Compress; import org.apache.ignite.internal.GridComponent; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.discovery.DataBagItem; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC; /** - * Carries discovery data in marshalled form + * Carries discovery data in form of {@link Message} * and allows convenient way of converting it to and from {@link DiscoveryDataBag} objects. */ -public class DiscoveryDataPacket implements Serializable, Message { - /** Local file header signature (read as a little-endian number). */ - private static final int ZIP_HEADER_SIGNATURE = 0x04034b50; - - /** */ - private static final long serialVersionUID = 0L; - +public class DiscoveryDataPacket implements Message { /** */ @Order(0) UUID joiningNodeId; @@ -59,11 +53,13 @@ public class DiscoveryDataPacket implements Serializable, Message { /** */ @Order(2) - Map commonData = new HashMap<>(); + @Compress + Map commonData = new HashMap<>(); /** */ @Order(3) - Map> nodeSpecificData = new HashMap<>(); + @Compress + Map> nodeSpecificData = new HashMap<>(); /** */ private transient boolean joiningNodeClient; @@ -90,24 +86,16 @@ public UUID joiningNodeId() { /** * @param bag Bag. * @param nodeId Node id. - * @param marsh Marsh. - * @param log Logger. */ - public void marshalGridNodeData(DiscoveryDataBag bag, UUID nodeId, Marshaller marsh, - int compressionLevel, IgniteLogger log) { - marshalData(bag.commonData(), commonData, marsh, compressionLevel, log); + public void addNodeData(DiscoveryDataBag bag, UUID nodeId) { + if (bag.commonData() != null) + commonData.putAll(bag.commonData()); - Map locNodeSpecificData = bag.localNodeSpecificData(); + Map locNodeSpecificData = bag.localNodeSpecificData(); if (locNodeSpecificData != null) { - Map marshLocNodeSpecificData = U.newHashMap(locNodeSpecificData.size()); - - marshalData(locNodeSpecificData, marshLocNodeSpecificData, marsh, compressionLevel, log); - - filterDuplicatedData(marshLocNodeSpecificData); - - if (!marshLocNodeSpecificData.isEmpty()) - nodeSpecificData.put(nodeId, marshLocNodeSpecificData); + if (!locNodeSpecificData.isEmpty()) + nodeSpecificData.put(nodeId, locNodeSpecificData); } } @@ -119,40 +107,15 @@ public void addJoiningNodeData(DiscoveryDataBag bag) { joiningNodeData.putAll(bag.joiningNodeData()); } - /** - * @param marsh Marsh. - * @param clsLdr Class loader. - * @param clientNode Client node. - * @param log Logger. - */ - public DiscoveryDataBag unmarshalGridData( - Marshaller marsh, - ClassLoader clsLdr, - boolean clientNode, - IgniteLogger log - ) throws IgniteCheckedException { + /** */ + public DiscoveryDataBag bagWithNodeData() { DiscoveryDataBag dataBag = new DiscoveryDataBag(joiningNodeId, joiningNodeClient); - if (commonData != null && !commonData.isEmpty()) - dataBag.commonData(unmarshalData(commonData, marsh, clsLdr, clientNode, log, true)); - - if (nodeSpecificData != null && !nodeSpecificData.isEmpty()) { - Map> unmarshNodeSpecData = U.newLinkedHashMap(nodeSpecificData.size()); - - for (Map.Entry> nodeBinEntry : nodeSpecificData.entrySet()) { - Map nodeBinData = nodeBinEntry.getValue(); - - if (nodeBinData == null || nodeBinData.isEmpty()) - continue; - - unmarshNodeSpecData.put( - nodeBinEntry.getKey(), - unmarshalData(nodeBinData, marsh, clsLdr, clientNode, log, true) - ); - } + if (!F.isEmpty(commonData)) + dataBag.commonData(commonData); - dataBag.nodeSpecificData(unmarshNodeSpecData); - } + if (!F.isEmpty(nodeSpecificData)) + dataBag.nodeSpecificData(F.view(nodeSpecificData, uuid -> !F.isEmpty(nodeSpecificData.get(uuid)))); return dataBag; } @@ -184,199 +147,49 @@ public boolean hasDataFromNode(UUID nodeId) { } /** - * @param existingDataPacket Existing data packet. - * @param mrgdCmnDataKeys Mrgd cmn data keys. - * @param mrgdSpecifDataKeys Mrgd specif data keys. - */ - public boolean mergeDataFrom( - DiscoveryDataPacket existingDataPacket, - Collection mrgdCmnDataKeys, - Collection mrgdSpecifDataKeys - ) { - if (commonData.size() != mrgdCmnDataKeys.size()) { - for (Map.Entry e : commonData.entrySet()) { - if (!mrgdCmnDataKeys.contains(e.getKey())) { - byte[] data = existingDataPacket.commonData.get(e.getKey()); - - if (data != null && Arrays.equals(e.getValue(), data)) { - e.setValue(data); - - boolean add = mrgdCmnDataKeys.add(e.getKey()); - - assert add; - - if (mrgdCmnDataKeys.size() == commonData.size()) - break; - } - } - } - } - - if (nodeSpecificData.size() != mrgdSpecifDataKeys.size()) { - for (Map.Entry> e : nodeSpecificData.entrySet()) { - if (!mrgdSpecifDataKeys.contains(e.getKey())) { - Map data = existingDataPacket.nodeSpecificData.get(e.getKey()); - - if (data != null && mapsEqual(e.getValue(), data)) { - e.setValue(data); - - boolean add = mrgdSpecifDataKeys.add(e.getKey()); - - assert add; - - if (mrgdSpecifDataKeys.size() == nodeSpecificData.size()) - break; - } - } - } - } - - return (mrgdCmnDataKeys.size() == commonData.size()) && (mrgdSpecifDataKeys.size() == nodeSpecificData.size()); - } - - /** - * @param m1 first map to compare. - * @param m2 second map to compare. + * Collects and dumps catched unmarshalling errors. + * + * @param ignite Ignite. */ - private boolean mapsEqual(Map m1, Map m2) { - if (m1 == m2) - return true; + public List checkUnmarshallingErrors(Ignite ignite) { + List> items = Stream.concat( + Stream.concat(joiningNodeData.entrySet().stream(), commonData.entrySet().stream()), + nodeSpecificData.values() + .stream() + .flatMap(m -> m.entrySet().stream())) + .filter(e -> e.getValue() instanceof DataBagItem) + .collect(Collectors.toList()); - if (m1.size() == m2.size()) { - for (Map.Entry e : m1.entrySet()) { - byte[] data = m2.get(e.getKey()); + List errs = new ArrayList<>(items.size()); - if (!Arrays.equals(e.getValue(), data)) - return false; - } + for (Map.Entry item : items) { + int cmpId = item.getKey(); + DataBagItem dataBagItem = (DataBagItem)item.getValue(); - return true; - } + IgniteCheckedException e = dataBagItem.unmarshallError(); - return false; - } - - /** - * @param src Source. - * @param marsh Marsh. - * @param clsLdr Class loader. - * @param clientNode Client node. - * @param log Logger. - * @param panic Throw unmarshalling if {@code true}. - * @throws IgniteCheckedException If {@code panic} is {@code True} and unmarshalling failed. - */ - private Map unmarshalData( - Map src, - Marshaller marsh, - ClassLoader clsLdr, - boolean clientNode, - IgniteLogger log, - boolean panic - ) throws IgniteCheckedException { - Map res = U.newHashMap(src.size()); - - for (Map.Entry binEntry : src.entrySet()) { - try { - Serializable compData = isZipped(binEntry.getValue()) ? - U.unmarshalZip(marsh, binEntry.getValue(), clsLdr) : - U.unmarshal(marsh, binEntry.getValue(), clsLdr); - res.put(binEntry.getKey(), compData); - } - catch (IgniteCheckedException e) { - if (CONTINUOUS_PROC.ordinal() == binEntry.getKey() && - X.hasCause(e, ClassNotFoundException.class) && clientNode - ) { - U.warn(log, "Failed to unmarshal continuous query remote filter on client node. Can be ignored."); + if (e != null) { + if (CONTINUOUS_PROC.ordinal() == cmpId && X.hasCause(e, ClassNotFoundException.class) && + ignite.configuration().isClientMode()) { + U.warn(ignite.log(), "Failed to unmarshal continuous query remote filter on client node. " + + "Can be ignored."); continue; } - else if (binEntry.getKey() < GridComponent.DiscoveryDataExchangeType.VALUES.length) { - U.error(log, - "Failed to unmarshal discovery data for component: " + - GridComponent.DiscoveryDataExchangeType.VALUES[binEntry.getKey()], - e - ); + else if (cmpId < GridComponent.DiscoveryDataExchangeType.VALUES.length) { + U.error(ignite.log(), "Failed to unmarshal discovery data for component: " + + GridComponent.DiscoveryDataExchangeType.VALUES[cmpId], e); } else { - U.warn(log, "Failed to unmarshal discovery data." + - " Component " + binEntry.getKey() + " is not found."); + U.warn(ignite.log(), "Failed to unmarshal discovery data." + + " Component " + cmpId + " is not found.", e); } - if (panic) - throw e; + errs.add(e); } } - return res; - } - - /** - * @param val Value to check. - * @return {@code true} if value is zipped. - */ - private boolean isZipped(byte[] val) { - return val != null && val.length > 3 && makeInt(val) == ZIP_HEADER_SIGNATURE; - } - - /** - * Make int from first 4 bytes in little-endian byte order. - * - * @param b Source of bytes. - * @return Made int. - */ - private static int makeInt(byte[] b) { - return (((b[3]) << 24) | - ((b[2] & 0xff) << 16) | - ((b[1] & 0xff) << 8) | - ((b[0] & 0xff))); - } - - /** - * @param src Source. - * @param target Target. - * @param marsh Marsh. - * @param log Logger. - */ - private void marshalData( - Map src, - Map target, - Marshaller marsh, - int compressionLevel, - IgniteLogger log - ) { - // may happen if nothing was collected from components, - // corresponding map (for common data or for node specific data) left null - if (src == null) - return; - - for (Map.Entry entry : src.entrySet()) { - try { - target.put(entry.getKey(), U.zip(U.marshal(marsh, entry.getValue()), compressionLevel)); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal discovery data " + - "[comp=" + entry.getKey() + ", data=" + entry.getValue() + ']', e); - } - } - } - - /** */ - private void filterDuplicatedData(Map discoData) { - for (Map existingData : nodeSpecificData.values()) { - Iterator> it = discoData.entrySet().iterator(); - - while (it.hasNext()) { - Map.Entry discoDataEntry = it.next(); - - byte[] curData = existingData.get(discoDataEntry.getKey()); - - if (Arrays.equals(curData, discoDataEntry.getValue())) - it.remove(); - } - - if (discoData.isEmpty()) - break; - } + return errs; } /** diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java index 30a7ec8029837..49a9ccc63c113 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.IgniteSpiContext; import org.apache.ignite.spi.IgniteSpiException; @@ -226,6 +227,11 @@ private class DelegatedDiscoverySpi extends IgniteSpiAdapter implements IgniteDi delegate.resolveCommunicationFailure(node, err); } + /** {@inheritDoc} */ + @Override public MessageFactoryProvider messageFactoryProvider() { + return delegate.messageFactoryProvider(); + } + /** Delegated discovery data exchange. */ private class DelegatedDiscoverySpiDataExchange implements DiscoverySpiDataExchange { /** Discovery data exchange delegate. */ diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index ad98a2936b98f..1f2080e270691 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -58,6 +58,8 @@ import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage; +import org.apache.ignite.internal.processors.marshaller.MappedName; +import org.apache.ignite.internal.processors.marshaller.MarshallerMappingsData; import org.apache.ignite.internal.processors.port.GridPortRecord; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.lang.GridAbsPredicate; @@ -2440,12 +2442,12 @@ private static class TestTcpDiscoveryMarshallerDataSpi extends TcpDiscoverySpi { exchange.onExchange(dataBag); } - private List getAllMappings(DiscoveryDataBag bag) { - return (List)bag.commonData().get(MARSHALLER_PROC.ordinal()); + private MarshallerMappingsData getAllMappings(DiscoveryDataBag bag) { + return (MarshallerMappingsData)bag.commonData().get(MARSHALLER_PROC.ordinal()); } - private Map getJavaMappings(List allMappings) { - return (Map)allMappings.get(JAVA_ID); + private Map getJavaMappings(MarshallerMappingsData allMappings) { + return allMappings.mappings().get(JAVA_ID); } }); } diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java index a186aed526567..b551506c8ad32 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java @@ -17,28 +17,28 @@ package org.apache.ignite.spi.discovery.zk.internal; -import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.plugin.extensions.communication.Message; /** * */ class ZkBulkJoinContext { /** */ - List>> nodes; + List> nodes; /** * @param nodeEvtData Node event data. * @param discoData Discovery data for node. */ - void addJoinedNode(ZkJoinedNodeEvtData nodeEvtData, Map discoData) { + void addJoinedNode(ZkJoinedNodeEvtData nodeEvtData, Map discoData) { if (nodes == null) nodes = new ArrayList<>(); - nodes.add(new T2<>(nodeEvtData, discoData)); + nodes.add(new T2<>(nodeEvtData, new ZkDiscoData(discoData))); } /** diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoData.java new file mode 100644 index 0000000000000..c679790dca3ad --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoData.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.Map; +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; + +/** Data bag data holder. */ +public class ZkDiscoData implements Message { + /** */ + @Order(0) + Map data; + + /** + * Default constructor for {@link MessageFactory}. + */ + public ZkDiscoData() { + // No-op. + } + + /** + * @param data Discovery data. + */ + public ZkDiscoData(Map data) { + this.data = data; + } + + /** + * @return Data. + */ + public Map data() { + return data; + } +} diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java index 39d89b32af878..fd1937c0ffedd 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java @@ -28,5 +28,6 @@ public class ZkMessageFactory implements MessageFactoryProvider { factory.register(401, ZkCommunicationErrorResolveStartMessage::new, new ZkCommunicationErrorResolveStartMessageSerializer()); factory.register(402, ZkForceNodeFailMessage::new, new ZkForceNodeFailMessageSerializer()); factory.register(403, ZkNoServersMessage::new, new ZkNoServersMessageSerializer()); + factory.register(404, ZkDiscoData::new, new ZkDiscoDataSerializer()); } } diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 0d0531d1a9a2c..c6be3e5dcc8d3 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -78,6 +78,7 @@ import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.security.SecurityCredentials; import org.apache.ignite.spi.IgniteNodeValidationResult; @@ -1781,7 +1782,7 @@ private void generateBulkJoinEvent(TreeMap curTop, Z long evtId = rtState.evtsData.evtIdGen; - List>> nodes = joinCtx.nodes; + List> nodes = joinCtx.nodes; assert nodes != null && !nodes.isEmpty(); @@ -1793,11 +1794,9 @@ private void generateBulkJoinEvent(TreeMap curTop, Z Map dupDiscoData = null; for (int i = 0; i < nodeCnt; i++) { - T2> nodeEvtData = nodes.get(i); + T2 nodeEvtData = nodes.get(i); - Map discoData = nodeEvtData.get2(); - - byte[] discoDataBytes = U.marshal(marsh, discoData); + byte[] discoDataBytes = msgParser.marshalZip(nodeEvtData.get2()); Long dupDataNode = null; @@ -2251,7 +2250,7 @@ private void addJoinedNode( exchange.collect(collectBag); - Map commonData = collectBag.commonData(); + Map commonData = collectBag.commonData(); Object old = curTop.put(joinedNode.order(), joinedNode); @@ -3021,12 +3020,11 @@ private void processLocalJoin(ZkDiscoveryEventsData evtsData, byte[] discoDataBytes = dataForJoined.discoveryDataForNode(locNode.order()); - Map commonDiscoData = - marsh.unmarshal(discoDataBytes, U.resolveClassLoader(spi.ignite().configuration())); + ZkDiscoData commonDiscoData = msgParser.unmarshalZip(discoDataBytes); DiscoveryDataBag dataBag = new DiscoveryDataBag(locNode.id(), locNode.isClient()); - dataBag.commonData(commonDiscoData); + dataBag.commonData(commonDiscoData.data()); exchange.onExchange(dataBag); From 64161296ac3c66eb51ebd3d6ea6c4b72c44382a5 Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Thu, 18 Jun 2026 09:06:22 +0300 Subject: [PATCH 2/2] IGNITE-28787 Use Message DTO to transfer CacheClientReconnectDiscoveryData & CacheJoinNodeDiscoveryData --- .../ignite/internal/CoreMessagesProvider.java | 12 ++ .../CacheClientReconnectDiscoveryData.java | 155 +----------------- .../cache/CacheGroupRecoveryState.java | 10 +- .../processors/cache/CacheJoinInfo.java | 128 +++++++++++++++ .../cache/CacheJoinNodeDiscoveryData.java | 135 +++------------ .../processors/cache/CacheReconnectInfo.java | 83 ++++++++++ .../cache/ClusterCacheGroupRecoveryData.java | 22 ++- .../processors/cache/ClusterCachesInfo.java | 75 +++------ .../cache/GridLocalConfigManager.java | 20 +-- .../cache/ValidationOnNodeJoinUtils.java | 2 +- 10 files changed, 310 insertions(+), 332 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinInfo.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheReconnectInfo.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index 2695f623b2d17..ce47ef9f7510a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -55,14 +55,20 @@ import org.apache.ignite.internal.processors.authentication.UserManagementOperationFinishedMessage; import org.apache.ignite.internal.processors.authentication.UserProposedMessage; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; +import org.apache.ignite.internal.processors.cache.CacheClientReconnectDiscoveryData; import org.apache.ignite.internal.processors.cache.CacheConfigurationEnrichment; import org.apache.ignite.internal.processors.cache.CacheEntryPredicateAdapter; import org.apache.ignite.internal.processors.cache.CacheEvictionEntry; +import org.apache.ignite.internal.processors.cache.CacheGroupRecoveryState; import org.apache.ignite.internal.processors.cache.CacheInvokeDirectResult; +import org.apache.ignite.internal.processors.cache.CacheJoinInfo; +import org.apache.ignite.internal.processors.cache.CacheJoinNodeDiscoveryData; +import org.apache.ignite.internal.processors.cache.CacheReconnectInfo; import org.apache.ignite.internal.processors.cache.CacheStatisticsClearMessage; import org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessage; import org.apache.ignite.internal.processors.cache.ClientCacheChangeDiscoveryMessage; import org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscoveryMessage; +import org.apache.ignite.internal.processors.cache.ClusterCacheGroupRecoveryData; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.ExchangeFailureMessage; @@ -454,6 +460,12 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(ClientCacheChangeDummyDiscoveryMessage.class); withNoSchema(DynamicCacheChangeBatch.class); withSchema(InitialUsersData.class); + withNoSchema(CacheClientReconnectDiscoveryData.class); + withNoSchema(CacheGroupRecoveryState.class); + withNoSchema(CacheJoinInfo.class); + withNoSchema(CacheJoinNodeDiscoveryData.class); + withNoSchema(CacheReconnectInfo.class); + withNoSchema(ClusterCacheGroupRecoveryData.class); // [10000 - 10200]: Transaction and lock related messages. Most of them originally comes from Communication. msgIdx = 10000; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java index 83692fb75201f..492f60d88b5c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java @@ -17,175 +17,36 @@ package org.apache.ignite.internal.processors.cache; -import java.io.Serializable; import java.util.Map; -import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Discovery data sent from client reconnecting to cluster. */ -public class CacheClientReconnectDiscoveryData implements Serializable { +public class CacheClientReconnectDiscoveryData implements Message { /** */ - private static final long serialVersionUID = 0L; + @Order(0) + Map clientCaches; /** */ - private final Map clientCacheGrps; - - /** */ - private final Map clientCaches; + public CacheClientReconnectDiscoveryData() { } /** * @param clientCaches Information about caches started on re-joining client node. - * @param clientCacheGrps Information about cach groups started on re-joining client node. */ - CacheClientReconnectDiscoveryData(Map clientCacheGrps, - Map clientCaches) { - this.clientCacheGrps = clientCacheGrps; + CacheClientReconnectDiscoveryData(Map clientCaches) { this.clientCaches = clientCaches; } /** * @return Information about caches started on re-joining client node. */ - Map clientCacheGroups() { - return clientCacheGrps; - } - - /** - * @return Information about caches started on re-joining client node. - */ - public Map clientCaches() { + public Map clientCaches() { return clientCaches; } - /** - * - */ - static class CacheGroupInfo implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final CacheConfiguration ccfg; - - /** */ - private final IgniteUuid deploymentId; - - /** Flags added for future usage. */ - private final long flags; - - /** - * @param ccfg Cache group configuration. - * @param deploymentId Cache group deployment ID. - * @param flags Flags (for future usage). - */ - CacheGroupInfo(CacheConfiguration ccfg, - IgniteUuid deploymentId, - long flags) { - assert ccfg != null; - assert deploymentId != null; - - this.ccfg = ccfg; - this.deploymentId = deploymentId; - this.flags = flags; - } - - /** - * @return Cache group configuration. - */ - CacheConfiguration config() { - return ccfg; - } - - /** - * @return Cache group deployment ID. - */ - IgniteUuid deploymentId() { - return deploymentId; - } - } - - /** - * - */ - static class CacheInfo implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final CacheConfiguration ccfg; - - /** */ - private final CacheType cacheType; - - /** */ - private final IgniteUuid deploymentId; - - /** */ - private final boolean nearCache; - - /** Flags added for future usage. */ - private final long flags; - - /** - * @param ccfg Cache configuration. - * @param cacheType Cache type. - * @param deploymentId Cache deployment ID. - * @param nearCache Near cache flag. - * @param flags Flags (for future usage). - */ - CacheInfo(CacheConfiguration ccfg, - CacheType cacheType, - IgniteUuid deploymentId, - boolean nearCache, - long flags) { - assert ccfg != null; - assert cacheType != null; - assert deploymentId != null; - - this.ccfg = ccfg; - this.cacheType = cacheType; - this.deploymentId = deploymentId; - this.nearCache = nearCache; - this.flags = flags; - } - - /** - * @return Cache configuration. - */ - CacheConfiguration config() { - return ccfg; - } - - /** - * @return Cache type. - */ - CacheType cacheType() { - return cacheType; - } - - /** - * @return Cache deployment ID. - */ - IgniteUuid deploymentId() { - return deploymentId; - } - - /** - * @return Near cache flag. - */ - boolean nearCache() { - return nearCache; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CacheInfo.class, this); - } - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheClientReconnectDiscoveryData.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupRecoveryState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupRecoveryState.java index 880fa09b21c62..da288d580cae9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupRecoveryState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupRecoveryState.java @@ -25,19 +25,23 @@ import java.util.Collections; import java.util.Set; import java.util.stream.Collectors; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; /** */ -public class CacheGroupRecoveryState implements Externalizable { +public class CacheGroupRecoveryState implements Externalizable, Message { /** */ private static final long serialVersionUID = 0L; /** */ - private Set lostParts; + @Order(0) + Set lostParts; /** */ - private Set zeroParts; + @Order(1) + Set zeroParts; /** */ public CacheGroupRecoveryState() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinInfo.java new file mode 100644 index 0000000000000..8af1d529dcb41 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinInfo.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.MarshallableMessage; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; + +/** */ +public class CacheJoinInfo implements MarshallableMessage { + /** */ + @GridToStringInclude + private StoredCacheData cacheData; + + /** */ + @Order(0) + byte[] cacheDataBytes; + + /** */ + @Order(1) + @GridToStringInclude + CacheType cacheType; + + /** */ + @Order(2) + @GridToStringInclude + boolean sql; + + /** Flags added for future usage. */ + @Order(3) + long flags; + + /** Statically configured flag. */ + @Order(4) + boolean staticallyConfigured; + + /** */ + public CacheJoinInfo() { } + + /** + * @param cacheData Cache data. + * @param cacheType Cache type. + * @param sql SQL flag - {@code true} if cache was created with {@code CREATE TABLE}. + * @param flags Flags (for future usage). + * @param staticallyConfigured {@code true} if it was configured by static config and {@code false} otherwise. + */ + public CacheJoinInfo(StoredCacheData cacheData, CacheType cacheType, boolean sql, long flags, + boolean staticallyConfigured) { + this.cacheData = cacheData; + this.cacheType = cacheType; + this.sql = sql; + this.flags = flags; + this.staticallyConfigured = staticallyConfigured; + } + + /** + * @return Cache data. + */ + public StoredCacheData cacheData() { + return cacheData; + } + + /** + * @return Cache type. + */ + public CacheType cacheType() { + return cacheType; + } + + /** + * @return SQL flag - {@code true} if cache was created with {@code CREATE TABLE}. + */ + public boolean sql() { + return sql; + } + + /** + * @return {@code true} if it was configured by static config and {@code false} otherwise. + */ + public boolean isStaticallyConfigured() { + return staticallyConfigured; + } + + /** + * @return Long which bits represent some flags. + */ + public long getFlags() { + return flags; + } + + /** + * {@inheritDoc} + */ + @Override public String toString() { + return S.toString(CacheJoinInfo.class, this); + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + if (cacheData != null) + cacheDataBytes = U.marshal(marsh, cacheData); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + if (cacheDataBytes != null) + cacheData = U.unmarshal(marsh, cacheDataBytes, clsLdr); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java index 700d5166608e7..08f7f2b51acfc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java @@ -17,39 +17,42 @@ package org.apache.ignite.internal.processors.cache; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.Serializable; import java.util.Map; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** * Information about configured caches sent from joining node. */ -public class CacheJoinNodeDiscoveryData implements Serializable { +public class CacheJoinNodeDiscoveryData implements Message { /** */ - private static final long serialVersionUID = 0L; + @Order(0) + @GridToStringInclude + Map caches; /** */ + @Order(1) @GridToStringInclude - private final Map caches; + Map templates; /** */ @GridToStringInclude - private final Map templates; + @Order(2) + IgniteUuid cacheDeploymentId; /** */ - @GridToStringInclude - private final IgniteUuid cacheDeploymentId; + @Order(3) + boolean startCaches; /** */ - private final boolean startCaches; + @Nullable ClusterCacheGroupRecoveryData clusterCacheGrpRecoveryData; /** */ - @Nullable private ClusterCacheGroupRecoveryData clusterCacheGroupRecoveryData; + public CacheJoinNodeDiscoveryData() { } /** * @param cacheDeploymentId Deployment ID for started caches. @@ -59,8 +62,8 @@ public class CacheJoinNodeDiscoveryData implements Serializable { */ public CacheJoinNodeDiscoveryData( IgniteUuid cacheDeploymentId, - Map caches, - Map templates, + Map caches, + Map templates, boolean startCaches ) { this.cacheDeploymentId = cacheDeploymentId; @@ -86,121 +89,25 @@ public IgniteUuid cacheDeploymentId() { /** * @return Templates configured on joining node. */ - public Map templates() { + public Map templates() { return templates; } /** * @return Caches configured on joining node. */ - public Map caches() { + public Map caches() { return caches; } /** */ - public void clusterCacheGroupRecoveryData(@Nullable ClusterCacheGroupRecoveryData clusterCacheGroupRecoveryData) { - this.clusterCacheGroupRecoveryData = clusterCacheGroupRecoveryData; + public void clusterCacheGroupRecoveryData(@Nullable ClusterCacheGroupRecoveryData clusterCacheGrpRecoveryData) { + this.clusterCacheGrpRecoveryData = clusterCacheGrpRecoveryData; } /** */ @Nullable public ClusterCacheGroupRecoveryData clusterCacheGroupRecoveryData() { - return clusterCacheGroupRecoveryData; - } - - /** - * - */ - public static class CacheInfo implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - @GridToStringInclude - private StoredCacheData cacheData; - - /** */ - @GridToStringInclude - private CacheType cacheType; - - /** */ - @GridToStringInclude - private boolean sql; - - /** Flags added for future usage. */ - private long flags; - - /** Statically configured flag */ - private boolean staticallyConfigured; - - /** - * @param cacheData Cache data. - * @param cacheType Cache type. - * @param sql SQL flag - {@code true} if cache was created with {@code CREATE TABLE}. - * @param flags Flags (for future usage). - * @param staticallyConfigured {@code true} if it was configured by static config and {@code false} otherwise. - */ - public CacheInfo(StoredCacheData cacheData, CacheType cacheType, boolean sql, long flags, - boolean staticallyConfigured) { - this.cacheData = cacheData; - this.cacheType = cacheType; - this.sql = sql; - this.flags = flags; - this.staticallyConfigured = staticallyConfigured; - } - - /** - * @return Cache data. - */ - public StoredCacheData cacheData() { - return cacheData; - } - - /** - * @return Cache type. - */ - public CacheType cacheType() { - return cacheType; - } - - /** - * @return SQL flag - {@code true} if cache was created with {@code CREATE TABLE}. - */ - public boolean sql() { - return sql; - } - - /** - * @return {@code true} if it was configured by static config and {@code false} otherwise. - */ - public boolean isStaticallyConfigured() { - return staticallyConfigured; - } - - /** - * @return Long which bits represent some flags. - */ - public long getFlags() { - return flags; - } - - /** - * @param ois ObjectInputStream. - */ - private void readObject(ObjectInputStream ois) - throws IOException, ClassNotFoundException { - ObjectInputStream.GetField gf = ois.readFields(); - - cacheData = (StoredCacheData)gf.get("cacheData", null); - cacheType = (CacheType)gf.get("cacheType", null); - sql = gf.get("sql", false); - flags = gf.get("flags", 0L); - staticallyConfigured = gf.get("staticallyConfigured", true); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CacheInfo.class, this); - } + return clusterCacheGrpRecoveryData; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheReconnectInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheReconnectInfo.java new file mode 100644 index 0000000000000..760496716ee41 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheReconnectInfo.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** */ +public class CacheReconnectInfo implements Message { + /** */ + @Order(0) + String cacheName; + + /** */ + @Order(1) + IgniteUuid deploymentId; + + /** */ + @Order(2) + boolean nearCache; + + /** */ + public CacheReconnectInfo() { } + + /** + * @param cacheName Cache name. + * @param deploymentId Cache deployment ID. + * @param nearCache Near cache flag. + */ + public CacheReconnectInfo(String cacheName, IgniteUuid deploymentId, boolean nearCache) { + assert cacheName != null; + assert deploymentId != null; + + this.cacheName = cacheName; + this.deploymentId = deploymentId; + this.nearCache = nearCache; + } + + /** + * @return Cache configuration. + */ + String cacheName() { + return cacheName; + } + + /** + * @return Cache deployment ID. + */ + IgniteUuid deploymentId() { + return deploymentId; + } + + /** + * @return Near cache flag. + */ + boolean nearCache() { + return nearCache; + } + + /** + * {@inheritDoc} + */ + @Override public String toString() { + return S.toString(CacheReconnectInfo.class, this); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCacheGroupRecoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCacheGroupRecoveryData.java index 0d71593378a49..6cf7f44053edc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCacheGroupRecoveryData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCacheGroupRecoveryData.java @@ -24,19 +24,23 @@ import java.util.Collection; import java.util.Map; import java.util.stream.Collectors; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** */ -public class ClusterCacheGroupRecoveryData implements Externalizable { +public class ClusterCacheGroupRecoveryData implements Externalizable, Message { /** */ private static final long serialVersionUID = 0L; /** */ - private long clusterBaselineTopologyVersion; + @Order(0) + long clusterBaselineTopVer; /** */ - private Map grpStates; + @Order(1) + Map grpStates; /** */ public ClusterCacheGroupRecoveryData() { @@ -44,14 +48,14 @@ public ClusterCacheGroupRecoveryData() { } /** */ - public ClusterCacheGroupRecoveryData(long clusterBaselineTopologyVersion, Collection grps) { - this.clusterBaselineTopologyVersion = clusterBaselineTopologyVersion; - this.grpStates = grps.stream().collect(Collectors.toMap(CacheGroupContext::groupId, CacheGroupRecoveryState::new)); + public ClusterCacheGroupRecoveryData(long clusterBaselineTopVer, Collection grps) { + this.clusterBaselineTopVer = clusterBaselineTopVer; + grpStates = grps.stream().collect(Collectors.toMap(CacheGroupContext::groupId, CacheGroupRecoveryState::new)); } /** */ public boolean isMoreRelevantThan(ClusterCacheGroupRecoveryData data) { - return clusterBaselineTopologyVersion > data.clusterBaselineTopologyVersion; + return clusterBaselineTopVer > data.clusterBaselineTopVer; } /** */ @@ -61,13 +65,13 @@ public boolean isMoreRelevantThan(ClusterCacheGroupRecoveryData data) { /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(clusterBaselineTopologyVersion); + out.writeLong(clusterBaselineTopVer); U.writeMap(out, grpStates); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - clusterBaselineTopologyVersion = in.readLong(); + clusterBaselineTopVer = in.readLong(); grpStates = U.readHashMap(in); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index d5c7d8dce2536..a9101902b22c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache; import java.io.File; -import java.io.Serializable; import java.nio.file.InvalidPathException; import java.util.ArrayList; import java.util.Arrays; @@ -82,6 +81,7 @@ import org.apache.ignite.plugin.CachePluginContext; import org.apache.ignite.plugin.CachePluginProvider; import org.apache.ignite.plugin.PluginProvider; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.systemview.view.CacheGroupView; import org.apache.ignite.spi.systemview.view.CacheView; @@ -301,7 +301,7 @@ public void onStart(CacheJoinNodeDiscoveryData joinDiscoData) throws IgniteCheck Map grpCfgs = new HashMap<>(); - for (CacheJoinNodeDiscoveryData.CacheInfo info : joinDiscoData.caches().values()) { + for (CacheJoinInfo info : joinDiscoData.caches().values()) { if (info.cacheData().config().getGroupName() == null) continue; @@ -330,7 +330,7 @@ public void onKernalStart(boolean checkConsistency) throws IgniteCheckedExceptio if (gridData != null && gridData.joinDiscoData != null) { CacheJoinNodeDiscoveryData joinDiscoData = gridData.joinDiscoData; - for (CacheJoinNodeDiscoveryData.CacheInfo locCacheInfo : joinDiscoData.caches().values()) { + for (CacheJoinInfo locCacheInfo : joinDiscoData.caches().values()) { CacheConfiguration locCfg = locCacheInfo.cacheData().config(); CacheData cacheData = gridData.gridData.caches().get(locCfg.getName()); @@ -388,7 +388,7 @@ public void onKernalStart(boolean checkConsistency) throws IgniteCheckedExceptio * @throws IgniteCheckedException If check failed. */ @SuppressWarnings("unchecked") - private void checkCache(CacheJoinNodeDiscoveryData.CacheInfo locInfo, CacheData rmtData, UUID rmt) + private void checkCache(CacheJoinInfo locInfo, CacheData rmtData, UUID rmt) throws IgniteCheckedException { GridCacheAttributes rmtAttr = new GridCacheAttributes(rmtData.cacheConfiguration(), rmtData.cacheConfigurationEnrichment()); GridCacheAttributes locAttr = @@ -1277,37 +1277,22 @@ Collection restartingCaches() { /** * @return Discovery date sent on local node join. */ - private Serializable joinDiscoveryData() { + private Message joinDiscoveryData() { if (cachesOnDisconnect != null) { - Map cacheGrpsInfo = new HashMap<>(); - Map cachesInfo = new HashMap<>(); + Map cachesInfo = new HashMap<>(); - Map grps = cachesOnDisconnect.cacheGrps; Map caches = cachesOnDisconnect.caches; - for (CacheGroupContext grp : ctx.cache().cacheGroups()) { - CacheGroupDescriptor desc = grps.get(grp.groupId()); - - assert desc != null : grp.cacheOrGroupName(); - - cacheGrpsInfo.put(grp.groupId(), new CacheClientReconnectDiscoveryData.CacheGroupInfo(desc.config(), - desc.deploymentId(), - 0)); - } - - for (IgniteInternalCache cache : ctx.cache().caches()) { + for (IgniteInternalCache cache : ctx.cache().caches()) { DynamicCacheDescriptor desc = caches.get(cache.name()); assert desc != null : cache.name(); - cachesInfo.put(cache.name(), new CacheClientReconnectDiscoveryData.CacheInfo(desc.cacheConfiguration(), - desc.cacheType(), - desc.deploymentId(), - cache.context().isNear(), - 0)); + cachesInfo.put(cache.name(), new CacheReconnectInfo(desc.cacheConfiguration().getName(), + desc.deploymentId(), cache.context().isNear())); } - return new CacheClientReconnectDiscoveryData(cacheGrpsInfo, cachesInfo); + return new CacheClientReconnectDiscoveryData(cachesInfo); } else { assert joinDiscoData != null; @@ -1539,7 +1524,7 @@ public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { String conflictErr = null; if (joinDiscoData != null) { - for (Map.Entry e : joinDiscoData.caches().entrySet()) { + for (Map.Entry e : joinDiscoData.caches().entrySet()) { if (!registeredCaches.containsKey(e.getKey())) { conflictErr = checkCacheConflict(e.getValue().cacheData().config(), true); @@ -1811,7 +1796,7 @@ private Collection getLocalQueryEntities(String cacheName) { if (joinDiscoData == null) return Collections.emptyList(); - CacheJoinNodeDiscoveryData.CacheInfo cacheInfo = joinDiscoData.caches().get(cacheName); + CacheJoinInfo cacheInfo = joinDiscoData.caches().get(cacheName); if (cacheInfo == null) return Collections.emptyList(); @@ -1848,7 +1833,7 @@ private void initStartCachesForLocalJoin(boolean firstNode, boolean reconnect) { if (reconnect && surviveReconnect(cfg.getName()) && cachesOnDisconnect.state.active() && active) continue; - CacheJoinNodeDiscoveryData.CacheInfo locCfg = joinDiscoData.caches().get(cfg.getName()); + CacheJoinInfo locCfg = joinDiscoData.caches().get(cfg.getName()); NearCacheConfiguration nearCfg = null; @@ -2037,20 +2022,18 @@ public ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, Affini */ public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) { if (data.hasJoiningNodeData()) { - Serializable joiningNodeData = data.joiningNodeData(); - - if (joiningNodeData instanceof CacheClientReconnectDiscoveryData) { + if (data.joiningNodeData() instanceof CacheClientReconnectDiscoveryData reconData) { if (disconnectedState()) { if (clientReconnectReqs == null) clientReconnectReqs = new LinkedHashMap<>(); - clientReconnectReqs.put(data.joiningNodeId(), (CacheClientReconnectDiscoveryData)joiningNodeData); + clientReconnectReqs.put(data.joiningNodeId(), reconData); } else - processClientReconnectData((CacheClientReconnectDiscoveryData)joiningNodeData, data.joiningNodeId()); + processClientReconnectData(reconData, data.joiningNodeId()); } - else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) - processJoiningNode((CacheJoinNodeDiscoveryData)joiningNodeData, data.joiningNodeId(), false); + else if (data.joiningNodeData() instanceof CacheJoinNodeDiscoveryData joinData) + processJoiningNode(joinData, data.joiningNodeId(), false); } } @@ -2061,15 +2044,11 @@ else if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) */ public String validateJoiningNodeData(DiscoveryDataBag.JoiningNodeDiscoveryData data, boolean joiningNodeClient) { if (data.hasJoiningNodeData()) { - Serializable joiningNodeData = data.joiningNodeData(); - - if (joiningNodeData instanceof CacheJoinNodeDiscoveryData) { - CacheJoinNodeDiscoveryData joinData = (CacheJoinNodeDiscoveryData)joiningNodeData; - + if (data.joiningNodeData() instanceof CacheJoinNodeDiscoveryData joinData) { Set problemCaches = null; Set encClientCaches = null; - for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.caches().values()) { + for (CacheJoinInfo cacheInfo : joinData.caches().values()) { CacheConfiguration cfg = cacheInfo.cacheData().config(); if (!registeredCaches.containsKey(cfg.getName())) { @@ -2125,16 +2104,16 @@ private void processClientReconnectData(CacheClientReconnectDiscoveryData client DiscoveryDataClusterState state = ctx.state().clusterState(); if (state.state().active() && !state.transition()) { - for (CacheClientReconnectDiscoveryData.CacheInfo cacheInfo : clientData.clientCaches().values()) { - String cacheName = cacheInfo.config().getName(); + for (CacheReconnectInfo cacheReconnectInfo : clientData.clientCaches().values()) { + String cacheName = cacheReconnectInfo.cacheName(); if (surviveReconnect(cacheName)) ctx.discovery().addClientNode(cacheName, clientNodeId, false); else { DynamicCacheDescriptor desc = registeredCaches.get(cacheName); - if (desc != null && desc.deploymentId().equals(cacheInfo.deploymentId())) - ctx.discovery().addClientNode(cacheName, clientNodeId, cacheInfo.nearCache()); + if (desc != null && desc.deploymentId().equals(cacheReconnectInfo.deploymentId())) + ctx.discovery().addClientNode(cacheName, clientNodeId, cacheReconnectInfo.nearCache()); } } } @@ -2216,7 +2195,7 @@ private String processJoiningNode(CacheJoinNodeDiscoveryData joinData, UUID node boolean hasSchemaPatchConflict = false; boolean active = ctx.state().clusterState().active(); - for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.caches().values()) { + for (CacheJoinInfo cacheInfo : joinData.caches().values()) { CacheConfiguration cfg = cacheInfo.cacheData().config(); if (!registeredCaches.containsKey(cfg.getName())) { @@ -2278,7 +2257,7 @@ else if (!schemaPatch.isEmpty() && !hasSchemaPatchConflict) private void registerNewCache( CacheJoinNodeDiscoveryData joinData, UUID nodeId, - CacheJoinNodeDiscoveryData.CacheInfo cacheInfo + CacheJoinInfo cacheInfo ) { CacheConfiguration cfg = cacheInfo.cacheData().config(); @@ -2345,7 +2324,7 @@ private void registerNewCache( * @param nodeId Joining node id. */ private void registerNewCacheTemplates(CacheJoinNodeDiscoveryData joinData, UUID nodeId) { - for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : joinData.templates().values()) { + for (CacheJoinInfo cacheInfo : joinData.templates().values()) { CacheConfiguration cfg = cacheInfo.cacheData().config(); if (!registeredTemplates.containsKey(cfg.getName())) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java index 24ec163584f0c..324849ec7af79 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java @@ -354,9 +354,9 @@ public Set localCachesOnStart() { * @throws IgniteCheckedException If failed. */ public CacheJoinNodeDiscoveryData restoreCacheConfigurations() throws IgniteCheckedException { - Map caches = new HashMap<>(); + Map caches = new HashMap<>(); - Map templates = new HashMap<>(); + Map templates = new HashMap<>(); restoreCaches(caches, templates, ctx.config()); @@ -488,8 +488,8 @@ private boolean startAllCachesOnClientStart() { * @param igniteCfg Ignite configuration. */ private void restoreCaches( - Map caches, - Map templates, + Map caches, + Map templates, IgniteConfiguration igniteCfg ) throws IgniteCheckedException { CacheConfiguration[] cfgs = igniteCfg.getCacheConfiguration(); @@ -567,7 +567,7 @@ private void restoreCaches( * @param isStaticallyConfigured Statically configured flag. */ private void addStoredCache( - Map caches, + Map caches, StoredCacheData cacheData, String cacheName, CacheType cacheType, @@ -581,7 +581,7 @@ private void addStoredCache( stopSeq.addFirst(cacheName); } - caches.put(cacheName, new CacheJoinNodeDiscoveryData.CacheInfo(cacheData, cacheType, cacheData.sql(), + caches.put(cacheName, new CacheJoinInfo(cacheData, cacheType, cacheData.sql(), persistedBefore ? 1 : 0, isStaticallyConfigured)); } @@ -595,8 +595,8 @@ private void addStoredCache( private void addCacheFromConfiguration( CacheConfiguration cfg, boolean sql, - Map caches, - Map templates + Map caches, + Map templates ) throws IgniteCheckedException { String cacheName = cfg.getName(); @@ -604,7 +604,7 @@ private void addCacheFromConfiguration( Collection> ccfgs = new ArrayList<>(caches.size()); - for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : caches.values()) + for (CacheJoinInfo cacheInfo : caches.values()) ccfgs.add(cacheInfo.cacheData().config()); String err = validateIncomingConfiguration(ccfgs, cfg); @@ -631,7 +631,7 @@ private void addCacheFromConfiguration( cfg = splitCfg.get1(); if (GridCacheUtils.isCacheTemplateName(cacheName)) - templates.put(cacheName, new CacheJoinNodeDiscoveryData.CacheInfo(cacheData, CacheType.USER, false, 0, true)); + templates.put(cacheName, new CacheJoinInfo(cacheData, CacheType.USER, false, 0, true)); else { if (caches.containsKey(cacheName)) { throw new IgniteCheckedException("Duplicate cache name found (check configuration and " + diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java index ac8d97b2a127c..04bcfbb78827d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ValidationOnNodeJoinUtils.java @@ -150,7 +150,7 @@ public class ValidationOnNodeJoinUtils { } } - for (CacheJoinNodeDiscoveryData.CacheInfo cacheInfo : nodeData.caches().values()) { + for (CacheJoinInfo cacheInfo : nodeData.caches().values()) { if (secCtx != null && cacheInfo.cacheType() == CacheType.USER) { try (Scope ignored = ctx.security().withContext(secCtx)) { GridCacheProcessor.authorizeCacheCreate(ctx.security(), cacheInfo.cacheData().config());