diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index 9da592635d229..1ff257553094e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -40,10 +40,13 @@ import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyRequest; import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse; import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted; +import org.apache.ignite.internal.managers.encryption.KnownEncryptionKeys; import org.apache.ignite.internal.managers.encryption.MasterKeyChangeRequest; import org.apache.ignite.internal.managers.encryption.NodeEncryptionKeys; +import org.apache.ignite.internal.managers.eventstorage.EnabledEvents; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage; import org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider; +import org.apache.ignite.internal.processors.authentication.InitialUsersData; import org.apache.ignite.internal.processors.authentication.User; import org.apache.ignite.internal.processors.authentication.UserAcceptedMessage; import org.apache.ignite.internal.processors.authentication.UserAuthenticateRequestMessage; @@ -181,6 +184,8 @@ import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; +import org.apache.ignite.internal.processors.cluster.ClusterFlags; +import org.apache.ignite.internal.processors.cluster.ClusterIdAndTag; import org.apache.ignite.internal.processors.cluster.ClusterMetricsUpdateMessage; import org.apache.ignite.internal.processors.cluster.NodeFullMetricsMessage; import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage; @@ -205,7 +210,7 @@ import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasMessage; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateAckMessage; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateMessage; -import org.apache.ignite.internal.processors.query.InlineSizesData; +import org.apache.ignite.internal.processors.plugin.PluginsData; import org.apache.ignite.internal.processors.query.QueryField; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse; @@ -213,6 +218,10 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse; import org.apache.ignite.internal.processors.query.messages.GridQueryKillRequest; import org.apache.ignite.internal.processors.query.messages.GridQueryKillResponse; +import org.apache.ignite.internal.processors.query.schema.message.ActiveProposals; +import org.apache.ignite.internal.processors.query.schema.message.InlineSizesData; +import org.apache.ignite.internal.processors.query.schema.message.QueryEntityExMessage; +import org.apache.ignite.internal.processors.query.schema.message.QueryEntityMessage; import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage; import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage; import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage; @@ -253,7 +262,7 @@ import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage; import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage; import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage; -import org.apache.ignite.spi.discovery.ObjectData; +import org.apache.ignite.spi.discovery.DataBagItem; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage; @@ -353,7 +362,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(GridCacheVersion.class); withNoSchema(GridCacheVersionEx.class); withNoSchema(WALPointer.class); - withNoSchemaResolvedClassLoader(ObjectData.class); + withNoSchemaResolvedClassLoader(DataBagItem.class); withSchemaResolvedClassLoader(GridTopicMessage.class); // [5700 - 5900]: Discovery originated messages. @@ -446,6 +455,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(CacheStatisticsClearMessage.class); withNoSchema(ClientCacheChangeDummyDiscoveryMessage.class); withNoSchema(DynamicCacheChangeBatch.class); + withNoSchema(InitialUsersData.class); // [10000 - 10200]: Transaction and lock related messages. Most of them originally comes from Communication. msgIdx = 10000; @@ -575,6 +585,9 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(CacheContinuousQueryBatchAck.class); withSchema(CacheContinuousQueryEntry.class); withNoSchema(InlineSizesData.class); + withNoSchema(ActiveProposals.class); + withNoSchema(QueryEntityMessage.class); + withNoSchema(QueryEntityExMessage.class); // [11200 - 11300]: Compute, distributed process messages. msgIdx = 11200; @@ -652,6 +665,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(MasterKeyChangeRequest.class); withNoSchema(GroupKeyEncrypted.class); withNoSchema(NodeEncryptionKeys.class); + withNoSchema(KnownEncryptionKeys.class); // [13000 - 13300]: Control, configuration, diagnostincs and other messages. msgIdx = 13000; @@ -665,6 +679,10 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchemaResolvedClassLoader(DynamicCacheChangeRequest.class); withNoSchema(PartitionHashRecord.class); withNoSchema(TransactionsHashRecord.class); + withNoSchema(ClusterIdAndTag.class); + withNoSchema(ClusterFlags.class); + withNoSchemaResolvedClassLoader(PluginsData.class); + withSchema(EnabledEvents.class); assert msgIdx <= MAX_MESSAGE_ID; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java index f0e1a7c7b627a..68ce157be8b5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java @@ -17,10 +17,10 @@ package org.apache.ignite.internal; -import java.io.Serializable; -import java.util.Map; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.plugin.PluginsData; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.plugin.PluginProvider; import org.apache.ignite.plugin.PluginValidationException; @@ -115,10 +115,10 @@ public PluginProvider plugin() { @Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node, JoiningNodeDiscoveryData discoData) { try { - Map map = discoData.joiningNodeData(); + PluginsData pluginsData = discoData.joiningNodeData(); - if (map != null) - plugin.validateNewNode(node, map.get(plugin.name())); + if (pluginsData != null && !F.isEmpty(pluginsData.data())) + plugin.validateNewNode(node, pluginsData.data().get(plugin.name())); else plugin.validateNewNode(node, null); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/QueryIndexMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/QueryIndexMessage.java index c42415be029b4..1bdb38fc3f0a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/QueryIndexMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/QueryIndexMessage.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.cache.query; -import java.io.Serializable; import java.util.LinkedHashMap; import org.apache.ignite.cache.QueryIndex; import org.apache.ignite.cache.QueryIndexType; @@ -28,10 +27,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageFactory; /** Message for {@link QueryIndex}. */ -public class QueryIndexMessage implements Serializable, Message { - /** */ - private static final long serialVersionUID = 0L; - +public class QueryIndexMessage implements Message { /** Index name. */ @Order(0) public String name; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java index 19c58d5d7b8cf..58ae3f3a65201 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java @@ -563,7 +563,7 @@ public void onLocalJoin() { } } - dataBag.addGridCommonData(ENCRYPTION_MGR.ordinal(), knownEncKeys); + dataBag.addGridCommonData(ENCRYPTION_MGR.ordinal(), new KnownEncryptionKeys(knownEncKeys)); } /** {@inheritDoc} */ @@ -571,20 +571,15 @@ public void onLocalJoin() { if (ctx.clientNode()) return; - Map encKeysFromCluster = (Map)data.commonData(); + KnownEncryptionKeys encKeysFromCluster = data.commonData(); - if (F.isEmpty(encKeysFromCluster)) + if (encKeysFromCluster == null || F.isEmpty(encKeysFromCluster.keys)) return; - for (Map.Entry entry : encKeysFromCluster.entrySet()) { + for (Map.Entry entry : encKeysFromCluster.keys.entrySet()) { int grpId = entry.getKey(); - GroupKeyEncrypted rmtKey; - - if (entry.getValue() instanceof GroupKeyEncrypted) - rmtKey = (GroupKeyEncrypted)entry.getValue(); - else - rmtKey = new GroupKeyEncrypted(INITIAL_KEY_ID, (byte[])entry.getValue()); + GroupKeyEncrypted rmtKey = entry.getValue(); GroupKey locGrpKey = getActiveKey(grpId); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/KnownEncryptionKeys.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/KnownEncryptionKeys.java new file mode 100644 index 0000000000000..05e4e8d34e23e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/KnownEncryptionKeys.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.encryption; + +import java.util.Map; +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** */ +public class KnownEncryptionKeys implements Message { + /** */ + @Order(0) + Map keys; + + /** */ + public KnownEncryptionKeys() {} + + /** */ + KnownEncryptionKeys(Map keys) { + this.keys = keys; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/EnabledEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/EnabledEvents.java new file mode 100644 index 0000000000000..17035c8416726 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/EnabledEvents.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.eventstorage; + +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * + */ +public class EnabledEvents implements Message { + /** */ + @Order(0) + int[] evts; + + /** */ + public EnabledEvents() { } + + /** + * @param events Enabled events. + */ + public EnabledEvents(int[] events) { + this.evts = events; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index 1f96aa0a316d9..d19f913aee999 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -1173,13 +1173,15 @@ private int[] copy(int[] arr) { /** {@inheritDoc} */ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { - if (data.commonData() == null) + EnabledEvents enabled = data.commonData(); + + if (enabled == null) return; if (ctx.clientNode()) return; - GridIntList clusterData = new GridIntList((int[])data.commonData()); + GridIntList clusterData = new GridIntList(enabled.evts); GridIntList nodeData = new GridIntList(enabledEvents()); GridIntList toEnable = new GridIntList(clusterData.size()); @@ -1207,9 +1209,7 @@ private int[] copy(int[] arr) { if (dataBag.isJoiningNodeClient() && dataBag.commonDataCollectedFor(EVENT_MGR.ordinal())) return; - int[] clusterData = enabledEvents(); - - dataBag.addGridCommonData(EVENT_MGR.ordinal(), clusterData); + dataBag.addGridCommonData(EVENT_MGR.ordinal(), new EnabledEvents(enabledEvents())); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java index af9baf67bf34c..7254437f997b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java @@ -60,7 +60,6 @@ import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -430,7 +429,7 @@ private boolean isLocalNodeCoordinator() { /** {@inheritDoc} */ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { - initUsrs = (InitialUsersData)data.commonData(); + initUsrs = data.commonData(); } /** {@inheritDoc} */ @@ -999,36 +998,6 @@ public void checkUserOperation(UserManagementOperation op) throws IgniteAccessCo } /** - * Initial data is collected on coordinator to send to join node. - */ - private static final class InitialUsersData implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** Users. */ - @GridToStringInclude - private final ArrayList usrs; - - /** Active user operations. */ - @GridToStringInclude - private final ArrayList activeOps; - - /** - * @param usrs Users. - * @param ops Active operations on cluster. - */ - InitialUsersData(Collection usrs, Collection ops) { - this.usrs = new ArrayList<>(usrs); - activeOps = new ArrayList<>(ops); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(InitialUsersData.class, this); - } - } - - /**i * */ private final class UserProposedListener implements CustomEventListener { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/InitialUsersData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/InitialUsersData.java new file mode 100644 index 0000000000000..261bf09293caf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/InitialUsersData.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.authentication; + +import java.util.ArrayList; +import java.util.Collection; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** Initial data is collected on coordinator to send to join node. */ +public class InitialUsersData implements Message { + /** Users. */ + @GridToStringInclude + @Order(0) + ArrayList usrs; + + /** Active user operations. */ + @GridToStringInclude + @Order(1) + ArrayList activeOps; + + /** */ + public InitialUsersData() { } + + /** + * @param usrs Users. + * @param ops Active operations on cluster. + */ + InitialUsersData(Collection usrs, Collection ops) { + this.usrs = new ArrayList<>(usrs); + activeOps = new ArrayList<>(ops); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(InitialUsersData.class, this); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserManagementOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserManagementOperation.java index 82d3b939bf58f..da6efb8ad9ee4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserManagementOperation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserManagementOperation.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.authentication; -import java.io.Serializable; import java.util.Objects; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.internal.S; @@ -27,10 +26,7 @@ /** * The operation with users. Used to deliver the information about requested operation to all server nodes. */ -public class UserManagementOperation implements Serializable, Message { - /** */ - private static final long serialVersionUID = 0L; - +public class UserManagementOperation implements Message { /** User. */ @Order(0) User usr; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 987be04ea2fe8..d5c7d8dce2536 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -1504,10 +1504,9 @@ public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { if (data.commonData() == null) return; - assert joinDiscoData != null || disconnectedState(); - assert data.commonData() instanceof CacheNodeCommonDiscoveryData : data; + CacheNodeCommonDiscoveryData cachesData = data.commonData(); - CacheNodeCommonDiscoveryData cachesData = (CacheNodeCommonDiscoveryData)data.commonData(); + assert joinDiscoData != null || disconnectedState(); // CacheGroup configurations that were created from local node configuration. Map locCacheGrps = new HashMap<>(registeredCacheGroups()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 8867e576ec8af..7affd7b7b5744 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.binary; import java.io.File; -import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; @@ -1464,7 +1463,7 @@ private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId, Binary res.put(e.getKey(), e.getValue()); } - dataBag.addGridCommonData(BINARY_PROC.ordinal(), (Serializable)res); + dataBag.addGridCommonData(BINARY_PROC.ordinal(), new BinaryMetadataVersionsData(res)); } } @@ -1530,10 +1529,10 @@ private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId, Binary /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - Map receivedData = (Map)data.commonData(); + BinaryMetadataVersionsData receivedData = data.commonData(); - if (receivedData != null) { - for (Map.Entry e : receivedData.entrySet()) { + if (receivedData != null && !F.isEmpty(receivedData.data)) { + for (Map.Entry e : receivedData.data.entrySet()) { BinaryMetadataVersionInfo metaVerInfo = e.getValue(); BinaryMetadataVersionInfo locMetaVerInfo = new BinaryMetadataVersionInfo(metaVerInfo.metadata(), diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterFlags.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterFlags.java new file mode 100644 index 0000000000000..ccca3822389a5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterFlags.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cluster; + +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** */ +public class ClusterFlags implements Message { + /** Update notifier enabled flag. */ + @Order(0) + boolean updateNotifierEnabled; + + /** */ + public ClusterFlags() { } + + /** + * @param updateNotifierEnabled Update notifier enabled flag. + */ + public ClusterFlags(boolean updateNotifierEnabled) { + this.updateNotifierEnabled = updateNotifierEnabled; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterIdAndTag.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterIdAndTag.java index 2b03f36377d60..17c49f2f07e85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterIdAndTag.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterIdAndTag.java @@ -20,20 +20,27 @@ import java.io.Serializable; import java.util.Objects; import java.util.UUID; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Container class to send cluster ID and tag in disco data and to write them atomically to metastorage. */ -public class ClusterIdAndTag implements Serializable { +public class ClusterIdAndTag implements Serializable, Message { /** */ private static final long serialVersionUID = 0L; /** */ - private final UUID id; + @Order(0) + UUID id; /** */ - private final String tag; + @Order(1) + String tag; + + /** */ + public ClusterIdAndTag() { } /** * @param id Cluster ID. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java index b7d78a906a5b7..2a777f0feb627 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java @@ -19,7 +19,6 @@ import java.io.Serializable; import java.util.Collection; -import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.Timer; @@ -76,6 +75,7 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.metric.MetricRegistry; import org.apache.ignite.mxbean.IgniteClusterMXBean; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; @@ -465,42 +465,31 @@ public IgniteFuture clientReconnectFuture() { /** {@inheritDoc} */ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) { - dataBag.addJoiningNodeData(CLUSTER_PROC.ordinal(), getDiscoveryData()); + dataBag.addJoiningNodeData(CLUSTER_PROC.ordinal(), new ClusterFlags(notifyEnabled.get())); } /** {@inheritDoc} */ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { - dataBag.addNodeSpecificData(CLUSTER_PROC.ordinal(), getDiscoveryData()); + dataBag.addNodeSpecificData(CLUSTER_PROC.ordinal(), new ClusterFlags(notifyEnabled.get())); - dataBag.addGridCommonData(CLUSTER_PROC.ordinal(), new ClusterIdAndTag(cluster.id(), cluster.tag())); - } - - /** - * @return Discovery data. - */ - private Serializable getDiscoveryData() { - HashMap map = new HashMap<>(2); - - map.put(ATTR_UPDATE_NOTIFIER_STATUS, notifyEnabled.get()); - - return map; + dataBag.addGridCommonData(CLUSTER_PROC.ordinal(), (Message)new ClusterIdAndTag(cluster.id(), cluster.tag())); } /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - Map nodeSpecData = data.nodeSpecificData(); + Map nodeSpecData = data.nodeSpecificData(); if (nodeSpecData != null) { - Boolean lstFlag = findLastFlag(nodeSpecData.values()); + Boolean lstFlag = findLastUpdateNotifierFlag(nodeSpecData.values()); if (lstFlag != null) notifyEnabled.set(lstFlag); } - ClusterIdAndTag commonData = (ClusterIdAndTag)data.commonData(); + ClusterIdAndTag commonData = data.commonData(); if (commonData != null) { - Serializable remoteClusterId = commonData.id(); + UUID remoteClusterId = commonData.id(); if (remoteClusterId != null) { if (locClusterId != null && !locClusterId.equals(remoteClusterId)) { @@ -510,7 +499,7 @@ private Serializable getDiscoveryData() { ", local cluster ID: " + locClusterId); } - locClusterId = (UUID)remoteClusterId; + locClusterId = remoteClusterId; } String remoteClusterTag = commonData.tag(); @@ -521,21 +510,17 @@ private Serializable getDiscoveryData() { } /** - * @param vals collection to seek through. + * @param flags Flags collection to seek through. */ - private Boolean findLastFlag(Collection vals) { - Boolean flag = null; + private Boolean findLastUpdateNotifierFlag(Collection flags) { + Boolean notifierFlag = null; - for (Serializable ser : vals) { - if (ser != null) { - Map map = (Map)ser; - - if (map.containsKey(ATTR_UPDATE_NOTIFIER_STATUS)) - flag = (Boolean)map.get(ATTR_UPDATE_NOTIFIER_STATUS); - } + for (ClusterFlags flag : flags) { + if (flag != null) + notifierFlag = flag.updateNotifierEnabled; } - return flag; + return notifierFlag; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index 337b4f51a70c3..3d9d20faf535e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -965,14 +965,16 @@ protected IgniteCheckedException concurrentStateChangeError(ClusterState state, /** {@inheritDoc} */ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { - if (data.commonData() instanceof DiscoveryDataClusterState) { + Serializable commonData = data.commonData(); + + if (commonData instanceof DiscoveryDataClusterState) { if (globalState != null && globalState.baselineTopology() != null) //node with BaselineTopology is not allowed to join mixed cluster // (where some nodes don't support BaselineTopology) throw new IgniteException("Node with BaselineTopology cannot join" + " mixed cluster running in compatibility mode"); - globalState = (DiscoveryDataClusterState)data.commonData(); + globalState = (DiscoveryDataClusterState)commonData; compatibilityMode = true; @@ -981,7 +983,7 @@ protected IgniteCheckedException concurrentStateChangeError(ClusterState state, return; } - BaselineStateAndHistoryData stateDiscoData = (BaselineStateAndHistoryData)data.commonData(); + BaselineStateAndHistoryData stateDiscoData = (BaselineStateAndHistoryData)commonData; if (stateDiscoData != null) { DiscoveryDataClusterState state = stateDiscoData.globalState; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 489f13e71fee8..7291e631c2f31 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -528,8 +528,7 @@ private Map copyLocalInfos(Map l @Override public void onGridDataReceived(GridDiscoveryData data) { if (immutableDiscoCustomMsg) { if (data.commonData() != null) { - ContinuousRoutinesCommonDiscoveryData commonData = - (ContinuousRoutinesCommonDiscoveryData)data.commonData(); + ContinuousRoutinesCommonDiscoveryData commonData = data.commonData(); for (ContinuousRoutineInfo routineInfo : commonData.startedRoutines) { if (routinesInfo.routineExists(routineInfo.routineId)) @@ -542,11 +541,11 @@ private Map copyLocalInfos(Map l } } else { - Map nodeSpecData = data.nodeSpecificData(); + Map nodeSpecData = data.nodeSpecificData(); if (nodeSpecData != null) { - for (Map.Entry e : nodeSpecData.entrySet()) - onDiscoveryDataReceivedMutable((DiscoveryData)e.getValue()); + for (DiscoveryData val : nodeSpecData.values()) + onDiscoveryDataReceivedMutable(val); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java index 8946672364edb..a39d17b926ce8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java @@ -332,28 +332,26 @@ private final class MappingAcceptedListener implements CustomEventListener> mappings = (List>)data.commonData(); - - processIncomingMappings(mappings); + processIncomingMappings(data.commonData()); } /** * @param mappings Incoming marshaller mappings. */ - private void processIncomingMappings(List> mappings) { - marshallerCtx.onMappingDataReceived(log, mappings); + private void processIncomingMappings(@Nullable MarshallerMappingsData mappings) { + if (mappings != null) + marshallerCtx.onMappingDataReceived(log, mappings.mappings()); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java index 2207b1c21f47c..37bd14d511e81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java @@ -37,4 +37,11 @@ public MarshallerMappingsData() {} public MarshallerMappingsData(List> mappings) { this.mappings = mappings; } + + /** + * @return Mappings. + */ + public List> mappings() { + return mappings; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java index 1362fec2e7f5b..ad9022f2e9747 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java @@ -958,7 +958,7 @@ private DistributedMetaStorageKeyValuePair[] localFullData() { lock.writeLock().lock(); try { - DistributedMetaStorageClusterNodeData nodeData = (DistributedMetaStorageClusterNodeData)data.commonData(); + DistributedMetaStorageClusterNodeData nodeData = data.commonData(); if (nodeData != null) { if (nodeData.fullData != null) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java index 59bcabd2bfa6f..00e8e968da14f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java @@ -157,24 +157,24 @@ public T createComponent(Class cls) { /** {@inheritDoc} */ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) { - Serializable pluginsData = getDiscoveryData(dataBag.joiningNodeId()); + PluginsData discoData = getDiscoveryData(dataBag.joiningNodeId()); - if (pluginsData != null) - dataBag.addJoiningNodeData(PLUGIN.ordinal(), pluginsData); + if (!F.isEmpty(discoData.data)) + dataBag.addJoiningNodeData(PLUGIN.ordinal(), discoData); } /** {@inheritDoc} */ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { - Serializable pluginsData = getDiscoveryData(dataBag.joiningNodeId()); + PluginsData discoData = getDiscoveryData(dataBag.joiningNodeId()); - if (pluginsData != null) - dataBag.addNodeSpecificData(PLUGIN.ordinal(), pluginsData); + if (!F.isEmpty(discoData.data)) + dataBag.addNodeSpecificData(PLUGIN.ordinal(), discoData); } /** * @param joiningNodeId Joining node id. */ - private Serializable getDiscoveryData(UUID joiningNodeId) { + private PluginsData getDiscoveryData(UUID joiningNodeId) { HashMap pluginsData = null; for (Map.Entry e : plugins.entrySet()) { @@ -188,31 +188,27 @@ private Serializable getDiscoveryData(UUID joiningNodeId) { } } - return pluginsData; + return new PluginsData(pluginsData); } /** {@inheritDoc} */ @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) { - if (data.hasJoiningNodeData()) { - Map pluginsData = data.joiningNodeData(); + PluginsData discoData = data.joiningNodeData(); - applyPluginsData(data.joiningNodeId(), pluginsData); - } + if (discoData != null && !F.isEmpty(discoData.data)) + applyPluginsData(data.joiningNodeId(), discoData.data); } /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - Map nodeSpecificData = data.nodeSpecificData(); + Map nodeSpecificData = data.nodeSpecificData(); if (nodeSpecificData != null) { UUID joiningNodeId = data.joiningNodeId(); - for (Serializable v : nodeSpecificData.values()) { - if (v != null) { - Map pluginsData = (Map)v; - - applyPluginsData(joiningNodeId, pluginsData); - } + for (PluginsData pluginsData : nodeSpecificData.values()) { + if (pluginsData != null && !F.isEmpty(pluginsData.data)) + applyPluginsData(joiningNodeId, pluginsData.data); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/PluginsData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/PluginsData.java new file mode 100644 index 0000000000000..f422e35b74a82 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/PluginsData.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.plugin; + +import java.io.Serializable; +import java.util.Map; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.MarshallableMessage; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.jetbrains.annotations.Nullable; + +/** */ +public class PluginsData implements MarshallableMessage { + /** Original plugins data. */ + @Nullable Map data; + + /** Serialized plugins data. */ + @Order(0) + @Nullable byte[] dataBytes; + + /** */ + public PluginsData() { } + + /** + * @param data Plugins data. + */ + public PluginsData(@Nullable Map data) { + this.data = data; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + if (!F.isEmpty(data)) + dataBytes = U.marshal(marsh, data); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + if (dataBytes != null) + data = U.unmarshal(marsh, dataBytes, clsLdr); + } + + /** + * @return Original plugins data. + */ + public @Nullable Map data() { + return data; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 854e8a17e5880..8fa969740cbb5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.query; -import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -118,6 +117,8 @@ import org.apache.ignite.internal.processors.query.schema.SchemaOperationWorker; import org.apache.ignite.internal.processors.query.schema.SchemaSqlViewManager; import org.apache.ignite.internal.processors.query.schema.management.SchemaManager; +import org.apache.ignite.internal.processors.query.schema.message.ActiveProposals; +import org.apache.ignite.internal.processors.query.schema.message.InlineSizesData; import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage; import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage; import org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage; @@ -181,9 +182,6 @@ */ @SuppressWarnings("rawtypes") public class GridQueryProcessor extends GridProcessorAdapter { - /** */ - private static final String INLINE_SIZES_DISCO_BAG_KEY = "inline_sizes"; - /** Warn message if some indexes have different inline sizes on the nodes. */ public static final String INLINE_SIZES_DIFFER_WARN_MSG_FORMAT = "Inline sizes on local node and node %s are different. " + "Please drop and create again these indexes to avoid performance problems with SQL queries. Problem indexes: %s"; @@ -473,36 +471,28 @@ public void onCacheReconnect() throws IgniteCheckedException { /** {@inheritDoc} */ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { - LinkedHashMap proposals; + ActiveProposals proposals; // Collect active proposals. synchronized (stateMux) { - proposals = new LinkedHashMap<>(activeProposals); + proposals = new ActiveProposals(activeProposals); } dataBag.addGridCommonData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), proposals); // We should send inline index sizes information only to server nodes. if (!dataBag.isJoiningNodeClient()) { - HashMap nodeSpecificMap = new HashMap<>(); - - Serializable oldVal = nodeSpecificMap.put(INLINE_SIZES_DISCO_BAG_KEY, collectSecondaryIndexesInlineSize()); - - assert oldVal == null : oldVal; - - dataBag.addNodeSpecificData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), nodeSpecificMap); + dataBag.addNodeSpecificData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(), + new InlineSizesData(secondaryIndexesInlineSize())); } } /** {@inheritDoc} */ @Override public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) { - Object joiningNodeData = data.joiningNodeData(); - - if (joiningNodeData instanceof InlineSizesData) { - Map joiningNodeIndexesInlineSize = ((InlineSizesData)joiningNodeData).sizes; + InlineSizesData joiningNodeData = data.joiningNodeData(); - checkInlineSizes(secondaryIndexesInlineSize(), joiningNodeIndexesInlineSize, data.joiningNodeId()); - } + if (joiningNodeData != null) + checkInlineSizes(secondaryIndexesInlineSize(), joiningNodeData.sizes(), data.joiningNodeId()); } /** {@inheritDoc} */ @@ -514,30 +504,27 @@ public void onCacheReconnect() throws IgniteCheckedException { /** {@inheritDoc} */ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { // Preserve proposals. - LinkedHashMap activeProposals = - (LinkedHashMap)data.commonData(); + ActiveProposals discoData = data.commonData(); // Process proposals as if they were received as regular discovery messages. - if (!F.isEmpty(activeProposals)) { + if (discoData != null && !F.isEmpty(discoData.activeProposals())) { synchronized (stateMux) { - for (SchemaProposeDiscoveryMessage activeProposal : activeProposals.values()) + for (SchemaProposeDiscoveryMessage activeProposal : discoData.activeProposals().values()) onSchemaProposeDiscovery0(activeProposal); } } - if (!F.isEmpty(data.nodeSpecificData())) { + Map nodedSpecificData = data.nodeSpecificData(); + + if (!F.isEmpty(nodedSpecificData)) { Map indexesInlineSize = secondaryIndexesInlineSize(); if (!F.isEmpty(indexesInlineSize)) { - for (UUID nodeId : data.nodeSpecificData().keySet()) { - Serializable serializable = data.nodeSpecificData().get(nodeId); + for (UUID nodeId : nodedSpecificData.keySet()) { + InlineSizesData inlineSizesData = nodedSpecificData.get(nodeId); - assert serializable instanceof Map : serializable; - - Map nodeSpecificData = (Map)serializable; - - if (nodeSpecificData.containsKey(INLINE_SIZES_DISCO_BAG_KEY)) - checkInlineSizes(indexesInlineSize, (Map)nodeSpecificData.get(INLINE_SIZES_DISCO_BAG_KEY), nodeId); + if (inlineSizesData != null) + checkInlineSizes(indexesInlineSize, inlineSizesData.sizes(), nodeId); } } } @@ -685,16 +672,6 @@ private void checkInlineSizes(Map local, Map r } } - /** - * @return Serializable information about secondary indexes inline size. - * @see #secondaryIndexesInlineSize() - */ - private Serializable collectSecondaryIndexesInlineSize() { - Map map = secondaryIndexesInlineSize(); - - return map instanceof Serializable ? (Serializable)map : new HashMap<>(map); - } - /** * Process schema propose message from discovery thread. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryField.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryField.java index b24e523a2b785..29659e60da36f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryField.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryField.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.query; -import java.io.Serializable; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; @@ -25,10 +24,7 @@ /** * Query field metadata. */ -public class QueryField implements Serializable, Message { - /** */ - private static final long serialVersionUID = 0L; - +public class QueryField implements Message { /** Field name. */ @Order(0) String name; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/ActiveProposals.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/ActiveProposals.java new file mode 100644 index 0000000000000..ea46ee533fb90 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/ActiveProposals.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema.message; + +import java.util.LinkedHashMap; +import java.util.UUID; +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * Holder for active schema change propose discovery messages. + */ +public class ActiveProposals implements Message { + /** Active proposals. */ + @Order(0) + LinkedHashMap activeProposals; + + /** */ + public ActiveProposals() { + // No-op. + } + + /** + * @param activeProposals Active proposals. + */ + public ActiveProposals(LinkedHashMap activeProposals) { + this.activeProposals = activeProposals; + } + + /** + * @return Active proposals. + */ + public LinkedHashMap activeProposals() { + return activeProposals; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/InlineSizesData.java similarity index 87% rename from modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/InlineSizesData.java index eb3813501f670..f0d0f69708cb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/InlineSizesData.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query; +package org.apache.ignite.internal.processors.query.schema.message; import java.util.Map; import org.apache.ignite.internal.Order; @@ -36,4 +36,11 @@ public InlineSizesData() {} public InlineSizesData(Map sizes) { this.sizes = sizes; } + + /** + * @return Inline sizes. + */ + public Map sizes() { + return sizes; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/QueryEntityExMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/QueryEntityExMessage.java new file mode 100644 index 0000000000000..42eca7b2d122f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/QueryEntityExMessage.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema.message; + +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.processors.query.QueryEntityEx; + +/** + * Message for {@link QueryEntityEx} transfer. + */ +public class QueryEntityExMessage extends QueryEntityMessage { + /** Whether to preserve order specified by 'keyFields' or not. */ + @Order(0) + boolean preserveKeysOrder; + + /** Whether a primary key should be autocreated or not. */ + @Order(1) + boolean implicitPk; + + /** Whether absent PK parts should be filled with defaults or not. */ + @Order(2) + boolean fillAbsentPKsWithDefaults; + + /** INLINE_SIZE for PK index. */ + @Order(3) + int pkInlineSize; + + /** INLINE_SIZE for affinity field index. */ + @Order(4) + int affKeyInlineSize; + + /** Whether query entity was created by SQL. */ + @Order(5) + boolean sql; + + /** */ + public QueryEntityExMessage() { } + + /** + * @param qryEntity Original {@link QueryEntity}. + */ + public QueryEntityExMessage(QueryEntityEx qryEntity) { + super(qryEntity); + + preserveKeysOrder = qryEntity.isPreserveKeysOrder(); + implicitPk = qryEntity.implicitPk(); + fillAbsentPKsWithDefaults = qryEntity.fillAbsentPKsWithDefaults(); + + pkInlineSize = qryEntity.getPrimaryKeyInlineSize() != null ? qryEntity.getPrimaryKeyInlineSize() : -1; + + affKeyInlineSize = qryEntity.getAffinityKeyInlineSize() != null ? qryEntity.getAffinityKeyInlineSize() : -1; + + sql = qryEntity.sql(); + } + + /** {@inheritDoc} */ + @Override public QueryEntity toEntity() { + QueryEntityEx qryEntity = new QueryEntityEx(super.toEntity()); + + qryEntity.setNotNullFields(notNullFields); + qryEntity.setPreserveKeysOrder(preserveKeysOrder); + qryEntity.implicitPk(implicitPk); + qryEntity.fillAbsentPKsWithDefaults(fillAbsentPKsWithDefaults); + qryEntity.sql(sql); + + if (pkInlineSize != -1) + qryEntity.setPrimaryKeyInlineSize(pkInlineSize); + + if (affKeyInlineSize != -1) + qryEntity.setAffinityKeyInlineSize(affKeyInlineSize); + + return qryEntity; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/QueryEntityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/QueryEntityMessage.java new file mode 100644 index 0000000000000..255bfb1cd34ce --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/QueryEntityMessage.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema.message; + +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.internal.MarshallableMessage; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.cache.query.QueryIndexMessage; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; + +/** + * Message for {@link QueryEntity} transfer. + */ +public class QueryEntityMessage implements MarshallableMessage { + /** Key type. */ + @Order(0) + String keyType; + + /** Value type. */ + @Order(1) + String valType; + + /** Key name. Can be used in field list to denote the key as a whole. */ + @Order(2) + String keyFieldName; + + /** Value name. Can be used in field list to denote the entire value. */ + @Order(3) + String valFieldName; + + /** Fields available for query. A map from field name to type name. */ + @Order(4) + LinkedHashMap fields; + + /** Set of field names that belong to the key. */ + @Order(5) + String[] keyFields; + + /** Aliases. */ + @Order(6) + Map aliases; + + /** Collection of query indexes. */ + @Order(7) + Collection idxs; + + /** Table name. */ + @Order(8) + String tableName; + + /** Fields that must have non-null value. NB: DO NOT remove underscore to avoid clashes with QueryEntityEx. */ + @Order(9) + Set notNullFields; + + /** Fields default values. */ + Map dfltFieldValues; + + /** Serialized form of {@link #dfltFieldValues}. */ + @Order(10) + byte[] dfltFieldValuesBytes; + + /** Precision(Maximum length) for fields. */ + @Order(11) + Map fieldsPrecision; + + /** Scale for fields. */ + @Order(12) + Map fieldsScale; + + /** */ + public QueryEntityMessage() { } + + /** + * @param qryEntity Original {@link QueryEntity}. + */ + public QueryEntityMessage(QueryEntity qryEntity) { + keyType = qryEntity.getKeyType(); + valType = qryEntity.getValueType(); + + keyFieldName = qryEntity.getKeyFieldName(); + valFieldName = qryEntity.getValueFieldName(); + + fields = qryEntity.getFields(); + aliases = qryEntity.getAliases(); + + if (!F.isEmpty(qryEntity.getKeyFields())) + keyFields = qryEntity.getKeyFields().toArray(U.EMPTY_STRS); + + if (!F.isEmpty(qryEntity.getIndexes())) + idxs = F.viewReadOnly(qryEntity.getIndexes(), QueryIndexMessage::new); + + tableName = qryEntity.getTableName(); + + notNullFields = qryEntity.getNotNullFields(); + dfltFieldValues = qryEntity.getDefaultFieldValues(); + fieldsPrecision = qryEntity.getFieldsPrecision(); + fieldsScale = qryEntity.getFieldsScale(); + } + + /** + * @return Original {@link QueryEntity}. + */ + public QueryEntity toEntity() { + return new QueryEntity() + .setKeyType(keyType) + .setValueType(valType) + .setKeyFieldName(keyFieldName) + .setValueFieldName(valFieldName) + .setFields(fields) + .setKeyFields(!F.isEmpty(keyFields) ? new LinkedHashSet<>(List.of(keyFields)) : null) + .setAliases(aliases) + .setIndexes(!F.isEmpty(idxs) ? F.viewReadOnly(idxs, QueryIndexMessage::queryIndex) : null) + .setTableName(tableName) + .setNotNullFields(notNullFields) + .setDefaultFieldValues(dfltFieldValues) + .setFieldsPrecision(fieldsPrecision) + .setFieldsScale(fieldsScale); + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + if (!F.isEmpty(dfltFieldValues)) + dfltFieldValuesBytes = U.marshal(marsh, dfltFieldValues); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + if (!F.isEmpty(dfltFieldValuesBytes)) + dfltFieldValues = U.unmarshal(marsh, dfltFieldValuesBytes, clsLdr); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java index 1b832abab0fe3..a7914327d60fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.query.schema.message; -import java.io.Serializable; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; @@ -30,10 +29,7 @@ /** * Abstract discovery message for schema operations. */ -public abstract class SchemaAbstractDiscoveryMessage extends DiscoveryCustomMessage implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - +public abstract class SchemaAbstractDiscoveryMessage extends DiscoveryCustomMessage { /** Operation. */ @GridToStringInclude @Order(0) @@ -41,11 +37,11 @@ public abstract class SchemaAbstractDiscoveryMessage extends DiscoveryCustomMess /** Error message. */ @Order(1) - transient String errMsg; + String errMsg; /** Error code. */ @Order(2) - transient int errCode; + int errCode; /** Error. */ SchemaOperationException err; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java index 9ff555b729a49..fc95da0c2a115 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java @@ -27,9 +27,6 @@ * Schema change finish discovery message. */ public class SchemaFinishDiscoveryMessage extends SchemaAbstractDiscoveryMessage { - /** */ - private static final long serialVersionUID = 0L; - /** No-op flag. */ @Order(0) boolean nop; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java index 1f761c1e81ed2..bef0b3b2e94d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java @@ -28,9 +28,6 @@ * Schema change propose discovery message. */ public class SchemaProposeDiscoveryMessage extends SchemaAbstractDiscoveryMessage { - /** */ - private static final long serialVersionUID = 0L; - /** Cache deployment ID. */ @Order(0) IgniteUuid depId; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAbstractOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAbstractOperation.java index 5264fbe2c6a2a..c3a83e31888f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAbstractOperation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAbstractOperation.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.query.schema.operation; -import java.io.Serializable; import java.util.UUID; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.internal.S; @@ -26,10 +25,7 @@ /** * Abstract operation on schema. */ -public abstract class SchemaAbstractOperation implements Serializable, Message { - /** */ - private static final long serialVersionUID = 0L; - +public abstract class SchemaAbstractOperation implements Message { /** Operation ID. */ @Order(0) UUID opId; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAddQueryEntityOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAddQueryEntityOperation.java index ebb23aff57ea9..04e0c7fd6a50f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAddQueryEntityOperation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAddQueryEntityOperation.java @@ -19,26 +19,20 @@ import java.util.Collection; import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.QueryEntity; -import org.apache.ignite.internal.MarshallableMessage; import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.internal.processors.query.QueryEntityEx; +import org.apache.ignite.internal.processors.query.schema.message.QueryEntityExMessage; +import org.apache.ignite.internal.processors.query.schema.message.QueryEntityMessage; +import org.apache.ignite.internal.util.typedef.F; /** * Enabling indexing on cache operation. */ -public class SchemaAddQueryEntityOperation extends SchemaAbstractOperation implements MarshallableMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private Collection entities; - - /** Serialized form of query entities. */ +public class SchemaAddQueryEntityOperation extends SchemaAbstractOperation { + /** Query entities. */ @Order(0) - transient byte[] qryEntitiesBytes; + Collection entities; /** */ @Order(1) @@ -68,7 +62,7 @@ public SchemaAddQueryEntityOperation( boolean sqlEscape ) { super(opId, cacheName, schemaName); - this.entities = entities; + this.entities = F.viewReadOnly(entities, this::makeEntityMessage); this.qryParallelism = qryParallelism; this.sqlEscape = sqlEscape; } @@ -77,7 +71,7 @@ public SchemaAddQueryEntityOperation( * @return Collection of query entities. */ public Collection entities() { - return entities; + return F.viewReadOnly(entities, QueryEntityMessage::toEntity); } /** @@ -94,19 +88,12 @@ public boolean isSqlEscape() { return sqlEscape; } - /** {@inheritDoc} */ - @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { - if (entities != null) - qryEntitiesBytes = U.marshal(marsh, entities); - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { - if (qryEntitiesBytes != null) { - entities = U.unmarshal(marsh, qryEntitiesBytes, clsLdr); - - qryEntitiesBytes = null; - } + /** + * @param qryEntity Query entity. + * @return The appropriate query entity message. + */ + private QueryEntityMessage makeEntityMessage(QueryEntity qryEntity) { + return qryEntity instanceof QueryEntityEx ? new QueryEntityExMessage((QueryEntityEx)qryEntity) + : new QueryEntityMessage(qryEntity); } - } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAlterTableAddColumnOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAlterTableAddColumnOperation.java index 11d77d31c5b1d..481e8f26fdd4b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAlterTableAddColumnOperation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAlterTableAddColumnOperation.java @@ -27,9 +27,6 @@ * Schema index drop operation. */ public class SchemaAlterTableAddColumnOperation extends SchemaAbstractOperation { - /** */ - private static final long serialVersionUID = 0L; - /** Target table name. */ @Order(0) String tblName; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAlterTableDropColumnOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAlterTableDropColumnOperation.java index cf48709e921ad..1aca33c57178d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAlterTableDropColumnOperation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAlterTableDropColumnOperation.java @@ -26,9 +26,6 @@ * Schema alter table drop column operation. */ public class SchemaAlterTableDropColumnOperation extends SchemaAbstractOperation { - /** */ - private static final long serialVersionUID = 0L; - /** Target table name. */ @Order(0) String tblName; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexCreateOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexCreateOperation.java index 056ccaae028ea..7dcde0c297e9c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexCreateOperation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexCreateOperation.java @@ -29,9 +29,6 @@ * Schema index create operation. */ public class SchemaIndexCreateOperation extends SchemaIndexAbstractOperation { - /** */ - private static final long serialVersionUID = 0L; - /** Table name. */ @Order(0) String tblName; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexDropOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexDropOperation.java index 6962fbb0ee5b2..4d0db89a638d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexDropOperation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexDropOperation.java @@ -25,9 +25,6 @@ * Schema index drop operation. */ public class SchemaIndexDropOperation extends SchemaIndexAbstractOperation { - /** */ - private static final long serialVersionUID = 0L; - /** Index name. */ @Order(0) String idxName; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java index e7f59549ea4ac..34118541ece6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java @@ -389,7 +389,7 @@ private void cancelDeployedServices() { if (data.commonData() == null) return; - ServiceProcessorCommonDiscoveryData clusterData = (ServiceProcessorCommonDiscoveryData)data.commonData(); + ServiceProcessorCommonDiscoveryData clusterData = data.commonData(); for (ServiceInfo desc : clusterData.registeredServices()) { try { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DataBagItem.java similarity index 68% rename from modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java rename to modules/core/src/main/java/org/apache/ignite/spi/discovery/DataBagItem.java index f9da59bffe415..d60f900c9e083 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DataBagItem.java @@ -29,8 +29,8 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; -/** Wrapper message for serializable data. */ -public class ObjectData implements MarshallableMessage { +/** Wrapper message for serializable data in a {@link DiscoveryDataBag}. */ +public class DataBagItem implements MarshallableMessage { /** */ @GridToStringInclude private Serializable data; @@ -40,16 +40,41 @@ public class ObjectData implements MarshallableMessage { @Order(0) byte[] dataBytes; + /** Unmarshalling error. */ + IgniteCheckedException unmarshallError; + /** */ - public ObjectData() {} + public DataBagItem() {} /** * @param data Original data. */ - public ObjectData(Serializable data) { + public DataBagItem(Serializable data) { this.data = data; } + /** + * @param msg Message. + * @param Type of data. + * + * @return Original message or data unwrapped from an DataBagItem wrapper. + */ + static @Nullable T unwrapIfNecessary(@Nullable Message msg) { + if (msg == null) + return null; + + return msg instanceof DataBagItem ? ((DataBagItem)msg).unwrap() : (T)msg; + } + + /** + * @param Type of data. + * + * @return Original data unwrapped from a message. + */ + private T unwrap() { + return (T)(data); + } + /** {@inheritDoc} */ @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { if (data != null) @@ -59,24 +84,24 @@ public ObjectData(Serializable data) { /** {@inheritDoc} */ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { if (dataBytes != null) { - data = U.unmarshal(marsh, dataBytes, clsLdr); - - dataBytes = null; + try { + data = U.unmarshal(marsh, dataBytes, clsLdr); + } + catch (IgniteCheckedException e) { + unmarshallError = e; + } } } /** - * @param msg Message. - * @param Type of data. - * - * @return Original data unwrapped from a message. + * @return Unmarshalling error. */ - public static T unwrap(@Nullable Message msg) { - return msg != null ? (T)(((ObjectData)msg).data) : null; + public IgniteCheckedException unmarshallError() { + return unmarshallError; } /** {@inheritDoc} */ @Override public String toString() { - return S.toString(ObjectData.class, this); + return S.toString(DataBagItem.class, this); } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java index 58e41738265d6..f1f0fad7c81da 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.UUID; import org.apache.ignite.internal.GridComponent; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; @@ -59,11 +60,17 @@ public interface GridDiscoveryData { /** @return ID fo the joining node. */ UUID joiningNodeId(); - /** @return Common for all cluster nodes discovery data that is sent to the joining node. */ - Serializable commonData(); + /** + * @param Data type. + * @return Common for all cluster nodes discovery data that is sent to the joining node. + */ + T commonData(); - /** @return Discovery data that is mapped to the particular cluster node and sent to the joining node. */ - Map nodeSpecificData(); + /** + * @param Data type. + * @return Discovery data that is mapped to the particular cluster node and sent to the joining node. + */ + Map nodeSpecificData(); } /** @@ -87,7 +94,7 @@ private final class JoiningNodeDiscoveryDataImpl implements JoiningNodeDiscovery @Override @Nullable public T joiningNodeData() { Message dataMsg = joiningNodeData.get(cmpId); - return dataMsg instanceof ObjectData ? ObjectData.unwrap(dataMsg) : (T)dataMsg; + return DataBagItem.unwrapIfNecessary(dataMsg); } /** @@ -106,7 +113,7 @@ private final class GridDiscoveryDataImpl implements GridDiscoveryData { private int cmpId; /** */ - private Map nodeSpecificData + private Map nodeSpecificData = new LinkedHashMap<>(DiscoveryDataBag.this.nodeSpecificData.size()); /** {@inheritDoc} */ @@ -115,16 +122,16 @@ private final class GridDiscoveryDataImpl implements GridDiscoveryData { } /** {@inheritDoc} */ - @Override @Nullable public Serializable commonData() { + @Override @Nullable public T commonData() { if (commonData != null) - return commonData.get(cmpId); + return DataBagItem.unwrapIfNecessary(commonData.get(cmpId)); return null; } /** {@inheritDoc} */ - @Override public Map nodeSpecificData() { - return nodeSpecificData; + @Override public Map nodeSpecificData() { + return F.viewReadOnly(nodeSpecificData, DataBagItem::unwrapIfNecessary); } /** @@ -142,7 +149,7 @@ private void componentId(int cmpId) { private void reinitNodeSpecData(int cmpId) { nodeSpecificData.clear(); - for (Map.Entry> e : DiscoveryDataBag.this.nodeSpecificData.entrySet()) { + for (Map.Entry> e : DiscoveryDataBag.this.nodeSpecificData.entrySet()) { if (e.getValue() != null && e.getValue().containsKey(cmpId)) nodeSpecificData.put(e.getKey(), e.getValue().get(cmpId)); } @@ -156,7 +163,7 @@ private void reinitNodeSpecData(int cmpId) { private static final UUID DEFAULT_KEY = null; /** */ - private UUID joiningNodeId; + private final UUID joiningNodeId; /** * Component IDs with already initialized common discovery data. @@ -164,13 +171,13 @@ private void reinitNodeSpecData(int cmpId) { private Set cmnDataInitializedCmps; /** */ - private Map joiningNodeData = new HashMap<>(); + private final Map joiningNodeData = new HashMap<>(); /** */ - private Map commonData = new HashMap<>(); + private final Map commonData = new HashMap<>(); /** */ - private Map> nodeSpecificData = new LinkedHashMap<>(); + private final Map> nodeSpecificData = new LinkedHashMap<>(); /** */ private JoiningNodeDiscoveryDataImpl newJoinerData; @@ -246,7 +253,7 @@ public JoiningNodeDiscoveryData newJoinerDiscoveryData(int cmpId) { * @param data Serializable data. */ public void addJoiningNodeData(Integer cmpId, Serializable data) { - joiningNodeData.put(cmpId, new ObjectData(data)); + joiningNodeData.put(cmpId, new DataBagItem(data)); } /** @@ -259,19 +266,35 @@ public void addJoiningNodeData(Integer cmpId, Message data) { /** * @param cmpId Component ID. - * @param data Data. + * @param data Serializable data. */ public void addGridCommonData(Integer cmpId, Serializable data) { + commonData.put(cmpId, new DataBagItem(data)); + } + + /** + * @param cmpId Component ID. + * @param data Message data. + */ + public void addGridCommonData(Integer cmpId, Message data) { commonData.put(cmpId, data); } /** * @param cmpId Component ID. - * @param data Data. + * @param data Serializable data. */ public void addNodeSpecificData(Integer cmpId, Serializable data) { + addNodeSpecificData(cmpId, new DataBagItem(data)); + } + + /** + * @param cmpId Component ID. + * @param data Message data. + */ + public void addNodeSpecificData(Integer cmpId, Message data) { if (!nodeSpecificData.containsKey(DEFAULT_KEY)) - nodeSpecificData.put(DEFAULT_KEY, new HashMap()); + nodeSpecificData.put(DEFAULT_KEY, new HashMap<>()); nodeSpecificData.get(DEFAULT_KEY).put(cmpId, data); } @@ -296,14 +319,14 @@ public void joiningNodeData(Map joinNodeData) { /** * @param cmnData Cmn data. */ - public void commonData(Map cmnData) { + public void commonData(Map cmnData) { commonData.putAll(cmnData); } /** * @param nodeSpecData Node specific data. */ - public void nodeSpecificData(Map> nodeSpecData) { + public void nodeSpecificData(Map> nodeSpecData) { nodeSpecificData.putAll(nodeSpecData); } @@ -316,12 +339,12 @@ public Map joiningNodeData() { * @return Discovery data for each Ignite component that is aggregated from the cluster nodes and sent to the * joining node. */ - public Map commonData() { + public Map commonData() { return commonData; } /** @return Discovery data that belongs to the current cluster node and is sent to the joining node. */ - @Nullable public Map localNodeSpecificData() { + @Nullable public Map localNodeSpecificData() { return nodeSpecificData.get(DEFAULT_KEY); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 82c012c2a1a94..8c97d91c587b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -124,7 +124,6 @@ import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; import org.apache.ignite.spi.IgniteSpiThread; -import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryNotification; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiListener; @@ -2581,36 +2580,6 @@ void add(TcpDiscoveryAbstractMessage msg) { if (addedMsg.gridDiscoveryData() != null) addedMsg.clearDiscoveryData(); } - else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { - TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg; - - if (addFinishMsg.clientDiscoData() != null) { - addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(addFinishMsg); - - msg = addFinishMsg; - - DiscoveryDataPacket discoData = addFinishMsg.clientDiscoData(); - - Set mrgdCmnData = new HashSet<>(); - Set mrgdSpecData = new HashSet<>(); - - boolean allMerged = false; - - for (TcpDiscoveryAbstractMessage msg0 : msgs) { - - if (msg0 instanceof TcpDiscoveryNodeAddFinishedMessage) { - DiscoveryDataPacket existingDiscoData = - ((TcpDiscoveryNodeAddFinishedMessage)msg0).clientDiscoData(); - - if (existingDiscoData != null) - allMerged = discoData.mergeDataFrom(existingDiscoData, mrgdCmnData, mrgdSpecData); - } - - if (allMerged) - break; - } - } - } else if (msg instanceof TcpDiscoveryNodeLeftMessage) clearClientAddFinished(msg.creatorNodeId()); else if (msg instanceof TcpDiscoveryNodeFailedMessage) @@ -4768,9 +4737,18 @@ private IgniteNodeValidationResult validateByIgniteComponents(TcpDiscoveryJoinRe /** */ private IgniteNodeValidationResult validateByIgniteComponentsWithJoiningNodeData(TcpDiscoveryJoinRequestMessage req) { - DiscoveryDataBag data = req.gridDiscoveryData().bagWithJoiningNodeData(); + DiscoveryDataPacket packet = req.gridDiscoveryData(); + + List errs = packet.checkUnmarshallingErrors(spi.ignite()); + + IgniteNodeValidationResult err = F.isEmpty(errs) ? + spi.getSpiContext().validateNode(req.node(), packet.bagWithJoiningNodeData()) : + new IgniteNodeValidationResult( + req.node().id(), + errs.get(0).getMessage() // We need only first error. + ); - return spi.getSpiContext().validateNode(req.node(), data); + return err; } /** */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index a7f68fb1cf542..7345f4ff7fed3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -2068,21 +2068,14 @@ DiscoveryDataPacket collectExchangeData(DiscoveryDataPacket dataPacket) { assert dataPacket != null; assert dataPacket.joiningNodeId() != null; - //create data bag, pass it to exchange.collect DiscoveryDataBag dataBag = dataPacket.bagForDataCollection(); exchange.collect(dataBag); - //marshall collected bag into packet, return packet if (dataPacket.joiningNodeId().equals(locNode.id())) dataPacket.addJoiningNodeData(dataBag); else - dataPacket.marshalGridNodeData( - dataBag, - locNode.id(), - marshaller(), - ignite.configuration().getNetworkCompressionLevel(), - log); + dataPacket.addNodeData(dataBag, locNode.id()); return dataPacket; } @@ -2097,19 +2090,20 @@ protected void onExchange(DiscoveryDataPacket dataPacket, ClassLoader clsLdr) { DiscoveryDataBag dataBag; + List errs = dataPacket.checkUnmarshallingErrors(ignite()); + if (dataPacket.joiningNodeId().equals(locNode.id())) { - try { - dataBag = dataPacket.unmarshalGridData(marshaller(), clsLdr, locNode.clientRouterNodeId() != null, log); - } - catch (IgniteCheckedException e) { + if (!F.isEmpty(errs)) { if (ignite() instanceof IgniteEx) { FailureProcessor failure = ((IgniteEx)ignite()).context().failure(); - failure.process(new FailureContext(CRITICAL_ERROR, e)); + failure.process(new FailureContext(CRITICAL_ERROR, errs.get(0))); } - throw new IgniteException(e); + throw new IgniteException(errs.get(0)); } + + dataBag = dataPacket.bagWithNodeData(); } else dataBag = dataPacket.bagWithJoiningNodeData(); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java index 4ded165df7082..2ebf7e09d1ee3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java @@ -16,38 +16,32 @@ */ package org.apache.ignite.spi.discovery.tcp.internal; -import java.io.Serializable; -import java.util.Arrays; -import java.util.Collection; +import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.Compress; import org.apache.ignite.internal.GridComponent; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.discovery.DataBagItem; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC; /** - * Carries discovery data in marshalled form + * Carries discovery data in form of {@link Message} * and allows convenient way of converting it to and from {@link DiscoveryDataBag} objects. */ -public class DiscoveryDataPacket implements Serializable, Message { - /** Local file header signature (read as a little-endian number). */ - private static final int ZIP_HEADER_SIGNATURE = 0x04034b50; - - /** */ - private static final long serialVersionUID = 0L; - +public class DiscoveryDataPacket implements Message { /** */ @Order(0) UUID joiningNodeId; @@ -59,11 +53,13 @@ public class DiscoveryDataPacket implements Serializable, Message { /** */ @Order(2) - Map commonData = new HashMap<>(); + @Compress + Map commonData = new HashMap<>(); /** */ @Order(3) - Map> nodeSpecificData = new HashMap<>(); + @Compress + Map> nodeSpecificData = new HashMap<>(); /** */ private transient boolean joiningNodeClient; @@ -90,24 +86,16 @@ public UUID joiningNodeId() { /** * @param bag Bag. * @param nodeId Node id. - * @param marsh Marsh. - * @param log Logger. */ - public void marshalGridNodeData(DiscoveryDataBag bag, UUID nodeId, Marshaller marsh, - int compressionLevel, IgniteLogger log) { - marshalData(bag.commonData(), commonData, marsh, compressionLevel, log); + public void addNodeData(DiscoveryDataBag bag, UUID nodeId) { + if (bag.commonData() != null) + commonData.putAll(bag.commonData()); - Map locNodeSpecificData = bag.localNodeSpecificData(); + Map locNodeSpecificData = bag.localNodeSpecificData(); if (locNodeSpecificData != null) { - Map marshLocNodeSpecificData = U.newHashMap(locNodeSpecificData.size()); - - marshalData(locNodeSpecificData, marshLocNodeSpecificData, marsh, compressionLevel, log); - - filterDuplicatedData(marshLocNodeSpecificData); - - if (!marshLocNodeSpecificData.isEmpty()) - nodeSpecificData.put(nodeId, marshLocNodeSpecificData); + if (!locNodeSpecificData.isEmpty()) + nodeSpecificData.put(nodeId, locNodeSpecificData); } } @@ -119,40 +107,15 @@ public void addJoiningNodeData(DiscoveryDataBag bag) { joiningNodeData.putAll(bag.joiningNodeData()); } - /** - * @param marsh Marsh. - * @param clsLdr Class loader. - * @param clientNode Client node. - * @param log Logger. - */ - public DiscoveryDataBag unmarshalGridData( - Marshaller marsh, - ClassLoader clsLdr, - boolean clientNode, - IgniteLogger log - ) throws IgniteCheckedException { + /** */ + public DiscoveryDataBag bagWithNodeData() { DiscoveryDataBag dataBag = new DiscoveryDataBag(joiningNodeId, joiningNodeClient); - if (commonData != null && !commonData.isEmpty()) - dataBag.commonData(unmarshalData(commonData, marsh, clsLdr, clientNode, log, true)); - - if (nodeSpecificData != null && !nodeSpecificData.isEmpty()) { - Map> unmarshNodeSpecData = U.newLinkedHashMap(nodeSpecificData.size()); - - for (Map.Entry> nodeBinEntry : nodeSpecificData.entrySet()) { - Map nodeBinData = nodeBinEntry.getValue(); - - if (nodeBinData == null || nodeBinData.isEmpty()) - continue; - - unmarshNodeSpecData.put( - nodeBinEntry.getKey(), - unmarshalData(nodeBinData, marsh, clsLdr, clientNode, log, true) - ); - } + if (!F.isEmpty(commonData)) + dataBag.commonData(commonData); - dataBag.nodeSpecificData(unmarshNodeSpecData); - } + if (!F.isEmpty(nodeSpecificData)) + dataBag.nodeSpecificData(F.view(nodeSpecificData, uuid -> !F.isEmpty(nodeSpecificData.get(uuid)))); return dataBag; } @@ -184,199 +147,49 @@ public boolean hasDataFromNode(UUID nodeId) { } /** - * @param existingDataPacket Existing data packet. - * @param mrgdCmnDataKeys Mrgd cmn data keys. - * @param mrgdSpecifDataKeys Mrgd specif data keys. - */ - public boolean mergeDataFrom( - DiscoveryDataPacket existingDataPacket, - Collection mrgdCmnDataKeys, - Collection mrgdSpecifDataKeys - ) { - if (commonData.size() != mrgdCmnDataKeys.size()) { - for (Map.Entry e : commonData.entrySet()) { - if (!mrgdCmnDataKeys.contains(e.getKey())) { - byte[] data = existingDataPacket.commonData.get(e.getKey()); - - if (data != null && Arrays.equals(e.getValue(), data)) { - e.setValue(data); - - boolean add = mrgdCmnDataKeys.add(e.getKey()); - - assert add; - - if (mrgdCmnDataKeys.size() == commonData.size()) - break; - } - } - } - } - - if (nodeSpecificData.size() != mrgdSpecifDataKeys.size()) { - for (Map.Entry> e : nodeSpecificData.entrySet()) { - if (!mrgdSpecifDataKeys.contains(e.getKey())) { - Map data = existingDataPacket.nodeSpecificData.get(e.getKey()); - - if (data != null && mapsEqual(e.getValue(), data)) { - e.setValue(data); - - boolean add = mrgdSpecifDataKeys.add(e.getKey()); - - assert add; - - if (mrgdSpecifDataKeys.size() == nodeSpecificData.size()) - break; - } - } - } - } - - return (mrgdCmnDataKeys.size() == commonData.size()) && (mrgdSpecifDataKeys.size() == nodeSpecificData.size()); - } - - /** - * @param m1 first map to compare. - * @param m2 second map to compare. + * Collects and dumps catched unmarshalling errors. + * + * @param ignite Ignite. */ - private boolean mapsEqual(Map m1, Map m2) { - if (m1 == m2) - return true; + public List checkUnmarshallingErrors(Ignite ignite) { + List> items = Stream.concat( + Stream.concat(joiningNodeData.entrySet().stream(), commonData.entrySet().stream()), + nodeSpecificData.values() + .stream() + .flatMap(m -> m.entrySet().stream())) + .filter(e -> e.getValue() instanceof DataBagItem) + .collect(Collectors.toList()); - if (m1.size() == m2.size()) { - for (Map.Entry e : m1.entrySet()) { - byte[] data = m2.get(e.getKey()); + List errs = new ArrayList<>(items.size()); - if (!Arrays.equals(e.getValue(), data)) - return false; - } + for (Map.Entry item : items) { + int cmpId = item.getKey(); + DataBagItem dataBagItem = (DataBagItem)item.getValue(); - return true; - } + IgniteCheckedException e = dataBagItem.unmarshallError(); - return false; - } - - /** - * @param src Source. - * @param marsh Marsh. - * @param clsLdr Class loader. - * @param clientNode Client node. - * @param log Logger. - * @param panic Throw unmarshalling if {@code true}. - * @throws IgniteCheckedException If {@code panic} is {@code True} and unmarshalling failed. - */ - private Map unmarshalData( - Map src, - Marshaller marsh, - ClassLoader clsLdr, - boolean clientNode, - IgniteLogger log, - boolean panic - ) throws IgniteCheckedException { - Map res = U.newHashMap(src.size()); - - for (Map.Entry binEntry : src.entrySet()) { - try { - Serializable compData = isZipped(binEntry.getValue()) ? - U.unmarshalZip(marsh, binEntry.getValue(), clsLdr) : - U.unmarshal(marsh, binEntry.getValue(), clsLdr); - res.put(binEntry.getKey(), compData); - } - catch (IgniteCheckedException e) { - if (CONTINUOUS_PROC.ordinal() == binEntry.getKey() && - X.hasCause(e, ClassNotFoundException.class) && clientNode - ) { - U.warn(log, "Failed to unmarshal continuous query remote filter on client node. Can be ignored."); + if (e != null) { + if (CONTINUOUS_PROC.ordinal() == cmpId && X.hasCause(e, ClassNotFoundException.class) && + ignite.configuration().isClientMode()) { + U.warn(ignite.log(), "Failed to unmarshal continuous query remote filter on client node. " + + "Can be ignored."); continue; } - else if (binEntry.getKey() < GridComponent.DiscoveryDataExchangeType.VALUES.length) { - U.error(log, - "Failed to unmarshal discovery data for component: " + - GridComponent.DiscoveryDataExchangeType.VALUES[binEntry.getKey()], - e - ); + else if (cmpId < GridComponent.DiscoveryDataExchangeType.VALUES.length) { + U.error(ignite.log(), "Failed to unmarshal discovery data for component: " + + GridComponent.DiscoveryDataExchangeType.VALUES[cmpId], e); } else { - U.warn(log, "Failed to unmarshal discovery data." + - " Component " + binEntry.getKey() + " is not found."); + U.warn(ignite.log(), "Failed to unmarshal discovery data." + + " Component " + cmpId + " is not found.", e); } - if (panic) - throw e; + errs.add(e); } } - return res; - } - - /** - * @param val Value to check. - * @return {@code true} if value is zipped. - */ - private boolean isZipped(byte[] val) { - return val != null && val.length > 3 && makeInt(val) == ZIP_HEADER_SIGNATURE; - } - - /** - * Make int from first 4 bytes in little-endian byte order. - * - * @param b Source of bytes. - * @return Made int. - */ - private static int makeInt(byte[] b) { - return (((b[3]) << 24) | - ((b[2] & 0xff) << 16) | - ((b[1] & 0xff) << 8) | - ((b[0] & 0xff))); - } - - /** - * @param src Source. - * @param target Target. - * @param marsh Marsh. - * @param log Logger. - */ - private void marshalData( - Map src, - Map target, - Marshaller marsh, - int compressionLevel, - IgniteLogger log - ) { - // may happen if nothing was collected from components, - // corresponding map (for common data or for node specific data) left null - if (src == null) - return; - - for (Map.Entry entry : src.entrySet()) { - try { - target.put(entry.getKey(), U.zip(U.marshal(marsh, entry.getValue()), compressionLevel)); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal discovery data " + - "[comp=" + entry.getKey() + ", data=" + entry.getValue() + ']', e); - } - } - } - - /** */ - private void filterDuplicatedData(Map discoData) { - for (Map existingData : nodeSpecificData.values()) { - Iterator> it = discoData.entrySet().iterator(); - - while (it.hasNext()) { - Map.Entry discoDataEntry = it.next(); - - byte[] curData = existingData.get(discoDataEntry.getKey()); - - if (Arrays.equals(curData, discoDataEntry.getValue())) - it.remove(); - } - - if (discoData.isEmpty()) - break; - } + return errs; } /** diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java index 30a7ec8029837..49a9ccc63c113 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.IgniteSpiContext; import org.apache.ignite.spi.IgniteSpiException; @@ -226,6 +227,11 @@ private class DelegatedDiscoverySpi extends IgniteSpiAdapter implements IgniteDi delegate.resolveCommunicationFailure(node, err); } + /** {@inheritDoc} */ + @Override public MessageFactoryProvider messageFactoryProvider() { + return delegate.messageFactoryProvider(); + } + /** Delegated discovery data exchange. */ private class DelegatedDiscoverySpiDataExchange implements DiscoverySpiDataExchange { /** Discovery data exchange delegate. */ diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index ad98a2936b98f..1f2080e270691 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -58,6 +58,8 @@ import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage; +import org.apache.ignite.internal.processors.marshaller.MappedName; +import org.apache.ignite.internal.processors.marshaller.MarshallerMappingsData; import org.apache.ignite.internal.processors.port.GridPortRecord; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.lang.GridAbsPredicate; @@ -2440,12 +2442,12 @@ private static class TestTcpDiscoveryMarshallerDataSpi extends TcpDiscoverySpi { exchange.onExchange(dataBag); } - private List getAllMappings(DiscoveryDataBag bag) { - return (List)bag.commonData().get(MARSHALLER_PROC.ordinal()); + private MarshallerMappingsData getAllMappings(DiscoveryDataBag bag) { + return (MarshallerMappingsData)bag.commonData().get(MARSHALLER_PROC.ordinal()); } - private Map getJavaMappings(List allMappings) { - return (Map)allMappings.get(JAVA_ID); + private Map getJavaMappings(MarshallerMappingsData allMappings) { + return allMappings.mappings().get(JAVA_ID); } }); } diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java index a186aed526567..b551506c8ad32 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java @@ -17,28 +17,28 @@ package org.apache.ignite.spi.discovery.zk.internal; -import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.plugin.extensions.communication.Message; /** * */ class ZkBulkJoinContext { /** */ - List>> nodes; + List> nodes; /** * @param nodeEvtData Node event data. * @param discoData Discovery data for node. */ - void addJoinedNode(ZkJoinedNodeEvtData nodeEvtData, Map discoData) { + void addJoinedNode(ZkJoinedNodeEvtData nodeEvtData, Map discoData) { if (nodes == null) nodes = new ArrayList<>(); - nodes.add(new T2<>(nodeEvtData, discoData)); + nodes.add(new T2<>(nodeEvtData, new ZkDiscoData(discoData))); } /** diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoData.java new file mode 100644 index 0000000000000..c679790dca3ad --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoData.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.util.Map; +import org.apache.ignite.internal.Order; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; + +/** Data bag data holder. */ +public class ZkDiscoData implements Message { + /** */ + @Order(0) + Map data; + + /** + * Default constructor for {@link MessageFactory}. + */ + public ZkDiscoData() { + // No-op. + } + + /** + * @param data Discovery data. + */ + public ZkDiscoData(Map data) { + this.data = data; + } + + /** + * @return Data. + */ + public Map data() { + return data; + } +} diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java index 39d89b32af878..fd1937c0ffedd 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java @@ -28,5 +28,6 @@ public class ZkMessageFactory implements MessageFactoryProvider { factory.register(401, ZkCommunicationErrorResolveStartMessage::new, new ZkCommunicationErrorResolveStartMessageSerializer()); factory.register(402, ZkForceNodeFailMessage::new, new ZkForceNodeFailMessageSerializer()); factory.register(403, ZkNoServersMessage::new, new ZkNoServersMessageSerializer()); + factory.register(404, ZkDiscoData::new, new ZkDiscoDataSerializer()); } } diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 0d0531d1a9a2c..c6be3e5dcc8d3 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -78,6 +78,7 @@ import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.security.SecurityCredentials; import org.apache.ignite.spi.IgniteNodeValidationResult; @@ -1781,7 +1782,7 @@ private void generateBulkJoinEvent(TreeMap curTop, Z long evtId = rtState.evtsData.evtIdGen; - List>> nodes = joinCtx.nodes; + List> nodes = joinCtx.nodes; assert nodes != null && !nodes.isEmpty(); @@ -1793,11 +1794,9 @@ private void generateBulkJoinEvent(TreeMap curTop, Z Map dupDiscoData = null; for (int i = 0; i < nodeCnt; i++) { - T2> nodeEvtData = nodes.get(i); + T2 nodeEvtData = nodes.get(i); - Map discoData = nodeEvtData.get2(); - - byte[] discoDataBytes = U.marshal(marsh, discoData); + byte[] discoDataBytes = msgParser.marshalZip(nodeEvtData.get2()); Long dupDataNode = null; @@ -2251,7 +2250,7 @@ private void addJoinedNode( exchange.collect(collectBag); - Map commonData = collectBag.commonData(); + Map commonData = collectBag.commonData(); Object old = curTop.put(joinedNode.order(), joinedNode); @@ -3021,12 +3020,11 @@ private void processLocalJoin(ZkDiscoveryEventsData evtsData, byte[] discoDataBytes = dataForJoined.discoveryDataForNode(locNode.order()); - Map commonDiscoData = - marsh.unmarshal(discoDataBytes, U.resolveClassLoader(spi.ignite().configuration())); + ZkDiscoData commonDiscoData = msgParser.unmarshalZip(discoDataBytes); DiscoveryDataBag dataBag = new DiscoveryDataBag(locNode.id(), locNode.isClient()); - dataBag.commonData(commonDiscoData); + dataBag.commonData(commonDiscoData.data()); exchange.onExchange(dataBag);