Skip to content

Add graceful shutdown drain to ServiceBusProcessorClient#48192

Open
EldertGrootenboer wants to merge 41 commits intoAzure:mainfrom
EldertGrootenboer:fix/servicebus-processor-graceful-shutdown-45716
Open

Add graceful shutdown drain to ServiceBusProcessorClient#48192
EldertGrootenboer wants to merge 41 commits intoAzure:mainfrom
EldertGrootenboer:fix/servicebus-processor-graceful-shutdown-45716

Conversation

@EldertGrootenboer
Copy link
Copy Markdown
Contributor

@EldertGrootenboer EldertGrootenboer commented Mar 2, 2026

Fixes #45716

When ServiceBusProcessorClient.close() is called while message handlers are still executing, the receiver is disposed immediately, causing in-flight handlers to fail with IllegalStateException: Cannot perform operation on a disposed receiver.

What this PR does

Adds drain-before-dispose logic to all processor shutdown paths. An AtomicInteger handler counter with Object monitor wait/notify blocks close() until all in-flight message handlers complete (or a 30-second timeout expires) before subscription cancellation/disposal:

  • MessagePump (V2 non-session) — drainHandlers() added
  • ServiceBusProcessor.RollingMessagePump (V2 lifecycle) — calls pump.drainHandlers() before disposable.dispose()
  • SessionsMessagePump.RollingSessionReceiver (V2 session) — per-session drain in terminate() before workerScheduler.dispose()
  • ServiceBusProcessorClient (V1) — drainV1Handlers() before receiverSubscriptions.cancel()

This mirrors the .NET SDK's StopProcessingAsync behavior which awaits Task.WhenAll on in-flight handlers before disposing.

V1 lifecycle hardening (added during review)

ServiceBusProcessorClient.close() originally held the instance monitor across the entire drain wait, which let any in-flight handler calling a synchronized accessor on the same client (isRunning(), getIdentifier()) stall shutdown for the full drain timeout. Releasing the monitor across the drain opened additional races with start()/stop()/restartMessageReceiver() and concurrent close() calls. The PR now:

  • Releases the instance monitor across the V1 drain wait.
  • Adds a v1CloseInProgress AtomicBoolean (claimed via compareAndSet) so start(), stop(), restartMessageReceiver() return early during shutdown and only one concurrent close() performs cleanup.
  • Caches the V1 identifier so getIdentifier() returns a stable value during/after close() without lazy-creating a fresh receiver. getIdentifier() may now return null on the V1 path when close() is in progress on a brand-new processor that never started (Javadoc updated).

Scope

This PR is intentionally scoped to the azure-messaging-servicebus module. Earlier revisions also added drainTimeout property scaffolding to four Spring modules (spring-cloud-azure-autoconfigure, spring-cloud-azure-service, spring-messaging-azure-servicebus, plus its test counterpart) and a corresponding sdk/spring/CHANGELOG.md entry. Those were reverted because the Spring modules still depend on azure-messaging-servicebus:7.17.17 (no drainTimeout(Duration) builder API) and ServiceBusSessionProcessorClientBuilderFactory doesn't wire properties.getDrainTimeout() to the underlying client builder, so the property would have been a no-op end-to-end. The Spring drainTimeout wiring should ship as a separate PR after the Spring servicebus dep is bumped to 7.18.0+ and the builder factory is updated.

Tests

12 tests in ServiceBusProcessorGracefulShutdownTest covering the full drain + lifecycle surface:

  • Drain happy pathsv2CloseShouldWaitForInFlightHandlerBeforeClosingClient, v1CloseShouldWaitForInFlightHandlerBeforeClosingClient
  • Drain timeout / re-entrancyv2DrainShouldRespectTimeout, v2DrainFromWithinHandlerShouldNotDeadlock, v1ReentrantCloseWaitsForOtherConcurrentHandlers
  • Closing-flag dispatch gatev2ClosingFlagPreventsNewHandlersAfterDrainStarts, v1ClosingFlagPreventsNewHandlersAfterDrainStarts, v1StartAfterCloseResetsClosingFlag
  • V1 lifecycle hardeningv1CloseShouldNotHoldClientMonitorDuringDrain, v1ConcurrentStartDuringCloseDrainIsIgnored, v1GetIdentifierDuringAndAfterCloseDoesNotCreateNewReceiver, v1ConcurrentCloseCallsDoNotRace

All lifecycle coordination in tests uses deterministic waitFor(...) polling (predicate-based) instead of fixed Thread.sleep waits so the suite is robust on slow/contended CI.

Full module test suite: 956 tests pass, 0 failures, 0 errors.

When ServiceBusProcessorClient.close() is called while message handlers
are still executing, the receiver was disposed immediately, causing
in-flight handlers to fail with IllegalStateException.

Add drain-before-dispose logic using an AtomicInteger handler counter
and Object monitor wait/notify to all processor shutdown paths:
- MessagePump (V2 non-session)
- ServiceBusProcessor.RollingMessagePump (V2 non-session lifecycle)
- SessionsMessagePump.RollingSessionReceiver (V2 session)
- ServiceBusProcessorClient V1 close path

The drain executes before subscription cancellation/disposal, with a
configurable timeout (default 30s) to prevent indefinite blocking.

Includes 3 regression tests in ServiceBusProcessorGracefulShutdownTest.
Copilot AI review requested due to automatic review settings March 2, 2026 23:04
@EldertGrootenboer EldertGrootenboer added bug This issue requires a change to an existing behavior in the product in order to be resolved. Service Bus Client This issue points to a problem in the data-plane of the library. labels Mar 2, 2026
@EldertGrootenboer EldertGrootenboer added the customer-reported Issues that are reported by GitHub users external to the Azure organization. label Mar 2, 2026
@EldertGrootenboer EldertGrootenboer requested a review from a team as a code owner March 2, 2026 23:04
@EldertGrootenboer EldertGrootenboer added bug This issue requires a change to an existing behavior in the product in order to be resolved. Service Bus Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. labels Mar 2, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds “drain before dispose” behavior to Service Bus processor shutdown paths to avoid failing in-flight message handlers (and their settlement calls) with IllegalStateException when the underlying receiver is disposed during close().

Changes:

  • Track in-flight handler execution and block shutdown briefly to allow handlers to complete (with a 30s timeout).
  • Apply draining to V2 non-session (MessagePump/RollingMessagePump), V2 session (RollingSessionReceiver.terminate()), and V1 (ServiceBusProcessorClient.close()).
  • Add regression tests covering V2 non-session and V1 shutdown draining plus a drain-timeout test.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java Adds handler counting + drainHandlers(Duration) used to block shutdown until in-flight handlers finish or timeout.
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessor.java Wires draining into RollingMessagePump.dispose() before disposing the subscription.
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java Adds per-session handler counting + drain during termination before disposing the worker scheduler.
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java Adds V1 handler counting + drainV1Handlers(Duration) invoked during close() before subscription cancellation.
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java Adds new unit tests validating drain behavior for V2 non-session, V1, and drain timeout.
Comments suppressed due to low confidence (1)

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java:458

  • SessionsMessagePump.RollingSessionReceiver now includes new drain-before-dispose behavior, but the added logic isn’t covered by the new regression tests. There are already isolated unit tests for SessionsMessagePump behavior; it should be possible to extend them to verify that termination waits for an in-flight handler (or respects the timeout) for the session path as well.

Please add a unit test that exercises session termination while a handler is blocked, to prevent regressions in this new shutdown behavior.

            // Drain in-flight message handlers BEFORE disposing the worker scheduler.
            // Disposing the scheduler interrupts handler threads (via ScheduledExecutorService.shutdownNow()).
            // Draining first ensures handlers can complete message settlement before threads are interrupted.
            // See https://github.com/Azure/azure-sdk-for-java/issues/45716
            drainHandlers(DRAIN_TIMEOUT);
            workerScheduler.dispose();

- Add ThreadLocal<Boolean> flag to detect when drainHandlers is called
  from within a message handler (e.g., user calls close() inside
  processMessage callback)
- Guard all three drain paths: MessagePump.drainHandlers(),
  ServiceBusProcessorClient.drainV1Handlers(), and
  SessionsMessagePump.RollingSessionReceiver.drainHandlers()
- When re-entrant call detected, skip drain with warning log and return
  immediately to avoid self-deadlock
- Add v2DrainFromWithinHandlerShouldNotDeadlock test verifying the guard
  prevents deadlock when drain is called from handler thread
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.

Comments suppressed due to low confidence (3)

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:161

  • This test uses a fixed Thread.sleep(200) to “give dispose a moment to start”. Fixed sleeps are prone to flakiness on slow/loaded CI agents (either too short or unnecessarily long). Prefer a synchronization point that directly observes the expected state (e.g., assertFalse(disposeDone.await(...)), a latch signaled right before/after entering drain, or Mockito’s timed verification APIs) so the test doesn’t depend on timing heuristics.
        // Give dispose a moment to start; it should be blocked in drainHandlers().
        Thread.sleep(200);

        // Verify: client has NOT been closed yet (handler is still running, drain is blocking dispose).
        verify(client, never()).close();
        assertFalse(handlerCompleted.get(), "Handler should still be in-flight");

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:256

  • This test uses a fixed Thread.sleep(200) to “give close a moment to start”. As written, it can be flaky under variable scheduling/CPU contention. Use an explicit synchronization condition (e.g., assertFalse(closeDone.await(...)) or a latch that confirms the close thread is blocked in the drain) instead of a fixed sleep.
        // Give close a moment to start; it should be blocked in drainV1Handlers().
        Thread.sleep(200);

        // Verify: client has NOT been closed yet (handler is still running, drain is blocking close).
        verify(asyncClient, never()).close();
        assertFalse(handlerCompleted.get(), "Handler should still be in-flight");

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:54

  • The class-level "Coverage Matrix" JavaDoc is now out of sync with the actual tests in this class: it lists only the V2/V1/timeout scenarios, but the class also includes a re-entrant drain regression test (v2DrainFromWithinHandlerShouldNotDeadlock). Please update the matrix to reflect all covered scenarios so readers don’t miss this important case.
 * <h3>Coverage Matrix</h3>
 * <ul>
 *   <li><b>V2 Non-Session</b> — {@link #v2CloseShouldWaitForInFlightHandlerBeforeClosingClient()}:
 *       Tests drain in {@code RollingMessagePump.dispose()} → {@code MessagePump.drainHandlers()}</li>
 *   <li><b>V1 Non-Session</b> — {@link #v1CloseShouldWaitForInFlightHandlerBeforeClosingClient()}:
 *       Tests drain in {@code ServiceBusProcessorClient.close()} → {@code drainV1Handlers()}</li>
 *   <li><b>Drain Timeout</b> — {@link #v2DrainShouldRespectTimeout()}:
 *       Tests {@code MessagePump.drainHandlers()} timeout behavior directly</li>

After drainHandlers() returns but before the Flux subscription is disposed, flatMap can dispatch a new handler that attempts settlement on a closing client. Add a volatile boolean closing flag to MessagePump and SessionsMessagePump.RollingSessionReceiver, set at the start of drainHandlers() and checked at the top of handleMessage(). Handlers that see the flag skip processing and return immediately. V1 path is unaffected (isRunning already gates subscription.request). New test: v2ClosingFlagPreventsNewHandlersAfterDrainStarts.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated no new comments.

Comments suppressed due to low confidence (3)

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java:176

  • isHandlerThread is an instance ThreadLocal. Setting it back to FALSE leaves an entry in the thread’s ThreadLocalMap for each MessagePump instance that ever ran on that pooled thread. Since pumps can be recreated (rolling/retry) and worker threads are long-lived, this can accumulate stale entries and increase memory usage over time. Prefer calling isHandlerThread.remove() in the finally block instead of set(Boolean.FALSE) to ensure the entry is cleared.
        } finally {
            isHandlerThread.set(Boolean.FALSE);
            if (activeHandlerCount.decrementAndGet() == 0) {
                synchronized (drainLock) {
                    drainLock.notifyAll();

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java:503

  • isV1HandlerThread is an instance ThreadLocal. Resetting it with set(Boolean.FALSE) keeps a per-processor entry in the thread’s ThreadLocalMap, which can accumulate on pooled threads over the lifetime of the application. Prefer isV1HandlerThread.remove() in the finally block to fully clear the entry after each callback.
                    } finally {
                        isV1HandlerThread.set(Boolean.FALSE);
                        if (activeV1HandlerCount.decrementAndGet() == 0) {
                            synchronized (v1DrainLock) {
                                v1DrainLock.notifyAll();

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java:571

  • isHandlerThread is an instance ThreadLocal. Setting it to FALSE leaves an entry behind in the thread’s ThreadLocalMap, and since session pumps/receivers can be recreated while using long-lived pooled threads, this can accumulate stale entries over time. Prefer isHandlerThread.remove() in the finally block to clear the ThreadLocal after each handler execution.
            } finally {
                isHandlerThread.set(Boolean.FALSE);
                if (activeHandlerCount.decrementAndGet() == 0) {
                    synchronized (drainLock) {
                        drainLock.notifyAll();
                    }

On pooled threads (Reactor boundedElastic), set(FALSE) leaves a stale entry in the ThreadLocalMap after the pump is GC'd. remove() clears the entry immediately, following Java best practice for ThreadLocal cleanup on long-lived worker threads.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (4)

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:157

  • These Thread.sleep(...) calls make the tests timing-dependent and can be flaky under load/slow CI. Prefer synchronizing on explicit signals (additional latches) or Mockito’s time-based verification (verify(mock, after(ms).never())..., timeout(ms) with zero invocations) so the test waits deterministically without assuming scheduler timing.
        Thread.sleep(200);

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:251

  • These Thread.sleep(...) calls make the tests timing-dependent and can be flaky under load/slow CI. Prefer synchronizing on explicit signals (additional latches) or Mockito’s time-based verification (verify(mock, after(ms).never())..., timeout(ms) with zero invocations) so the test waits deterministically without assuming scheduler timing.
        Thread.sleep(200);

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:481

  • These Thread.sleep(...) calls make the tests timing-dependent and can be flaky under load/slow CI. Prefer synchronizing on explicit signals (additional latches) or Mockito’s time-based verification (verify(mock, after(ms).never())..., timeout(ms) with zero invocations) so the test waits deterministically without assuming scheduler timing.
        Thread.sleep(500);

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java:483

  • The drain implementation (counter + lock + re-entrancy guard + timeout loop) is duplicated across MessagePump, SessionsMessagePump, and V1 (ServiceBusProcessorClient). To reduce divergence risk and make future fixes (e.g., re-entrancy behavior) consistent, consider extracting a small shared helper (package-private) that encapsulates the counter/lock/wait-notify pattern and exposes a clear “drain result”.
        private void drainHandlers(Duration timeout) {
            closing = true;
            if (isHandlerThread.get()) {
                // Re-entrant call from within a session message handler (e.g., user called close() inside processMessage).
                // Waiting here would self-deadlock because this thread's handler incremented the counter and
                // cannot decrement it until it returns. Skip the drain — remaining handlers (if any) will
                // complete naturally after this handler returns.
                logger.atWarning()
                    .log("drainHandlers called from within a session message handler (re-entrant). "
                        + "Skipping drain to avoid self-deadlock.");
                return;
            }

When a handler calls close() re-entrantly, the drain now waits for OTHER
concurrent handlers to complete (threshold=1) before cancelling subscriptions
and closing the client. Previously the drain returned immediately, which
could interrupt concurrent handlers mid-settlement.

Applied consistently across V1 (ServiceBusProcessorClient), V2 (MessagePump),
and sessions (SessionsMessagePump). Notification threshold updated from == 0
to <= 1 so the re-entrant waiter gets notified.
…ceiver leak during/after close

After close() releases the instance monitor across the drain wait, a concurrent getIdentifier() call could see asyncClient=null (just nulled by close()'s cleanup or never set) and fall through to the lazy createNewReceiver() path, leaving a fresh receiver behind that close() is no longer responsible for disposing.

getIdentifier() now caches the identifier in a volatile field whenever it observes a live asyncClient, and close() captures it once more before nulling the client. When asyncClient is null but cachedV1Identifier is set, getIdentifier() returns the cached value without lazy-creating a receiver. The original lazy-init behavior is preserved only for the first-ever call before any start().

Adds regression test v1GetIdentifierDuringAndAfterCloseDoesNotCreateNewReceiver verifying the receiver builder is invoked exactly once across start() + getIdentifier() during drain + getIdentifier() after close.

Addresses Copilot feedback on PR Azure#48192.
@EldertGrootenboer EldertGrootenboer requested a review from Copilot May 7, 2026 00:18
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 2 comments.

… null instead of empty string

Two fixes addressing Copilot review feedback on PR Azure#48192:

1. Concurrent close() ownership: previous version used v1CloseInProgress.set(true)/finally clear, which let two concurrent close() calls both proceed through drain + cleanup. The first to finish would clear the flag and let a concurrent start() create new resources, which the still-running second close() could then dispose. close() now uses compareAndSet(false, true) so only the first call wins ownership; subsequent concurrent close() calls return immediately. The processor still gets fully closed - the owner finishes the work.

2. getIdentifier() consistency: previously returned an empty string when v1CloseInProgress was true with no cached identifier - a sentinel value callers don't expect. Now returns null, matching the V2 path's behavior when no identifier is available.

Adds regression test v1ConcurrentCloseCallsDoNotRace verifying two concurrent close() calls: the second returns within 1 second (instead of waiting 30s drain timeout) and the asyncClient is closed exactly once.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 1 comment.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 2 comments.

Update getIdentifier() Javadoc to explicitly allow null and describe the two cases when null can be returned: V2 path before first start(), and V1 path when close() is in progress on a brand-new processor that never started.

Addresses Copilot feedback on PR Azure#48192.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 1 comment.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 2 comments.

…ning() polling

Four lifecycle tests added in this session used Thread.sleep(200) to wait for close() to enter the drain window before triggering the next concurrent action. On slow/contended CI the close thread may not reach that point within 200ms, leading to flaky failures: a concurrent close()/start()/getIdentifier() could win the race and run before v1CloseInProgress was set.

Replace each Thread.sleep(200) with waitFor(() -> !processorClient.isRunning(), ...) - close() sets isRunning=false inside its first synchronized block, so once the predicate returns true we know close has already taken ownership. The helper polls every 5ms with a 5s deadline so it bounds total wait time without depending on wall-clock timing.

Tests fixed:

- v1CloseShouldNotHoldClientMonitorDuringDrain

- v1ConcurrentStartDuringCloseDrainIsIgnored

- v1GetIdentifierDuringAndAfterCloseDoesNotCreateNewReceiver

- v1ConcurrentCloseCallsDoNotRace

Addresses Copilot feedback on PR Azure#48192.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 1 comment.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 1 comment.

…stic waitFor polling

Audit-and-fix sweep across the rest of ServiceBusProcessorGracefulShutdownTest after the previous round addressed the four lifecycle tests added this session. Same flakiness applies wherever a fixed sleep is used to assume an asynchronous lifecycle transition has happened.

Tests fixed:

- v2CloseShouldWaitForInFlightHandlerBeforeClosingClient: poll for disposeThread to be in WAITING/TIMED_WAITING state instead of fixed Thread.sleep(200).

- v1CloseShouldWaitForInFlightHandlerBeforeClosingClient: poll for !processorClient.isRunning() instead of fixed Thread.sleep(200).

- v1ReentrantCloseWaitsForOtherConcurrentHandlers: poll for !processorClient.isRunning() instead of fixed Thread.sleep(300).

- v1ClosingFlagPreventsNewHandlersAfterDrainStarts: poll for !processorClient.isRunning() instead of fixed Thread.sleep(200).

- v2ClosingFlagPreventsNewHandlersAfterDrainStarts: poll for drainThread state to be WAITING/TIMED_WAITING (deterministic prefix), then a small 300ms buffer for the timed reactive emission to flow through (the source flux's delayElements(200ms) can't be made fully deterministic without exposing internal MessagePump state for tests).

All 12 ServiceBusProcessorGracefulShutdownTest tests pass.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated no new comments.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 1 comment.

Comment thread sdk/spring/CHANGELOG.md Outdated
The Spring property additions for drainTimeout were never wired end-to-end:

1. All Spring modules depend on com.azure:azure-messaging-servicebus:7.17.17, which does not include the new drainTimeout(Duration) builder API (added here in 7.18.0-beta.2).

2. ServiceBusSessionProcessorClientBuilderFactory wires properties.getSessionIdleTimeout() to builder::sessionIdleTimeout, but no equivalent wiring was added for drainTimeout. So even with the dependency bump, setting the property would have no effect.

Setting the property in Spring configuration would have appeared to work but had no runtime effect, misleading users. Revert all four property POJOs and the corresponding sdk/spring/CHANGELOG.md entries; the Spring drainTimeout plumbing should ship as a separate PR after the Spring servicebus dependency is bumped to 7.18.0+ and the builder factory is updated.

Files reverted:

- sdk/spring/spring-cloud-azure-autoconfigure/.../AzureServiceBusProperties.java

- sdk/spring/spring-cloud-azure-service/.../ServiceBusProcessorClientProperties.java

- sdk/spring/spring-cloud-azure-service/.../ServiceBusProcessorClientTestProperties.java

- sdk/spring/spring-messaging-azure-servicebus/.../ProcessorProperties.java

- sdk/spring/CHANGELOG.md (drainTimeout entries)

Addresses Copilot feedback on PR Azure#48192.
@EldertGrootenboer
Copy link
Copy Markdown
Contributor Author

@rujche Heads up — I reverted the Spring property additions (and the corresponding sdk/spring/CHANGELOG.md entries) in 481f1bf after Copilot pointed out that they would not have had any runtime effect:

  1. The Spring modules pin com.azure:azure-messaging-servicebus:7.17.17, which does not include the new drainTimeout(Duration) builder API (added in 7.18.0-beta.2 here).
  2. ServiceBusSessionProcessorClientBuilderFactory does not wire properties.getDrainTimeout() to the underlying builder::drainTimeout, so even with the dep bump the property would be a no-op.

The Spring drainTimeout plumbing should ship as a separate PR after the Spring servicebus dep is bumped and the builder factory is updated. With that revert, this PR no longer touches sdk/spring/, so an sdk/spring/CHANGELOG.md entry is no longer warranted here. Could you please dismiss the changes-requested review when you get a chance? Thanks!

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.

…lag test via Sinks.Many

Two fixes addressing Copilot review feedback on PR Azure#48192:

1. V2 lifecycle race - close() releases the outer ServiceBusProcessorClient monitor before delegating to processorV2.close() (whose internal drain blocks). Without an explicit guard, a concurrent start()/stop() could acquire the outer monitor during the V2 drain window and call processorV2.start(), leaving the inner processor running after the outer close() returned. Add a v2CloseInProgress AtomicBoolean (claimed via compareAndSet) symmetric to the V1 guard. start(), stop(), and concurrent close() respect it.

2. v2ClosingFlagPreventsNewHandlersAfterDrainStarts test - replace delayElements(200ms)+Thread.sleep(300) with Sinks.Many for fully controlled emission timing. Test now: emits message1 explicitly via sink, waits for handler1Started, starts drainThread, waits deterministically for drainThread to be in WAITING/TIMED_WAITING (closing=true set), THEN emits message2 via sink. Uses Mockito.after(500).never().complete(message2) to assert message2 was never completed - this fails fast if regression delivers message2 to the consumer.

Adds regression test v2ConcurrentStartDuringCloseDrainIsIgnored mirroring the V1 test, exercising the V2 path through ServiceBusProcessorClient with options.setV2(true) and verifying that a concurrent start() during the V2 drain window does not invoke the receiver builder a second time.

All 13 ServiceBusProcessorGracefulShutdownTest tests pass; full module: 957 tests, 0 failures.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.

…eterministic test signal

Two fixes addressing Copilot review feedback on PR Azure#48192:

1. Connection monitor NPE - the periodic task scheduled in start() dereferenced this.asyncClient.get().isConnectionClosed() without a null check. Even though close() disposes monitorDisposable before nulling asyncClient, a tick already in flight could race past the dispose and observe asyncClient == null. Snapshot asyncClient at the start of the lambda and return early if null.

2. v2ClosingFlagPreventsNewHandlersAfterDrainStarts test - the verify(client, after(500).never()).complete(message2) assertion was misleading because the MessagePump in this test is constructed with enableAutoDisposition=false, so client.complete is never called regardless of dispatch. Drop the Mockito timeout and rely on drainHandlers' own activeHandlerCount semantics: drain returns when the count drops to 0, which can only happen after BOTH handler1 and message2's handleMessage (closing-flag check still increments/decrements the counter) have completed. Drain completion is therefore a deterministic synchronization point that message2 has been processed - if the closing flag worked, handler2ProcessMessageInvoked stays false.

All 13 ServiceBusProcessorGracefulShutdownTest tests pass.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.

…methods

Update Javadoc for MessagePump.drainHandlers(), SessionsMessagePump.RollingSessionReceiver.drainHandlers(), and ServiceBusProcessorClient.drainV1Handlers() to document that on the re-entrant path (invoked from within a message handler), the calling handler is excluded from the wait condition to avoid self-deadlock - so these methods may return while the calling handler is still executing or settling its message.

MessagePump.drainHandlers' return contract is updated accordingly: 'true if all in-flight handlers (excluding the calling handler on the re-entrant path) completed within the timeout'.

Addresses Copilot feedback on PR Azure#48192.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.

…uration) builder API

Cover the new public builder setters drainTimeout(Duration) on ServiceBusProcessorClientBuilder and ServiceBusSessionProcessorClientBuilder:

- processorBuilderDrainTimeoutValidation - asserts NullPointerException for null and IllegalArgumentException for Duration.ZERO and negative durations on both builders, matching the contract documented on ServiceBusProcessorClientOptions.setDrainTimeout(Duration).

- processorBuilderDrainTimeoutPositiveAccepted - asserts a positive Duration is accepted on both builders and that buildProcessorClient() returns a non-null client (verifies propagation through the builder pipeline without throwing during build).

Aligns drainTimeout coverage with existing validation tests for sessionIdleTimeout and maxAutoLockRenewDuration. All 8 ServiceBusClientBuilderUnitTest tests pass; full module: 959 tests, 0 failures.

Addresses Copilot feedback on PR Azure#48192.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 12 changed files in this pull request and generated no new comments.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 12 changed files in this pull request and generated no new comments.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 12 changed files in this pull request and generated 1 comment.

Comment on lines 146 to 153
private void handleMessage(ServiceBusReceivedMessage message) {
instrumentation.instrumentProcess(message, ReceiverKind.PROCESSOR, msg -> {
final Disposable lockRenewDisposable;
if (enableAutoLockRenew) {
lockRenewDisposable = client.beginLockRenewal(message);
} else {
lockRenewDisposable = Disposables.disposed();
activeHandlerCount.incrementAndGet();
isHandlerThread.set(Boolean.TRUE);
try {
if (closing) {
logger.atVerbose().log("Skipping handler execution, pump is closing.");
return;
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good observation, but verified — the implementation already handles this. handleMessage increments activeHandlerCount then immediately checks closing and returns inside a try/finally that decrements the counter:

private void handleMessage(ServiceBusReceivedMessage message) {
    activeHandlerCount.incrementAndGet();
    isHandlerThread.set(Boolean.TRUE);
    try {
        if (closing) {
            logger.atVerbose().log("Skipping handler execution, pump is closing.");
            return;
        }
        // ... real work ...
    } finally {
        isHandlerThread.remove();
        if (activeHandlerCount.decrementAndGet() == 0) {
            synchronized (drainLock) {
                drainLock.notifyAll();
            }
        }
    }
}

The skip path increments and decrements within microseconds — the counter is never observed > 0 by drainHandlers's wait condition because the skip path completes faster than the wait loop's poll. The decrement triggers drainLock.notifyAll() (when the count reaches 0), so any handler that arrives during shutdown immediately wakes the drain wait if it's the last in-flight handler.

In other words: the drain only blocks for handlers whose real work is in flight (those that started before closing=true was set and are now executing the message body). Skip-path invocations complete near-instantly and never extend the drain.

The new v2ClosingFlagPreventsNewHandlersAfterDrainStarts test (fully deterministic in d6f4fa9's predecessor commit e3c06e0) explicitly relies on this: messageSink.tryEmitNext(message2) is called after drain has set closing=true, and the test still asserts drainHandlers returns true after handler1 completes — which can only happen if message2's increment+skip+decrement raced to completion ahead of the drain's exit condition. If skip-path invocations could keep the drain pinned, that test would hang for the full drain timeout.

…builder structural test

The previous Spring revert (481f1bf) removed the drainTimeout property additions on the grounds that the wiring was missing and the Spring servicebus dependency was still pinned to 7.17.17. The From-Source CI run picked that up and failed in spring-cloud-azure-service:

ServiceBusSessionProcessorClientBuilderFactoryTests.supportSdkBuilderAllProperties:151 expected: <true> but was: <false>

Builder class owned property names: [..., drainTimeout, ...]

Unsupported property names: [draintimeout]

This is a structural integrity test that walks the SDK builder via reflection (against the from-source build, so it sees the new drainTimeout(Duration) method) and asserts every builder property has a corresponding Spring property. Without the Spring property, the test fails.

Re-add the property scaffolding only (no factory wiring, no CHANGELOG entry):

- ServiceBusProcessorClientProperties: getDrainTimeout() with javadoc explaining the deferred wiring.

- AzureServiceBusProperties.Processor: drainTimeout field + getter/setter.

- ServiceBusProcessorClientTestProperties: drainTimeout field + getter/setter.

- ProcessorProperties (spring-messaging-azure-servicebus): drainTimeout field + getter/setter.

The factory wiring (ServiceBus(Session)?ProcessorClientBuilderFactory) and the actual builder.drainTimeout(...) call cannot land yet because the Spring modules still pin azure-messaging-servicebus 7.17.17 (no drainTimeout(Duration) method); adding the wiring would break compile in non-from-source CI. The wiring will land in a follow-up PR once the Spring servicebus dep is bumped.

Addresses CI failure on PR Azure#48192.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug This issue requires a change to an existing behavior in the product in order to be resolved. Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Service Bus

Projects

Status: In Progress

Development

Successfully merging this pull request may close these issues.

[QUERY] Service Bus Processor Graceful Shutdown

5 participants