Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
0054f09
Add graceful shutdown drain to ServiceBusProcessorClient (#45716)
EldertGrootenboerMS Mar 2, 2026
26baa9a
Add re-entrancy guards to drain methods to prevent self-deadlock
EldertGrootenboerMS Mar 2, 2026
9248595
Add closing flag to prevent handler dispatch after drain starts
EldertGrootenboerMS Mar 3, 2026
aaa21c9
Use ThreadLocal.remove() instead of set(FALSE) in handler finally blocks
EldertGrootenboerMS Mar 3, 2026
d8ec586
Fix spotless formatting in graceful shutdown test
EldertGrootenboerMS Mar 3, 2026
94a93a1
Re-entrant drain waits for other concurrent handlers before closing
EldertGrootenboerMS Mar 3, 2026
18c7f4a
Add close() Javadoc for drain behavior and try/finally cleanup in tests
EldertGrootenboerMS Mar 3, 2026
db62682
Add V1 closing flag to prevent handler dispatch during drain-to-cance…
EldertGrootenboerMS Mar 3, 2026
f361b40
Reset v1Closing flag in start() to support processor restart after close
EldertGrootenboerMS Mar 3, 2026
0809f0b
Merge branch 'main' into fix/servicebus-processor-graceful-shutdown-4…
EldertGrootenboer Mar 16, 2026
d227f13
Merge branch 'main' into fix/servicebus-processor-graceful-shutdown-4…
EldertGrootenboer Mar 16, 2026
5a709c3
Merge branch 'main' into fix/servicebus-processor-graceful-shutdown-4…
EldertGrootenboer Mar 18, 2026
7add307
Merge branch 'main' into fix/servicebus-processor-graceful-shutdown-4…
EldertGrootenboer Mar 18, 2026
6cc2f5b
Merge branch 'main' into fix/servicebus-processor-graceful-shutdown-4…
EldertGrootenboer Mar 18, 2026
74a38e6
feat(servicebus): Add configurable drain timeout for processor shutdown
EldertGrootenboerMS Mar 30, 2026
668692f
fix(servicebus): Use ClientLogger for exception throwing in ServiceBu…
EldertGrootenboerMS Mar 30, 2026
04e15c4
fix(spring): Add drainTimeout property to Spring Cloud Azure ServiceB…
EldertGrootenboerMS Mar 30, 2026
0b55c6e
fix(spring): Remove factory wiring for drainTimeout (compiled against…
EldertGrootenboerMS Mar 30, 2026
2420132
docs(servicebus): Clarify close() re-entrant behavior in Javadoc
EldertGrootenboerMS Mar 30, 2026
78bc74d
Merge branch 'main' into fix/servicebus-processor-graceful-shutdown-4…
EldertGrootenboer Mar 30, 2026
2c68dc8
Merge branch 'main' into fix/servicebus-processor-graceful-shutdown-4…
EldertGrootenboer Mar 31, 2026
e724874
Merge branch 'main' into fix/servicebus-processor-graceful-shutdown-4…
EldertGrootenboer Mar 31, 2026
e35abb2
Merge branch 'main' into fix/servicebus-processor-graceful-shutdown-4…
EldertGrootenboer Mar 31, 2026
ccfb597
Merge branch 'main' into fix/servicebus-processor-graceful-shutdown-4…
EldertGrootenboer Apr 2, 2026
92fb19b
Merge branch 'main' into fix/servicebus-processor-graceful-shutdown-4…
EldertGrootenboer Apr 3, 2026
1f2dd40
Merge branch 'main' into fix/servicebus-processor-graceful-shutdown-4…
EldertGrootenboer Apr 7, 2026
5df3f64
docs(spring): Add CHANGELOG entries for ServiceBus drainTimeout property
EldertGrootenboerMS May 6, 2026
4ae104a
Merge branch 'main' into fix/servicebus-processor-graceful-shutdown-4…
EldertGrootenboer May 6, 2026
afb2728
fix(servicebus): Release client monitor across drain in ServiceBusPro…
EldertGrootenboerMS May 6, 2026
fa70fc3
fix(servicebus): Guard lifecycle methods against concurrent close dra…
EldertGrootenboerMS May 7, 2026
b3b5d89
fix(servicebus): Cache identifier in getIdentifier() to avoid lazy re…
EldertGrootenboerMS May 7, 2026
ead0651
fix(servicebus): Single-owner V1 close via CAS; getIdentifier returns…
EldertGrootenboerMS May 7, 2026
7a23053
docs(servicebus): Document nullable getIdentifier() return value
EldertGrootenboerMS May 7, 2026
0d3023e
test(servicebus): Replace flaky Thread.sleep with deterministic isRun…
EldertGrootenboerMS May 7, 2026
4ed0867
test(servicebus): Replace remaining flaky Thread.sleep with determini…
EldertGrootenboerMS May 7, 2026
481f1bf
fix(spring): Revert non-functional drainTimeout property additions
EldertGrootenboerMS May 7, 2026
886afde
fix(servicebus): Add v2CloseInProgress gate; deterministic v2ClosingF…
EldertGrootenboerMS May 7, 2026
e3c06e0
fix(servicebus): Null-safe connection monitor + drain-completion as d…
EldertGrootenboerMS May 7, 2026
33f1107
docs(servicebus): Document re-entrant drain semantics in three drain …
EldertGrootenboerMS May 7, 2026
c3b1128
test(servicebus): Add validation/propagation tests for drainTimeout(D…
EldertGrootenboerMS May 7, 2026
d6f4fa9
fix(spring): Re-add drainTimeout property scaffolding to satisfy SDK-…
EldertGrootenboerMS May 7, 2026
efad1d4
Merge branch 'main' into fix/servicebus-processor-graceful-shutdown-4…
EldertGrootenboer May 7, 2026
f75c307
fix(servicebus): Skip-path messages no longer extend drain under sust…
EldertGrootenboerMS May 7, 2026
28c481b
fix(servicebus): Deliver V1 errors during shutdown, mirror fast-path …
EldertGrootenboerMS May 7, 2026
ec904c1
docs(spring): Rewrite getDrainTimeout() Javadoc to be version-agnostic
EldertGrootenboerMS May 7, 2026
859723d
fix(servicebus): Validate non-null drainTimeout in ServiceBusProcesso…
EldertGrootenboerMS May 7, 2026
a5d44df
fix(servicebus): Drain skip-path is PEEK_LOCK only to avoid RECEIVE_A…
EldertGrootenboerMS May 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@

### Features Added

- Added `drainTimeout(Duration)` to `ServiceBusProcessorClientBuilder` and `ServiceBusSessionProcessorClientBuilder` to configure the maximum wait time for in-flight message handlers during processor shutdown. Defaults to 30 seconds.

### Breaking Changes

### Bugs Fixed

- Fixed `ServiceBusProcessorClient.close()` disposing the receiver before in-flight message handlers could complete settlement, causing `IllegalStateException`. The processor now drains active handlers before closing. ([#45716](https://github.com/Azure/azure-sdk-for-java/issues/45716))

### Other Changes

## 7.17.17 (2026-01-29)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.instrumentation.ReceiverKind;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.Disposables;
Expand All @@ -17,6 +18,8 @@
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -56,6 +59,16 @@ final class MessagePump {
private final boolean enableAutoLockRenew;
private final Scheduler workerScheduler;
private final ServiceBusReceiverInstrumentation instrumentation;
private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
private final Object drainLock = new Object();
private final ThreadLocal<Boolean> isHandlerThread = ThreadLocal.withInitial(() -> Boolean.FALSE);
private volatile boolean closing;
// True when the receive mode is PEEK_LOCK, in which case it is safe to skip handler dispatch
// for messages that arrive after closing=true (the broker still owns the lock and will
// redeliver). False for RECEIVE_AND_DELETE, where the broker has already removed the message
// before delivery - skipping the handler in that mode would lose the message permanently, so
// we must always invoke processMessage even during the drain window.
private final boolean skipDuringDrain;

/**
* Instantiate {@link MessagePump} that pumps messages emitted by the given {@code client}. The messages
Expand Down Expand Up @@ -85,6 +98,13 @@ final class MessagePump {
this.concurrency = concurrency;
this.enableAutoDisposition = enableAutoDisposition;
this.enableAutoLockRenew = client.isAutoLockRenewRequested();
// Cached at construction so the hot path (handleMessage) reads a primitive instead of
// walking the receiver options each call. PEEK_LOCK is safe to skip during drain (broker
// redelivers); RECEIVE_AND_DELETE is not (message would be lost). When the client cannot
// report a receive mode (test mocks that didn't stub getReceiverOptions()) default to the
// safer no-skip behavior so messages cannot be dropped silently.
final ReceiverOptions options = client.getReceiverOptions();
this.skipDuringDrain = options != null && options.getReceiveMode() == ServiceBusReceiveMode.PEEK_LOCK;
if (concurrency > 1) {
this.workerScheduler = Schedulers.boundedElastic();
} else {
Expand Down Expand Up @@ -138,24 +158,59 @@ private Mono<Void> pollConnectionState() {
}

private void handleMessage(ServiceBusReceivedMessage message) {
instrumentation.instrumentProcess(message, ReceiverKind.PROCESSOR, msg -> {
final Disposable lockRenewDisposable;
if (enableAutoLockRenew) {
lockRenewDisposable = client.beginLockRenewal(message);
} else {
lockRenewDisposable = Disposables.disposed();
// Fast-path early return: avoid counting skip-path invocations against the drain. Under
// sustained throughput with concurrency > 1, the upstream subscription is still live
// while drainHandlers() is waiting (the subscription is only disposed after drain returns),
// so flatMap keeps dispatching messages. If we incremented the counter for every skip
// we could keep activeHandlerCount > 0 long enough to push drain to its timeout. Reading
// the volatile flag here ensures messages that arrive after closing=true is set are
// dropped without touching the drain's exit condition.
//
// Skip is gated on PEEK_LOCK only: in that mode the broker still owns the lock and will
// redeliver any message we drop. In RECEIVE_AND_DELETE, the broker has already removed
// the message before delivery, so dropping it here would lose it permanently - those
// messages must always reach processMessage even during the drain window.
if (closing && skipDuringDrain) {
logger.atVerbose().log("Skipping handler execution (early), pump is closing.");
return;
}
Comment thread
EldertGrootenboer marked this conversation as resolved.
activeHandlerCount.incrementAndGet();
isHandlerThread.set(Boolean.TRUE);
try {
// closing may have flipped between the early check above and this point
// (a check-then-act race). Re-check inside the counted region so the rare race-loser
// still skips work; the increment will be balanced by the decrement in finally and
// notifyAll the drain. Same RECEIVE_AND_DELETE exemption applies.
if (closing && skipDuringDrain) {
logger.atVerbose().log("Skipping handler execution, pump is closing.");
return;
}
Comment thread
EldertGrootenboer marked this conversation as resolved.
Comment thread
EldertGrootenboer marked this conversation as resolved.
final Throwable error = notifyMessage(message);
if (enableAutoDisposition) {
if (error == null) {
complete(message);
instrumentation.instrumentProcess(message, ReceiverKind.PROCESSOR, msg -> {
final Disposable lockRenewDisposable;
if (enableAutoLockRenew) {
lockRenewDisposable = client.beginLockRenewal(message);
} else {
abandon(message);
lockRenewDisposable = Disposables.disposed();
}
final Throwable error = notifyMessage(message);
if (enableAutoDisposition) {
if (error == null) {
complete(message);
} else {
abandon(message);
}
}
lockRenewDisposable.dispose();
return error;
});
} finally {
isHandlerThread.remove();
if (activeHandlerCount.decrementAndGet() <= 1) {
synchronized (drainLock) {
drainLock.notifyAll();
}
}
lockRenewDisposable.dispose();
return error;
});
}
}

private Throwable notifyMessage(ServiceBusReceivedMessage message) {
Expand Down Expand Up @@ -193,6 +248,64 @@ private void abandon(ServiceBusReceivedMessage message) {
}
}

/**
* Wait for in-flight message handlers to complete, up to the specified timeout.
* This is called during processor close to ensure graceful shutdown — messages currently
* being processed are allowed to complete (including settlement) before the underlying client
* is disposed.
*
* <p><strong>Re-entrant semantics:</strong> when invoked from within a message handler
* (i.e. the calling thread is the handler thread itself), this method waits only for
* <em>other</em> concurrent handlers to complete and excludes the calling handler from the
* wait condition - waiting for the calling handler to finish would self-deadlock. In that
* case, this method may return {@code true} while the calling handler is still executing.</p>
*
* @param timeout the maximum time to wait for in-flight handlers to complete.
* @return {@code true} if all in-flight handlers (excluding the calling handler on the
* re-entrant path) completed within the timeout, {@code false} otherwise.
*/
boolean drainHandlers(Duration timeout) {
closing = true;
final int threshold;
if (isHandlerThread.get()) {
// Re-entrant call from within a message handler (e.g., user called close() inside processMessage).
// Cannot wait for this thread's own handler to complete (would self-deadlock), but we can
// wait for OTHER concurrent handlers to finish settlement before the underlying client is disposed.
threshold = 1;
if (activeHandlerCount.get() <= threshold) {
return true;
}
logger.atInfo()
.addKeyValue("otherActiveHandlers", activeHandlerCount.get() - 1)
.log("drainHandlers called from within a message handler (re-entrant). "
+ "Waiting for other active handlers to complete.");
} else {
threshold = 0;
}
final long deadline = System.nanoTime() + timeout.toNanos();
synchronized (drainLock) {
while (activeHandlerCount.get() > threshold) {
final long remainingNanos = deadline - System.nanoTime();
if (remainingNanos <= 0) {
logger.atWarning()
.addKeyValue("activeHandlers", activeHandlerCount.get())
.log("Drain timeout expired with active handlers still running.");
return false;
}
try {
final long millis = TimeUnit.NANOSECONDS.toMillis(remainingNanos);
final int nanos = (int) (remainingNanos % 1_000_000);
drainLock.wait(millis, nanos);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.atWarning().log("Drain interrupted while waiting for in-flight handlers.");
return false;
}
Comment thread
EldertGrootenboer marked this conversation as resolved.
}
}
return true;
}

private void logCPUResourcesConcurrencyMismatch() {
final int cores = Runtime.getRuntime().availableProcessors();
final int poolSize = DEFAULT_BOUNDED_ELASTIC_SIZE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1819,6 +1819,25 @@ public ServiceBusSessionProcessorClientBuilder maxConcurrentCalls(int maxConcurr
return this;
}

/**
* Sets the maximum time to wait for in-flight message handlers to complete when the processor is closed.
* When {@link ServiceBusProcessorClient#close()} is called, the processor will wait up to this duration
* for handlers that are currently processing messages to finish before shutting down. This ensures
* that messages being processed can complete settlement (complete, abandon, etc.) without encountering
* a disposed receiver.
*
* <p>If not specified, defaults to 30 seconds.</p>
*
* @param drainTimeout The maximum time to wait for in-flight handlers. Must be positive.
* @return The updated {@link ServiceBusSessionProcessorClientBuilder} object.
* @throws NullPointerException if {@code drainTimeout} is null.
* @throws IllegalArgumentException if {@code drainTimeout} is zero or negative.
*/
public ServiceBusSessionProcessorClientBuilder drainTimeout(Duration drainTimeout) {
processorClientOptions.setDrainTimeout(drainTimeout);
return this;
}
Comment thread
EldertGrootenboer marked this conversation as resolved.

/**
* Disables auto-complete and auto-abandon of received messages. By default, a successfully processed message is
* {@link ServiceBusReceivedMessageContext#complete() completed}. If an error happens when
Expand Down Expand Up @@ -2121,7 +2140,7 @@ ServiceBusReceiverAsyncClient buildAsyncClientForProcessor() {

SessionsMessagePump buildPumpForProcessor(ClientLogger logger,
Consumer<ServiceBusReceivedMessageContext> processMessage, Consumer<ServiceBusErrorContext> processError,
int concurrencyPerSession) {
int concurrencyPerSession, Duration drainTimeout) {
if (enableAutoComplete && receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) {
LOGGER.warning("'enableAutoComplete' is not needed in for RECEIVE_AND_DELETE mode.");
enableAutoComplete = false;
Expand Down Expand Up @@ -2165,7 +2184,7 @@ SessionsMessagePump buildPumpForProcessor(ClientLogger logger,
return new SessionsMessagePump(clientIdentifier, connectionCacheWrapper.getFullyQualifiedNamespace(),
entityPath, receiveMode, instrumentation, sessionAcquirer, maxAutoLockRenewDuration, sessionIdleTimeout,
maxConcurrentSessions, concurrencyPerSession, prefetchCount, enableAutoComplete, messageSerializer,
retryPolicy, processMessage, processError, onTerminate);
retryPolicy, processMessage, processError, onTerminate, drainTimeout);
}

/**
Expand Down Expand Up @@ -2555,6 +2574,25 @@ public ServiceBusProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCall
return this;
}

/**
* Sets the maximum time to wait for in-flight message handlers to complete when the processor is closed.
* When {@link ServiceBusProcessorClient#close()} is called, the processor will wait up to this duration
* for handlers that are currently processing messages to finish before shutting down. This ensures
* that messages being processed can complete settlement (complete, abandon, etc.) without encountering
* a disposed receiver.
*
* <p>If not specified, defaults to 30 seconds.</p>
*
* @param drainTimeout The maximum time to wait for in-flight handlers. Must be positive.
* @return The updated {@link ServiceBusProcessorClientBuilder} object.
* @throws NullPointerException if {@code drainTimeout} is null.
* @throws IllegalArgumentException if {@code drainTimeout} is zero or negative.
*/
public ServiceBusProcessorClientBuilder drainTimeout(Duration drainTimeout) {
processorClientOptions.setDrainTimeout(drainTimeout);
return this;
}
Comment thread
EldertGrootenboer marked this conversation as resolved.

/**
* Disables auto-complete and auto-abandon of received messages. By default, a successfully processed message is
* {@link ServiceBusReceivedMessageContext#complete() completed}. If an error happens when
Expand Down
Loading