Add tiered send/receive recovery to azure-core-amqp and azure-messaging-servicebus#48460
Add tiered send/receive recovery to azure-core-amqp and azure-messaging-servicebus#48460EldertGrootenboer wants to merge 30 commits intoAzure:mainfrom
Conversation
Add RecoveryKind error classification and recovery-aware retry to azure-core-amqp. Apply tiered recovery to all Service Bus sender, receiver, and session paths. On LINK errors: dispose stale link/session, retry with fresh resources. On CONNECTION errors: force-close the cached connection, retry with fresh connection. Includes quick-retry optimization and didQuickRetry deduplication. Fixes Azure#44688
Add azure-core-amqp to AdditionalModules in ci.yml and trigger paths so CI builds it from source alongside servicebus. Update pom.xml to reference 2.12.0-beta.1 with current tag (cross-module PR -- to be revisited after azure-core-amqp is released to Maven Central). Note: uses current tag temporarily; reviewer to confirm release sequencing.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 6 comments.
You can also share your feedback on Copilot code review. Take the survey.
Address 6 comments from Copilot PR review on Azure#48460: - sendFluxInternal: wrap only batchList (link acquisition) with withRetryAndRecovery, not the full sendOperation. Wrapping the outer operation caused the user-provided Flux to be re-subscribed on each retry and nested retries with sendBatchInternal. - scheduleMessageInternal: change getSendLink to getSendLinkWithRetry so schedule operations get the same tiered recovery as other send paths. - ReactorConnectionCache.forceCloseConnection: use connection.dispose() instead of closeAsync() so isDisposed() returns true synchronously. This ensures cacheInvalidateIf invalidates the cached reference immediately on the next get() call. - RetryUtil: replace Math.random() with ThreadLocalRandom.current() to eliminate shared RNG contention and improve testability. - performRecovery comment: remove contradictory 'happens organically' comment that conflicted with the explicit forceCloseConnection() call. - ServiceBusReceiverAsyncClient: add error handler to the subscribe() call inside the LINK recovery callback so failures are logged and do not silently leak.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 7 comments.
You can also share your feedback on Copilot code review. Take the survey.
- T1: ServiceBusSessionAcquirer - return delay after forceCloseConnection for RecoveryKind.CONNECTION so session acquisition retries after connection recovery - T2: ServiceBusSessionManager - merge LINK and CONNECTION into a single delay branch so CONNECTION errors retry instead of falling through to publishError - T3: ServiceBusSenderAsyncClient - narrow withRetryAndRecovery to wrap only link acquisition (getSendLink); messages.collect() moved outside retry boundary to avoid re-subscribing user Flux on retry - T4: RecoveryKind - reclassify OPERATION_CANCELLED from NONE to LINK because core-amqp raises it when AMQP layer unexpectedly aborts or disconnects, which requires link recovery (e.g. ReceiverUnsettledDeliveries remote Released outcome) - T5: RecoveryKind - reclassify RESOURCE_LIMIT_EXCEEDED from FATAL to NONE to match ReactorSender.isGeneralSendError() which treats it as retriable alongside SERVER_BUSY and TIMEOUT - T6: RetryUtilTest - add four tests for createRetryWithRecovery covering FATAL no-retry, LINK recovery callback, CONNECTION recovery callback, and retry budget exhaustion; use virtual time for backoff-delay scenarios - T7: RecoveryKindTest - rename operationCancelled test to expect LINK result, add new test asserting RESOURCE_LIMIT_EXCEEDED classifies as NONE
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.
You can also share your feedback on Copilot code review. Take the survey.
- T8: RetryUtil - clamp final jittered delay to maxDelay so retryOptions are
consistently respected. Previously jitter was applied after the pre-jitter cap
which could produce a delay exceeding retryOptions.getMaxDelay(). Also cap
baseDelay to maxDelay in FIXED mode (FIXED previously used baseDelay without
checking against maxDelay, unlike the EXPONENTIAL path).
- T9: ServiceBusReceiverAsyncClient - fix misleading log message in the LINK/
CONNECTION recovery callback. The error handler on connectionProcessor.subscribe
fires only when obtaining the connection fails (not when removeSession fails,
since it returns a boolean). Renamed to "Error obtaining connection during {}
recovery." Also log the boolean result of removeSession at VERBOSE level to
confirm whether a stale session was actually evicted.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.
You can also share your feedback on Copilot code review. Take the survey.
…overy T10: In ServiceBusSenderAsyncClient.performRecovery(), replace link.dispose() with link.closeAsync().subscribe(...). ReactorSender.dispose() calls closeAsync().block(tryTimeout), which blocks the Reactor thread when invoked from a recovery callback on a non-blocking scheduler. T11: In ReactorConnectionCache.forceCloseConnection(), replace connection.dispose() with a non-blocking equivalent: set a new forceInvalidate AtomicBoolean flag before starting connection.closeAsync().subscribe(...). The cacheInvalidateIf predicate now checks forceInvalidate in addition to isDisposed(), ensuring the cache is invalidated synchronously (by the flag) while the close handshake completes asynchronously. ReactorConnection.dispose() has the same blocking pattern. T12: Update comment in RetryUtil.createRetryWithRecovery() to remove the misleading claim that the quick-retry path matches Go's ResetAttempts(). The Java implementation uses a didQuickRetry flag only (no attempt counter reset); subsequent retries continue standard exponential backoff from the running count.
…nd CONNECTION The recovery comment said session removal was for LINK errors only, but the code runs removeSession() for both LINK and CONNECTION recovery. Update the comment to mention both tiers and clarify that CONNECTION additionally invalidates the cached connection.
What this PR does
Adds tiered send/receive recovery to
azure-core-amqpandazure-messaging-servicebus, matching the recovery pattern used by the Go, .NET, Python, and JS SDKs. When an AMQP operation fails, the error is classified into a recovery tier — NONE, LINK, CONNECTION, or FATAL — and the appropriate resources are closed before retrying.This resolves a long-standing issue where a single stale AMQP connection or link could cause a Service Bus sender or receiver to become permanently stuck, requiring a JVM restart.
Changes
azure-core-amqp (shared — benefits Event Hubs too)
RecoveryKind(new): Error classification enum withclassify(Throwable)that maps everyAmqpErrorConditionto NONE/LINK/CONNECTION/FATAL. Also classifiesIllegalStateException("Cannot publish ... disposed")as LINK so a staleReactorSenderis recovered instead of treated as fatal.ReactorConnectionCache.invalidateConnection()(new): Marks the cached connection for force-invalidation via anAtomicReference<String> forceInvalidateConnectionIdkeyed on the connection's id. The nextget()consumes the marker, closes the targeted connection, and supplies a fresh one. Targeting a specific connection-id prevents the AB-A race where a freshly created replacement connection could be evicted by a stale marker.AmqpChannelProcessor.forceCloseChannel()(new): Same role for the v1 connection cache.RetryUtil: No new public API. Recovery is composed at each call site using the existingwithRetry(...)overloads with anonErrorResume(recoverBeforeRetry...)step that classifies the error, performs tier-appropriate cleanup (close link, invalidate connection), then re-emits the error sowithRetryretries on the now-clean state.azure-messaging-servicebus
createMessageBatch,scheduleMessageInternal,sendBatchInternal,sendFluxInternal) wrap link acquisition withRetryUtil.withRetry(...) + onErrorResume(recoverBeforeRetry). A newgetSendLinkAndSizeWithRetry()helper wraps bothgetSendLink()andlink.getLinkSize()inside the same retry boundary so a transient size-negotiation failure also triggers tiered recovery.connectionCacheWrapper.invalidateConnection()on CONNECTION errors.Retry.from(...)includes aRecoveryKind.CONNECTIONbranch that callsconnectionCacheWrapper.invalidateConnection()followed byMono.delay(Duration.ZERO)so the session-acquisition retry continues on a fresh connection instead of falling through to terminal error handling.Mono.delay(Duration.ZERO), with CONNECTION additionally callinginvalidateConnection().ConnectionCacheWrapper.invalidateConnection(): Delegates to v1 (AmqpChannelProcessor.forceCloseChannel()) or v2 (ReactorConnectionCache.invalidateConnection()) depending on which cache is active.Tests
RecoveryKindTest(new, 215 lines): Covers all error condition classifications including the LINK reclassifications forOPERATION_CANCELLED, the NONE reclassification forRESOURCE_LIMIT_EXCEEDED, and the narrowedIllegalStateException("Cannot publish ... disposed")LINK match (with negative cases for unrelated disposal messages).ReactorConnectionCacheTest(3 new tests):invalidateConnection()causes the nextget()to close the cached connection and supply a new one; AB-A safety (stale marker does not invalidate a freshly created connection); no-op when no connection is cached.ServiceBusSenderAsyncClientTest: UpdatedfailedSendMessageReportsMetricsto use a FATAL error (the broadened retry filter would otherwise retry on the previous test error).Cross-SDK parity
GetRecoveryKind()HasLinkCommunicationErrorshutdown_handlerRecoveryKind.classify()FaultTolerantAmqpObjectcloseLink()closeAsync()+removeSession()Namespace.Recover()ActiveConnection.GetOrCreateAsyncrefreshConnection()invalidateConnection()(target-id keyed)onErrorResume(recoverBeforeRetry)before re-emitRelated issues
Directly fixes or would have prevented:
operation-cancellederrors