diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java index 6953d8b853891..4a8f556781cf7 100644 --- a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/OperationContext.java @@ -322,7 +322,7 @@ private static class AttributeValueHolder { } /** Allows to change multiple attribute values in a single update operation and skip updates that changes nothing. */ - private static class ContextUpdater { + static class ContextUpdater { /** */ private static final int INIT_UPDATES_CAPACITY = 3; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index 9da592635d229..cc0ab3e112b7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -666,6 +666,10 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(PartitionHashRecord.class); withNoSchema(TransactionsHashRecord.class); + // [13400 - 13600]: Operation context messages. + msgIdx = 13400; + withNoSchema(DistributedOperationContextMessage.class); + assert msgIdx <= MAX_MESSAGE_ID; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java new file mode 100644 index 0000000000000..42d5c7eda859a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/DistributedOperationContextMessage.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.internal.thread.context.DistributedOperationContextManager; +import org.apache.ignite.internal.thread.context.OperationContext; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * Transport for {@link OperationContext} distributed attributes. + * + * @see DistributedOperationContextManager + */ +public class DistributedOperationContextMessage implements Message { + /** Values of operation context attributes. */ + @Order(0) + public Message[] vals; + + /** Bitmap of effective attributes ids. */ + @Order(1) + public byte idBitmap; + + /** Empty constructor for serialization purposes. */ + public DistributedOperationContextMessage() { + // No-op. + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java new file mode 100644 index 0000000000000..cc39aff8e05d9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/context/DistributedOperationContextManager.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.thread.context; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.DistributedOperationContextMessage; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.jetbrains.annotations.Nullable; + +/** + * A manager of {@link OperationContextAttribute} which are required to be propagated through a cluster. + * Has own attributes ids compared to a local node's {@link OperationContext}. + * + * @see OperationContext + * @see DistributedOperationContextMessage + */ +public class DistributedOperationContextManager { + /** */ + private static final DistributedOperationContextManager INSTANCE = new DistributedOperationContextManager(); + + /** Maximal number of supported distributed attributes. */ + static final byte MAX_DISTRIBUTED_ATTR_CNT = Byte.SIZE; + + /** Registered distributed attributes by their cluster-wide id. */ + private final Map> attrs = new ConcurrentSkipListMap<>(); + + /** */ + public static DistributedOperationContextManager instance() { + return INSTANCE; + } + + /** + * Creates and registers a distributable {@link OperationContextAttribute}. + * + * @param id Cluster-wide id of a distributed operation context attribute. + * @param initVal The attribute's unitial value. + */ + public OperationContextAttribute createDistributedAttribute(byte id, @Nullable T initVal) { + assert id >= 0 && id < MAX_DISTRIBUTED_ATTR_CNT : "Invalid distributed attributed id [id=" + id + ']'; + + return (OperationContextAttribute)attrs.compute(id, (id0, attr0) -> { + if (attr0 != null) + throw new IgniteException("Duplicated distributed attribute id [id=" + id + ']'); + + return OperationContextAttribute.newInstance(initVal); + }); + } + + /** + * Requests current {@link OperationContext} for its effective attributes and collects ones which are also registered + * as distbibued attributes. + * + * @return A message to send current effective distributed attributes. {@code null}, if there are no + * effective attributes in {@link OperationContext} or none of them is a distributed attribute. + */ + public @Nullable DistributedOperationContextMessage collectDistributedAttributes() { + DistributedOperationContextMessage res = null; + List vals = null; + + for (Map.Entry> e : attrs.entrySet()) { + OperationContextAttribute attr = e.getValue(); + + Message curVal = OperationContext.get(attr); + + if (curVal != attr.initialValue()) { + if (res == null) { + res = new DistributedOperationContextMessage(); + + vals = new ArrayList<>(MAX_DISTRIBUTED_ATTR_CNT / 2); + } + + byte mask = (byte)(1 << e.getKey()); + + assert (res.idBitmap & mask) == 0; + + vals.add(curVal); + res.idBitmap |= mask; + } + } + + if (res != null) + res.vals = vals.toArray(vals.toArray(new Message[vals.size()])); + + return res; + } + + /** Sets the received distributed operation context attributes (if any) into current {@link OperationContext}. */ + public Scope restoreDistributedAttributes(@Nullable DistributedOperationContextMessage msg) { + if (msg == null) + return Scope.NOOP_SCOPE; + + assert msg.idBitmap != 0; + assert !F.isEmpty(msg.vals); + assert msg.vals.length <= MAX_DISTRIBUTED_ATTR_CNT; + + OperationContext.ContextUpdater updater = OperationContext.ContextUpdater.create(); + + for (byte valIdx = 0, maskIdx = 0; valIdx < msg.vals.length; ++valIdx) { + Message curVal = msg.vals[valIdx]; + + while ((msg.idBitmap & (1 << maskIdx)) == 0) + ++maskIdx; + + updater.set((OperationContextAttribute)attrs.get(maskIdx++), curVal); + } + + return updater.apply(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index a0e1a20048786..81c0bf9c26f5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -70,6 +70,8 @@ import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable; +import org.apache.ignite.internal.thread.context.DistributedOperationContextManager; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; @@ -1310,6 +1312,8 @@ private class SocketWriter extends IgniteSpiThread { * @param msg Message. */ private void sendMessage(TcpDiscoveryAbstractMessage msg) { + msg.opCtxMsg = DistributedOperationContextManager.instance().collectDistributedAttributes(); + synchronized (mux) { queue.add(msg); @@ -1757,276 +1761,290 @@ private MessageWorker(IgniteLogger log) { blockingSectionEnd(); } - if (msg instanceof JoinTimeout) { - int joinCnt0 = ((JoinTimeout)msg).joinCnt; - - if (joinCnt == joinCnt0) { - if (state == STARTING) { - joinError(new IgniteSpiException("Join process timed out, did not receive response for " + - "join request (consider increasing 'joinTimeout' configuration property) " + - "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']')); - + if (msg instanceof TcpDiscoveryAbstractMessage msg0 && msg0.opCtxMsg != null) { + try (Scope ignored = DistributedOperationContextManager.instance().restoreDistributedAttributes(msg0.opCtxMsg)) { + if (processRawMessage(msg)) break; - } - else if (state == DISCONNECTED) { - if (log.isDebugEnabled()) - log.debug("Failed to reconnect, local node segmented " + - "[joinTimeout=" + spi.joinTimeout + ']'); - - state = SEGMENTED; - - notifyDiscovery( - EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); - } } } - else if (msg == SPI_STOP) { - boolean connected = state == CONNECTED; - - state = STOPPED; + else if (processRawMessage(msg)) + break; + } + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + catch (Throwable t) { + if (spi.ignite() instanceof IgniteEx) + ((IgniteEx)spi.ignite()).context().failure().process(new FailureContext(CRITICAL_ERROR, t)); + } + finally { + SocketStream currSock = this.currSock; - assert spi.getSpiContext().isStopping(); + if (currSock != null) + U.closeQuiet(currSock.socket()); - if (connected && currSock != null) { - TcpDiscoveryNodeLeftMessage leftMsg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId()); + if (joinLatch.getCount() > 0) + joinError(new IgniteSpiException("Some error in join process.")); // This should not occur. - leftMsg.client(true); + if (reconnector != null) { + reconnector.cancel(); - Span rootSpan = tracing.create(TraceableMessagesTable.traceName(leftMsg.getClass())) - .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> locNode.id().toString()) - .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID), - () -> locNode.consistentId().toString()) - .addLog(() -> "Created"); + reconnector.join(); + } + } + } - leftMsg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan)); + /** @return {@code True} if the cycle stop is required. */ + private boolean processRawMessage(Object msg) throws InterruptedException { + if (msg instanceof JoinTimeout) { + int joinCnt0 = ((JoinTimeout)msg).joinCnt; - sockWriter.sendMessage(leftMsg); + if (joinCnt == joinCnt0) { + if (state == STARTING) { + joinError(new IgniteSpiException("Join process timed out, did not receive response for " + + "join request (consider increasing 'joinTimeout' configuration property) " + + "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']')); - rootSpan.addLog(() -> "Sent").end(); - } - else - leaveLatch.countDown(); + return true; } - else if (msg == SPI_RECONNECT) { - if (state == CONNECTED) { - if (reconnector != null) { - reconnector.cancel(); - reconnector.join(); + else if (state == DISCONNECTED) { + if (log.isDebugEnabled()) + log.debug("Failed to reconnect, local node segmented " + + "[joinTimeout=" + spi.joinTimeout + ']'); - reconnector = null; - } + state = SEGMENTED; - sockWriter.forceLeave(); - sockReader.forceStopRead(); + notifyDiscovery( + EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); + } + } + } + else if (msg == SPI_STOP) { + boolean connected = state == CONNECTED; - currSock = null; + state = STOPPED; - queue.clear(); + assert spi.getSpiContext().isStopping(); - onDisconnected(); + if (connected && currSock != null) { + TcpDiscoveryNodeLeftMessage leftMsg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId()); - UUID newId = UUID.randomUUID(); + leftMsg.client(true); - U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due " + - "to network problems [newId=" + newId + - ", prevId=" + locNode.id() + - ", locNode=" + locNode + ']'); + Span rootSpan = tracing.create(TraceableMessagesTable.traceName(leftMsg.getClass())) + .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> locNode.id().toString()) + .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID), + () -> locNode.consistentId().toString()) + .addLog(() -> "Created"); - locNode.onClientDisconnected(newId); + leftMsg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan)); - throttleClientReconnect(); + sockWriter.sendMessage(leftMsg); - tryJoin(); - } + rootSpan.addLog(() -> "Sent").end(); + } + else + leaveLatch.countDown(); + } + else if (msg == SPI_RECONNECT) { + if (state == CONNECTED) { + if (reconnector != null) { + reconnector.cancel(); + reconnector.join(); + + reconnector = null; } - else if (msg instanceof TcpDiscoveryNodeFailedMessage && - ((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) { - TcpDiscoveryNodeFailedMessage msg0 = (TcpDiscoveryNodeFailedMessage)msg; - assert msg0.force() : msg0; + sockWriter.forceLeave(); + sockReader.forceStopRead(); - forceFailMsg = msg0; - } - else if (msg instanceof SocketClosedMessage) { - if (((SocketClosedMessage)msg).sock == currSock) { - Socket sock = currSock.sock; + currSock = null; - InetSocketAddress prevAddr = new InetSocketAddress(sock.getInetAddress(), sock.getPort()); + queue.clear(); - currSock = null; + onDisconnected(); - boolean join = joinLatch.getCount() > 0; + UUID newId = UUID.randomUUID(); - if (spi.getSpiContext().isStopping() || state == SEGMENTED) { - leaveLatch.countDown(); + U.quietAndWarn(log, "Local node will try to reconnect to cluster with new id due " + + "to network problems [newId=" + newId + + ", prevId=" + locNode.id() + + ", locNode=" + locNode + ']'); - if (join) { - joinError(new IgniteSpiException("Failed to connect to cluster: socket closed.")); + locNode.onClientDisconnected(newId); - break; - } - } - else { - if (forceFailMsg != null) { - if (log.isDebugEnabled()) { - log.debug("Connection closed, local node received force fail message, " + - "will not try to restore connection"); - } + throttleClientReconnect(); - queue.addFirst(SPI_RECONNECT_FAILED); - } - else { - if (log.isDebugEnabled()) - log.debug("Connection closed, will try to restore connection."); + tryJoin(); + } + } + else if (msg instanceof TcpDiscoveryNodeFailedMessage && + ((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id())) { + TcpDiscoveryNodeFailedMessage msg0 = (TcpDiscoveryNodeFailedMessage)msg; - assert reconnector == null; + assert msg0.force() : msg0; - reconnector = new Reconnector(join, prevAddr); - reconnector.start(); - } - } + forceFailMsg = msg0; + } + else if (msg instanceof SocketClosedMessage) { + if (((SocketClosedMessage)msg).sock == currSock) { + Socket sock = currSock.sock; + + InetSocketAddress prevAddr = new InetSocketAddress(sock.getInetAddress(), sock.getPort()); + + currSock = null; + + boolean join = joinLatch.getCount() > 0; + + if (spi.getSpiContext().isStopping() || state == SEGMENTED) { + leaveLatch.countDown(); + + if (join) { + joinError(new IgniteSpiException("Failed to connect to cluster: socket closed.")); + + return true; } } - else if (msg == SPI_RECONNECT_FAILED) { - if (reconnector != null) { - reconnector.cancel(); - reconnector.join(); + else { + if (forceFailMsg != null) { + if (log.isDebugEnabled()) { + log.debug("Connection closed, local node received force fail message, " + + "will not try to restore connection"); + } - reconnector = null; + queue.addFirst(SPI_RECONNECT_FAILED); } - else - assert forceFailMsg != null; - - if (spi.isClientReconnectDisabled()) { - if (state != SEGMENTED && state != STOPPED) { - if (forceFailMsg != null) { - U.quietAndWarn(log, "Local node was dropped from cluster due to network problems " + - "[nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + - ", msg=" + forceFailMsg.warning() + ']'); - } + else { + if (log.isDebugEnabled()) + log.debug("Connection closed, will try to restore connection."); - if (log.isDebugEnabled()) { - log.debug("Failed to restore closed connection, reconnect disabled, " + - "local node segmented [networkTimeout=" + spi.netTimeout + ']'); - } + assert reconnector == null; - state = SEGMENTED; + reconnector = new Reconnector(join, prevAddr); + reconnector.start(); + } + } + } + } + else if (msg == SPI_RECONNECT_FAILED) { + if (reconnector != null) { + reconnector.cancel(); + reconnector.join(); - notifyDiscovery( - EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); - } + reconnector = null; + } + else + assert forceFailMsg != null; + + if (spi.isClientReconnectDisabled()) { + if (state != SEGMENTED && state != STOPPED) { + if (forceFailMsg != null) { + U.quietAndWarn(log, "Local node was dropped from cluster due to network problems " + + "[nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + + ", msg=" + forceFailMsg.warning() + ']'); } - else { - if (state == STARTING || state == CONNECTED) { - if (log.isDebugEnabled()) { - log.debug("Failed to restore closed connection, will try to reconnect " + - "[networkTimeout=" + spi.netTimeout + - ", joinTimeout=" + spi.joinTimeout + - ", failMsg=" + forceFailMsg + ']'); - } - onDisconnected(); - } + if (log.isDebugEnabled()) { + log.debug("Failed to restore closed connection, reconnect disabled, " + + "local node segmented [networkTimeout=" + spi.netTimeout + ']'); + } - UUID newId = UUID.randomUUID(); + state = SEGMENTED; - if (forceFailMsg != null) { - long delay = IgniteSystemProperties.getLong(IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY, - DFLT_DISCO_FAILED_CLIENT_RECONNECT_DELAY); + notifyDiscovery( + EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); + } + } + else { + if (state == STARTING || state == CONNECTED) { + if (log.isDebugEnabled()) { + log.debug("Failed to restore closed connection, will try to reconnect " + + "[networkTimeout=" + spi.netTimeout + + ", joinTimeout=" + spi.joinTimeout + + ", failMsg=" + forceFailMsg + ']'); + } - if (delay > 0) { - U.quietAndWarn(log, "Local node was dropped from cluster due to network problems, " + - "will try to reconnect with new id after " + delay + "ms (reconnect delay " + - "can be changed using IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY system " + - "property) [" + - "newId=" + newId + - ", prevId=" + locNode.id() + - ", locNode=" + locNode + - ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + - ", msg=" + forceFailMsg.warning() + ']'); + onDisconnected(); + } - Thread.sleep(delay); - } - else { - U.quietAndWarn(log, "Local node was dropped from cluster due to network problems, " + - "will try to reconnect with new id [" + - "newId=" + newId + - ", prevId=" + locNode.id() + - ", locNode=" + locNode + - ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + - ", msg=" + forceFailMsg.warning() + ']'); - } + UUID newId = UUID.randomUUID(); - forceFailMsg = null; - } - else if (log.isInfoEnabled()) { - log.info("Client node disconnected from cluster, will try to reconnect with new id " + - "[newId=" + newId + ", prevId=" + locNode.id() + ", locNode=" + locNode + ']'); - } + if (forceFailMsg != null) { + long delay = IgniteSystemProperties.getLong(IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY, + DFLT_DISCO_FAILED_CLIENT_RECONNECT_DELAY); - locNode.onClientDisconnected(newId); + if (delay > 0) { + U.quietAndWarn(log, "Local node was dropped from cluster due to network problems, " + + "will try to reconnect with new id after " + delay + "ms (reconnect delay " + + "can be changed using IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY system " + + "property) [" + + "newId=" + newId + + ", prevId=" + locNode.id() + + ", locNode=" + locNode + + ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + + ", msg=" + forceFailMsg.warning() + ']'); - tryJoin(); + Thread.sleep(delay); } + else { + U.quietAndWarn(log, "Local node was dropped from cluster due to network problems, " + + "will try to reconnect with new id [" + + "newId=" + newId + + ", prevId=" + locNode.id() + + ", locNode=" + locNode + + ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId() + + ", msg=" + forceFailMsg.warning() + ']'); + } + + forceFailMsg = null; + } + else if (log.isInfoEnabled()) { + log.info("Client node disconnected from cluster, will try to reconnect with new id " + + "[newId=" + newId + ", prevId=" + locNode.id() + ", locNode=" + locNode + ']'); } - else { - TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg; - if (joining()) { - IgniteSpiException err = null; + locNode.onClientDisconnected(newId); - if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage) - err = spi.duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg); - else if (discoMsg instanceof TcpDiscoveryAuthFailedMessage) - err = spi.authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg); - //TODO: https://issues.apache.org/jira/browse/IGNITE-9829 - else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage) - err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg); + tryJoin(); + } + } + else { + TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg; - if (err != null) { - if (state == DISCONNECTED) { - U.error(log, "Failed to reconnect, segment local node.", err); + if (joining()) { + IgniteSpiException err = null; - state = SEGMENTED; + if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage) + err = spi.duplicateIdError((TcpDiscoveryDuplicateIdMessage)msg); + else if (discoMsg instanceof TcpDiscoveryAuthFailedMessage) + err = spi.authenticationFailedError((TcpDiscoveryAuthFailedMessage)msg); + //TODO: https://issues.apache.org/jira/browse/IGNITE-9829 + else if (discoMsg instanceof TcpDiscoveryCheckFailedMessage) + err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg); - notifyDiscovery( - EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); - } - else - joinError(err); + if (err != null) { + if (state == DISCONNECTED) { + U.error(log, "Failed to reconnect, segment local node.", err); - cancel(); + state = SEGMENTED; - break; - } + notifyDiscovery( + EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes(), null); } + else + joinError(err); + + cancel(); - processDiscoveryMessage((TcpDiscoveryAbstractMessage)msg); + return true; } } - } - catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } - catch (Throwable t) { - if (spi.ignite() instanceof IgniteEx) - ((IgniteEx)spi.ignite()).context().failure().process(new FailureContext(CRITICAL_ERROR, t)); - } - finally { - SocketStream currSock = this.currSock; - if (currSock != null) - U.closeQuiet(currSock.socket()); - - if (joinLatch.getCount() > 0) - joinError(new IgniteSpiException("Some error in join process.")); // This should not occur. - - if (reconnector != null) { - reconnector.cancel(); - - reconnector.join(); - } + processDiscoveryMessage((TcpDiscoveryAbstractMessage)msg); } + + return false; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 82c012c2a1a94..a8a4f2f0a473e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -95,6 +95,8 @@ import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessagesTable; +import org.apache.ignite.internal.thread.context.DistributedOperationContextManager; +import org.apache.ignite.internal.thread.context.Scope; import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor; import org.apache.ignite.internal.util.GridBoundedLinkedHashSet; import org.apache.ignite.internal.util.GridConcurrentHashSet; @@ -3046,8 +3048,10 @@ void addMessage(TcpDiscoveryAbstractMessage msg, boolean ignoreHighPriority, boo return; } - if (msg instanceof TraceableMessage) { - TraceableMessage tMsg = (TraceableMessage)msg; + if (!fromSocket) + msg.opCtxMsg = DistributedOperationContextManager.instance().collectDistributedAttributes(); + + if (msg instanceof TraceableMessage tMsg) { // If we read this message from socket. if (fromSocket) @@ -3173,11 +3177,8 @@ protected void runTasks() { task.run(); } - /** {@inheritDoc} */ - @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { - if (msg == WAKEUP) - return; - + /** */ + private void processMessage0(TcpDiscoveryAbstractMessage msg) { notifiedDiscovery.set(false); if (msg instanceof TraceableMessage) { @@ -3315,6 +3316,16 @@ else if (msg instanceof TcpDiscoveryAuthFailedMessage) } } + /** {@inheritDoc} */ + @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { + if (msg == WAKEUP) + return; + + try (Scope ignored = DistributedOperationContextManager.instance().restoreDistributedAttributes(msg.opCtxMsg)) { + processMessage0(msg); + } + } + /** * Processes authentication failed message. * diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java index f23e36f200d27..d76279fb28082 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/InetSocketAddressMessage.java @@ -52,7 +52,6 @@ public int port() { return port; } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(InetSocketAddressMessage.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index 7a97763c36b25..9dda990c7d020 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -21,6 +21,7 @@ import java.util.HashSet; import java.util.Set; import java.util.UUID; +import org.apache.ignite.internal.DistributedOperationContextMessage; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -76,6 +77,11 @@ public abstract class TcpDiscoveryAbstractMessage implements Message { @Order(4) Set failedNodes; + /** Operation context attributes message. */ + @GridToStringInclude + @Order(5) + public @Nullable DistributedOperationContextMessage opCtxMsg; + /** * Default no-arg constructor for {@link Externalizable} interface. */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java index 9de906b27290c..bdf84b743ad2d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.thread.context; +import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; @@ -36,8 +37,13 @@ import java.util.function.Function; import java.util.function.Supplier; import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.managers.discovery.CustomEventListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture; @@ -48,6 +54,7 @@ import org.apache.ignite.internal.thread.pool.IgniteStripedThreadPoolExecutor; import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.queue.IgniteAsyncObjectHandler; import org.apache.ignite.internal.util.worker.queue.IgniteDelayedObjectHandler; @@ -56,6 +63,7 @@ import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.discovery.tcp.messages.InetSocketAddressMessage; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.thread.IgniteThread; import org.junit.Test; @@ -64,6 +72,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause; import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; /** */ public class OperationContextAttributesTest extends GridCommonAbstractTest { @@ -98,6 +107,8 @@ public class OperationContextAttributesTest extends GridCommonAbstractTest { @Override protected void afterTest() throws Exception { super.afterTest(); + stopAllGrids(); + if (poolToShutdownAfterTest != null) poolToShutdownAfterTest.shutdownNow(); @@ -808,6 +819,98 @@ public void testContextAwareDelayQueue() throws Exception { } } + /** */ + @Test + public void testSendAttributesByDiscovery() throws Exception { + byte attrId1 = 0; + byte attrId2 = DistributedOperationContextManager.MAX_DISTRIBUTED_ATTR_CNT - 1; + + InetSocketAddressMessage dfltDistAttr1Val = new InetSocketAddressMessage(InetAddress.getLoopbackAddress(), 80); + GridCacheVersion dfltDistrAttr2Val = new GridCacheVersion(1, 1, 1); + + // Local attribute 1. + OperationContextAttribute.newInstance(1000); + + // Distributed attribute 1. + OperationContextAttribute dAttr1 = DistributedOperationContextManager.instance() + .createDistributedAttribute(attrId1, dfltDistAttr1Val); + + // Local attribute 2. + OperationContextAttribute.newInstance("locaAttr2"); + + // Distributed attribute 2. + OperationContextAttribute dAttr2 = DistributedOperationContextManager.instance() + .createDistributedAttribute(attrId2, dfltDistrAttr2Val); + + startGrids(2); + startClientGrid(2); + + CountDownLatch coordLatch = new CountDownLatch(3); + CountDownLatch srvrLatch = new CountDownLatch(3); + CountDownLatch clientLatch = new CountDownLatch(3); + + InetSocketAddressMessage valToSend1 = new InetSocketAddressMessage(dfltDistAttr1Val.address(), 443); + GridCacheVersion valToSend2 = new GridCacheVersion(2, 2, 2); + + for (int i = 0; i < G.allGrids().size(); ++i) { + int i0 = i; + + grid(i).context().discovery().setCustomEventListener( + DynamicCacheChangeBatch.class, new CustomEventListener<>() { + @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, + DynamicCacheChangeBatch msg) { + + InetSocketAddressMessage receivedVal1 = OperationContext.get(dAttr1); + GridCacheVersion receivedVal2 = OperationContext.get(dAttr2); + + assertNotNull(receivedVal1); + assertNotNull(receivedVal2); + + assertFalse(dfltDistAttr1Val.port() == receivedVal1.port()); + assertEquals(receivedVal1.port(), valToSend1.port()); + assertEquals(receivedVal1.address(), valToSend1.address()); + + assertFalse(dfltDistrAttr2Val.equals(receivedVal2)); + assertTrue(valToSend2.equals(receivedVal2)); + + if (grid(i0).localNode().isClient()) + clientLatch.countDown(); + else if (grid(i0).localNode().order() == 1) + coordLatch.countDown(); + else + srvrLatch.countDown(); + } + }); + } + + // Send from the coordinator. + try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, valToSend2)) { + grid(0).createCache(defaultCacheConfiguration()); + } + + assertTrue(waitForCondition(() -> coordLatch.getCount() == 2, getTestTimeout())); + assertTrue(waitForCondition(() -> srvrLatch.getCount() == 2, getTestTimeout())); + assertTrue(waitForCondition(() -> clientLatch.getCount() == 2, getTestTimeout())); + + // Send from a server. + try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, valToSend2)) { + grid(1).destroyCache(DEFAULT_CACHE_NAME); + } + + assertTrue(waitForCondition(() -> coordLatch.getCount() == 1, getTestTimeout())); + assertTrue(waitForCondition(() -> srvrLatch.getCount() == 1, getTestTimeout())); + assertTrue(waitForCondition(() -> clientLatch.getCount() == 1, getTestTimeout())); + + // Send from a client. + try (Scope ignored = OperationContext.set(dAttr1, valToSend1, dAttr2, valToSend2)) { + grid(2).createCache(defaultCacheConfiguration()); + } + + assertTrue(coordLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); + assertTrue(srvrLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); + assertTrue(clientLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); + } + /** */ private void doContextAwareExecutorServiceTest(ExecutorService pool) throws Exception { CountDownLatch poolUnblockedLatch = blockPool(pool); @@ -923,9 +1026,8 @@ public AttributeValueChecker(String expStrAttrVal, Integer expIntAttrVal) { /** */ static void assertAllCreatedChecksPassed() throws Exception { - for (AttributeValueChecker check : CHECKS) { + for (AttributeValueChecker check : CHECKS) check.get(5_000, MILLISECONDS); - } } /** */