Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a3faf26
Add tiered send/receive recovery matching Go SDK parity
EldertGrootenboerMS Mar 18, 2026
5e28c33
fix(ci): build azure-core-amqp from source for tiered recovery PR
EldertGrootenboerMS Mar 18, 2026
f0c6e53
Merge branch 'main' into fix/servicebus-tiered-send-recovery
EldertGrootenboer Mar 18, 2026
0bd7282
Merge branch 'main' into fix/servicebus-tiered-send-recovery
EldertGrootenboer Mar 18, 2026
6116b7b
fix(review): address Copilot PR review feedback on tiered recovery
EldertGrootenboerMS Mar 18, 2026
f3a29a7
fix(review): address second Copilot review on tiered recovery
EldertGrootenboerMS Mar 18, 2026
79c52f9
fix(review): address third Copilot review on tiered recovery
EldertGrootenboerMS Mar 18, 2026
1fec9e5
fix(servicebus): make link/connection force-close non-blocking in rec…
EldertGrootenboerMS Mar 18, 2026
d4c64fd
fix(amqp): prevent NONE failure from consuming quick-retry flag
EldertGrootenboerMS Mar 18, 2026
52d4435
fix(amqp): classify disposed-link IllegalStateException as LINK recovery
EldertGrootenboerMS Mar 18, 2026
ce10aa2
fix(amqp): narrow disposed-ISE match to prevent tier misclassification
EldertGrootenboerMS Mar 18, 2026
5448c97
fix(amqp): clarify virtual-time reason in retry tests
EldertGrootenboerMS Mar 18, 2026
4bd93bb
fix(amqp): tie forceCloseConnection invalidation to specific connecti…
EldertGrootenboerMS Mar 18, 2026
2e80396
fix(amqp): rename test methods to camelCase for checkstyle compliance
EldertGrootenboerMS Mar 18, 2026
1860dd3
fix(amqp): use distinct log message for force-invalidation path
EldertGrootenboerMS Mar 18, 2026
c2d2086
Merge branch 'main' into fix/servicebus-tiered-send-recovery
EldertGrootenboer Mar 19, 2026
bf2b560
fix(amqp): guard backoff overflow and normalize RecoveryKind message …
EldertGrootenboerMS Mar 19, 2026
e76c2d8
fix(servicebus): scope send-link recovery to per-operation reference,…
EldertGrootenboerMS Mar 19, 2026
3eea3b2
fix(servicebus): include exception in connection-recovery warning log
EldertGrootenboerMS Mar 19, 2026
1f9bf92
fix(ci): move azure-core-amqp source comment to correct entry
EldertGrootenboerMS Mar 19, 2026
c0e2d8b
Merge branch 'main' into fix/servicebus-tiered-send-recovery
EldertGrootenboer Mar 19, 2026
0c6cb7a
refactor: address reviewer feedback on tiered recovery
EldertGrootenboerMS Apr 2, 2026
cb1e86a
fix(servicebus): restore comment explaining Mono.delay(Duration.ZERO)…
EldertGrootenboerMS Apr 9, 2026
a3b639c
fix(servicebus): include exception in recovery warning logs
EldertGrootenboerMS Apr 9, 2026
ea2961b
docs(servicebus): clarify invalidateConnection v1/v2 behavior in javadoc
EldertGrootenboerMS Apr 9, 2026
591a8c7
Merge branch 'main' into fix/servicebus-tiered-send-recovery
EldertGrootenboerMS May 6, 2026
43a27f4
test(amqp): add unit tests for ReactorConnectionCache.invalidateConne…
EldertGrootenboerMS May 6, 2026
43d5ff1
fix(servicebus): wrap getLinkSize() in retry+recovery boundary
EldertGrootenboerMS May 6, 2026
e1b382a
fix(servicebus): remove unused static import for RetryUtil.withRetry
EldertGrootenboerMS May 6, 2026
95e9070
docs(servicebus): clarify session-removal recovery comment for LINK a…
EldertGrootenboerMS May 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,15 @@ private void setAndClearChannel() {
close(oldChannel);
}

/**
* Force-closes the current cached channel so that the next subscriber receives a fresh one.
* This is used for connection-level recovery when the current connection is stale
* but the processor has not detected it (e.g., heartbeats echoed by intermediate infrastructure).
*/
public void forceCloseChannel() {
setAndClearChannel();
}

/**
* Checks the current state of the channel for this channel and returns true if the channel is null or if this
* processor is disposed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public final class ReactorConnectionCache<T extends ReactorConnection> implement
// any dependent type; instead, the dependent type must acquire Connection only through the cache route,
// i.e., by subscribing to 'createOrGetCachedConnection' via 'get()' getter.
private volatile T currentConnection;
Comment thread
EldertGrootenboer marked this conversation as resolved.
// Holds the ID of the connection that invalidateConnection() asked to force-invalidate.
// Only the connection whose getId() matches this value will be invalidated by cacheInvalidateIf;
// a freshly created connection with a different ID is never accidentally invalidated.
private final AtomicReference<String> forceInvalidateConnectionId = new AtomicReference<>(null);
private final State state = new State();

/**
Expand Down Expand Up @@ -113,12 +117,25 @@ public ReactorConnectionCache(Supplier<T> connectionSupplier, String fullyQualif
}
}).cacheInvalidateIf(c -> {
if (c.isDisposed()) {
// Connection disposed for any reason. Clean up the force-invalidate marker if it
// was targeting this connection so it is not accidentally consumed by a future
// connection that happens to have the same ID.
forceInvalidateConnectionId.compareAndSet(c.getId(), null);
withConnectionId(logger, c.getId()).log("The connection is closed, requesting a new connection.");
return true;
} else {
// Emit cached connection.
return false;
}
final String targetId = forceInvalidateConnectionId.get();
Comment thread
EldertGrootenboer marked this conversation as resolved.
if (targetId != null
&& targetId.equals(c.getId())
&& forceInvalidateConnectionId.compareAndSet(targetId, null)) {
// invalidateConnection() marked this connection dirty.
// Close it here — the cache owns the connection lifecycle.
withConnectionId(logger, c.getId()).log("Force-invalidating and closing connection for recovery.");
closeConnection(c, logger, "Force-invalidated for recovery.");
return true;
}
// No forced invalidation targeted this connection — emit it from cache.
return false;
});
}

Expand Down Expand Up @@ -172,6 +189,32 @@ public boolean isCurrentConnectionClosed() {
return (currentConnection != null && currentConnection.isDisposed()) || terminated;
}

/**
* Marks the current cached connection for invalidation so that the next {@link #get()} call
* closes it and creates a fresh connection. This is used for connection-level recovery when
* the current connection is in a stale state that the cache's normal error detection (via
* endpoint state signals) has not detected — for example, when intermediate infrastructure
* (load balancers, NAT gateways) is echoing AMQP heartbeats on behalf of a dead connection.
*
* <p>The actual close is deferred to the cache chain's {@code cacheInvalidateIf} predicate
* inside {@link #get()}, keeping the connection lifecycle local to the cache. This avoids
* the concurrency concern of two threads touching the cached connection simultaneously.</p>
*
* <p>This is modeled after the Go SDK's {@code Namespace.Recover()} which explicitly closes
* the old connection and increments the connection revision.</p>
*
* <p>This method is safe to call concurrently. If the connection is already closed or being
* closed, this is a no-op.</p>
*/
public void invalidateConnection() {
final T connection = currentConnection;
if (connection != null && !connection.isDisposed()) {
Comment thread
EldertGrootenboer marked this conversation as resolved.
withConnectionId(logger, connection.getId())
.log("Marking connection for force-invalidation. Next get() will close and replace it.");
Comment thread
EldertGrootenboer marked this conversation as resolved.
forceInvalidateConnectionId.set(connection.getId());
}
Comment thread
EldertGrootenboer marked this conversation as resolved.
Comment thread
EldertGrootenboer marked this conversation as resolved.
Comment thread
EldertGrootenboer marked this conversation as resolved.
}

/**
* Terminate so that consumers will no longer be able to request connection. If there is a current (cached)
* connection then it will be closed.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.implementation;

import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;

import java.util.Locale;
import java.util.concurrent.TimeoutException;

/**
* Classifies errors into recovery tiers, determining what resources should be closed
* between retry attempts. This follows the tiered recovery pattern used by the Go, .NET,
* Python, and JS Azure SDKs.
*
* <ul>
* <li>{@link #NONE} — Retry on the same link (server-busy, timeouts).</li>
* <li>{@link #LINK} — Close the send/receive link; next retry creates a fresh link on the same connection.</li>
* <li>{@link #CONNECTION} — Close the entire connection; next retry creates a fresh connection and link.</li>
* <li>{@link #FATAL} — Do not retry (unauthorized, not-found, message too large).</li>
* </ul>
*/
public enum RecoveryKind {
/**
* No recovery needed — retry on the same link and connection.
* Applies to: server-busy, timeouts, resource-limit-exceeded.
*/
NONE,

/**
* Close the link (and its session) before retrying. The next retry creates a fresh link
* on the same connection.
* Applies to: link:detach-forced, link:stolen, transient AMQP errors on the link.
*/
LINK,

/**
* Close the entire connection before retrying. The next retry creates a fresh connection,
* session, and link.
* Applies to: connection:forced, connection:framing-error, proton:io, internal-error.
*/
CONNECTION,

/**
* Do not retry — the error is permanent.
* Applies to: unauthorized-access, not-found, message-size-exceeded.
*/
FATAL;

/**
* Classifies the given error into a {@link RecoveryKind} that determines what resources
* should be invalidated between retry attempts.
*
* @param error The error to classify.
* @return The recovery kind for the given error.
*/
public static RecoveryKind classify(Throwable error) {
if (error == null) {
return NONE;
}

// Timeouts — retry on same link, the link may still be healthy.
if (error instanceof TimeoutException) {
return NONE;
}

if (error instanceof AmqpException) {
final AmqpException amqpError = (AmqpException) error;
final AmqpErrorCondition condition = amqpError.getErrorCondition();

if (condition != null) {
switch (condition) {
// Connection-level errors — close the entire connection.
case CONNECTION_FORCED:
case CONNECTION_FRAMING_ERROR:
case CONNECTION_REDIRECT:
case PROTON_IO:
case INTERNAL_ERROR:
return CONNECTION;

// Link-level errors — close the link, keep the connection.
case LINK_DETACH_FORCED:
case LINK_STOLEN:
case LINK_REDIRECT:
case PARTITION_NOT_OWNED_ERROR:
case TRANSFER_LIMIT_EXCEEDED:
// operation-cancelled can signal "AMQP layer unexpectedly aborted or disconnected"
// (e.g. ReceiverUnsettledDeliveries remote Released outcome), requiring link recovery.
case OPERATION_CANCELLED:
return LINK;

// Fatal errors — do not retry.
case NOT_FOUND:
case UNAUTHORIZED_ACCESS:
case LINK_PAYLOAD_SIZE_EXCEEDED:
case NOT_ALLOWED:
Comment thread
EldertGrootenboer marked this conversation as resolved.
case NOT_IMPLEMENTED:
case ENTITY_DISABLED_ERROR:
case ENTITY_ALREADY_EXISTS:
case PUBLISHER_REVOKED_ERROR:
case ARGUMENT_ERROR:
case ARGUMENT_OUT_OF_RANGE_ERROR:
case ILLEGAL_STATE:
case MESSAGE_LOCK_LOST:
case STORE_LOCK_LOST_ERROR:
return FATAL;

// Server-busy, timeouts, and resource-limit errors — retry on same link.
// RESOURCE_LIMIT_EXCEEDED is treated as transient here because ReactorSender
// groups it alongside SERVER_BUSY and TIMEOUT in its send-error retry logic.
case SERVER_BUSY_ERROR:
case TIMEOUT_ERROR:
case RESOURCE_LIMIT_EXCEEDED:
return NONE;
Comment thread
EldertGrootenboer marked this conversation as resolved.

// Session/lock errors — link-level recovery.
// Session lock loss means the session link is invalid and
// a fresh link must be acquired for a new session.
case SESSION_LOCK_LOST:
case SESSION_CANNOT_BE_LOCKED:
case SESSION_NOT_FOUND:
case MESSAGE_NOT_FOUND:
return LINK;

Comment thread
EldertGrootenboer marked this conversation as resolved.
default:
break;
}
}

// Transient AMQP errors without a specific condition — link recovery.
if (amqpError.isTransient()) {
return LINK;
}

// Non-transient AMQP errors without a recognized condition — fatal.
return FATAL;
}

// RequestResponseChannelClosedException — link-level (parent connection disposing).
if (error instanceof RequestResponseChannelClosedException) {
return LINK;
}

// IllegalStateException thrown by a disposed ReactorSender (e.g., "Cannot publish
// message when disposed." or "Cannot publish data batch when disposed."). This is
// a link-staleness signal: the link was closed (possibly by a concurrent recovery
// path) before the in-flight send could complete. LINK recovery creates a fresh
// link on the next retry.
// Match both "Cannot publish" and "disposed" to avoid misclassifying unrelated
// disposal signals (e.g., "Connection is disposed. Cannot get management instance.").
if (error instanceof IllegalStateException) {
final String msg = error.getMessage();
if (msg != null) {
final String normalizedMsg = msg.toLowerCase(Locale.ROOT);
if (normalizedMsg.contains("cannot publish") && normalizedMsg.contains("disposed")) {
return LINK;
}
}
Comment thread
EldertGrootenboer marked this conversation as resolved.
}
Comment thread
EldertGrootenboer marked this conversation as resolved.

// Unknown non-AMQP errors — treat as fatal (don't retry application or SDK bugs).
// The Go SDK defaults to CONNECTION for unknown errors, but those are AMQP-layer
// errors (io.EOF, net.Error). Java's non-AMQP exceptions (e.g., AzureException,
// RuntimeException) should fail fast rather than trigger connection recovery.
return FATAL;
Comment thread
EldertGrootenboer marked this conversation as resolved.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,5 @@ static Retry createRetry(AmqpRetryOptions options) {
.filter(error -> error instanceof TimeoutException
|| (error instanceof AmqpException && ((AmqpException) error).isTransient()));
}

}
Comment thread
EldertGrootenboer marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,129 @@ public void shouldRefreshCacheOnErrorInCachedConnection() {
}
}

@Test
public void invalidateConnectionReplacesCachedConnectionOnNextGet() {
final ConnectionSupplier connectionSupplier = new ConnectionSupplier();
final ReactorConnectionCache<ReactorConnection> connectionCache
= new ReactorConnectionCache<>(connectionSupplier, FQDN, ENTITY_PATH, retryPolicy, new HashMap<>());
try {
final ReactorConnection[] c = new ReactorConnection[1];
final Mono<ReactorConnection> connectionMono = connectionCache.get();
// Populate the cache with the first connection.
StepVerifier.create(connectionMono, 0)
.thenRequest(1)
.then(() -> connectionSupplier.emitEndpointState(EndpointState.ACTIVE))
.expectNextMatches(con -> {
Assertions.assertFalse(con.isDisposed());
c[0] = con;
return true;
})
.expectComplete()
.verify(VERIFY_TIMEOUT);
connectionSupplier.assertInvocationCount(1);

// Mark the current connection for force-invalidation.
connectionCache.invalidateConnection();

// Next subscription must close the marked connection and serve a brand-new one.
StepVerifier.create(connectionMono, 0)
.thenRequest(1)
.then(() -> connectionSupplier.emitEndpointState(EndpointState.ACTIVE))
.expectNextMatches(con -> {
Assertions.assertFalse(con.isDisposed());
Assertions.assertNotEquals(c[0], con);
return true;
})
.expectComplete()
.verify(VERIFY_TIMEOUT);

// The previously cached connection should be closed by the cacheInvalidateIf branch.
Assertions.assertTrue(c[0].isDisposed());
// A second connection had to be supplied because the first was force-invalidated.
connectionSupplier.assertInvocationCount(2);
} finally {
connectionSupplier.dispose();
connectionCache.dispose();
}
}

@Test
public void invalidateConnectionDoesNotInvalidateNewlyCreatedConnection() {
// Asserts the AB-A safety property: invalidate() targets a specific connection-id, so a
// fresh connection created after the invalidation marker was consumed must NOT be evicted
// by a stale marker.
final ConnectionSupplier connectionSupplier = new ConnectionSupplier();
final ReactorConnectionCache<ReactorConnection> connectionCache
= new ReactorConnectionCache<>(connectionSupplier, FQDN, ENTITY_PATH, retryPolicy, new HashMap<>());
try {
final ReactorConnection[] c = new ReactorConnection[2];
final Mono<ReactorConnection> connectionMono = connectionCache.get();
StepVerifier.create(connectionMono, 0)
.thenRequest(1)
.then(() -> connectionSupplier.emitEndpointState(EndpointState.ACTIVE))
.expectNextMatches(con -> {
c[0] = con;
return true;
})
.expectComplete()
.verify(VERIFY_TIMEOUT);

connectionCache.invalidateConnection();

// Consume the invalidation marker and obtain the replacement connection.
StepVerifier.create(connectionMono, 0)
.thenRequest(1)
.then(() -> connectionSupplier.emitEndpointState(EndpointState.ACTIVE))
.expectNextMatches(con -> {
c[1] = con;
Assertions.assertNotEquals(c[0], c[1]);
return true;
})
.expectComplete()
.verify(VERIFY_TIMEOUT);

// The marker has been consumed; another get() must serve the same (cached) replacement
// connection without creating a third one.
StepVerifier.create(connectionMono, 0).thenRequest(1).expectNextMatches(con -> {
Assertions.assertEquals(c[1], con);
Assertions.assertFalse(con.isDisposed());
return true;
}).expectComplete().verify(VERIFY_TIMEOUT);

connectionSupplier.assertInvocationCount(2);
} finally {
connectionSupplier.dispose();
connectionCache.dispose();
}
}

@Test
public void invalidateConnectionWithoutCachedConnectionIsNoOp() {
final ConnectionSupplier connectionSupplier = new ConnectionSupplier();
final ReactorConnectionCache<ReactorConnection> connectionCache
= new ReactorConnectionCache<>(connectionSupplier, FQDN, ENTITY_PATH, retryPolicy, new HashMap<>());
try {
// No connection has been supplied yet.
connectionCache.invalidateConnection();
connectionSupplier.assertInvocationCount(0);

// First get() after the no-op invalidate() must still produce a connection normally.
StepVerifier.create(connectionCache.get(), 0)
.thenRequest(1)
.then(() -> connectionSupplier.emitEndpointState(EndpointState.ACTIVE))
.expectNextMatches(con -> {
Assertions.assertFalse(con.isDisposed());
return true;
})
.expectComplete()
.verify(VERIFY_TIMEOUT);
connectionSupplier.assertInvocationCount(1);
} finally {
connectionSupplier.dispose();
connectionCache.dispose();
}
}

@Test
public void shouldBubbleUpNonRetriableError() {
final ConnectionSupplier connectionSupplier = new ConnectionSupplier();
Expand Down
Loading
Loading