diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java index 3c1eef933a38..4f23f86e92da 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java @@ -347,6 +347,15 @@ private void setAndClearChannel() { close(oldChannel); } + /** + * Force-closes the current cached channel so that the next subscriber receives a fresh one. + * This is used for connection-level recovery when the current connection is stale + * but the processor has not detected it (e.g., heartbeats echoed by intermediate infrastructure). + */ + public void forceCloseChannel() { + setAndClearChannel(); + } + /** * Checks the current state of the channel for this channel and returns true if the channel is null or if this * processor is disposed. diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java index d772cfdf0698..f153b4da2f46 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java @@ -48,6 +48,10 @@ public final class ReactorConnectionCache implement // any dependent type; instead, the dependent type must acquire Connection only through the cache route, // i.e., by subscribing to 'createOrGetCachedConnection' via 'get()' getter. private volatile T currentConnection; + // Holds the ID of the connection that invalidateConnection() asked to force-invalidate. + // Only the connection whose getId() matches this value will be invalidated by cacheInvalidateIf; + // a freshly created connection with a different ID is never accidentally invalidated. + private final AtomicReference forceInvalidateConnectionId = new AtomicReference<>(null); private final State state = new State(); /** @@ -113,12 +117,25 @@ public ReactorConnectionCache(Supplier connectionSupplier, String fullyQualif } }).cacheInvalidateIf(c -> { if (c.isDisposed()) { + // Connection disposed for any reason. Clean up the force-invalidate marker if it + // was targeting this connection so it is not accidentally consumed by a future + // connection that happens to have the same ID. + forceInvalidateConnectionId.compareAndSet(c.getId(), null); withConnectionId(logger, c.getId()).log("The connection is closed, requesting a new connection."); return true; - } else { - // Emit cached connection. - return false; } + final String targetId = forceInvalidateConnectionId.get(); + if (targetId != null + && targetId.equals(c.getId()) + && forceInvalidateConnectionId.compareAndSet(targetId, null)) { + // invalidateConnection() marked this connection dirty. + // Close it here — the cache owns the connection lifecycle. + withConnectionId(logger, c.getId()).log("Force-invalidating and closing connection for recovery."); + closeConnection(c, logger, "Force-invalidated for recovery."); + return true; + } + // No forced invalidation targeted this connection — emit it from cache. + return false; }); } @@ -172,6 +189,32 @@ public boolean isCurrentConnectionClosed() { return (currentConnection != null && currentConnection.isDisposed()) || terminated; } + /** + * Marks the current cached connection for invalidation so that the next {@link #get()} call + * closes it and creates a fresh connection. This is used for connection-level recovery when + * the current connection is in a stale state that the cache's normal error detection (via + * endpoint state signals) has not detected — for example, when intermediate infrastructure + * (load balancers, NAT gateways) is echoing AMQP heartbeats on behalf of a dead connection. + * + *

The actual close is deferred to the cache chain's {@code cacheInvalidateIf} predicate + * inside {@link #get()}, keeping the connection lifecycle local to the cache. This avoids + * the concurrency concern of two threads touching the cached connection simultaneously.

+ * + *

This is modeled after the Go SDK's {@code Namespace.Recover()} which explicitly closes + * the old connection and increments the connection revision.

+ * + *

This method is safe to call concurrently. If the connection is already closed or being + * closed, this is a no-op.

+ */ + public void invalidateConnection() { + final T connection = currentConnection; + if (connection != null && !connection.isDisposed()) { + withConnectionId(logger, connection.getId()) + .log("Marking connection for force-invalidation. Next get() will close and replace it."); + forceInvalidateConnectionId.set(connection.getId()); + } + } + /** * Terminate so that consumers will no longer be able to request connection. If there is a current (cached) * connection then it will be closed. diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java new file mode 100644 index 000000000000..df3dc4c1d651 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java @@ -0,0 +1,168 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.implementation; + +import com.azure.core.amqp.exception.AmqpErrorCondition; +import com.azure.core.amqp.exception.AmqpException; + +import java.util.Locale; +import java.util.concurrent.TimeoutException; + +/** + * Classifies errors into recovery tiers, determining what resources should be closed + * between retry attempts. This follows the tiered recovery pattern used by the Go, .NET, + * Python, and JS Azure SDKs. + * + *
    + *
  • {@link #NONE} — Retry on the same link (server-busy, timeouts).
  • + *
  • {@link #LINK} — Close the send/receive link; next retry creates a fresh link on the same connection.
  • + *
  • {@link #CONNECTION} — Close the entire connection; next retry creates a fresh connection and link.
  • + *
  • {@link #FATAL} — Do not retry (unauthorized, not-found, message too large).
  • + *
+ */ +public enum RecoveryKind { + /** + * No recovery needed — retry on the same link and connection. + * Applies to: server-busy, timeouts, resource-limit-exceeded. + */ + NONE, + + /** + * Close the link (and its session) before retrying. The next retry creates a fresh link + * on the same connection. + * Applies to: link:detach-forced, link:stolen, transient AMQP errors on the link. + */ + LINK, + + /** + * Close the entire connection before retrying. The next retry creates a fresh connection, + * session, and link. + * Applies to: connection:forced, connection:framing-error, proton:io, internal-error. + */ + CONNECTION, + + /** + * Do not retry — the error is permanent. + * Applies to: unauthorized-access, not-found, message-size-exceeded. + */ + FATAL; + + /** + * Classifies the given error into a {@link RecoveryKind} that determines what resources + * should be invalidated between retry attempts. + * + * @param error The error to classify. + * @return The recovery kind for the given error. + */ + public static RecoveryKind classify(Throwable error) { + if (error == null) { + return NONE; + } + + // Timeouts — retry on same link, the link may still be healthy. + if (error instanceof TimeoutException) { + return NONE; + } + + if (error instanceof AmqpException) { + final AmqpException amqpError = (AmqpException) error; + final AmqpErrorCondition condition = amqpError.getErrorCondition(); + + if (condition != null) { + switch (condition) { + // Connection-level errors — close the entire connection. + case CONNECTION_FORCED: + case CONNECTION_FRAMING_ERROR: + case CONNECTION_REDIRECT: + case PROTON_IO: + case INTERNAL_ERROR: + return CONNECTION; + + // Link-level errors — close the link, keep the connection. + case LINK_DETACH_FORCED: + case LINK_STOLEN: + case LINK_REDIRECT: + case PARTITION_NOT_OWNED_ERROR: + case TRANSFER_LIMIT_EXCEEDED: + // operation-cancelled can signal "AMQP layer unexpectedly aborted or disconnected" + // (e.g. ReceiverUnsettledDeliveries remote Released outcome), requiring link recovery. + case OPERATION_CANCELLED: + return LINK; + + // Fatal errors — do not retry. + case NOT_FOUND: + case UNAUTHORIZED_ACCESS: + case LINK_PAYLOAD_SIZE_EXCEEDED: + case NOT_ALLOWED: + case NOT_IMPLEMENTED: + case ENTITY_DISABLED_ERROR: + case ENTITY_ALREADY_EXISTS: + case PUBLISHER_REVOKED_ERROR: + case ARGUMENT_ERROR: + case ARGUMENT_OUT_OF_RANGE_ERROR: + case ILLEGAL_STATE: + case MESSAGE_LOCK_LOST: + case STORE_LOCK_LOST_ERROR: + return FATAL; + + // Server-busy, timeouts, and resource-limit errors — retry on same link. + // RESOURCE_LIMIT_EXCEEDED is treated as transient here because ReactorSender + // groups it alongside SERVER_BUSY and TIMEOUT in its send-error retry logic. + case SERVER_BUSY_ERROR: + case TIMEOUT_ERROR: + case RESOURCE_LIMIT_EXCEEDED: + return NONE; + + // Session/lock errors — link-level recovery. + // Session lock loss means the session link is invalid and + // a fresh link must be acquired for a new session. + case SESSION_LOCK_LOST: + case SESSION_CANNOT_BE_LOCKED: + case SESSION_NOT_FOUND: + case MESSAGE_NOT_FOUND: + return LINK; + + default: + break; + } + } + + // Transient AMQP errors without a specific condition — link recovery. + if (amqpError.isTransient()) { + return LINK; + } + + // Non-transient AMQP errors without a recognized condition — fatal. + return FATAL; + } + + // RequestResponseChannelClosedException — link-level (parent connection disposing). + if (error instanceof RequestResponseChannelClosedException) { + return LINK; + } + + // IllegalStateException thrown by a disposed ReactorSender (e.g., "Cannot publish + // message when disposed." or "Cannot publish data batch when disposed."). This is + // a link-staleness signal: the link was closed (possibly by a concurrent recovery + // path) before the in-flight send could complete. LINK recovery creates a fresh + // link on the next retry. + // Match both "Cannot publish" and "disposed" to avoid misclassifying unrelated + // disposal signals (e.g., "Connection is disposed. Cannot get management instance."). + if (error instanceof IllegalStateException) { + final String msg = error.getMessage(); + if (msg != null) { + final String normalizedMsg = msg.toLowerCase(Locale.ROOT); + if (normalizedMsg.contains("cannot publish") && normalizedMsg.contains("disposed")) { + return LINK; + } + } + } + + // Unknown non-AMQP errors — treat as fatal (don't retry application or SDK bugs). + // The Go SDK defaults to CONNECTION for unknown errors, but those are AMQP-layer + // errors (io.EOF, net.Error). Java's non-AMQP exceptions (e.g., AzureException, + // RuntimeException) should fail fast rather than trigger connection recovery. + return FATAL; + } +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java index c22c8ac928b3..6b6c5bd57cf6 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java @@ -129,4 +129,5 @@ static Retry createRetry(AmqpRetryOptions options) { .filter(error -> error instanceof TimeoutException || (error instanceof AmqpException && ((AmqpException) error).isTransient())); } + } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionCacheTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionCacheTest.java index 4b51308f8e56..2016d3fe0260 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionCacheTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionCacheTest.java @@ -222,6 +222,129 @@ public void shouldRefreshCacheOnErrorInCachedConnection() { } } + @Test + public void invalidateConnectionReplacesCachedConnectionOnNextGet() { + final ConnectionSupplier connectionSupplier = new ConnectionSupplier(); + final ReactorConnectionCache connectionCache + = new ReactorConnectionCache<>(connectionSupplier, FQDN, ENTITY_PATH, retryPolicy, new HashMap<>()); + try { + final ReactorConnection[] c = new ReactorConnection[1]; + final Mono connectionMono = connectionCache.get(); + // Populate the cache with the first connection. + StepVerifier.create(connectionMono, 0) + .thenRequest(1) + .then(() -> connectionSupplier.emitEndpointState(EndpointState.ACTIVE)) + .expectNextMatches(con -> { + Assertions.assertFalse(con.isDisposed()); + c[0] = con; + return true; + }) + .expectComplete() + .verify(VERIFY_TIMEOUT); + connectionSupplier.assertInvocationCount(1); + + // Mark the current connection for force-invalidation. + connectionCache.invalidateConnection(); + + // Next subscription must close the marked connection and serve a brand-new one. + StepVerifier.create(connectionMono, 0) + .thenRequest(1) + .then(() -> connectionSupplier.emitEndpointState(EndpointState.ACTIVE)) + .expectNextMatches(con -> { + Assertions.assertFalse(con.isDisposed()); + Assertions.assertNotEquals(c[0], con); + return true; + }) + .expectComplete() + .verify(VERIFY_TIMEOUT); + + // The previously cached connection should be closed by the cacheInvalidateIf branch. + Assertions.assertTrue(c[0].isDisposed()); + // A second connection had to be supplied because the first was force-invalidated. + connectionSupplier.assertInvocationCount(2); + } finally { + connectionSupplier.dispose(); + connectionCache.dispose(); + } + } + + @Test + public void invalidateConnectionDoesNotInvalidateNewlyCreatedConnection() { + // Asserts the AB-A safety property: invalidate() targets a specific connection-id, so a + // fresh connection created after the invalidation marker was consumed must NOT be evicted + // by a stale marker. + final ConnectionSupplier connectionSupplier = new ConnectionSupplier(); + final ReactorConnectionCache connectionCache + = new ReactorConnectionCache<>(connectionSupplier, FQDN, ENTITY_PATH, retryPolicy, new HashMap<>()); + try { + final ReactorConnection[] c = new ReactorConnection[2]; + final Mono connectionMono = connectionCache.get(); + StepVerifier.create(connectionMono, 0) + .thenRequest(1) + .then(() -> connectionSupplier.emitEndpointState(EndpointState.ACTIVE)) + .expectNextMatches(con -> { + c[0] = con; + return true; + }) + .expectComplete() + .verify(VERIFY_TIMEOUT); + + connectionCache.invalidateConnection(); + + // Consume the invalidation marker and obtain the replacement connection. + StepVerifier.create(connectionMono, 0) + .thenRequest(1) + .then(() -> connectionSupplier.emitEndpointState(EndpointState.ACTIVE)) + .expectNextMatches(con -> { + c[1] = con; + Assertions.assertNotEquals(c[0], c[1]); + return true; + }) + .expectComplete() + .verify(VERIFY_TIMEOUT); + + // The marker has been consumed; another get() must serve the same (cached) replacement + // connection without creating a third one. + StepVerifier.create(connectionMono, 0).thenRequest(1).expectNextMatches(con -> { + Assertions.assertEquals(c[1], con); + Assertions.assertFalse(con.isDisposed()); + return true; + }).expectComplete().verify(VERIFY_TIMEOUT); + + connectionSupplier.assertInvocationCount(2); + } finally { + connectionSupplier.dispose(); + connectionCache.dispose(); + } + } + + @Test + public void invalidateConnectionWithoutCachedConnectionIsNoOp() { + final ConnectionSupplier connectionSupplier = new ConnectionSupplier(); + final ReactorConnectionCache connectionCache + = new ReactorConnectionCache<>(connectionSupplier, FQDN, ENTITY_PATH, retryPolicy, new HashMap<>()); + try { + // No connection has been supplied yet. + connectionCache.invalidateConnection(); + connectionSupplier.assertInvocationCount(0); + + // First get() after the no-op invalidate() must still produce a connection normally. + StepVerifier.create(connectionCache.get(), 0) + .thenRequest(1) + .then(() -> connectionSupplier.emitEndpointState(EndpointState.ACTIVE)) + .expectNextMatches(con -> { + Assertions.assertFalse(con.isDisposed()); + return true; + }) + .expectComplete() + .verify(VERIFY_TIMEOUT); + connectionSupplier.assertInvocationCount(1); + } finally { + connectionSupplier.dispose(); + connectionCache.dispose(); + } + } + @Test public void shouldBubbleUpNonRetriableError() { final ConnectionSupplier connectionSupplier = new ConnectionSupplier(); diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java new file mode 100644 index 000000000000..d123a2c8963c --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java @@ -0,0 +1,215 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.implementation; + +import com.azure.core.amqp.exception.AmqpErrorCondition; +import com.azure.core.amqp.exception.AmqpException; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests for {@link RecoveryKind#classify(Throwable)}. + */ +class RecoveryKindTest { + + @Test + void nullErrorReturnsNone() { + assertEquals(RecoveryKind.NONE, RecoveryKind.classify(null)); + } + + @Test + void timeoutExceptionReturnsNone() { + assertEquals(RecoveryKind.NONE, RecoveryKind.classify(new TimeoutException("timed out"))); + } + + @Test + void serverBusyReturnsNone() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.SERVER_BUSY_ERROR, "server busy", null); + assertEquals(RecoveryKind.NONE, RecoveryKind.classify(error)); + } + + @Test + void timeoutErrorConditionReturnsNone() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR, "timeout", null); + assertEquals(RecoveryKind.NONE, RecoveryKind.classify(error)); + } + + @Test + void linkDetachForcedReturnsLink() { + final AmqpException error + = new AmqpException(true, AmqpErrorCondition.LINK_DETACH_FORCED, "detach forced", null); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void linkStolenReturnsLink() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.LINK_STOLEN, "link stolen", null); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void transientAmqpErrorWithoutConditionReturnsLink() { + final AmqpException error = new AmqpException(true, "transient error", null, null); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void connectionForcedReturnsConnection() { + final AmqpException error + = new AmqpException(true, AmqpErrorCondition.CONNECTION_FORCED, "connection forced", null); + assertEquals(RecoveryKind.CONNECTION, RecoveryKind.classify(error)); + } + + @Test + void connectionFramingErrorReturnsConnection() { + final AmqpException error + = new AmqpException(true, AmqpErrorCondition.CONNECTION_FRAMING_ERROR, "framing error", null); + assertEquals(RecoveryKind.CONNECTION, RecoveryKind.classify(error)); + } + + @Test + void internalErrorReturnsConnection() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.INTERNAL_ERROR, "internal error", null); + assertEquals(RecoveryKind.CONNECTION, RecoveryKind.classify(error)); + } + + @Test + void protonIoReturnsConnection() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.PROTON_IO, "io error", null); + assertEquals(RecoveryKind.CONNECTION, RecoveryKind.classify(error)); + } + + @Test + void connectionRedirectReturnsConnection() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.CONNECTION_REDIRECT, "redirect", null); + assertEquals(RecoveryKind.CONNECTION, RecoveryKind.classify(error)); + } + + @Test + void linkRedirectReturnsLink() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.LINK_REDIRECT, "redirect", null); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void transferLimitExceededReturnsLink() { + final AmqpException error + = new AmqpException(true, AmqpErrorCondition.TRANSFER_LIMIT_EXCEEDED, "transfer limit", null); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void argumentErrorReturnsFatal() { + final AmqpException error = new AmqpException(false, AmqpErrorCondition.ARGUMENT_ERROR, "bad argument", null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void notFoundReturnsFatal() { + final AmqpException error = new AmqpException(false, AmqpErrorCondition.NOT_FOUND, "not found", null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void unauthorizedAccessReturnsFatal() { + final AmqpException error + = new AmqpException(false, AmqpErrorCondition.UNAUTHORIZED_ACCESS, "unauthorized", null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void payloadSizeExceededReturnsFatal() { + final AmqpException error + = new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, "too large", null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void notAllowedReturnsFatal() { + final AmqpException error = new AmqpException(false, AmqpErrorCondition.NOT_ALLOWED, "not allowed", null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void nonTransientAmqpErrorReturnsFatal() { + final AmqpException error = new AmqpException(false, "permanent error", null, null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void requestResponseChannelClosedReturnsLink() { + final RequestResponseChannelClosedException error = new RequestResponseChannelClosedException("channel closed"); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void unknownExceptionReturnsFatal() { + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(new RuntimeException("unknown"))); + } + + @Test + void sessionLockLostReturnsLink() { + final AmqpException error + = new AmqpException(true, AmqpErrorCondition.SESSION_LOCK_LOST, "session lock lost", null); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void messageLockLostReturnsFatal() { + final AmqpException error + = new AmqpException(false, AmqpErrorCondition.MESSAGE_LOCK_LOST, "message lock lost", null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void storeLockLostReturnsFatal() { + final AmqpException error + = new AmqpException(false, AmqpErrorCondition.STORE_LOCK_LOST_ERROR, "store lock lost", null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void operationCancelledReturnsLink() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.OPERATION_CANCELLED, "cancelled", null); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void resourceLimitExceededReturnsNone() { + final AmqpException error + = new AmqpException(true, AmqpErrorCondition.RESOURCE_LIMIT_EXCEEDED, "resource limit", null); + assertEquals(RecoveryKind.NONE, RecoveryKind.classify(error)); + } + + @Test + void illegalStateExceptionDisposedMessageReturnsLink() { + // Matches ReactorSender.send() message: "connectionId[%s] linkName[%s] Cannot publish message when disposed." + assertEquals(RecoveryKind.LINK, RecoveryKind.classify( + new IllegalStateException("connectionId[abc] linkName[xyz] Cannot publish message when disposed."))); + } + + @Test + void illegalStateExceptionDisposedDataBatchReturnsLink() { + // Matches ReactorSender.send(List) message: "connectionId[%s] linkName[%s] Cannot publish data batch when disposed." + assertEquals(RecoveryKind.LINK, RecoveryKind.classify( + new IllegalStateException("connectionId[abc] linkName[xyz] Cannot publish data batch when disposed."))); + } + + @Test + void illegalStateExceptionUnrelatedToDisposedReturnsFatal() { + // Non-disposed IllegalStateException must remain FATAL (genuine application or SDK bug). + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(new IllegalStateException("some unexpected state"))); + } + + @Test + void illegalStateExceptionConnectionDisposedReturnsFatal() { + // "Connection is disposed. Cannot get management instance." contains "disposed" but NOT + // "Cannot publish" — must remain FATAL to avoid misclassifying connection-level disposal. + assertEquals(RecoveryKind.FATAL, RecoveryKind + .classify(new IllegalStateException("Connection is disposed. Cannot get management instance."))); + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/pom.xml b/sdk/servicebus/azure-messaging-servicebus/pom.xml index 8d5c27f749ee..f5e60c55f37d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/pom.xml +++ b/sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -69,7 +69,7 @@ com.azure azure-core-amqp - 2.11.4 + 2.12.0-beta.1 com.azure diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ConnectionCacheWrapper.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ConnectionCacheWrapper.java index 1d4aa44b4c42..282b846b4f42 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ConnectionCacheWrapper.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ConnectionCacheWrapper.java @@ -51,4 +51,18 @@ AmqpRetryOptions getRetryOptions() { boolean isChannelClosed() { return isV2 ? cache.isCurrentConnectionClosed() : processor.isChannelClosed(); } + + /** + * Invalidates the current cached connection for connection-level recovery when the + * connection is stale but the cache has not detected it via endpoint state signals. + * On v2, marks the connection for invalidation so the next get() closes it and creates + * a fresh one. On v1, force-closes the current channel immediately. + */ + void invalidateConnection() { + if (isV2) { + cache.invalidateConnection(); + } else { + processor.forceCloseChannel(); + } + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index 0b3154ec33ef..8a8ee446d186 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -11,6 +11,7 @@ import com.azure.core.amqp.implementation.MessageFlux; import com.azure.core.amqp.implementation.MessageSerializer; import com.azure.core.amqp.implementation.RequestResponseChannelClosedException; +import com.azure.core.amqp.implementation.RecoveryKind; import com.azure.core.amqp.implementation.RetryUtil; import com.azure.core.amqp.implementation.StringUtil; import com.azure.core.amqp.implementation.handler.DeliveryNotOnLinkException; @@ -45,6 +46,7 @@ import java.util.Objects; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -1736,14 +1738,47 @@ private ServiceBusAsyncConsumer getOrCreateConsumer() { // final Mono retryableReceiveLinkMono = RetryUtil.withRetry(receiveLinkMono.onErrorMap(RequestResponseChannelClosedException.class, e -> { - // When the current connection is being disposed, the V1 ConnectionProcessor or V2 ReactorConnectionCache - // can produce a new connection if downstream request. In this context, treat - // RequestResponseChannelClosedException error from the following two sources as retry-able so that - // retry can obtain a new connection - - // 1. error from the RequestResponseChannel scoped to the current connection being disposed, - // 2. error from the V2 RequestResponseChannelCache scoped to the current connection being disposed. - // return new AmqpException(true, e.getMessage(), e, null); + }).onErrorResume(e -> { + final RecoveryKind recoveryKind = RecoveryKind.classify(e); + if (recoveryKind == RecoveryKind.LINK || recoveryKind == RecoveryKind.CONNECTION) { + LOGGER.atWarning() + .addKeyValue(LINK_NAME_KEY, linkName) + .addKeyValue("recoveryKind", recoveryKind) + .log("Receive link creation failed, performing {} recovery.", recoveryKind, e); + + // For both LINK and CONNECTION recovery, the session hosting the failed link may + // be stale. Ask the connection to remove it so the next retry creates a fresh + // session + link. CONNECTION recovery additionally invalidates the cached + // connection below so the next retry rebuilds connection, session, and link. + Mono recovery = connectionProcessor.flatMap(connection -> { + final boolean removed = connection.removeSession(entityPath); + LOGGER.atVerbose() + .addKeyValue(LINK_NAME_KEY, linkName) + .addKeyValue("sessionRemoved", removed) + .log("Stale session removal during {} recovery.", recoveryKind); + return Mono.empty(); + }).onErrorResume(error -> { + LOGGER.atWarning() + .addKeyValue(LINK_NAME_KEY, linkName) + .log("Error obtaining connection during {} recovery.", recoveryKind, error); + return Mono.empty(); + }); + + if (recoveryKind == RecoveryKind.CONNECTION) { + recovery + = recovery.then(Mono.fromRunnable(() -> connectionCacheWrapper.invalidateConnection())); + } + // Ensure the error passes the standard retry filter. Non-AMQP errors + // (e.g., IllegalStateException) classified as LINK/CONNECTION must be wrapped + // as transient AmqpException so RetryUtil.createRetry accepts them for retry. + final Throwable retriable = (e instanceof TimeoutException + || (e instanceof AmqpException && ((AmqpException) e).isTransient())) + ? e + : new AmqpException(true, e.getMessage(), e, null); + return recovery.then(Mono.error(retriable)); + } + return Mono.error(e); }), connectionCacheWrapper.getRetryOptions(), "Failed to create receive link " + linkName, true); // A Flux that produces a new AmqpReceiveLink each time it receives a request from the below diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java index 58db3b129e65..49c78ad0d96f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java @@ -10,7 +10,9 @@ import com.azure.core.amqp.implementation.AmqpSendLink; import com.azure.core.amqp.implementation.ErrorContextProvider; import com.azure.core.amqp.implementation.MessageSerializer; +import com.azure.core.amqp.implementation.RecoveryKind; import com.azure.core.amqp.implementation.RequestResponseChannelClosedException; +import com.azure.core.amqp.implementation.RetryUtil; import com.azure.core.annotation.ServiceClient; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; @@ -24,6 +26,8 @@ import org.apache.qpid.proton.amqp.transaction.TransactionalState; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; import java.time.OffsetDateTime; import java.util.ArrayList; @@ -35,6 +39,7 @@ import java.util.Locale; import java.util.Objects; import java.util.Set; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -44,7 +49,6 @@ import java.util.stream.Collector; import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY; -import static com.azure.core.amqp.implementation.RetryUtil.withRetry; import static com.azure.core.util.FluxUtil.fluxError; import static com.azure.core.util.FluxUtil.monoError; import static com.azure.messaging.servicebus.implementation.Messages.INVALID_OPERATION_DISPOSED_SENDER; @@ -234,6 +238,7 @@ public final class ServiceBusSenderAsyncClient implements AutoCloseable { private final MessagingEntityType entityType; private final Runnable onClientClose; private final String entityName; + private final ConnectionCacheWrapper connectionCacheWrapper; private final Mono connectionProcessor; private final String fullyQualifiedNamespace; private final String viaEntityName; @@ -254,6 +259,7 @@ public final class ServiceBusSenderAsyncClient implements AutoCloseable { this.retryOptions = Objects.requireNonNull(retryOptions, "'retryOptions' cannot be null."); this.entityName = Objects.requireNonNull(entityName, "'entityPath' cannot be null."); Objects.requireNonNull(connectionCacheWrapper, "'connectionCacheWrapper' cannot be null."); + this.connectionCacheWrapper = connectionCacheWrapper; this.connectionProcessor = connectionCacheWrapper.getConnection(); this.fullyQualifiedNamespace = connectionCacheWrapper.getFullyQualifiedNamespace(); this.instrumentation = Objects.requireNonNull(instrumentation, "'instrumentation' cannot be null."); @@ -462,7 +468,9 @@ public Mono createMessageBatch(CreateMessageBatchOptions final int maxSize = options.getMaximumSizeInBytes(); - return getSendLinkWithRetry("create-batch").flatMap(link -> link.getLinkSize().flatMap(size -> { + return getSendLinkAndSizeWithRetry("create-batch").flatMap(t -> { + final AmqpSendLink link = t.getT1(); + final int size = t.getT2(); final int maximumLinkSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES; if (maxSize > maximumLinkSize) { return monoError(logger, @@ -474,7 +482,7 @@ public Mono createMessageBatch(CreateMessageBatchOptions final int batchSize = maxSize > 0 ? maxSize : maximumLinkSize; return Mono .just(new ServiceBusMessageBatch(isV2, batchSize, link::getErrorContext, tracer, messageSerializer)); - })).onErrorMap(this::mapError); + }).onErrorMap(this::mapError); } /** @@ -811,7 +819,9 @@ private Mono scheduleMessageInternal(ServiceBusMessage message, OffsetDate } return tracer.traceScheduleMono("ServiceBus.scheduleMessage", - getSendLinkWithRetry("schedule-message").flatMap(link -> link.getLinkSize().flatMap(size -> { + getSendLinkAndSizeWithRetry("schedule-message").flatMap(t -> { + final AmqpSendLink link = t.getT1(); + final int size = t.getT2(); final int maxSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES; return connectionProcessor.flatMap(connection -> connection.getManagementNode(entityName, entityType)) .flatMap( @@ -819,7 +829,7 @@ private Mono scheduleMessageInternal(ServiceBusMessage message, OffsetDate .schedule(Arrays.asList(message), scheduledEnqueueTime, maxSize, link.getLinkName(), transactionContext) .next()); - })), message, message.getContext()).onErrorMap(this::mapError); + }), message, message.getContext()).onErrorMap(this::mapError); } /** @@ -859,7 +869,9 @@ private Mono sendBatchInternal(ServiceBusMessageBatch batch, messages.add(message); }); + final AtomicReference operationLink = new AtomicReference<>(); final Mono sendMessage = getSendLink("send-batch").flatMap(link -> { + operationLink.set(link); if (transactionContext != null && transactionContext.getTransactionId() != null) { final TransactionalState deliveryState = new TransactionalState(); deliveryState.setTxnId(Binary.create(transactionContext.getTransactionId())); @@ -871,8 +883,11 @@ private Mono sendBatchInternal(ServiceBusMessageBatch batch, } }); - final String message = "Sending messages timed out. message-count:" + batch.getCount() + entityId(); - final Mono withRetry = withRetry(sendMessage, retryOptions, message).onErrorMap(this::mapError); + final String timeoutMessage = "Sending messages timed out. message-count:" + batch.getCount() + entityId(); + final Mono withRetry = RetryUtil.withRetry( + sendMessage + .onErrorResume(e -> recoverBeforeRetry(e, "sendBatch", operationLink).then(Mono.error(asRetriable(e)))), + retryOptions, timeoutMessage).onErrorMap(this::mapError); return instrumentation.instrumentSendBatch("ServiceBus.send", withRetry, batch.getMessages()); } @@ -883,19 +898,29 @@ private Mono sendFluxInternal(Flux messages, new IllegalStateException(String.format(INVALID_OPERATION_DISPOSED_SENDER, "sendMessage"))); } - final Mono> batchList - = getSendLinkWithRetry("send-batches").flatMap(link -> link.getLinkSize().flatMap(size -> { - final int batchSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES; - final CreateMessageBatchOptions batchOptions - = new CreateMessageBatchOptions().setMaximumSizeInBytes(batchSize); - return messages.collect( - new AmqpMessageCollector(isV2, batchOptions, 1, link::getErrorContext, tracer, messageSerializer)); - })); + // Apply retry+recovery to BOTH link acquisition and link-size negotiation. Keeping + // messages.collect() outside the retry boundary avoids re-subscribing the user-provided + // Flux on each retry attempt, which could duplicate side-effects or re-consume a hot + // publisher. A getLinkSize() failure (link never ACTIVE, link disposed during negotiation) + // now triggers the same tiered recovery as a getSendLink() failure. + final Mono> linkAndSize = getSendLinkAndSizeWithRetry("send-batches"); + + final Mono> batchListMono = linkAndSize.flatMap(t -> { + final AmqpSendLink link = t.getT1(); + final int size = t.getT2(); + final int batchSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES; + final CreateMessageBatchOptions batchOptions + = new CreateMessageBatchOptions().setMaximumSizeInBytes(batchSize); + return messages.collect( + new AmqpMessageCollector(isV2, batchOptions, 1, link::getErrorContext, tracer, messageSerializer)); + }); - return batchList.flatMap(list -> Flux.fromIterable(list) + final Mono sendOperation = batchListMono.flatMap(list -> Flux.fromIterable(list) .flatMap(batch -> sendBatchInternal(batch, transactionContext)) .then() - .doOnError(error -> logger.error("Error sending batch.", error))).onErrorMap(this::mapError); + .doOnError(error -> logger.error("Error sending batch.", error))); + + return sendOperation.onErrorMap(this::mapError); } private Mono getSendLink(String callSite) { @@ -925,8 +950,24 @@ private Mono getSendLink(String callSite) { return getSendLink; } - private Mono getSendLinkWithRetry(String callSite) { - return withRetry(getSendLink(callSite), retryOptions, String.format(retryGetLinkErrorMessageFormat, callSite)); + /** + * Acquires a send link AND its negotiated maximum message size inside a single retry+recovery + * boundary. A failure during {@code getLinkSize()} (e.g., the link never reaches ACTIVE within + * the timeout, or the link becomes disposed mid-negotiation) triggers the same tiered recovery + * as a failure during {@code getSendLink()} — without it, transient size-negotiation faults + * would short-circuit the operation without LINK or CONNECTION recovery. + * + * @param callSite identifier used in recovery diagnostics. + * @return a Mono emitting (link, size) on success, retried with recovery on transient failures. + */ + private Mono> getSendLinkAndSizeWithRetry(String callSite) { + final AtomicReference operationLink = new AtomicReference<>(); + return RetryUtil.withRetry( + getSendLink(callSite).doOnNext(operationLink::set) + .flatMap(link -> link.getLinkSize().map(size -> Tuples.of(link, size))) + .onErrorResume(e -> recoverBeforeRetry(e, "getSendLinkAndSize-" + callSite, operationLink) + .then(Mono.error(asRetriable(e)))), + retryOptions, String.format(retryGetLinkErrorMessageFormat, callSite)); } private Throwable mapError(Throwable throwable) { @@ -936,6 +977,73 @@ private Throwable mapError(Throwable throwable) { return throwable; } + /** + * Performs reactive tiered recovery based on the classified error. For LINK recovery, closes the + * send link so the next retry creates a fresh one. For CONNECTION recovery, closes the link and + * invalidates the connection so the cache creates everything fresh. + * + *

Called from {@code onErrorResume} at each send call site so that recovery completes before + * the error is re-emitted and the standard retry logic re-subscribes.

+ */ + private Mono recoverBeforeRetry(Throwable error, String callSite, AtomicReference linkRef) { + final RecoveryKind kind = RecoveryKind.classify(error); + if (kind != RecoveryKind.LINK && kind != RecoveryKind.CONNECTION) { + return Mono.empty(); + } + + logger.atWarning() + .addKeyValue(ENTITY_PATH_KEY, entityName) + .addKeyValue("recoveryKind", kind) + .addKeyValue("callSite", callSite) + .log("Performing {} recovery before retry.", kind, error); + + // Close the operation-scoped send link so the next retry creates a fresh one. + // Using a per-operation AtomicReference (not a class-level field) prevents concurrent send + // operations from accidentally closing each other's links. + // Use closeAsync() rather than dispose() to avoid blocking the Reactor thread. + Mono recovery = Mono.empty(); + final AmqpSendLink link = linkRef != null ? linkRef.getAndSet(null) : null; + if (link != null) { + recovery = Mono.defer(() -> { + Mono close = link.closeAsync(); + return close != null ? close : Mono.empty(); + }).onErrorResume(closeErr -> { + logger.atVerbose() + .addKeyValue(ENTITY_PATH_KEY, entityName) + .log("Error closing stale send link during recovery.", closeErr); + return Mono.empty(); + }); + } + linkName.set(null); + + // For CONNECTION errors, invalidate the cached connection so the next get() closes it + // and creates a fresh one. Matches Go SDK's Namespace.Recover(). + if (kind == RecoveryKind.CONNECTION) { + recovery = recovery.then(Mono.fromRunnable(() -> connectionCacheWrapper.invalidateConnection())); + } + + return recovery; + } + + /** + * Ensures the error is retriable by the standard {@link RetryUtil#createRetry} filter. + * That filter only accepts {@link TimeoutException} or transient {@link AmqpException}. + * Non-AMQP errors like {@code IllegalStateException("Cannot publish when disposed")} + * that {@link RecoveryKind} classifies as LINK would otherwise be rejected, bypassing + * the recovery we just performed. This wraps such errors as transient AmqpException. + */ + private static Throwable asRetriable(Throwable error) { + if (error instanceof TimeoutException + || (error instanceof AmqpException && ((AmqpException) error).isTransient())) { + return error; + } + final RecoveryKind kind = RecoveryKind.classify(error); + if (kind == RecoveryKind.LINK || kind == RecoveryKind.CONNECTION) { + return new AmqpException(true, error.getMessage(), error, null); + } + return error; + } + private String entityId() { return " " + ENTITY_PATH_KEY + ":" + entityName + (viaEntityName != null ? " " + VIA_ENTITY_NAME_KEY + ":" + viaEntityName : "") + " "; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionAcquirer.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionAcquirer.java index ee9a0a9e0399..a090f9f7c159 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionAcquirer.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionAcquirer.java @@ -9,6 +9,7 @@ import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.implementation.StringUtil; import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2; +import com.azure.core.amqp.implementation.RecoveryKind; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.implementation.MessagingEntityType; import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode; @@ -137,12 +138,25 @@ private Mono acquireIntern(String sessionId) { return acquireSession(sessionId).timeout(tryTimeout) .retryWhen(Retry.from(signals -> signals.flatMap(signal -> { final Throwable t = signal.failure(); + final RecoveryKind kind = RecoveryKind.classify(t); + if (kind == RecoveryKind.CONNECTION) { + logger.atWarning() + .addKeyValue(ENTITY_PATH_KEY, entityPath) + .log("Connection-level error acquiring session, forcing connection recovery.", t); + connectionCacheWrapper.invalidateConnection(); + return Mono.delay(Duration.ZERO); + } if (isTimeoutError(t)) { logger.atVerbose() .addKeyValue(ENTITY_PATH_KEY, entityPath) .addKeyValue("attempt", signal.totalRetriesInARow()) .log("Timeout while acquiring session '{}'.", sessionName(sessionId), t); - // retry session acquire using Schedulers.parallel() and free the QPid thread. + return Mono.delay(Duration.ZERO); + } + if (kind == RecoveryKind.LINK) { + logger.atWarning() + .addKeyValue(ENTITY_PATH_KEY, entityPath) + .log("Link-level error acquiring session, retrying.", t); return Mono.delay(Duration.ZERO); } return publishError(sessionId, t, true); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java index b5760c5afe0e..b5299fe2776a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java @@ -8,6 +8,7 @@ import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.exception.SessionErrorContext; import com.azure.core.amqp.implementation.MessageSerializer; +import com.azure.core.amqp.implementation.RecoveryKind; import com.azure.core.amqp.implementation.StringUtil; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.implementation.DispositionStatus; @@ -282,24 +283,37 @@ Mono getActiveLink() { .timeout(operationTimeout) .then(Mono.just(link)))).retryWhen(Retry.from(retrySignals -> retrySignals.flatMap(signal -> { final Throwable failure = signal.failure(); + final RecoveryKind kind = RecoveryKind.classify(failure); LOGGER.atInfo() .addKeyValue(ENTITY_PATH_KEY, entityPath) .addKeyValue("attempt", signal.totalRetriesInARow()) + .addKeyValue("recoveryKind", kind) .log("Error occurred while getting unnamed session.", failure); if (isDisposed.get()) { return Mono.error( new AmqpException(false, "SessionManager is already disposed.", failure, getErrorContext())); - } else if (failure instanceof TimeoutException) { + } + + if (kind == RecoveryKind.CONNECTION) { + LOGGER.atWarning() + .addKeyValue(ENTITY_PATH_KEY, entityPath) + .log("Connection-level error in session manager, forcing connection recovery.", failure); + connectionCacheWrapper.invalidateConnection(); + } + + // Mono.delay(Duration.ZERO) instead of Mono.empty() — the delay schedules the retry on the + // 'parallel' Scheduler, freeing the QPid (proton) thread for other IO. Using Mono.empty() would + // block the QPid thread and slow down / block message pumping in other sessions. + if (failure instanceof TimeoutException) { return Mono.delay(Duration.ZERO); } else if (failure instanceof AmqpException && ((AmqpException) failure).getErrorCondition() == AmqpErrorCondition.TIMEOUT_ERROR) { // The link closed remotely with 'Detach {errorCondition:com.microsoft:timeout}' frame because // the broker waited for N seconds (60 sec hard limit today) but there was no free or new session. - // - // Given N seconds elapsed since the last session acquire attempt, request for a session on - // the 'parallel' Scheduler and free the 'QPid' thread for other IO. - // + return Mono.delay(Duration.ZERO); + } else if (kind == RecoveryKind.LINK || kind == RecoveryKind.CONNECTION) { + // Link or connection-level error — retry to acquire a fresh link (or connection). return Mono.delay(Duration.ZERO); } else { final long id = System.nanoTime(); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java index 20ca897316e9..204eb1db52f7 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java @@ -6,6 +6,8 @@ import com.azure.core.amqp.AmqpEndpointState; import com.azure.core.amqp.AmqpRetryMode; import com.azure.core.amqp.AmqpRetryOptions; +import com.azure.core.amqp.exception.AmqpErrorCondition; +import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.AmqpTransaction; import com.azure.core.amqp.AmqpTransportType; import com.azure.core.amqp.FixedAmqpRetryPolicy; @@ -668,7 +670,8 @@ void failedSendMessageReportsMetrics(boolean isV2) { when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); - when(sendLink.send(any(Message.class))).thenThrow(new RuntimeException("foo")); + when(sendLink.send(any(Message.class))) + .thenReturn(Mono.error(new AmqpException(false, AmqpErrorCondition.NOT_FOUND, "entity not found", null))); // Act StepVerifier.create(sender.sendMessage(new ServiceBusMessage(TEST_CONTENTS))) diff --git a/sdk/servicebus/ci.yml b/sdk/servicebus/ci.yml index 58823bab7a69..49564ad549aa 100644 --- a/sdk/servicebus/ci.yml +++ b/sdk/servicebus/ci.yml @@ -13,6 +13,7 @@ trigger: - sdk/servicebus/azure-messaging-servicebus-stress/ - sdk/servicebus/azure-messaging-servicebus-track2-perf/ - sdk/servicebus/build/ + - sdk/core/azure-core-amqp/ exclude: - sdk/servicebus/pom.xml - sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -33,6 +34,7 @@ pr: - sdk/servicebus/azure-messaging-servicebus-stress/ - sdk/servicebus/azure-messaging-servicebus-track2-perf/ - sdk/servicebus/build/ + - sdk/core/azure-core-amqp/ exclude: - sdk/servicebus/pom.xml - sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -59,3 +61,6 @@ extends: # required by the above perf libraries - name: perf-test-core groupId: com.azure + - name: azure-core-amqp + groupId: com.azure + # Build azure-core-amqp from source (needed for RecoveryKind, tiered retry)