diff --git a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md index fce49f300f01..0d10d5dbaf73 100644 --- a/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md @@ -4,10 +4,14 @@ ### Features Added +- Added `drainTimeout(Duration)` to `ServiceBusProcessorClientBuilder` and `ServiceBusSessionProcessorClientBuilder` to configure the maximum wait time for in-flight message handlers during processor shutdown. Defaults to 30 seconds. + ### Breaking Changes ### Bugs Fixed +- Fixed `ServiceBusProcessorClient.close()` disposing the receiver before in-flight message handlers could complete settlement, causing `IllegalStateException`. The processor now drains active handlers before closing. ([#45716](https://github.com/Azure/azure-sdk-for-java/issues/45716)) + ### Other Changes ## 7.17.17 (2026-01-29) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java index 8d0ebeaa85cc..38c373538bb2 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java @@ -6,6 +6,7 @@ import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.implementation.instrumentation.ReceiverKind; import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.reactivestreams.Publisher; import reactor.core.Disposable; import reactor.core.Disposables; @@ -17,6 +18,8 @@ import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Function; @@ -56,6 +59,16 @@ final class MessagePump { private final boolean enableAutoLockRenew; private final Scheduler workerScheduler; private final ServiceBusReceiverInstrumentation instrumentation; + private final AtomicInteger activeHandlerCount = new AtomicInteger(0); + private final Object drainLock = new Object(); + private final ThreadLocal isHandlerThread = ThreadLocal.withInitial(() -> Boolean.FALSE); + private volatile boolean closing; + // True when the receive mode is PEEK_LOCK, in which case it is safe to skip handler dispatch + // for messages that arrive after closing=true (the broker still owns the lock and will + // redeliver). False for RECEIVE_AND_DELETE, where the broker has already removed the message + // before delivery - skipping the handler in that mode would lose the message permanently, so + // we must always invoke processMessage even during the drain window. + private final boolean skipDuringDrain; /** * Instantiate {@link MessagePump} that pumps messages emitted by the given {@code client}. The messages @@ -85,6 +98,13 @@ final class MessagePump { this.concurrency = concurrency; this.enableAutoDisposition = enableAutoDisposition; this.enableAutoLockRenew = client.isAutoLockRenewRequested(); + // Cached at construction so the hot path (handleMessage) reads a primitive instead of + // walking the receiver options each call. PEEK_LOCK is safe to skip during drain (broker + // redelivers); RECEIVE_AND_DELETE is not (message would be lost). When the client cannot + // report a receive mode (test mocks that didn't stub getReceiverOptions()) default to the + // safer no-skip behavior so messages cannot be dropped silently. + final ReceiverOptions options = client.getReceiverOptions(); + this.skipDuringDrain = options != null && options.getReceiveMode() == ServiceBusReceiveMode.PEEK_LOCK; if (concurrency > 1) { this.workerScheduler = Schedulers.boundedElastic(); } else { @@ -138,24 +158,59 @@ private Mono pollConnectionState() { } private void handleMessage(ServiceBusReceivedMessage message) { - instrumentation.instrumentProcess(message, ReceiverKind.PROCESSOR, msg -> { - final Disposable lockRenewDisposable; - if (enableAutoLockRenew) { - lockRenewDisposable = client.beginLockRenewal(message); - } else { - lockRenewDisposable = Disposables.disposed(); + // Fast-path early return: avoid counting skip-path invocations against the drain. Under + // sustained throughput with concurrency > 1, the upstream subscription is still live + // while drainHandlers() is waiting (the subscription is only disposed after drain returns), + // so flatMap keeps dispatching messages. If we incremented the counter for every skip + // we could keep activeHandlerCount > 0 long enough to push drain to its timeout. Reading + // the volatile flag here ensures messages that arrive after closing=true is set are + // dropped without touching the drain's exit condition. + // + // Skip is gated on PEEK_LOCK only: in that mode the broker still owns the lock and will + // redeliver any message we drop. In RECEIVE_AND_DELETE, the broker has already removed + // the message before delivery, so dropping it here would lose it permanently - those + // messages must always reach processMessage even during the drain window. + if (closing && skipDuringDrain) { + logger.atVerbose().log("Skipping handler execution (early), pump is closing."); + return; + } + activeHandlerCount.incrementAndGet(); + isHandlerThread.set(Boolean.TRUE); + try { + // closing may have flipped between the early check above and this point + // (a check-then-act race). Re-check inside the counted region so the rare race-loser + // still skips work; the increment will be balanced by the decrement in finally and + // notifyAll the drain. Same RECEIVE_AND_DELETE exemption applies. + if (closing && skipDuringDrain) { + logger.atVerbose().log("Skipping handler execution, pump is closing."); + return; } - final Throwable error = notifyMessage(message); - if (enableAutoDisposition) { - if (error == null) { - complete(message); + instrumentation.instrumentProcess(message, ReceiverKind.PROCESSOR, msg -> { + final Disposable lockRenewDisposable; + if (enableAutoLockRenew) { + lockRenewDisposable = client.beginLockRenewal(message); } else { - abandon(message); + lockRenewDisposable = Disposables.disposed(); + } + final Throwable error = notifyMessage(message); + if (enableAutoDisposition) { + if (error == null) { + complete(message); + } else { + abandon(message); + } + } + lockRenewDisposable.dispose(); + return error; + }); + } finally { + isHandlerThread.remove(); + if (activeHandlerCount.decrementAndGet() <= 1) { + synchronized (drainLock) { + drainLock.notifyAll(); } } - lockRenewDisposable.dispose(); - return error; - }); + } } private Throwable notifyMessage(ServiceBusReceivedMessage message) { @@ -193,6 +248,64 @@ private void abandon(ServiceBusReceivedMessage message) { } } + /** + * Wait for in-flight message handlers to complete, up to the specified timeout. + * This is called during processor close to ensure graceful shutdown — messages currently + * being processed are allowed to complete (including settlement) before the underlying client + * is disposed. + * + *

Re-entrant semantics: when invoked from within a message handler + * (i.e. the calling thread is the handler thread itself), this method waits only for + * other concurrent handlers to complete and excludes the calling handler from the + * wait condition - waiting for the calling handler to finish would self-deadlock. In that + * case, this method may return {@code true} while the calling handler is still executing.

+ * + * @param timeout the maximum time to wait for in-flight handlers to complete. + * @return {@code true} if all in-flight handlers (excluding the calling handler on the + * re-entrant path) completed within the timeout, {@code false} otherwise. + */ + boolean drainHandlers(Duration timeout) { + closing = true; + final int threshold; + if (isHandlerThread.get()) { + // Re-entrant call from within a message handler (e.g., user called close() inside processMessage). + // Cannot wait for this thread's own handler to complete (would self-deadlock), but we can + // wait for OTHER concurrent handlers to finish settlement before the underlying client is disposed. + threshold = 1; + if (activeHandlerCount.get() <= threshold) { + return true; + } + logger.atInfo() + .addKeyValue("otherActiveHandlers", activeHandlerCount.get() - 1) + .log("drainHandlers called from within a message handler (re-entrant). " + + "Waiting for other active handlers to complete."); + } else { + threshold = 0; + } + final long deadline = System.nanoTime() + timeout.toNanos(); + synchronized (drainLock) { + while (activeHandlerCount.get() > threshold) { + final long remainingNanos = deadline - System.nanoTime(); + if (remainingNanos <= 0) { + logger.atWarning() + .addKeyValue("activeHandlers", activeHandlerCount.get()) + .log("Drain timeout expired with active handlers still running."); + return false; + } + try { + final long millis = TimeUnit.NANOSECONDS.toMillis(remainingNanos); + final int nanos = (int) (remainingNanos % 1_000_000); + drainLock.wait(millis, nanos); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.atWarning().log("Drain interrupted while waiting for in-flight handlers."); + return false; + } + } + } + return true; + } + private void logCPUResourcesConcurrencyMismatch() { final int cores = Runtime.getRuntime().availableProcessors(); final int poolSize = DEFAULT_BOUNDED_ELASTIC_SIZE; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java index c4f868e13124..f9ccc0f52bf2 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java @@ -1819,6 +1819,25 @@ public ServiceBusSessionProcessorClientBuilder maxConcurrentCalls(int maxConcurr return this; } + /** + * Sets the maximum time to wait for in-flight message handlers to complete when the processor is closed. + * When {@link ServiceBusProcessorClient#close()} is called, the processor will wait up to this duration + * for handlers that are currently processing messages to finish before shutting down. This ensures + * that messages being processed can complete settlement (complete, abandon, etc.) without encountering + * a disposed receiver. + * + *

If not specified, defaults to 30 seconds.

+ * + * @param drainTimeout The maximum time to wait for in-flight handlers. Must be positive. + * @return The updated {@link ServiceBusSessionProcessorClientBuilder} object. + * @throws NullPointerException if {@code drainTimeout} is null. + * @throws IllegalArgumentException if {@code drainTimeout} is zero or negative. + */ + public ServiceBusSessionProcessorClientBuilder drainTimeout(Duration drainTimeout) { + processorClientOptions.setDrainTimeout(drainTimeout); + return this; + } + /** * Disables auto-complete and auto-abandon of received messages. By default, a successfully processed message is * {@link ServiceBusReceivedMessageContext#complete() completed}. If an error happens when @@ -2121,7 +2140,7 @@ ServiceBusReceiverAsyncClient buildAsyncClientForProcessor() { SessionsMessagePump buildPumpForProcessor(ClientLogger logger, Consumer processMessage, Consumer processError, - int concurrencyPerSession) { + int concurrencyPerSession, Duration drainTimeout) { if (enableAutoComplete && receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) { LOGGER.warning("'enableAutoComplete' is not needed in for RECEIVE_AND_DELETE mode."); enableAutoComplete = false; @@ -2165,7 +2184,7 @@ SessionsMessagePump buildPumpForProcessor(ClientLogger logger, return new SessionsMessagePump(clientIdentifier, connectionCacheWrapper.getFullyQualifiedNamespace(), entityPath, receiveMode, instrumentation, sessionAcquirer, maxAutoLockRenewDuration, sessionIdleTimeout, maxConcurrentSessions, concurrencyPerSession, prefetchCount, enableAutoComplete, messageSerializer, - retryPolicy, processMessage, processError, onTerminate); + retryPolicy, processMessage, processError, onTerminate, drainTimeout); } /** @@ -2555,6 +2574,25 @@ public ServiceBusProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCall return this; } + /** + * Sets the maximum time to wait for in-flight message handlers to complete when the processor is closed. + * When {@link ServiceBusProcessorClient#close()} is called, the processor will wait up to this duration + * for handlers that are currently processing messages to finish before shutting down. This ensures + * that messages being processed can complete settlement (complete, abandon, etc.) without encountering + * a disposed receiver. + * + *

If not specified, defaults to 30 seconds.

+ * + * @param drainTimeout The maximum time to wait for in-flight handlers. Must be positive. + * @return The updated {@link ServiceBusProcessorClientBuilder} object. + * @throws NullPointerException if {@code drainTimeout} is null. + * @throws IllegalArgumentException if {@code drainTimeout} is zero or negative. + */ + public ServiceBusProcessorClientBuilder drainTimeout(Duration drainTimeout) { + processorClientOptions.setDrainTimeout(drainTimeout); + return this; + } + /** * Disables auto-complete and auto-abandon of received messages. By default, a successfully processed message is * {@link ServiceBusReceivedMessageContext#complete() completed}. If an error happens when diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessor.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessor.java index 96abd6baa79c..c1cc6558d6f0 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessor.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessor.java @@ -12,6 +12,7 @@ import reactor.util.retry.Retry; import java.time.Duration; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -31,6 +32,7 @@ final class ServiceBusProcessor { private final Consumer processError; private final int concurrency; private final Boolean enableAutoDisposition; + private final Duration drainTimeout; private boolean isRunning; private RollingMessagePump rollingMessagePump; @@ -42,10 +44,11 @@ final class ServiceBusProcessor { * @param processError The consumer to report the errors. * @param concurrency The parallelism, i.e., how many invocations of {@code processMessage} should happen in parallel. * @param enableAutoDisposition Indicate if auto-complete or abandon should be enabled. + * @param drainTimeout The maximum time to wait for in-flight message handlers to complete during shutdown. */ ServiceBusProcessor(ServiceBusClientBuilder.ServiceBusReceiverClientBuilder builder, Consumer processMessage, Consumer processError, - int concurrency, boolean enableAutoDisposition) { + int concurrency, boolean enableAutoDisposition, Duration drainTimeout) { this.kind = Kind.NON_SESSION; this.nonSessionBuilder = builder; this.sessionBuilder = null; @@ -53,6 +56,9 @@ final class ServiceBusProcessor { this.processMessage = processMessage; this.concurrency = concurrency; this.enableAutoDisposition = enableAutoDisposition; + // Fail fast at construction time so a null timeout cannot surface as an NPE later in + // RollingMessagePump.dispose() / MessagePump.drainHandlers(...) during shutdown. + this.drainTimeout = Objects.requireNonNull(drainTimeout, "'drainTimeout' cannot be null."); synchronized (lock) { this.isRunning = false; @@ -66,10 +72,11 @@ final class ServiceBusProcessor { * @param processMessage The consumer to invoke for each message. * @param processError The consumer to report the errors. * @param concurrency The parallelism, i.e., how many invocations of {@code processMessage} should happen in parallel per session. + * @param drainTimeout The maximum time to wait for in-flight message handlers to complete during shutdown. */ ServiceBusProcessor(ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder builder, Consumer processMessage, Consumer processError, - int concurrency) { + int concurrency, Duration drainTimeout) { this.kind = Kind.SESSION; this.sessionBuilder = builder; this.nonSessionBuilder = null; @@ -77,6 +84,9 @@ final class ServiceBusProcessor { this.processMessage = processMessage; this.concurrency = concurrency; this.enableAutoDisposition = null; + // Fail fast at construction time so a null timeout cannot surface as an NPE later in + // RollingMessagePump.dispose() / SessionsMessagePump.drainHandlers(...) during shutdown. + this.drainTimeout = Objects.requireNonNull(drainTimeout, "'drainTimeout' cannot be null."); synchronized (lock) { this.isRunning = false; @@ -92,9 +102,10 @@ void start() { isRunning = true; if (kind == Kind.NON_SESSION) { rollingMessagePump = new RollingMessagePump(nonSessionBuilder, processMessage, processError, - concurrency, enableAutoDisposition); + concurrency, enableAutoDisposition, drainTimeout); } else { - rollingMessagePump = new RollingMessagePump(sessionBuilder, processMessage, processError, concurrency); + rollingMessagePump + = new RollingMessagePump(sessionBuilder, processMessage, processError, concurrency, drainTimeout); } p = rollingMessagePump; } @@ -149,8 +160,10 @@ static final class RollingMessagePump extends AtomicBoolean { private final Consumer processMessage; private final Consumer processError; private final Boolean enableAutoDisposition; + private final Duration drainTimeout; private final Disposable.Composite disposable = Disposables.composite(); private final AtomicReference clientIdentifier = new AtomicReference<>(); + private volatile MessagePump currentPump; /** * Instantiate {@link RollingMessagePump} that stream messages using {@link MessagePump}. @@ -162,10 +175,11 @@ static final class RollingMessagePump extends AtomicBoolean { * @param processError The consumer to report the errors. * @param concurrency The parallelism, i.e., how many invocations of {@code processMessage} should happen in parallel. * @param enableAutoDisposition Indicate if auto-complete or abandon should be enabled. + * @param drainTimeout The maximum time to wait for in-flight handlers during shutdown. */ RollingMessagePump(ServiceBusClientBuilder.ServiceBusReceiverClientBuilder builder, Consumer processMessage, Consumer processError, - int concurrency, boolean enableAutoDisposition) { + int concurrency, boolean enableAutoDisposition, Duration drainTimeout) { this.logger = new ClientLogger(RollingMessagePump.class); this.kind = Kind.NON_SESSION; this.nonSessionBuilder = builder; @@ -174,6 +188,7 @@ static final class RollingMessagePump extends AtomicBoolean { this.processError = processError; this.processMessage = processMessage; this.enableAutoDisposition = enableAutoDisposition; + this.drainTimeout = drainTimeout; } /** @@ -186,11 +201,11 @@ static final class RollingMessagePump extends AtomicBoolean { * in parallel for each session. * @param processMessage The consumer to invoke for each message. * @param processError The consumer to report the errors. - + * @param drainTimeout The maximum time to wait for in-flight handlers during shutdown. */ RollingMessagePump(ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder builder, Consumer processMessage, Consumer processError, - int concurrencyPerSession) { + int concurrencyPerSession, Duration drainTimeout) { this.logger = new ClientLogger(RollingMessagePump.class); this.kind = Kind.SESSION; this.sessionBuilder = builder; @@ -199,6 +214,7 @@ static final class RollingMessagePump extends AtomicBoolean { this.processMessage = processMessage; this.concurrency = concurrencyPerSession; this.enableAutoDisposition = null; + this.drainTimeout = drainTimeout; } /** @@ -228,14 +244,15 @@ Mono beginIntern() { clientIdentifier.set(client.getIdentifier()); final MessagePump pump = new MessagePump(client, processMessage, processError, concurrency, enableAutoDisposition); + currentPump = pump; return pump.begin(); }, client -> { client.close(); }, true); } else { pumping = Mono.using(() -> { - final SessionsMessagePump pump - = sessionBuilder.buildPumpForProcessor(logger, processMessage, processError, concurrency); + final SessionsMessagePump pump = sessionBuilder.buildPumpForProcessor(logger, processMessage, + processError, concurrency, drainTimeout); return pump; }, pump -> { clientIdentifier.set(pump.getIdentifier()); @@ -256,7 +273,21 @@ String getClientIdentifier() { } void dispose() { + // Drain in-flight message handlers BEFORE disposing the subscription. + // Disposing cancels the reactive chain, which interrupts handler threads (via Reactor's + // publishOn worker disposal). Draining first ensures handlers can complete message + // settlement before the client is closed. + // See https://github.com/Azure/azure-sdk-for-java/issues/45716 + final MessagePump pump = currentPump; + if (pump != null) { + pump.drainHandlers(drainTimeout); + } disposable.dispose(); + // Clear the reference now that the pump's underlying client and subscription are gone + // so we don't retain it past its useful lifetime. A subsequent start() cycle will + // assign a fresh pump in beginIntern() before any drain on this RollingMessagePump + // can run again. + currentPump = null; } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java index 508b0e2c6b2e..b353252f0103 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java @@ -9,16 +9,19 @@ import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder; import com.azure.messaging.servicebus.implementation.ServiceBusProcessorClientOptions; import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Disposable; import reactor.core.scheduler.Schedulers; +import java.time.Duration; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -203,6 +206,36 @@ public final class ServiceBusProcessorClient implements AutoCloseable { private Disposable monitorDisposable; private boolean wasStopped = false; private final ServiceBusProcessor processorV2; + // V1 handler tracking for graceful shutdown (not used in V2 path). + private final AtomicInteger activeV1HandlerCount = new AtomicInteger(0); + private final Object v1DrainLock = new Object(); + private final ThreadLocal isV1HandlerThread = ThreadLocal.withInitial(() -> Boolean.FALSE); + private volatile boolean v1Closing; + // True while close() is between snapshotting state and finishing cleanup. Used to prevent + // start()/stop()/restartMessageReceiver() from racing with an in-progress shutdown - close() + // releases the instance monitor across the drain wait so other lifecycle methods could + // otherwise interleave and create state that close() then tears down. start() may be invoked + // again after close() returns to begin a new processing cycle (the flag is cleared in close()'s + // finally block). + private final AtomicBoolean v1CloseInProgress = new AtomicBoolean(false); + // Same gate as v1CloseInProgress but for the V2 path. close() releases the instance monitor + // before delegating to processorV2.close() (whose internal drain blocks for in-flight handler + // settlement). Without this gate, a concurrent start()/stop() call could acquire the outer + // monitor during the drain window and call processorV2.start(), leaving the inner processor + // running after the outer close() returns. + private final AtomicBoolean v2CloseInProgress = new AtomicBoolean(false); + // Most-recent identifier captured from the V1 asyncClient. Lets getIdentifier() return a + // stable value during/after close() without lazy-creating a fresh receiver that close() would + // not dispose. Refreshed every time getIdentifier() observes a live asyncClient and once more + // by close() before nulling it. + private volatile String cachedV1Identifier; + // Namespace + entity path captured from the V1 asyncClient. Used by handleError() so that + // processError can be invoked even after asyncClient has been nulled by close() cleanup - + // e.g. a re-entrant close() called from within processMessage that completes after the + // calling handler throws. Without these, handleError() would NPE on asyncClient.get() and + // silently swallow the user's exception. + private volatile String cachedV1FullyQualifiedNamespace; + private volatile String cachedV1EntityPath; /** * Constructor to create a sessions-enabled processor. @@ -231,8 +264,8 @@ public final class ServiceBusProcessorClient implements AutoCloseable { this.subscriptionName = subscriptionName; if (processorOptions.isV2()) { final int concurrencyPerSession = this.processorOptions.getMaxConcurrentCalls(); - this.processorV2 - = new ServiceBusProcessor(sessionReceiverBuilder, processMessage, processError, concurrencyPerSession); + this.processorV2 = new ServiceBusProcessor(sessionReceiverBuilder, processMessage, processError, + concurrencyPerSession, processorOptions.getDrainTimeout()); this.tracer = null; } else { this.processorV2 = null; @@ -269,7 +302,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable { final int concurrency = this.processorOptions.getMaxConcurrentCalls(); final boolean enableAutoDisposition = !this.processorOptions.isDisableAutoComplete(); this.processorV2 = new ServiceBusProcessor(receiverBuilder, processMessage, processError, concurrency, - enableAutoDisposition); + enableAutoDisposition, processorOptions.getDrainTimeout()); this.tracer = null; } else { this.processorV2 = null; @@ -293,14 +326,30 @@ public final class ServiceBusProcessorClient implements AutoCloseable { */ public synchronized void start() { if (processorV2 != null) { + if (v2CloseInProgress.get()) { + // close() is mid-shutdown on the V2 path - don't restart processorV2 underneath + // it. Caller can retry start() once close() has returned. + LOGGER.info("Processor close() is in progress; ignoring start() call."); + return; + } processorV2.start(); return; } + if (v1CloseInProgress.get()) { + // close() is mid-shutdown - it has set isRunning=false and will tear down state when + // the drain returns. Refuse to start so we don't create resources that close() then + // immediately discards. Caller can retry start() once close() has returned. + LOGGER.info("Processor close() is in progress; ignoring start() call."); + return; + } if (isRunning.getAndSet(true)) { LOGGER.info("Processor is already running"); return; } + // Reset shutdown-only state so the processor can restart after a close() cycle. + v1Closing = false; + if (wasStopped) { wasStopped = false; LOGGER.warning( @@ -321,7 +370,14 @@ public synchronized void start() { // (parallel scheduler backing Flux.interval), so that we don't block any of the parallel threads. if (monitorDisposable == null) { monitorDisposable = Schedulers.boundedElastic().schedulePeriodically(() -> { - if (this.asyncClient.get().isConnectionClosed()) { + // Snapshot asyncClient before dereferencing - close() nulls the field after + // disposing this monitor, but a tick already in flight can still race past + // monitorDisposable.dispose() and observe asyncClient == null. + final ServiceBusReceiverAsyncClient currentClient = this.asyncClient.get(); + if (currentClient == null) { + return; + } + if (currentClient.isConnectionClosed()) { restartMessageReceiver(null); } }, SCHEDULER_INTERVAL_IN_SECONDS, SCHEDULER_INTERVAL_IN_SECONDS, TimeUnit.SECONDS); @@ -334,9 +390,19 @@ public synchronized void start() { */ public synchronized void stop() { if (processorV2 != null) { + if (v2CloseInProgress.get()) { + LOGGER.info("Processor close() is in progress; ignoring stop() call."); + return; + } processorV2.stop(); return; } + if (v1CloseInProgress.get()) { + // close() has already set isRunning=false and is draining. A concurrent stop() would + // be a no-op against the same state and could confuse the wasStopped semantics. + LOGGER.info("Processor close() is in progress; ignoring stop() call."); + return; + } wasStopped = true; isRunning.set(false); } @@ -344,23 +410,98 @@ public synchronized void stop() { /** * Stops message processing and closes the processor. The receiving links and sessions are closed and calling * {@link #start()} will create a new processing cycle with new links and new sessions. + * + *

This method blocks while waiting for in-flight message handlers to complete (up to the configured + * drain timeout, default 30 seconds) before cancelling subscriptions and closing the underlying client. + * This ensures handlers can finish message settlement without encountering a disposed receiver. Callers + * should avoid invoking {@code close()} on latency-sensitive threads. If the drain timeout expires, the + * processor proceeds with shutdown regardless.

+ * + *

When {@code close()} is invoked from within a {@code processMessage} callback, the drain + * logic waits only for other concurrent message handlers to complete. The calling handler is not included + * in the in-flight drain threshold to avoid self-deadlock, so shutdown may proceed (including cancelling + * subscriptions and closing the underlying client) while that callback is still executing or settling its + * message. The configured drain timeout continues to apply to the handlers that are being awaited.

+ * + * @see Issue #45716 */ @Override - public synchronized void close() { - if (processorV2 != null) { - processorV2.close(); - return; + public void close() { + final ServiceBusProcessor v2Snapshot; + final Duration drainTimeout; + final boolean wonV2Close; + + // Snapshot state and mark the processor as closing while holding the monitor, but RELEASE + // the monitor before performing the blocking drain. Holding the monitor across the drain + // would stall shutdown for the full drain timeout if any in-flight handler calls a + // synchronized accessor (isRunning(), getIdentifier()) on this client: the handler would + // block on the monitor while close() is waiting for that handler to finish. + synchronized (this) { + v2Snapshot = processorV2; + if (v2Snapshot == null) { + // Only the first concurrent close() takes ownership of the V1 shutdown. Subsequent + // close() calls return early so they do not race with the owner's cleanup (e.g. + // they could otherwise re-enter sync2 after start() created fresh state and dispose + // those resources). The processor still gets closed - the owner finishes the work. + if (!v1CloseInProgress.compareAndSet(false, true)) { + return; + } + isRunning.set(false); + v1Closing = true; + drainTimeout = processorOptions.getDrainTimeout(); + wonV2Close = false; + } else { + // V2 path: claim ownership symmetrically so concurrent start()/stop() return early + // and a second concurrent close() returns immediately rather than re-invoking + // processorV2.close() while the owner is still draining. + wonV2Close = v2CloseInProgress.compareAndSet(false, true); + drainTimeout = null; + } } - isRunning.set(false); - receiverSubscriptions.keySet().forEach(Subscription::cancel); - receiverSubscriptions.clear(); - if (monitorDisposable != null) { - monitorDisposable.dispose(); - monitorDisposable = null; + + if (v2Snapshot != null) { + if (!wonV2Close) { + return; + } + try { + // V2 path: drain happens inside processorV2.close(). Invoked outside the monitor + // so handlers calling isRunning()/getIdentifier() during drain do not stall shutdown. + v2Snapshot.close(); + } finally { + v2CloseInProgress.set(false); + } + return; } - if (asyncClient.get() != null) { - asyncClient.get().close(); - asyncClient.set(null); + + try { + // V1 path: drain in-flight message handlers BEFORE cancelling subscriptions. + // Cancelling subscriptions triggers Reactor's publishOn worker disposal, which interrupts + // handler threads. Draining first ensures handlers can complete message settlement + // before the underlying client is closed. + // See https://github.com/Azure/azure-sdk-for-java/issues/45716 + drainV1Handlers(drainTimeout); + + // Re-acquire the monitor for the (non-blocking) cleanup so it does not race with + // start()/restartMessageReceiver(). Concurrent close() calls remain safe: every step + // below is guarded by a null check or operates on already-cleared collections. + synchronized (this) { + receiverSubscriptions.keySet().forEach(Subscription::cancel); + receiverSubscriptions.clear(); + if (monitorDisposable != null) { + monitorDisposable.dispose(); + monitorDisposable = null; + } + if (asyncClient.get() != null) { + // Capture the identifier before disposing so a later getIdentifier() call can + // return the same value without lazy-creating a fresh receiver. + cachedV1Identifier = asyncClient.get().getIdentifier(); + asyncClient.get().close(); + asyncClient.set(null); + } + } + } finally { + // Clear the in-progress flag last so a subsequent start() can begin a fresh cycle. + v1CloseInProgress.set(false); } } @@ -410,17 +551,47 @@ public String getSubscriptionName() { /** * Gets the identifier of the instance of {@link ServiceBusProcessorClient}. * - * @return The identifier that can identify the instance of {@link ServiceBusProcessorClient}. + *

The identifier is captured from the underlying receiver as soon as one exists. After + * {@link #close() close()} returns, this method continues to return the identifier of the + * receiver that was active before shutdown so callers (logs, diagnostics) get a stable value.

+ * + * @return The identifier that can identify the instance of {@link ServiceBusProcessorClient}, + * or {@code null} if no identifier is available yet. {@code null} can be returned in two + * cases: (1) on the V2 path before the first call to {@link #start() start()} has created + * the underlying processor, and (2) on the V1 path when {@link #close() close()} is in + * progress on a brand-new processor that has never been started (so no identifier was + * ever cached). In all other cases - while running, after {@code start()}, during/after + * {@code close()} on a previously-started processor - this method returns a non-null + * identifier. */ public synchronized String getIdentifier() { if (processorV2 != null) { return processorV2.getIdentifier(); } - if (asyncClient.get() == null) { - asyncClient.set(createNewReceiver()); + final ServiceBusReceiverAsyncClient current = asyncClient.get(); + if (current != null) { + // Live client - refresh the cache and return its identifier. + cachedV1Identifier = current.getIdentifier(); + return cachedV1Identifier; } - - return asyncClient.get().getIdentifier(); + if (cachedV1Identifier != null) { + // The processor was started at some point and we captured the identifier. Return it + // without lazy-creating a new receiver - otherwise a getIdentifier() call during + // close()'s drain window or after close() returned would leak a receiver that the + // shutdown path is no longer responsible for. + return cachedV1Identifier; + } + if (v1CloseInProgress.get()) { + // First-ever call but close() is mid-shutdown and never had a chance to cache. + // Don't create a receiver close() won't dispose; return null - the V2 path also + // returns null when no identifier is available, keeping the API contract consistent. + return null; + } + // First call before any start() - preserve the lazy-init behavior so getIdentifier() + // returns a non-null identifier even before the processor is started. + asyncClient.set(createNewReceiver()); + cachedV1Identifier = asyncClient.get().getIdentifier(); + return cachedV1Identifier; } private synchronized void receiveMessages() { @@ -430,6 +601,21 @@ private synchronized void receiveMessages() { return; } ServiceBusReceiverAsyncClient receiverClient = asyncClient.get(); + // Cache the namespace + entity path so handleError() works even after close() has nulled + // asyncClient (re-entrant close from within processMessage). Refreshed on every receive + // cycle so a restart against a different builder picks up new values. + cachedV1FullyQualifiedNamespace = receiverClient.getFullyQualifiedNamespace(); + cachedV1EntityPath = receiverClient.getEntityPath(); + // Cache the receive mode so the onNext hot-path reads a primitive instead of walking the + // receiver options each call. PEEK_LOCK is safe to skip during drain (broker still owns + // the lock and will redeliver any message we drop). RECEIVE_AND_DELETE is not - the broker + // has already removed the message before delivery, so dropping it here would lose it + // permanently. Only PEEK_LOCK takes the drain-aware fast path. When the client cannot + // report a receive mode (test mocks that didn't stub getReceiverOptions()) default to the + // safer no-skip behavior so messages cannot be dropped silently. + final ReceiverOptions receiverOptions = receiverClient.getReceiverOptions(); + final boolean skipDuringDrain + = receiverOptions != null && receiverOptions.getReceiveMode() == ServiceBusReceiveMode.PEEK_LOCK; @SuppressWarnings({ "unchecked", "rawtypes" }) CoreSubscriber[] subscribers @@ -449,36 +635,67 @@ public void onSubscribe(Subscription subscription) { @SuppressWarnings("try") @Override public void onNext(ServiceBusMessageContext serviceBusMessageContext) { - Context span = serviceBusMessageContext.getMessage() != null - ? serviceBusMessageContext.getMessage().getContext() - : Context.NONE; - Exception exception = null; - AutoCloseable scope = tracer.makeSpanCurrent(span); + // Errors must always flow to the user's processError callback even during + // shutdown - they don't touch the receiver and the application has the right + // to know what failed before close completes. Only message dispatches take + // the drain-aware fast path: avoids counting skip-path invocations against + // the drain so a busy upstream cannot keep activeV1HandlerCount > 0 while + // close() is waiting. The skip is gated on PEEK_LOCK only - in + // RECEIVE_AND_DELETE the broker has already removed the message, so dropping + // it here would lose it permanently. + final boolean isErrorContext = serviceBusMessageContext.hasError(); + if (!isErrorContext && v1Closing && skipDuringDrain) { + LOGGER.verbose("Skipping V1 handler execution (early), processor is closing."); + return; + } + activeV1HandlerCount.incrementAndGet(); + isV1HandlerThread.set(Boolean.TRUE); try { - if (serviceBusMessageContext.hasError()) { - handleError(serviceBusMessageContext.getThrowable()); - } else { - ServiceBusReceivedMessageContext serviceBusReceivedMessageContext - = new ServiceBusReceivedMessageContext(receiverClient, serviceBusMessageContext); - - try { - processMessage.accept(serviceBusReceivedMessageContext); - } catch (Exception ex) { - handleError(new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK)); - - if (!processorOptions.isDisableAutoComplete()) { - LOGGER.warning("Error when processing message. Abandoning message.", ex); - abandonMessage(serviceBusMessageContext, receiverClient); + if (!isErrorContext && v1Closing && skipDuringDrain) { + // v1Closing may have flipped between the early check above and this + // point (a check-then-act race). Race-loser invocations still skip + // work; the increment is balanced by the decrement in finally. + LOGGER.verbose("Skipping V1 handler execution, processor is closing."); + return; + } + Context span = serviceBusMessageContext.getMessage() != null + ? serviceBusMessageContext.getMessage().getContext() + : Context.NONE; + Exception exception = null; + AutoCloseable scope = tracer.makeSpanCurrent(span); + try { + if (isErrorContext) { + handleError(serviceBusMessageContext.getThrowable()); + } else { + ServiceBusReceivedMessageContext serviceBusReceivedMessageContext + = new ServiceBusReceivedMessageContext(receiverClient, serviceBusMessageContext); + + try { + processMessage.accept(serviceBusReceivedMessageContext); + } catch (Exception ex) { + handleError(new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK)); + + if (!processorOptions.isDisableAutoComplete()) { + LOGGER.warning("Error when processing message. Abandoning message.", ex); + abandonMessage(serviceBusMessageContext, receiverClient); + } + exception = ex; } - exception = ex; } - } - if (isRunning.get()) { - LOGGER.verbose("Requesting 1 more message from upstream"); - subscription.request(1); + if (isRunning.get()) { + LOGGER.verbose("Requesting 1 more message from upstream"); + subscription.request(1); + } + } finally { + tracer.endSpan(exception, span, scope); } } finally { - tracer.endSpan(exception, span, scope); + isV1HandlerThread.remove(); + if (activeV1HandlerCount.decrementAndGet() <= 1) { + synchronized (v1DrainLock) { + v1DrainLock.notifyAll(); + } + } } } @@ -526,9 +743,24 @@ private void abandonMessage(ServiceBusMessageContext serviceBusMessageContext, private void handleError(Throwable throwable) { try { - ServiceBusReceiverAsyncClient client = asyncClient.get(); - final String fullyQualifiedNamespace = client.getFullyQualifiedNamespace(); - final String entityPath = client.getEntityPath(); + // Read the cached values so processError still fires after asyncClient has been + // nulled by close() cleanup - e.g. a re-entrant close() from within processMessage + // that completes before the calling handler throws. Falls back to the live client + // when the cache hasn't been populated yet (first-ever error before any receive + // cycle), and only then to defaults so the error context is never NPE-suppressed. + String fullyQualifiedNamespace = cachedV1FullyQualifiedNamespace; + String entityPath = cachedV1EntityPath; + if (fullyQualifiedNamespace == null || entityPath == null) { + final ServiceBusReceiverAsyncClient client = asyncClient.get(); + if (client != null) { + if (fullyQualifiedNamespace == null) { + fullyQualifiedNamespace = client.getFullyQualifiedNamespace(); + } + if (entityPath == null) { + entityPath = client.getEntityPath(); + } + } + } processError.accept(new ServiceBusErrorContext(throwable, fullyQualifiedNamespace, entityPath)); } catch (Exception ex) { LOGGER.verbose("Error from error handler. Ignoring error.", ex); @@ -536,6 +768,11 @@ private void handleError(Throwable throwable) { } private synchronized void restartMessageReceiver(Subscription requester) { + if (v1CloseInProgress.get()) { + // Connection monitor or onError fallback fired during shutdown; don't recreate the + // receiver - close() is about to dispose it anyway. + return; + } if (!isRunning()) { return; } @@ -555,4 +792,56 @@ private ServiceBusReceiverAsyncClient createNewReceiver() { ? this.sessionReceiverBuilder.buildAsyncClientForProcessor() : this.receiverBuilder.buildAsyncClientForProcessor(); } + + /** + * Wait for in-flight V1 message handlers to complete, up to the specified timeout. + * Called during V1 close to ensure graceful shutdown before disposing the underlying client. + * + *

Re-entrant semantics: when invoked from within a V1 message handler + * (e.g. the user calls {@link #close() close()} inside their {@code processMessage} + * callback), this method waits only for other concurrent handlers and excludes the + * calling handler from the wait condition - waiting for the calling handler to finish would + * self-deadlock. In that case, this method may return while the calling handler is still + * executing or settling its message.

+ * + * @param timeout the maximum time to wait for handlers to complete. + */ + private void drainV1Handlers(Duration timeout) { + v1Closing = true; + final int threshold; + if (isV1HandlerThread.get()) { + // Re-entrant call from within a V1 message handler (e.g., user called close() inside processMessage). + // Cannot wait for this thread's own handler to complete (would self-deadlock), but we can + // wait for OTHER concurrent handlers to finish settlement before cancelling subscriptions + // and closing the underlying client. + threshold = 1; + if (activeV1HandlerCount.get() <= threshold) { + return; + } + LOGGER.info("drainV1Handlers called from within a V1 message handler (re-entrant). " + + "Waiting for {} other active handler(s) to complete.", activeV1HandlerCount.get() - 1); + } else { + threshold = 0; + } + final long deadline = System.nanoTime() + timeout.toNanos(); + synchronized (v1DrainLock) { + while (activeV1HandlerCount.get() > threshold) { + final long remainingNanos = deadline - System.nanoTime(); + if (remainingNanos <= 0) { + LOGGER.warning("V1 drain timeout expired with {} active handlers still running.", + activeV1HandlerCount.get()); + return; + } + try { + final long millis = TimeUnit.NANOSECONDS.toMillis(remainingNanos); + final int nanos = (int) (remainingNanos % 1_000_000); + v1DrainLock.wait(millis, nanos); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warning("V1 drain interrupted while waiting for in-flight handlers."); + return; + } + } + } + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java index 90bf2db4f0aa..3dd1febd98bb 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java @@ -35,7 +35,9 @@ import java.util.Objects; import java.util.Locale; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -148,9 +150,16 @@ final class SessionsMessagePump { private final Consumer processMessage; private final Consumer processError; private final Runnable onTerminate; + private final Duration drainTimeout; private final AtomicReference> rollingReceiversRef = new AtomicReference<>(EMPTY); private final SessionReceiversTracker receiversTracker; private final Mono nextSession; + // True when the receive mode is PEEK_LOCK. Cached here from the ctor's receiveMode parameter + // (which is otherwise only forwarded to SessionReceiversTracker) so each rolling receiver can + // be told whether it is safe to skip handler dispatch during drain. RECEIVE_AND_DELETE + // sessions must always invoke processMessage even during shutdown - see RollingSessionReceiver + // for the data-loss rationale. + private final boolean skipDuringDrain; SessionsMessagePump(String identifier, String fullyQualifiedNamespace, String entityPath, ServiceBusReceiveMode receiveMode, ServiceBusReceiverInstrumentation instrumentation, @@ -158,7 +167,7 @@ final class SessionsMessagePump { int maxConcurrentSessions, int concurrencyPerSession, int prefetch, boolean enableAutoDisposition, MessageSerializer serializer, AmqpRetryPolicy retryPolicy, Consumer processMessage, Consumer processError, - Runnable onTerminate) { + Runnable onTerminate, Duration drainTimeout) { this.pumpId = COUNTER.incrementAndGet(); final Map loggingContext = new HashMap<>(3); loggingContext.put(PUMP_ID_KEY, pumpId); @@ -184,9 +193,11 @@ final class SessionsMessagePump { this.processMessage = Objects.requireNonNull(processMessage, "'processMessage' cannot be null."); this.processError = Objects.requireNonNull(processError, "'processError' cannot be null."); this.onTerminate = Objects.requireNonNull(onTerminate, "'onTerminate' cannot be null."); + this.drainTimeout = Objects.requireNonNull(drainTimeout, "'drainTimeout' cannot be null."); this.receiversTracker = new SessionReceiversTracker(logger, maxConcurrentSessions, fullyQualifiedNamespace, entityPath, receiveMode, instrumentation); this.nextSession = new NextSession(pumpId, fullyQualifiedNamespace, entityPath, sessionAcquirer).mono(); + this.skipDuringDrain = receiveMode == ServiceBusReceiveMode.PEEK_LOCK; } String getIdentifier() { @@ -272,10 +283,10 @@ private Mono terminate(TerminalSignalType signalType) { private List createRollingSessionReceivers() { final ArrayList rollingReceivers = new ArrayList<>(maxConcurrentSessions); for (int rollerId = 1; rollerId <= maxConcurrentSessions; rollerId++) { - final RollingSessionReceiver rollingReceiver - = new RollingSessionReceiver(pumpId, rollerId, instrumentation, fullyQualifiedNamespace, entityPath, - nextSession, maxSessionLockRenew, sessionIdleTimeout, concurrencyPerSession, prefetch, - enableAutoDisposition, serializer, retryPolicy, processMessage, processError, receiversTracker); + final RollingSessionReceiver rollingReceiver = new RollingSessionReceiver(pumpId, rollerId, instrumentation, + fullyQualifiedNamespace, entityPath, nextSession, maxSessionLockRenew, sessionIdleTimeout, + concurrencyPerSession, prefetch, enableAutoDisposition, serializer, retryPolicy, processMessage, + processError, receiversTracker, drainTimeout, skipDuringDrain); rollingReceivers.add(rollingReceiver); } return rollingReceivers; @@ -368,11 +379,22 @@ private static final class RollingSessionReceiver extends AtomicReference isHandlerThread = ThreadLocal.withInitial(() -> Boolean.FALSE); + private volatile boolean closing; + // True when the receive mode is PEEK_LOCK, in which case it is safe to skip handler + // dispatch for messages that arrive after closing=true (the broker still owns the lock + // and will redeliver). False for RECEIVE_AND_DELETE, where the broker has already removed + // the message before delivery - skipping the handler in that mode would lose the message + // permanently, so we must always invoke processMessage even during the drain window. + private final boolean skipDuringDrain; private final Consumer processMessage; private final Consumer processError; private final boolean enableAutoDisposition; private final Duration maxSessionLockRenew; private final Duration sessionIdleTimeout; + private final Duration drainTimeout; private final MessageSerializer serializer; private final ServiceBusReceiverInstrumentation instrumentation; private final ServiceBusTracer tracer; @@ -385,7 +407,7 @@ private static final class RollingSessionReceiver extends AtomicReference processMessage, Consumer processError, - SessionReceiversTracker receiversTracker) { + SessionReceiversTracker receiversTracker, Duration drainTimeout, boolean skipDuringDrain) { super(INIT); this.pumpId = pumpId; final Map loggingContext = new HashMap<>(3); @@ -406,6 +428,8 @@ private static final class RollingSessionReceiver extends AtomicReference nextSessionReceiverStream @@ -446,10 +470,70 @@ private Mono terminate(TerminalSignalType signalType, Scheduler workerSche // by the ServiceBusSessionReactorReceiver. logger.atInfo().log("Roller terminated. rollerId:" + rollerId + " signal:" + signalType); nextSessionStream.close(); + // Drain in-flight message handlers BEFORE disposing the worker scheduler. + // Disposing the scheduler interrupts handler threads (via ScheduledExecutorService.shutdownNow()). + // Draining first ensures handlers can complete message settlement before threads are interrupted. + // See https://github.com/Azure/azure-sdk-for-java/issues/45716 + drainHandlers(drainTimeout); workerScheduler.dispose(); return Mono.empty(); } + /** + * Wait for in-flight session message handlers to complete, up to the specified timeout. + * Called during session receiver termination to ensure graceful shutdown — messages + * currently being processed are allowed to complete (including settlement) before the + * worker scheduler is disposed. + * + *

Re-entrant semantics: when invoked from within a session message + * handler (i.e. the calling thread is the handler thread itself), this method waits only + * for other concurrent handlers on this session and excludes the calling handler + * from the wait condition - waiting for the calling handler to finish would self-deadlock. + * In that case, this method may return while the calling handler is still executing.

+ * + * @param timeout the maximum time to wait for in-flight handlers to complete. + */ + private void drainHandlers(Duration timeout) { + closing = true; + final int threshold; + if (isHandlerThread.get()) { + // Re-entrant call from within a session message handler (e.g., user called close() inside processMessage). + // Cannot wait for this thread's own handler to complete (would self-deadlock), but we can + // wait for OTHER concurrent handlers to finish settlement before disposing the worker scheduler. + threshold = 1; + if (activeHandlerCount.get() <= threshold) { + return; + } + logger.atInfo() + .addKeyValue("otherActiveHandlers", activeHandlerCount.get() - 1) + .log("drainHandlers called from within a session message handler (re-entrant). " + + "Waiting for other active handlers to complete."); + } else { + threshold = 0; + } + final long deadline = System.nanoTime() + timeout.toNanos(); + synchronized (drainLock) { + while (activeHandlerCount.get() > threshold) { + final long remainingNanos = deadline - System.nanoTime(); + if (remainingNanos <= 0) { + logger.atWarning() + .addKeyValue("activeHandlers", activeHandlerCount.get()) + .log("Session drain timeout expired with active handlers still running."); + return; + } + try { + final long millis = TimeUnit.NANOSECONDS.toMillis(remainingNanos); + final int nanos = (int) (remainingNanos % 1_000_000); + drainLock.wait(millis, nanos); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.atWarning().log("Session drain interrupted while waiting for in-flight handlers."); + return; + } + } + } + } + private ServiceBusSessionReactorReceiver nextSessionReceiver(ServiceBusSessionAcquirer.Session nextSession) { final State lastState = super.get(); if (lastState == TERMINATED) { @@ -486,22 +570,58 @@ private void handleMessage(Message qpidMessage) { final ServiceBusReceivedMessage message = serializer.deserialize(qpidMessage, ServiceBusReceivedMessage.class); - instrumentation.instrumentProcess(message, ReceiverKind.PROCESSOR, msg -> { - logger.atVerbose() - .addKeyValue(SESSION_ID_KEY, message.getSessionId()) - .addKeyValue(MESSAGE_ID_LOGGING_KEY, message.getMessageId()) - .log("Received message."); + // Fast-path early return: avoid counting skip-path invocations against the drain. + // Under sustained throughput, the per-session messageFlux is still live while + // drainHandlers() is waiting (the worker scheduler is only disposed after drain + // returns), so flatMap keeps dispatching messages. If we incremented the counter for + // every skip we could keep activeHandlerCount > 0 long enough to push drain to its + // timeout. Reading the volatile flag here ensures messages that arrive after + // closing=true is set are dropped without touching the drain's exit condition. + // + // Skip is gated on PEEK_LOCK only: in that mode the broker still owns the lock and + // will redeliver any message we drop. In RECEIVE_AND_DELETE, the broker has already + // removed the message before delivery, so dropping it here would lose it permanently + // - those messages must always reach processMessage even during the drain window. + if (closing && skipDuringDrain) { + logger.atVerbose().log("Skipping handler execution (early), session pump is closing."); + return; + } - final Throwable error = notifyMessage(msg); - if (enableAutoDisposition) { - if (error == null) { - complete(msg); - } else { - abandon(msg); + activeHandlerCount.incrementAndGet(); + isHandlerThread.set(Boolean.TRUE); + try { + // closing may have flipped between the early check above and this point + // (a check-then-act race). Re-check inside the counted region so the rare + // race-loser still skips work; the increment will be balanced by the decrement + // in finally and notifyAll the drain. Same RECEIVE_AND_DELETE exemption applies. + if (closing && skipDuringDrain) { + logger.atVerbose().log("Skipping handler execution, session pump is closing."); + return; + } + instrumentation.instrumentProcess(message, ReceiverKind.PROCESSOR, msg -> { + logger.atVerbose() + .addKeyValue(SESSION_ID_KEY, message.getSessionId()) + .addKeyValue(MESSAGE_ID_LOGGING_KEY, message.getMessageId()) + .log("Received message."); + + final Throwable error = notifyMessage(msg); + if (enableAutoDisposition) { + if (error == null) { + complete(msg); + } else { + abandon(msg); + } + } + return error; + }); + } finally { + isHandlerThread.remove(); + if (activeHandlerCount.decrementAndGet() <= 1) { + synchronized (drainLock) { + drainLock.notifyAll(); } } - return error; - }); + } } private Throwable notifyMessage(ServiceBusReceivedMessage message) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusProcessorClientOptions.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusProcessorClientOptions.java index af1fd2a7b962..117f8d0b2cc2 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusProcessorClientOptions.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusProcessorClientOptions.java @@ -4,17 +4,24 @@ package com.azure.messaging.servicebus.implementation; import com.azure.core.annotation.Fluent; +import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.ServiceBusProcessorClient; +import java.time.Duration; + /** * Additional options to configure {@link ServiceBusProcessorClient}. */ @Fluent public final class ServiceBusProcessorClientOptions { + private static final ClientLogger LOGGER = new ClientLogger(ServiceBusProcessorClientOptions.class); + private static final Duration DEFAULT_DRAIN_TIMEOUT = Duration.ofSeconds(30); + private int maxConcurrentCalls = 1; private boolean disableAutoComplete; private boolean isV2; + private Duration drainTimeout = DEFAULT_DRAIN_TIMEOUT; /** * Returns true if the auto-complete and auto-abandon feature is disabled. @@ -60,4 +67,32 @@ public ServiceBusProcessorClientOptions setV2(boolean isV2) { public boolean isV2() { return isV2; } + + /** + * Returns the maximum time to wait for in-flight message handlers to complete during processor shutdown. + * @return the drain timeout duration. + */ + public Duration getDrainTimeout() { + return drainTimeout; + } + + /** + * Sets the maximum time to wait for in-flight message handlers to complete during processor shutdown. + * Defaults to 30 seconds. + * + * @param drainTimeout the maximum time to wait for in-flight handlers. Must be positive. + * @return The updated instance of {@link ServiceBusProcessorClientOptions}. + * @throws NullPointerException if {@code drainTimeout} is null. + * @throws IllegalArgumentException if {@code drainTimeout} is zero or negative. + */ + public ServiceBusProcessorClientOptions setDrainTimeout(Duration drainTimeout) { + if (drainTimeout == null) { + throw LOGGER.logExceptionAsError(new NullPointerException("'drainTimeout' cannot be null.")); + } + if (drainTimeout.isZero() || drainTimeout.isNegative()) { + throw LOGGER.logExceptionAsError(new IllegalArgumentException("'drainTimeout' must be positive.")); + } + this.drainTimeout = drainTimeout; + return this; + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderUnitTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderUnitTest.java index 045562f3b88f..e928dafaf8d2 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderUnitTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderUnitTest.java @@ -198,6 +198,60 @@ public void testEntityNameInConnectionString() { assertNotNull(builder.buildAsyncClient()); } + /** + * Verifies that {@code drainTimeout(Duration)} on both processor builders rejects null + * (NullPointerException) and zero/negative values (IllegalArgumentException), matching the + * documented contract on {@code ServiceBusProcessorClientOptions.setDrainTimeout(Duration)}. + */ + @Test + public void processorBuilderDrainTimeoutValidation() { + // Non-session processor builder. + assertThrowsExactly(NullPointerException.class, + () -> createMinimalValidClientBuilder().processor().drainTimeout(null)); + assertThrowsExactly(IllegalArgumentException.class, + () -> createMinimalValidClientBuilder().processor().drainTimeout(Duration.ZERO)); + assertThrowsExactly(IllegalArgumentException.class, + () -> createMinimalValidClientBuilder().processor().drainTimeout(Duration.ofSeconds(-1))); + + // Session processor builder. + assertThrowsExactly(NullPointerException.class, + () -> createMinimalValidClientBuilder().sessionProcessor().drainTimeout(null)); + assertThrowsExactly(IllegalArgumentException.class, + () -> createMinimalValidClientBuilder().sessionProcessor().drainTimeout(Duration.ZERO)); + assertThrowsExactly(IllegalArgumentException.class, + () -> createMinimalValidClientBuilder().sessionProcessor().drainTimeout(Duration.ofSeconds(-1))); + } + + /** + * Verifies that a positive {@code drainTimeout(Duration)} is accepted on both processor + * builders and propagates through to a buildable processor client (no validation exception + * during build). + */ + @Test + public void processorBuilderDrainTimeoutPositiveAccepted() { + // Non-session processor builder accepts a positive duration and builds successfully. + final ServiceBusProcessorClient processorClient = createMinimalValidClientBuilder().processor() + .queueName("fakequeue") + .drainTimeout(Duration.ofSeconds(5)) + .processMessage(x -> { + }) + .processError(x -> { + }) + .buildProcessorClient(); + assertNotNull(processorClient); + + // Session processor builder accepts a positive duration and builds successfully. + final ServiceBusProcessorClient sessionProcessorClient = createMinimalValidClientBuilder().sessionProcessor() + .queueName("fakequeue") + .drainTimeout(Duration.ofSeconds(5)) + .processMessage(x -> { + }) + .processError(x -> { + }) + .buildProcessorClient(); + assertNotNull(sessionProcessorClient); + } + private static ServiceBusClientBuilder createMinimalValidClientBuilder() { return new ServiceBusClientBuilder().credential(FAKE_TOKEN_CREDENTIAL).fullyQualifiedNamespace(NAMESPACE_NAME); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java new file mode 100644 index 000000000000..761d5cb7de17 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java @@ -0,0 +1,1549 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusReceiverClientBuilder; +import com.azure.messaging.servicebus.ServiceBusProcessor.RollingMessagePump; +import com.azure.messaging.servicebus.implementation.ServiceBusProcessorClientOptions; +import com.azure.messaging.servicebus.implementation.instrumentation.ReceiverKind; +import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.junit.jupiter.api.parallel.Isolated; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for graceful shutdown behavior during processor close. + *

+ * Validates that when a processor is closed, in-flight message handlers are allowed to complete + * (including message settlement) before the underlying client is disposed. This prevents + * {@link IllegalStateException} when handlers attempt to settle messages on a disposed receiver. + *

+ * + *

Coverage Matrix

+ *
    + *
  • V2 Non-Session — {@link #v2CloseShouldWaitForInFlightHandlerBeforeClosingClient()}: + * Tests drain in {@code RollingMessagePump.dispose()} → {@code MessagePump.drainHandlers()}
  • + *
  • V1 Non-Session — {@link #v1CloseShouldWaitForInFlightHandlerBeforeClosingClient()}: + * Tests drain in {@code ServiceBusProcessorClient.close()} → {@code drainV1Handlers()}
  • + *
  • Drain Timeout — {@link #v2DrainShouldRespectTimeout()}: + * Tests {@code MessagePump.drainHandlers()} timeout behavior directly
  • + *
  • Re-entrant (single) — {@link #v2DrainFromWithinHandlerShouldNotDeadlock()}: + * Tests re-entrant drain with no other concurrent handlers (returns true immediately)
  • + *
  • Closing Flag — {@link #v2ClosingFlagPreventsNewHandlersAfterDrainStarts()}: + * Tests that the V2 closing flag prevents new handler dispatch during drain
  • + *
  • V1 Closing Flag — {@link #v1ClosingFlagPreventsNewHandlersAfterDrainStarts()}: + * Tests that the V1 closing flag prevents new handler dispatch in the drain-to-cancel window
  • + *
  • V1 Restart — {@link #v1StartAfterCloseResetsClosingFlag()}: + * Tests that {@code start()} after {@code close()} resets {@code v1Closing} so handlers run
  • + *
  • Re-entrant (concurrent) — {@link #v1ReentrantCloseWaitsForOtherConcurrentHandlers()}: + * Tests re-entrant drain with concurrent handlers — waits for other handlers before closing
  • + *
  • Monitor Released During Drain — {@link #v1CloseShouldNotHoldClientMonitorDuringDrain()}: + * Tests that {@code close()} releases the instance monitor across the drain wait, so handlers + * calling synchronized accessors (e.g. {@code isRunning()}) do not stall shutdown until the + * drain timeout expires
  • + *
  • Concurrent Start During Close — {@link #v1ConcurrentStartDuringCloseDrainIsIgnored()}: + * Tests that a concurrent {@code start()} during {@code close()}'s drain window is ignored + * so it does not create new resources that the in-progress {@code close()} would tear down
  • + *
  • getIdentifier() During Close — {@link #v1GetIdentifierDuringAndAfterCloseDoesNotCreateNewReceiver()}: + * Tests that {@code getIdentifier()} returns the cached identifier during/after close instead of + * lazy-creating a new receiver that would leak past the shutdown path
  • + *
  • Concurrent Close Ownership — {@link #v1ConcurrentCloseCallsDoNotRace()}: + * Tests that only the first concurrent {@code close()} performs cleanup; the others return + * immediately so they cannot dispose state created after the owner cleared the in-progress flag
  • + *
  • V2 Concurrent Start During Close — {@link #v2ConcurrentStartDuringCloseDrainIsIgnored()}: + * Tests that a concurrent {@code start()} during {@code processorV2.close()}'s drain window + * is ignored, mirroring the V1 guarantee
  • + *
  • RECEIVE_AND_DELETE No Skip (V2) — {@link #v2ReceiveAndDeleteModeDoesNotSkipDuringDrain()}: + * Tests that the V2 pump's drain skip-path does NOT drop messages in RECEIVE_AND_DELETE mode - + * the broker has already removed those messages, so skipping would lose them permanently
  • + *
  • RECEIVE_AND_DELETE No Skip (V1) — {@link #v1ReceiveAndDeleteModeDoesNotSkipDuringDrain()}: + * Mirrors the V2 guarantee for the V1 onNext path
  • + *
  • V2 Session — Not directly unit-testable. The drain in + * {@code SessionsMessagePump.RollingSessionReceiver.terminate()} uses the identical + * {@code AtomicInteger} + {@code Object} monitor wait/notifyAll pattern as {@code MessagePump}. + * {@code SessionsMessagePump} requires a {@code ServiceBusSessionAcquirer} (AMQP connections) + * and {@code RollingSessionReceiver} is a private inner class, making unit testing infeasible. + * The session drain behavior should be verified via live/integration tests.
  • + *
+ * + * @see Issue #45716 + */ +@Execution(ExecutionMode.SAME_THREAD) +@Isolated +public class ServiceBusProcessorGracefulShutdownTest { + private static final ServiceBusReceiverInstrumentation INSTRUMENTATION + = new ServiceBusReceiverInstrumentation(null, null, "FQDN", "entityPath", null, ReceiverKind.PROCESSOR); + + private AutoCloseable mocksCloseable; + + @BeforeEach + public void setup() { + mocksCloseable = MockitoAnnotations.openMocks(this); + } + + @AfterEach + public void teardown() throws Exception { + Mockito.framework().clearInlineMock(this); + if (mocksCloseable != null) { + mocksCloseable.close(); + } + } + + /** + * Polls the supplied predicate every 5 ms (up to 5 seconds) and fails the test if it never + * becomes true. Used to wait for asynchronous lifecycle transitions deterministically without + * relying on a fixed {@link Thread#sleep(long)}. + */ + private static void waitFor(java.util.function.BooleanSupplier condition, String description) + throws InterruptedException { + final long deadline = System.nanoTime() + Duration.ofSeconds(5).toNanos(); + while (!condition.getAsBoolean()) { + if (System.nanoTime() > deadline) { + throw new AssertionError("Timed out waiting for " + description); + } + Thread.sleep(5); + } + } + + /** + * Returns a real {@link ReceiverOptions} configured for PEEK_LOCK. Every test in this file + * targets the PEEK_LOCK shutdown semantics (broker re-delivers any message dropped during + * drain), so production code reading {@code client.getReceiverOptions().getReceiveMode()} + * must see PEEK_LOCK on the mocked async clients to take the drain-aware fast path. A + * RECEIVE_AND_DELETE-specific test would build a different value here. + * + *

Uses the real {@code ReceiverOptions} factory rather than a Mockito mock to avoid the + * "UnfinishedStubbing" trap that arises when this helper is invoked inside another + * {@code when(...).thenReturn(...)} clause.

+ */ + private static ReceiverOptions peekLockOptions() { + return ReceiverOptions.createNonSessionOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, null, false); + } + + /** + * Verifies that when the V2 processor pump is disposed, in-flight message handlers + * are allowed to complete before the underlying client is closed. + *

+ * Regression test for #45716. + * Before the fix, disposing the pump would immediately cancel the reactive chain (interrupting + * handler threads via Reactor's publishOn worker disposal), then close the client. Handlers + * that called {@code client.complete(message).block()} would fail with + * {@link IllegalStateException}: "Cannot perform operation on a disposed receiver". + *

+ *

+ * The fix drains in-flight handlers in {@code RollingMessagePump.dispose()} BEFORE disposing + * the subscription, ensuring handlers complete message settlement first. + *

+ */ + @Test + public void v2CloseShouldWaitForInFlightHandlerBeforeClosingClient() throws InterruptedException { + final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class); + final ServiceBusReceiverClientBuilder builder = mock(ServiceBusReceiverClientBuilder.class); + final ServiceBusReceiverAsyncClient client = mock(ServiceBusReceiverAsyncClient.class); + + when(builder.buildAsyncClientForProcessor()).thenReturn(client); + when(client.getInstrumentation()).thenReturn(INSTRUMENTATION); + when(client.getReceiverOptions()).thenReturn(peekLockOptions()); + when(client.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(client.getEntityPath()).thenReturn("entityPath"); + when(client.isConnectionClosed()).thenReturn(false); + when(client.isAutoLockRenewRequested()).thenReturn(false); + // Emit one message on boundedElastic then hang. publishOn ensures the handler doesn't block + // the subscription thread when concurrency=1 (which uses Schedulers.immediate() for the worker). + when(client.nonSessionProcessorReceiveV2()) + .thenReturn(Flux.concat(Flux.just(message), Flux.never()) + .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + when(client.complete(any())).thenReturn(Mono.empty()); + doNothing().when(client).close(); + + // Latches to coordinate between the handler thread and the test thread. + final CountDownLatch handlerStarted = new CountDownLatch(1); + final CountDownLatch handlerCanProceed = new CountDownLatch(1); + final AtomicBoolean handlerCompleted = new AtomicBoolean(false); + + // The handler signals when it starts, then waits for the test to allow it to proceed. + final Consumer messageConsumer = (messageContext) -> { + handlerStarted.countDown(); + try { + handlerCanProceed.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + handlerCompleted.set(true); + }; + + final RollingMessagePump pump = new RollingMessagePump(builder, messageConsumer, e -> { + }, 1, true, Duration.ofSeconds(30)); + + // Start the pump. + pump.begin(); + + // Wait for the handler to start processing the message. + assertTrue(handlerStarted.await(5, TimeUnit.SECONDS), "Handler should have started processing"); + + // Dispose the pump while the handler is still in-flight. + // dispose() now drains FIRST (before cancelling the subscription), so it blocks until + // the handler completes. Run on a separate thread to avoid blocking the test. + final CountDownLatch disposeDone = new CountDownLatch(1); + final Thread disposeThread = new Thread(() -> { + pump.dispose(); + disposeDone.countDown(); + }); + disposeThread.start(); + + try { + // Wait deterministically for dispose to be blocked in drainHandlers() (the drain + // sets the closing flag and blocks on the in-flight handler counter monitor, so the + // thread transitions to WAITING/TIMED_WAITING). Avoids the flakiness of a fixed sleep. + waitFor( + () -> disposeThread.getState() == Thread.State.WAITING + || disposeThread.getState() == Thread.State.TIMED_WAITING, + "dispose() to be blocked in drainHandlers()"); + + // Verify: client has NOT been closed yet (handler is still running, drain is blocking dispose). + verify(client, never()).close(); + assertFalse(handlerCompleted.get(), "Handler should still be in-flight"); + + // Now let the handler complete. + handlerCanProceed.countDown(); + + // Wait for dispose to finish. + assertTrue(disposeDone.await(5, TimeUnit.SECONDS), "Dispose should complete after handler finishes"); + assertTrue(handlerCompleted.get(), "Handler should have completed"); + + // Verify the client was closed (after the handler completed and drain returned). + verify(client, timeout(2000)).close(); + // Verify complete was called (auto-disposition is enabled). + verify(client).complete(any()); + } finally { + handlerCanProceed.countDown(); + disposeThread.join(5000); + } + } + + /** + * Verifies that when the V1 processor is closed, in-flight message handlers are allowed to + * complete before the underlying client is closed. + *

+ * Regression test for #45716. + * Before the fix, the V1 path would cancel subscriptions (which interrupts handler threads + * via Reactor's publishOn worker disposal) and then immediately close the async client. + *

+ *

+ * The fix drains in-flight handlers BEFORE cancelling subscriptions. Setting + * {@code isRunning = false} prevents new message requests while the drain waits for + * in-flight handlers to complete. + *

+ */ + @Test + public void v1CloseShouldWaitForInFlightHandlerBeforeClosingClient() throws InterruptedException { + final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class); + final Flux messageFlux = Flux.concat(Flux.just(message), Flux.never()); + + final ServiceBusReceiverClientBuilder receiverBuilder = mock(ServiceBusReceiverClientBuilder.class); + final ServiceBusReceiverAsyncClient asyncClient = mock(ServiceBusReceiverAsyncClient.class); + + when(receiverBuilder.buildAsyncClientForProcessor()).thenReturn(asyncClient); + when(asyncClient.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(asyncClient.getEntityPath()).thenReturn("entityPath"); + when(asyncClient.isConnectionClosed()).thenReturn(false); + final ServiceBusReceiverInstrumentation instrumentation + = new ServiceBusReceiverInstrumentation(null, null, "FQDN", "entityPath", null, ReceiverKind.PROCESSOR); + when(asyncClient.getInstrumentation()).thenReturn(instrumentation); + when(asyncClient.getReceiverOptions()).thenReturn(peekLockOptions()); + // V1 path uses receiveMessagesWithContext, publishOn(boundedElastic) matches real behavior + // and ensures the handler runs on a separate thread (needed for drain testing). + when(asyncClient.receiveMessagesWithContext()).thenReturn(messageFlux.map(ServiceBusMessageContext::new) + .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + doNothing().when(asyncClient).close(); + + // Latches to coordinate between the handler thread and the test thread. + final CountDownLatch handlerStarted = new CountDownLatch(1); + final CountDownLatch handlerCanProceed = new CountDownLatch(1); + final AtomicBoolean handlerCompleted = new AtomicBoolean(false); + + final Consumer messageConsumer = (messageContext) -> { + handlerStarted.countDown(); + try { + handlerCanProceed.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + handlerCompleted.set(true); + }; + + // Build V1 processor (isV2 = false by NOT setting options.setV2(true)) + final ServiceBusProcessorClientOptions options + = new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1); + // V1 path: do not set V2 + final ServiceBusProcessorClient processorClient + = new ServiceBusProcessorClient(receiverBuilder, "entityPath", null, null, messageConsumer, error -> { + }, options); + + // Start the processor (V1 path). + processorClient.start(); + + // Wait for the handler to start processing the message. + assertTrue(handlerStarted.await(5, TimeUnit.SECONDS), "Handler should have started processing"); + + // Close the processor while the handler is still in-flight. + // close() now drains FIRST (before cancelling subscriptions), so it blocks until + // the handler completes. Run on a separate thread to avoid blocking the test. + final CountDownLatch closeDone = new CountDownLatch(1); + final Thread closeThread = new Thread(() -> { + processorClient.close(); + closeDone.countDown(); + }); + closeThread.start(); + + try { + // Wait deterministically for close() to enter drainV1Handlers(). close() sets + // isRunning=false inside its first synchronized block before entering drain, so once + // the predicate returns true we know close has at least taken ownership. + waitFor(() -> !processorClient.isRunning(), "close() to have set isRunning=false"); + + // Verify: client has NOT been closed yet (handler is still running, drain is blocking close). + verify(asyncClient, never()).close(); + assertFalse(handlerCompleted.get(), "Handler should still be in-flight"); + + // Now let the handler complete. + handlerCanProceed.countDown(); + + // Wait for close to finish. + assertTrue(closeDone.await(5, TimeUnit.SECONDS), "Close should complete after handler finishes"); + assertTrue(handlerCompleted.get(), "Handler should have completed"); + + // Verify the client was closed (after the handler completed). + verify(asyncClient, timeout(2000)).close(); + } finally { + handlerCanProceed.countDown(); + closeThread.join(5000); + } + } + + /** + * Verifies that the V2 drain mechanism respects the timeout. If a handler takes longer than + * the drain timeout, {@code drainHandlers} returns false and the processor doesn't hang + * indefinitely. + *

+ * This tests the drain mechanism directly on a {@link MessagePump} without going through + * the full RollingMessagePump dispose path. The handler blocks forever, and we verify + * that {@code drainHandlers()} with a short timeout returns false after approximately + * the timeout duration. + *

+ */ + @Test + public void v2DrainShouldRespectTimeout() throws InterruptedException { + final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class); + final ServiceBusReceiverClientBuilder builder = mock(ServiceBusReceiverClientBuilder.class); + final ServiceBusReceiverAsyncClient client = mock(ServiceBusReceiverAsyncClient.class); + + when(builder.buildAsyncClientForProcessor()).thenReturn(client); + when(client.getInstrumentation()).thenReturn(INSTRUMENTATION); + when(client.getReceiverOptions()).thenReturn(peekLockOptions()); + when(client.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(client.getEntityPath()).thenReturn("entityPath"); + when(client.isConnectionClosed()).thenReturn(false); + when(client.isAutoLockRenewRequested()).thenReturn(false); + when(client.nonSessionProcessorReceiveV2()) + .thenReturn(Flux.concat(Flux.just(message), Flux.never()) + .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + when(client.complete(any())).thenReturn(Mono.empty()); + doNothing().when(client).close(); + + // Handler blocks forever (never releases the latch). + final CountDownLatch handlerStarted = new CountDownLatch(1); + final CountDownLatch neverReleasedLatch = new CountDownLatch(1); + + final Consumer messageConsumer = (messageContext) -> { + handlerStarted.countDown(); + try { + neverReleasedLatch.await(); // Block forever + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }; + + final MessagePump pump = new MessagePump(client, messageConsumer, e -> { + }, 1, false); + + // Subscribe to start pumping. + final AtomicReference subscription = new AtomicReference<>(); + subscription.set(pump.begin().subscribe()); + + // Wait for the handler to start. + assertTrue(handlerStarted.await(5, TimeUnit.SECONDS), "Handler should have started processing"); + + // Call drainHandlers with a very short timeout while the handler is still running. + // DO NOT dispose the subscription first — disposing cancels the reactive chain, which + // interrupts the handler's thread via Reactor's publishOn worker disposal. The drain must + // be called while the subscription (and handler) is still active. + final long startTime = System.nanoTime(); + final boolean drained = pump.drainHandlers(Duration.ofMillis(500)); + final long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); + + // Drain should return false (timed out) and should take close to 500ms. + assertFalse(drained, "Drain should return false when timeout expires with active handlers"); + assertTrue(elapsed >= 400, + "Drain should wait at least close to the timeout duration, but took " + elapsed + "ms"); + assertTrue(elapsed < 3000, "Drain should not take excessively long, but took " + elapsed + "ms"); + + // Clean up: release the blocked handler and dispose the subscription. + neverReleasedLatch.countDown(); + subscription.get().dispose(); + } + + /** + * Verifies that calling {@code drainHandlers()} from within a message handler (re-entrant) + * does not deadlock. This simulates a user calling {@code processor.close()} from inside + * their {@code processMessage} callback when only this handler is active (no concurrent handlers). + *

+ * Without the re-entrancy guard, the handler thread would enter {@code drainHandlers()}, + * which waits for {@code activeHandlerCount} to reach 0. But the handler itself has + * incremented the counter and won't decrement it until it returns — classic self-deadlock. + *

+ *

+ * The fix detects the re-entrant call via a {@link ThreadLocal} flag and uses a threshold of 1 + * (only wait for OTHER handlers). With no other handlers active, it returns {@code true} + * immediately. + *

+ */ + @Test + public void v2DrainFromWithinHandlerShouldNotDeadlock() throws InterruptedException { + final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class); + final ServiceBusReceiverAsyncClient client = mock(ServiceBusReceiverAsyncClient.class); + + when(client.getInstrumentation()).thenReturn(INSTRUMENTATION); + when(client.getReceiverOptions()).thenReturn(peekLockOptions()); + when(client.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(client.getEntityPath()).thenReturn("entityPath"); + when(client.isConnectionClosed()).thenReturn(false); + when(client.isAutoLockRenewRequested()).thenReturn(false); + when(client.nonSessionProcessorReceiveV2()) + .thenReturn(Flux.concat(Flux.just(message), Flux.never()) + .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + when(client.complete(any())).thenReturn(Mono.empty()); + doNothing().when(client).close(); + + final CountDownLatch handlerStarted = new CountDownLatch(1); + final CountDownLatch handlerDone = new CountDownLatch(1); + final AtomicBoolean drainReturnedTrue = new AtomicBoolean(false); + + // Create the pump first, then reference it inside the handler via AtomicReference. + final AtomicReference pumpRef = new AtomicReference<>(); + + final Consumer messageConsumer = (messageContext) -> { + handlerStarted.countDown(); + // Simulate user calling close() from within processMessage, which triggers drainHandlers(). + // With only the current handler active (no other concurrent handlers), the re-entrant drain + // should return true immediately (nothing to drain) instead of deadlocking. + boolean result = pumpRef.get().drainHandlers(Duration.ofSeconds(5)); + drainReturnedTrue.set(result); + handlerDone.countDown(); + }; + + final MessagePump pump = new MessagePump(client, messageConsumer, e -> { + }, 1, false); + pumpRef.set(pump); + + // Subscribe to start pumping. + final AtomicReference subscription = new AtomicReference<>(); + subscription.set(pump.begin().subscribe()); + + // Wait for the handler to start and complete (should NOT deadlock). + assertTrue(handlerStarted.await(5, TimeUnit.SECONDS), "Handler should have started processing"); + assertTrue(handlerDone.await(5, TimeUnit.SECONDS), + "Handler should have completed without deadlocking on re-entrant drainHandlers()"); + + // Re-entrant drain with no other concurrent handlers should return true (nothing to drain). + assertTrue(drainReturnedTrue.get(), + "Re-entrant drainHandlers() with no other concurrent handlers should return true"); + + // Clean up. + subscription.get().dispose(); + } + + /** + * Verifies that the closing flag prevents new message handlers from processing after + * {@code drainHandlers()} is called. + *

+ * Race condition scenario: with {@code flatMap(concurrency=2)}, two messages are emitted. + * The first handler blocks (simulating in-flight work). {@code drainHandlers()} is called + * on a separate thread, which sets {@code closing = true} and waits for the first handler + * to complete. A second message arrives while closing is true. The second handler should + * see the closing flag and skip processing. + *

+ *

+ * Without the closing flag, the second handler could start real work (including settlement) + * between drain returning and subscription disposal, reintroducing the original failure mode. + *

+ */ + @Test + public void v2ClosingFlagPreventsNewHandlersAfterDrainStarts() throws InterruptedException { + final ServiceBusReceivedMessage message1 = mock(ServiceBusReceivedMessage.class); + final ServiceBusReceivedMessage message2 = mock(ServiceBusReceivedMessage.class); + final ServiceBusReceiverAsyncClient client = mock(ServiceBusReceiverAsyncClient.class); + + when(client.getInstrumentation()).thenReturn(INSTRUMENTATION); + when(client.getReceiverOptions()).thenReturn(peekLockOptions()); + when(client.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(client.getEntityPath()).thenReturn("entityPath"); + when(client.isConnectionClosed()).thenReturn(false); + when(client.isAutoLockRenewRequested()).thenReturn(false); + when(client.complete(any())).thenReturn(Mono.empty()); + doNothing().when(client).close(); + + // Use a Sinks.Many for fully controlled emission timing - we choose exactly when each + // message enters the pipeline rather than relying on a wall-clock delay. concurrency=2 + // lets flatMap dispatch handlers in parallel. + final reactor.core.publisher.Sinks.Many messageSink + = reactor.core.publisher.Sinks.many().unicast().onBackpressureBuffer(); + when(client.nonSessionProcessorReceiveV2()) + .thenReturn(messageSink.asFlux().publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + + final CountDownLatch handler1Started = new CountDownLatch(1); + final CountDownLatch handler1CanProceed = new CountDownLatch(1); + final AtomicBoolean handler2ProcessMessageInvoked = new AtomicBoolean(false); + + final Consumer messageConsumer = (messageContext) -> { + if (messageContext.getMessage() == message1) { + handler1Started.countDown(); + try { + handler1CanProceed.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } else { + // If this executes, the closing flag did NOT prevent the second handler. + handler2ProcessMessageInvoked.set(true); + } + }; + + final MessagePump pump = new MessagePump(client, messageConsumer, e -> { + }, 2, false); + + final AtomicReference subscription = new AtomicReference<>(); + subscription.set(pump.begin().subscribe()); + + // Emit message1 explicitly and wait for handler1 to start. + messageSink.tryEmitNext(message1); + assertTrue(handler1Started.await(5, TimeUnit.SECONDS), "Handler1 should have started processing"); + + // Call drainHandlers on a separate thread. This sets closing=true and waits for handler1. + final CountDownLatch drainDone = new CountDownLatch(1); + final AtomicBoolean drainResult = new AtomicBoolean(false); + final Thread drainThread = new Thread(() -> { + drainResult.set(pump.drainHandlers(Duration.ofSeconds(10))); + drainDone.countDown(); + }); + drainThread.start(); + + // Wait deterministically for drainHandlers() to enter its wait loop. The drain thread + // sets closing=true and then blocks on the in-flight handler counter monitor, so once + // it transitions to WAITING/TIMED_WAITING we know the closing flag has been set. Avoids + // the flakiness of a fixed sleep on slow/contended CI. + waitFor( + () -> drainThread.getState() == Thread.State.WAITING + || drainThread.getState() == Thread.State.TIMED_WAITING, + "drainHandlers() to enter waiting state (closing flag set)"); + + // Now emit message2 - it enters the pipeline AFTER closing=true has been set, so its + // onNext path must observe closing=true and skip dispatch. + messageSink.tryEmitNext(message2); + + // Release handler1 so the drain can complete. + handler1CanProceed.countDown(); + + // Wait for drain to finish. drainHandlers returns when activeHandlerCount drops to 0, + // which can only happen after BOTH handler1 (in-flight) and message2's handleMessage + // (closing-flag check increments and decrements the counter even when it skips dispatch) + // have completed. Drain completion is therefore a deterministic signal that message2 has + // been processed - if the closing flag worked, the consumer was never invoked for + // message2; if it did not, handler2ProcessMessageInvoked would be true. + assertTrue(drainDone.await(5, TimeUnit.SECONDS), "Drain should complete after handler1 finishes"); + assertTrue(drainResult.get(), "Drain should return true (all handlers completed)"); + + assertFalse(handler2ProcessMessageInvoked.get(), "Second handler should have been skipped by the closing flag"); + + // Clean up. + subscription.get().dispose(); + } + + /** + * Verifies that when a V1 handler calls {@code close()} re-entrantly with other concurrent + * handlers running, the re-entrant drain waits for those other handlers to complete before + * proceeding to cancel subscriptions and close the underlying client. + *

+ * With {@code maxConcurrentCalls=2}, two handlers run concurrently on separate + * {@code boundedElastic} threads. Handler B calls {@code processorClient.close()} while + * Handler A is still processing. {@code drainV1Handlers()} detects the re-entrant call + * (threshold=1) and waits until only the calling handler remains before allowing + * {@code close()} to proceed with subscription cancellation and client disposal. + *

+ *

+ * Without this fix, the re-entrant drain would return immediately, and {@code close()} + * would cancel subscriptions and call {@code asyncClient.close()} while Handler A + * is mid-settlement, reintroducing the original failure mode from issue #45716. + *

+ */ + @Test + public void v1ReentrantCloseWaitsForOtherConcurrentHandlers() throws InterruptedException { + final ServiceBusReceivedMessage message1 = mock(ServiceBusReceivedMessage.class); + final ServiceBusReceivedMessage message2 = mock(ServiceBusReceivedMessage.class); + + final ServiceBusReceiverClientBuilder receiverBuilder = mock(ServiceBusReceiverClientBuilder.class); + final ServiceBusReceiverAsyncClient asyncClient = mock(ServiceBusReceiverAsyncClient.class); + + when(receiverBuilder.buildAsyncClientForProcessor()).thenReturn(asyncClient); + when(asyncClient.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(asyncClient.getEntityPath()).thenReturn("entityPath"); + when(asyncClient.isConnectionClosed()).thenReturn(false); + when(asyncClient.getInstrumentation()).thenReturn(INSTRUMENTATION); + when(asyncClient.getReceiverOptions()).thenReturn(peekLockOptions()); + // Two messages arrive on separate parallel rails, processed concurrently. + when(asyncClient.receiveMessagesWithContext()).thenReturn(Flux.just(message1, message2) + .map(ServiceBusMessageContext::new) + .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + doNothing().when(asyncClient).close(); + + final CountDownLatch handler1Started = new CountDownLatch(1); + final CountDownLatch handler1CanProceed = new CountDownLatch(1); + final CountDownLatch handler2Started = new CountDownLatch(1); + final AtomicBoolean handler1Completed = new AtomicBoolean(false); + final AtomicReference processorRef = new AtomicReference<>(); + + final Consumer messageConsumer = (messageContext) -> { + if (messageContext.getMessage() == message1) { + handler1Started.countDown(); + try { + handler1CanProceed.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + handler1Completed.set(true); + } else { + handler2Started.countDown(); + // Wait for handler1 to start before calling close(), ensuring both handlers + // are running concurrently when the re-entrant close occurs. + try { + handler1Started.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + // Re-entrant close from within a handler. The drain should wait for handler1 + // (the other concurrent handler) before proceeding to close the client. + processorRef.get().close(); + } + }; + + final ServiceBusProcessorClientOptions options + = new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(2); + final ServiceBusProcessorClient processorClient + = new ServiceBusProcessorClient(receiverBuilder, "entityPath", null, null, messageConsumer, error -> { + }, options); + processorRef.set(processorClient); + + processorClient.start(); + + // Wait for both handlers to start. + assertTrue(handler1Started.await(5, TimeUnit.SECONDS), "Handler1 should have started processing"); + assertTrue(handler2Started.await(5, TimeUnit.SECONDS), "Handler2 should have started processing"); + + // Wait deterministically for handler2's re-entrant close() to take ownership and enter + // drainV1Handlers(). close() sets isRunning=false inside its first synchronized block, + // so once the predicate returns true we know the re-entrant close has progressed past + // ownership acquisition. + waitFor(() -> !processorClient.isRunning(), "handler2's re-entrant close() to have set isRunning=false"); + + // Handler1 is still running, handler2 is blocked in close() waiting for handler1 to finish. + verify(asyncClient, never()).close(); + assertFalse(handler1Completed.get(), "Handler1 should still be in-flight"); + + // Release handler1. + handler1CanProceed.countDown(); + + // Handler2's close() should now complete (handler1 finished, drain threshold reached). + verify(asyncClient, timeout(5000)).close(); + assertTrue(handler1Completed.get(), "Handler1 should have completed before client was closed"); + } + + /** + * Verifies that the V1 closing flag prevents new message handlers from executing user + * callback/settlement after {@code drainV1Handlers()} is triggered. + *

+ * Race condition scenario: with {@code maxConcurrentCalls=1}, a single message is + * in-flight when {@code close()} is called. {@code drainV1Handlers()} sets + * {@code v1Closing = true} and waits for the handler. Meanwhile, the subscriber still + * has an outstanding {@code request(1)}, so a second message can arrive via + * {@code onNext} during the drain-to-cancel window. The closing flag ensures that + * no user callback runs for messages arriving after shutdown begins. + *

+ */ + @Test + public void v1ClosingFlagPreventsNewHandlersAfterDrainStarts() throws InterruptedException { + final ServiceBusReceivedMessage message1 = mock(ServiceBusReceivedMessage.class); + final ServiceBusReceivedMessage message2 = mock(ServiceBusReceivedMessage.class); + + final ServiceBusReceiverClientBuilder receiverBuilder = mock(ServiceBusReceiverClientBuilder.class); + final ServiceBusReceiverAsyncClient asyncClient = mock(ServiceBusReceiverAsyncClient.class); + + when(receiverBuilder.buildAsyncClientForProcessor()).thenReturn(asyncClient); + when(asyncClient.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(asyncClient.getEntityPath()).thenReturn("entityPath"); + when(asyncClient.isConnectionClosed()).thenReturn(false); + when(asyncClient.getInstrumentation()).thenReturn(INSTRUMENTATION); + when(asyncClient.getReceiverOptions()).thenReturn(peekLockOptions()); + // Emit message1 immediately, then message2 after a delay (simulating a message arriving + // during the drain-to-cancel window). + when(asyncClient.receiveMessagesWithContext()).thenReturn(Flux + .concat(Flux.just(message1), Flux.just(message2).delayElements(Duration.ofMillis(300)), + Flux.never()) + .map(ServiceBusMessageContext::new) + .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + doNothing().when(asyncClient).close(); + + final CountDownLatch handler1Started = new CountDownLatch(1); + final CountDownLatch handler1CanProceed = new CountDownLatch(1); + final AtomicBoolean handler2ProcessMessageInvoked = new AtomicBoolean(false); + + final Consumer messageConsumer = (messageContext) -> { + if (messageContext.getMessage() == message1) { + handler1Started.countDown(); + try { + handler1CanProceed.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } else { + // If this executes, the V1 closing flag did NOT prevent the second handler. + handler2ProcessMessageInvoked.set(true); + } + }; + + final ServiceBusProcessorClientOptions options + = new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1); + final ServiceBusProcessorClient processorClient + = new ServiceBusProcessorClient(receiverBuilder, "entityPath", null, null, messageConsumer, error -> { + }, options); + + processorClient.start(); + + // Wait for handler1 to start processing. + assertTrue(handler1Started.await(5, TimeUnit.SECONDS), "Handler1 should have started processing"); + + // Close the processor on a separate thread. This sets v1Closing=true and drains handler1. + final CountDownLatch closeDone = new CountDownLatch(1); + final Thread closeThread = new Thread(() -> { + processorClient.close(); + closeDone.countDown(); + }); + closeThread.start(); + + try { + // Wait deterministically for close() to enter drainV1Handlers and have set + // v1Closing=true (close() sets isRunning=false inside its first synchronized block + // before entering drain, so once the predicate returns true we know close has + // taken ownership). + waitFor(() -> !processorClient.isRunning(), "close() to have set isRunning=false"); + + // Release handler1 so the drain completes. After drain returns, close() proceeds to + // cancel subscriptions. Message2 may arrive in this window via the outstanding request(1). + handler1CanProceed.countDown(); + + // Wait for close to finish. + assertTrue(closeDone.await(5, TimeUnit.SECONDS), "Close should complete"); + } finally { + handler1CanProceed.countDown(); + closeThread.join(5000); + } + + // The second message's handler should NOT have invoked processMessage because v1Closing was true. + assertFalse(handler2ProcessMessageInvoked.get(), + "Second handler should have been skipped by the V1 closing flag"); + } + + /** + * Verifies that calling {@code start()} after {@code close()} resets the {@code v1Closing} + * flag so that the processor can begin a new processing cycle. Without the reset, all + * {@code onNext} calls would short-circuit and no messages would be processed. + */ + @Test + public void v1StartAfterCloseResetsClosingFlag() throws InterruptedException { + final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class); + + final ServiceBusReceiverClientBuilder receiverBuilder = mock(ServiceBusReceiverClientBuilder.class); + final ServiceBusReceiverAsyncClient asyncClient1 = mock(ServiceBusReceiverAsyncClient.class); + final ServiceBusReceiverAsyncClient asyncClient2 = mock(ServiceBusReceiverAsyncClient.class); + + // First call returns asyncClient1 (constructor), second returns asyncClient2 (restart after close). + when(receiverBuilder.buildAsyncClientForProcessor()).thenReturn(asyncClient1, asyncClient2); + + for (ServiceBusReceiverAsyncClient client : new ServiceBusReceiverAsyncClient[] { + asyncClient1, + asyncClient2 }) { + when(client.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(client.getEntityPath()).thenReturn("entityPath"); + when(client.isConnectionClosed()).thenReturn(false); + when(client.getInstrumentation()).thenReturn(INSTRUMENTATION); + when(client.getReceiverOptions()).thenReturn(peekLockOptions()); + doNothing().when(client).close(); + } + + // First cycle: emit nothing (just close immediately). + when(asyncClient1.receiveMessagesWithContext()).thenReturn( + Flux.never().publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + + // Second cycle: emit one message, then never complete. + when(asyncClient2.receiveMessagesWithContext()).thenReturn(Flux.just(message) + .map(ServiceBusMessageContext::new) + .concatWith(Flux.never()) + .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + + final CountDownLatch messageProcessed = new CountDownLatch(1); + + final Consumer messageConsumer = (messageContext) -> { + messageProcessed.countDown(); + }; + + final ServiceBusProcessorClientOptions options + = new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1); + final ServiceBusProcessorClient processorClient + = new ServiceBusProcessorClient(receiverBuilder, "entityPath", null, null, messageConsumer, error -> { + }, options); + + try { + // First cycle: start then close (sets v1Closing=true during drain). + processorClient.start(); + processorClient.close(); + + // Second cycle: start again. If v1Closing is not reset, onNext will skip the handler. + processorClient.start(); + + // Verify the handler runs, proving v1Closing was reset. + assertTrue(messageProcessed.await(5, TimeUnit.SECONDS), + "Handler should run after restart, proving v1Closing was reset"); + } finally { + processorClient.close(); + } + } + + /** + * Verifies that {@code close()} releases the {@code ServiceBusProcessorClient} instance monitor + * while waiting for in-flight handlers to drain. If the monitor were held throughout the drain, + * any handler that called a {@code synchronized} accessor on the same client (e.g. + * {@link ServiceBusProcessorClient#isRunning()}, {@link ServiceBusProcessorClient#getIdentifier()}) + * would block on the monitor that {@code close()} holds while {@code close()} waits for that + * handler's count to reach zero - a stalemate that resolves only when the drain timeout elapses. + * Releasing the monitor across the drain wait lets the handler call those accessors freely, + * complete, and decrement the in-flight counter so {@code close()} returns promptly. + *

+ * Regression test for the deadlock concern raised on + * PR #48192. + *

+ */ + @Test + public void v1CloseShouldNotHoldClientMonitorDuringDrain() throws InterruptedException { + final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class); + final Flux messageFlux = Flux.concat(Flux.just(message), Flux.never()); + + final ServiceBusReceiverClientBuilder receiverBuilder = mock(ServiceBusReceiverClientBuilder.class); + final ServiceBusReceiverAsyncClient asyncClient = mock(ServiceBusReceiverAsyncClient.class); + + when(receiverBuilder.buildAsyncClientForProcessor()).thenReturn(asyncClient); + when(asyncClient.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(asyncClient.getEntityPath()).thenReturn("entityPath"); + when(asyncClient.isConnectionClosed()).thenReturn(false); + when(asyncClient.getInstrumentation()).thenReturn(INSTRUMENTATION); + when(asyncClient.getReceiverOptions()).thenReturn(peekLockOptions()); + when(asyncClient.receiveMessagesWithContext()).thenReturn(messageFlux.map(ServiceBusMessageContext::new) + .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + doNothing().when(asyncClient).close(); + + // The handler will (1) signal that it has started, (2) wait until close() has been invoked + // on a different thread, then (3) call isRunning() - which is synchronized on the same + // monitor that close() acquires. Without the fix, that call blocks until the drain timeout + // expires; with the fix, it returns immediately because close() released the monitor before + // waiting for the drain. + final CountDownLatch handlerStarted = new CountDownLatch(1); + final CountDownLatch closeStarted = new CountDownLatch(1); + final AtomicReference handlerSawIsRunning = new AtomicReference<>(); + final AtomicReference clientRef = new AtomicReference<>(); + + final Consumer messageConsumer = (messageContext) -> { + handlerStarted.countDown(); + try { + assertTrue(closeStarted.await(5, TimeUnit.SECONDS), "close() should have started"); + handlerSawIsRunning.set(clientRef.get().isRunning()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }; + + // Use the default 30-second drain timeout: a regression that re-introduces the monitor hold + // would force close() to wait the full 30s, which is well beyond the 5s assertion below. + final ServiceBusProcessorClientOptions options + = new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1); + final ServiceBusProcessorClient processorClient + = new ServiceBusProcessorClient(receiverBuilder, "entityPath", null, null, messageConsumer, error -> { + }, options); + clientRef.set(processorClient); + + processorClient.start(); + + assertTrue(handlerStarted.await(5, TimeUnit.SECONDS), "Handler should have started"); + + final long startNanos = System.nanoTime(); + final CountDownLatch closeDone = new CountDownLatch(1); + final Thread closeThread = new Thread(() -> { + processorClient.close(); + closeDone.countDown(); + }); + closeThread.start(); + + try { + // Wait deterministically for close() to take ownership before signalling the handler + // to call isRunning(). Polling avoids the flakiness of a fixed Thread.sleep on + // slow/contended CI. + waitFor(() -> !processorClient.isRunning(), "close() to have set isRunning=false"); + closeStarted.countDown(); + + assertTrue(closeDone.await(5, TimeUnit.SECONDS), + "close() should complete promptly after the handler returns. If it stalled until " + + "the 30s drain timeout, the instance monitor was held across the drain wait."); + final Duration closeDuration = Duration.ofNanos(System.nanoTime() - startNanos); + assertTrue(closeDuration.getSeconds() < 5, + "close() took " + closeDuration + ", expected < 5s (drain timeout is 30s)."); + + assertTrue(handlerSawIsRunning.get() != null, "Handler should have observed an isRunning() result"); + assertFalse(handlerSawIsRunning.get(), + "isRunning() should return false because close() set isRunning=false before draining"); + } finally { + closeStarted.countDown(); + closeThread.join(5000); + } + } + + /** + * Verifies that a concurrent {@code start()} call during {@code close()}'s drain window is + * ignored, so it does not create new resources that the in-progress {@code close()} would + * immediately tear down. + *

+ * Background: {@code close()} releases the client's instance monitor across the drain wait + * (see {@link #v1CloseShouldNotHoldClientMonitorDuringDrain()}). Without an explicit + * "close in progress" guard, a concurrent {@code start()} could acquire the monitor during + * that window, reset {@code v1Closing}, create a fresh async client, and start the connection + * monitor - only for {@code close()} to proceed into its cleanup phase and dispose those + * brand-new resources, leaving the user with a processor that appears started but has no + * working subscription. + *

+ *

+ * Regression test for the lifecycle race raised on + * PR #48192. + *

+ */ + @Test + public void v1ConcurrentStartDuringCloseDrainIsIgnored() throws InterruptedException { + final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class); + final Flux messageFlux = Flux.concat(Flux.just(message), Flux.never()); + + final ServiceBusReceiverClientBuilder receiverBuilder = mock(ServiceBusReceiverClientBuilder.class); + final ServiceBusReceiverAsyncClient asyncClient = mock(ServiceBusReceiverAsyncClient.class); + + // The builder should be invoked exactly once by the original start(); a concurrent start() + // during close drain must NOT create a second receiver. + when(receiverBuilder.buildAsyncClientForProcessor()).thenReturn(asyncClient); + when(asyncClient.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(asyncClient.getEntityPath()).thenReturn("entityPath"); + when(asyncClient.isConnectionClosed()).thenReturn(false); + when(asyncClient.getInstrumentation()).thenReturn(INSTRUMENTATION); + when(asyncClient.getReceiverOptions()).thenReturn(peekLockOptions()); + when(asyncClient.receiveMessagesWithContext()).thenReturn(messageFlux.map(ServiceBusMessageContext::new) + .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + doNothing().when(asyncClient).close(); + + final CountDownLatch handlerStarted = new CountDownLatch(1); + final CountDownLatch handlerCanProceed = new CountDownLatch(1); + + final Consumer messageConsumer = (messageContext) -> { + handlerStarted.countDown(); + try { + handlerCanProceed.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }; + + final ServiceBusProcessorClientOptions options + = new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1); + final ServiceBusProcessorClient processorClient + = new ServiceBusProcessorClient(receiverBuilder, "entityPath", null, null, messageConsumer, error -> { + }, options); + + processorClient.start(); + assertTrue(handlerStarted.await(5, TimeUnit.SECONDS), "Handler should have started"); + + // Run close() on a separate thread - it will block in the drain wait until the handler + // returns (handlerCanProceed has not yet been counted down). + final CountDownLatch closeDone = new CountDownLatch(1); + final Thread closeThread = new Thread(() -> { + processorClient.close(); + closeDone.countDown(); + }); + closeThread.start(); + + try { + // Wait deterministically for close() to take ownership (sets isRunning=false then + // v1CloseInProgress=true inside its first synchronized block). Polling isRunning() + // avoids the flakiness of a fixed Thread.sleep on slow/contended CI. + waitFor(() -> !processorClient.isRunning(), "close() to have set isRunning=false"); + + // Concurrent start() during the drain window. Without the v1CloseInProgress guard, + // this would create a new receiver and mark the processor running again, only for + // close() to subsequently dispose it. With the guard, start() returns without taking + // any action. + processorClient.start(); + + // The receiver builder should still have been invoked exactly once - the concurrent + // start() must not have created a second client. + verify(receiverBuilder, Mockito.times(1)).buildAsyncClientForProcessor(); + // The processor must not be reported as running while close is mid-shutdown. + assertFalse(processorClient.isRunning(), "Processor should not be running during close()'s drain window"); + + // Allow the original handler to complete so close() can finish. + handlerCanProceed.countDown(); + assertTrue(closeDone.await(5, TimeUnit.SECONDS), "close() should complete"); + + // After close() returns, isRunning should still be false and no extra receiver was created. + assertFalse(processorClient.isRunning(), "Processor should be stopped after close()"); + verify(receiverBuilder, Mockito.times(1)).buildAsyncClientForProcessor(); + // Original asyncClient was closed exactly once by close(). + verify(asyncClient, timeout(2000)).close(); + } finally { + handlerCanProceed.countDown(); + closeThread.join(5000); + } + } + + /** + * Verifies that {@code getIdentifier()} does not lazy-create a fresh receiver during or after + * {@code close()}, so it cannot leak a receiver that the shutdown path is no longer responsible + * for disposing. + *

+ * Background: {@code close()} releases the instance monitor across the drain wait. The original + * {@code getIdentifier()} would lazy-create a new receiver whenever {@code asyncClient} was + * {@code null}, so a concurrent {@code getIdentifier()} call after {@code close()} nulled the + * client (or after {@code close()} returned) would invoke the receiver builder again and leave + * a fresh, unmanaged client behind. The fix caches the identifier whenever it is observed and + * once more in {@code close()} before nulling the client, so {@code getIdentifier()} can return + * a stable value without creating a receiver during/after shutdown. + *

+ *

+ * Regression test for the lazy-receiver leak raised on + * PR #48192. + *

+ */ + @Test + public void v1GetIdentifierDuringAndAfterCloseDoesNotCreateNewReceiver() throws InterruptedException { + final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class); + final Flux messageFlux = Flux.concat(Flux.just(message), Flux.never()); + + final ServiceBusReceiverClientBuilder receiverBuilder = mock(ServiceBusReceiverClientBuilder.class); + final ServiceBusReceiverAsyncClient asyncClient = mock(ServiceBusReceiverAsyncClient.class); + + // Builder must be invoked exactly once - by the original start(). Neither getIdentifier() + // during the drain window nor after close() returned may trigger a second invocation. + when(receiverBuilder.buildAsyncClientForProcessor()).thenReturn(asyncClient); + when(asyncClient.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(asyncClient.getEntityPath()).thenReturn("entityPath"); + when(asyncClient.isConnectionClosed()).thenReturn(false); + when(asyncClient.getInstrumentation()).thenReturn(INSTRUMENTATION); + when(asyncClient.getReceiverOptions()).thenReturn(peekLockOptions()); + when(asyncClient.getIdentifier()).thenReturn("processor-id-1"); + when(asyncClient.receiveMessagesWithContext()).thenReturn(messageFlux.map(ServiceBusMessageContext::new) + .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + doNothing().when(asyncClient).close(); + + final CountDownLatch handlerStarted = new CountDownLatch(1); + final CountDownLatch handlerCanProceed = new CountDownLatch(1); + + final Consumer messageConsumer = (messageContext) -> { + handlerStarted.countDown(); + try { + handlerCanProceed.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }; + + final ServiceBusProcessorClientOptions options + = new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1); + final ServiceBusProcessorClient processorClient + = new ServiceBusProcessorClient(receiverBuilder, "entityPath", null, null, messageConsumer, error -> { + }, options); + + processorClient.start(); + assertTrue(handlerStarted.await(5, TimeUnit.SECONDS), "Handler should have started"); + + // Confirm getIdentifier() works while running (also seeds the cache). + assertTrue("processor-id-1".equals(processorClient.getIdentifier()), + "getIdentifier() should return the live client's identifier while running"); + + // Begin close on a separate thread; it will block in drainV1Handlers(). + final CountDownLatch closeDone = new CountDownLatch(1); + final Thread closeThread = new Thread(() -> { + processorClient.close(); + closeDone.countDown(); + }); + closeThread.start(); + + try { + // Wait deterministically for close() to enter the drain window. Polling avoids the + // flakiness of a fixed Thread.sleep on slow/contended CI. + waitFor(() -> !processorClient.isRunning(), "close() to have set isRunning=false"); + + // Concurrent getIdentifier() during the drain window. Must NOT create a new receiver + // (cachedV1Identifier was seeded by the call above; before fix, it would have hit the + // synchronized monitor and stalled; with monitor released and no cache, would have + // lazily created a receiver). + assertTrue("processor-id-1".equals(processorClient.getIdentifier()), + "getIdentifier() during close drain should return the cached identifier"); + + // Allow the handler to complete so close() can finish. + handlerCanProceed.countDown(); + assertTrue(closeDone.await(5, TimeUnit.SECONDS), "close() should complete"); + + // After close(), getIdentifier() must still return the cached identifier and must not + // create a new receiver via the builder. + assertTrue("processor-id-1".equals(processorClient.getIdentifier()), + "getIdentifier() after close should return the cached identifier"); + + // Builder was invoked exactly once - no leaked receiver from any getIdentifier() call. + verify(receiverBuilder, Mockito.times(1)).buildAsyncClientForProcessor(); + } finally { + handlerCanProceed.countDown(); + closeThread.join(5000); + } + } + + /** + * Verifies that concurrent {@code close()} calls do not race: only the first call performs + * the V1 cleanup; the others return early so they cannot dispose state created after the + * first call cleared {@code v1CloseInProgress}. + *

+ * Background: Without ownership, two concurrent {@code close()} calls would both proceed + * through drain + cleanup. The first to finish would clear the flag and let a concurrent + * {@code start()} create new resources, which the still-running second {@code close()} would + * then dispose. The fix uses {@code v1CloseInProgress.compareAndSet(false, true)} so only one + * close call wins ownership; the others return immediately. + *

+ *

+ * Regression test for the concurrent-close race raised on + * PR #48192. + *

+ */ + @Test + public void v1ConcurrentCloseCallsDoNotRace() throws InterruptedException { + final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class); + final Flux messageFlux = Flux.concat(Flux.just(message), Flux.never()); + + final ServiceBusReceiverClientBuilder receiverBuilder = mock(ServiceBusReceiverClientBuilder.class); + final ServiceBusReceiverAsyncClient asyncClient = mock(ServiceBusReceiverAsyncClient.class); + + when(receiverBuilder.buildAsyncClientForProcessor()).thenReturn(asyncClient); + when(asyncClient.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(asyncClient.getEntityPath()).thenReturn("entityPath"); + when(asyncClient.isConnectionClosed()).thenReturn(false); + when(asyncClient.getInstrumentation()).thenReturn(INSTRUMENTATION); + when(asyncClient.getReceiverOptions()).thenReturn(peekLockOptions()); + when(asyncClient.getIdentifier()).thenReturn("processor-id"); + when(asyncClient.receiveMessagesWithContext()).thenReturn(messageFlux.map(ServiceBusMessageContext::new) + .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + doNothing().when(asyncClient).close(); + + final CountDownLatch handlerStarted = new CountDownLatch(1); + final CountDownLatch handlerCanProceed = new CountDownLatch(1); + + final Consumer messageConsumer = (messageContext) -> { + handlerStarted.countDown(); + try { + handlerCanProceed.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }; + + final ServiceBusProcessorClientOptions options + = new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1); + final ServiceBusProcessorClient processorClient + = new ServiceBusProcessorClient(receiverBuilder, "entityPath", null, null, messageConsumer, error -> { + }, options); + + processorClient.start(); + assertTrue(handlerStarted.await(5, TimeUnit.SECONDS), "Handler should have started"); + + // First close() owns the V1 shutdown - it will block until the handler returns. + final CountDownLatch firstCloseDone = new CountDownLatch(1); + final Thread firstCloseThread = new Thread(() -> { + processorClient.close(); + firstCloseDone.countDown(); + }); + firstCloseThread.start(); + + // Wait deterministically for the first close() to take ownership (sets isRunning=false + // then v1CloseInProgress=true). Polling isRunning() avoids the flakiness of a fixed + // Thread.sleep on slow/contended CI - if the first close thread were delayed, the second + // close could win ownership and block on the drain, failing the immediate-return + // assertion below. + waitFor(() -> !processorClient.isRunning(), "first close() to have taken ownership"); + + // Second close() while the first is still draining - must return immediately because the + // first close owns the shutdown. We bound it at 1 second to catch a regression where the + // second close also waits on the drain (would take the full 30s drain timeout if it waited + // for the handler that the test deliberately keeps running). + final long secondCloseStart = System.nanoTime(); + final CountDownLatch secondCloseDone = new CountDownLatch(1); + final Thread secondCloseThread = new Thread(() -> { + processorClient.close(); + secondCloseDone.countDown(); + }); + secondCloseThread.start(); + + try { + assertTrue(secondCloseDone.await(1, TimeUnit.SECONDS), + "Second close() should return immediately when another close() owns the shutdown."); + final Duration secondDuration = Duration.ofNanos(System.nanoTime() - secondCloseStart); + assertTrue(secondDuration.toMillis() < 500, + "Second close() took " + secondDuration + ", expected immediate return."); + + // The first close() is still blocked on the handler. Verify the asyncClient has NOT + // been closed yet - the second close() must not have torn things down. + verify(asyncClient, never()).close(); + + // Allow the handler to complete so the first close() can finish. + handlerCanProceed.countDown(); + assertTrue(firstCloseDone.await(5, TimeUnit.SECONDS), "First close() should complete"); + + // The asyncClient is closed exactly once - by the first (owning) close() during cleanup. + verify(asyncClient, timeout(2000).times(1)).close(); + } finally { + handlerCanProceed.countDown(); + firstCloseThread.join(5000); + secondCloseThread.join(5000); + } + } + + /** + * Verifies that a concurrent {@code start()} call during a V2 {@code close()}'s drain window + * is ignored, mirroring the V1 guarantee in {@link #v1ConcurrentStartDuringCloseDrainIsIgnored()}. + *

+ * Background: {@code close()} releases the outer {@code ServiceBusProcessorClient} monitor + * before delegating to {@code processorV2.close()} (whose internal drain blocks for in-flight + * handler settlement). Without an explicit {@code v2CloseInProgress} guard, a concurrent + * {@code start()} could acquire the outer monitor during the drain window and call + * {@code processorV2.start()}, leaving the inner processor running after the outer + * {@code close()} returns - even though the caller observed {@code close()} returning + * successfully. + *

+ *

+ * Regression test for the V2 lifecycle race raised on + * PR #48192. + *

+ */ + @Test + public void v2ConcurrentStartDuringCloseDrainIsIgnored() throws InterruptedException { + final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class); + + final ServiceBusReceiverClientBuilder receiverBuilder = mock(ServiceBusReceiverClientBuilder.class); + final ServiceBusReceiverAsyncClient asyncClient = mock(ServiceBusReceiverAsyncClient.class); + + // Builder must be invoked exactly once - by the original start(). A concurrent start() + // during V2 close drain must NOT invoke the builder a second time. + when(receiverBuilder.buildAsyncClientForProcessor()).thenReturn(asyncClient); + when(asyncClient.getInstrumentation()).thenReturn(INSTRUMENTATION); + when(asyncClient.getReceiverOptions()).thenReturn(peekLockOptions()); + when(asyncClient.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(asyncClient.getEntityPath()).thenReturn("entityPath"); + when(asyncClient.isConnectionClosed()).thenReturn(false); + when(asyncClient.isAutoLockRenewRequested()).thenReturn(false); + when(asyncClient.complete(any())).thenReturn(Mono.empty()); + // Emit one message then hang so the V2 drain has something to wait for. + when(asyncClient.nonSessionProcessorReceiveV2()) + .thenReturn(Flux.concat(Flux.just(message), Flux.never()) + .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + doNothing().when(asyncClient).close(); + + final CountDownLatch handlerStarted = new CountDownLatch(1); + final CountDownLatch handlerCanProceed = new CountDownLatch(1); + + final Consumer messageConsumer = (messageContext) -> { + handlerStarted.countDown(); + try { + handlerCanProceed.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }; + + // V2 path: setV2(true) so the outer ServiceBusProcessorClient delegates to a real + // ServiceBusProcessor (RollingMessagePump under the hood). + final ServiceBusProcessorClientOptions options + = new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1).setV2(true); + final ServiceBusProcessorClient processorClient + = new ServiceBusProcessorClient(receiverBuilder, "entityPath", null, null, messageConsumer, error -> { + }, options); + + processorClient.start(); + assertTrue(handlerStarted.await(5, TimeUnit.SECONDS), "Handler should have started"); + + // Begin V2 close on a separate thread; it will block in processorV2.close()'s drain + // until the handler returns. + final CountDownLatch closeDone = new CountDownLatch(1); + final Thread closeThread = new Thread(() -> { + processorClient.close(); + closeDone.countDown(); + }); + closeThread.start(); + + try { + // Wait deterministically for V2 close to be in-progress. Polling isRunning() is the + // closest observable signal: processorV2's close() sets its internal isRunning=false + // before draining, and our outer v2CloseInProgress flag is set in the same critical + // section that captures processorV2 for the close call. + waitFor(() -> !processorClient.isRunning(), "V2 close() to be in progress"); + + // Concurrent start() during the V2 drain window. Without the v2CloseInProgress guard, + // this would invoke processorV2.start() and create a new RollingMessagePump that the + // outer close() can't track; with the guard, start() returns without taking action. + processorClient.start(); + + // The receiver builder should still have been invoked exactly once - the concurrent + // start() must not have created a second client behind the in-flight close(). + verify(receiverBuilder, Mockito.times(1)).buildAsyncClientForProcessor(); + assertFalse(processorClient.isRunning(), + "Processor should not report running while V2 close()'s drain is in progress"); + + // Allow the handler to complete so V2 close() can finish. + handlerCanProceed.countDown(); + assertTrue(closeDone.await(5, TimeUnit.SECONDS), "V2 close() should complete"); + + // After close() returns, no extra receivers were created and the processor remains stopped. + assertFalse(processorClient.isRunning(), "Processor should be stopped after V2 close()"); + verify(receiverBuilder, Mockito.times(1)).buildAsyncClientForProcessor(); + } finally { + handlerCanProceed.countDown(); + closeThread.join(5000); + } + } + + /** + * Verifies that in {@code RECEIVE_AND_DELETE} mode the V2 pump's drain skip-path does NOT + * drop messages: every message that arrives during the drain window must still reach + * {@code processMessage}. + *

+ * Background: the broker settles RECEIVE_AND_DELETE messages on delivery, so any message + * already in the pipeline when {@code drainHandlers()} sets {@code closing=true} has been + * removed from the entity. Skipping the user callback for those messages would lose them + * permanently. The fix gates the skip on PEEK_LOCK only. + *

+ *

+ * Regression test for the data-loss concern raised on + * PR #48192. + *

+ */ + @Test + public void v2ReceiveAndDeleteModeDoesNotSkipDuringDrain() throws InterruptedException { + final ServiceBusReceivedMessage message1 = mock(ServiceBusReceivedMessage.class); + final ServiceBusReceivedMessage message2 = mock(ServiceBusReceivedMessage.class); + final ServiceBusReceiverAsyncClient client = mock(ServiceBusReceiverAsyncClient.class); + + when(client.getInstrumentation()).thenReturn(INSTRUMENTATION); + // RECEIVE_AND_DELETE - production code must NOT take the drain skip-path on this client. + when(client.getReceiverOptions()).thenReturn( + ReceiverOptions.createNonSessionOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, 1, null, false)); + when(client.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(client.getEntityPath()).thenReturn("entityPath"); + when(client.isConnectionClosed()).thenReturn(false); + when(client.isAutoLockRenewRequested()).thenReturn(false); + + final reactor.core.publisher.Sinks.Many messageSink + = reactor.core.publisher.Sinks.many().unicast().onBackpressureBuffer(); + when(client.nonSessionProcessorReceiveV2()) + .thenReturn(messageSink.asFlux().publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + doNothing().when(client).close(); + + final CountDownLatch handler1Started = new CountDownLatch(1); + final CountDownLatch handler1CanProceed = new CountDownLatch(1); + final CountDownLatch handler2Invoked = new CountDownLatch(1); + + final Consumer messageConsumer = (messageContext) -> { + if (messageContext.getMessage() == message1) { + handler1Started.countDown(); + try { + handler1CanProceed.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } else { + // RECEIVE_AND_DELETE: this MUST be invoked even while drain is in progress, + // otherwise message2 is permanently lost. + handler2Invoked.countDown(); + } + }; + + // enableAutoDisposition=false: matches RECEIVE_AND_DELETE semantics (no settlement). + // concurrency=2 lets flatMap dispatch both handlers in parallel. + final MessagePump pump = new MessagePump(client, messageConsumer, e -> { + }, 2, false); + final AtomicReference subscription = new AtomicReference<>(); + subscription.set(pump.begin().subscribe()); + + messageSink.tryEmitNext(message1); + assertTrue(handler1Started.await(5, TimeUnit.SECONDS), "Handler1 should have started"); + + // Start drain on a separate thread - it sets closing=true and waits for handler1. + final CountDownLatch drainDone = new CountDownLatch(1); + final Thread drainThread = new Thread(() -> { + pump.drainHandlers(Duration.ofSeconds(10)); + drainDone.countDown(); + }); + drainThread.start(); + + // Wait deterministically for drainHandlers() to enter its wait loop (closing=true is set + // before the wait begins, so once the thread is parked we know the flag is observable). + waitFor( + () -> drainThread.getState() == Thread.State.WAITING + || drainThread.getState() == Thread.State.TIMED_WAITING, + "drainHandlers() to enter waiting state (closing flag set)"); + + // Emit message2 AFTER closing=true. In PEEK_LOCK this would be skipped; in + // RECEIVE_AND_DELETE it MUST be delivered to handler2 because the broker has already + // settled it. + messageSink.tryEmitNext(message2); + + assertTrue(handler2Invoked.await(5, TimeUnit.SECONDS), + "Handler2 must run during RECEIVE_AND_DELETE drain - skipping it would lose the message permanently"); + + // Release handler1 so drain can complete. + handler1CanProceed.countDown(); + assertTrue(drainDone.await(5, TimeUnit.SECONDS), "Drain should complete"); + subscription.get().dispose(); + } + + /** + * Verifies that in {@code RECEIVE_AND_DELETE} mode the V1 processor's drain skip-path does + * NOT drop messages: every message that arrives during the drain window must still reach + * {@code processMessage}. Mirrors the V2 guarantee in + * {@link #v2ReceiveAndDeleteModeDoesNotSkipDuringDrain()}. + *

+ * Regression test for the data-loss concern raised on + * PR #48192. + *

+ */ + @Test + public void v1ReceiveAndDeleteModeDoesNotSkipDuringDrain() throws InterruptedException { + final ServiceBusReceivedMessage message1 = mock(ServiceBusReceivedMessage.class); + final ServiceBusReceivedMessage message2 = mock(ServiceBusReceivedMessage.class); + + final ServiceBusReceiverClientBuilder receiverBuilder = mock(ServiceBusReceiverClientBuilder.class); + final ServiceBusReceiverAsyncClient asyncClient = mock(ServiceBusReceiverAsyncClient.class); + when(receiverBuilder.buildAsyncClientForProcessor()).thenReturn(asyncClient); + when(asyncClient.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(asyncClient.getEntityPath()).thenReturn("entityPath"); + when(asyncClient.isConnectionClosed()).thenReturn(false); + when(asyncClient.getInstrumentation()).thenReturn(INSTRUMENTATION); + // RECEIVE_AND_DELETE - V1 onNext must NOT take the v1Closing skip-path on this client. + when(asyncClient.getReceiverOptions()).thenReturn( + ReceiverOptions.createNonSessionOptions(ServiceBusReceiveMode.RECEIVE_AND_DELETE, 1, null, false)); + + final reactor.core.publisher.Sinks.Many messageSink + = reactor.core.publisher.Sinks.many().unicast().onBackpressureBuffer(); + when(asyncClient.receiveMessagesWithContext()).thenReturn(messageSink.asFlux() + .map(ServiceBusMessageContext::new) + .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + doNothing().when(asyncClient).close(); + + final CountDownLatch handler1Started = new CountDownLatch(1); + final CountDownLatch handler1CanProceed = new CountDownLatch(1); + final CountDownLatch handler2Invoked = new CountDownLatch(1); + + final Consumer messageConsumer = (messageContext) -> { + if (messageContext.getMessage() == message1) { + handler1Started.countDown(); + try { + handler1CanProceed.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } else { + handler2Invoked.countDown(); + } + }; + + // maxConcurrentCalls=2 so V1 uses the parallel/runOn path (concurrency=1 takes the + // single-subscriber path that only request(1)s after each handler completes, which would + // not deliver message2 because isRunning becomes false during close). + final ServiceBusProcessorClientOptions options + = new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(2); + final ServiceBusProcessorClient processorClient + = new ServiceBusProcessorClient(receiverBuilder, "entityPath", null, null, messageConsumer, error -> { + }, options); + + processorClient.start(); + messageSink.tryEmitNext(message1); + assertTrue(handler1Started.await(5, TimeUnit.SECONDS), "Handler1 should have started"); + + // Begin close on a separate thread. close() will set v1Closing=true and block on the drain. + final CountDownLatch closeDone = new CountDownLatch(1); + final Thread closeThread = new Thread(() -> { + processorClient.close(); + closeDone.countDown(); + }); + closeThread.start(); + + try { + // Wait deterministically for close() to take ownership and set v1Closing=true. + waitFor(() -> !processorClient.isRunning(), "close() to have set isRunning=false"); + + // Emit message2 during the drain window. In PEEK_LOCK this would be skipped; in + // RECEIVE_AND_DELETE the broker has already removed the message, so V1 onNext MUST + // dispatch it to handler2. + messageSink.tryEmitNext(message2); + + assertTrue(handler2Invoked.await(5, TimeUnit.SECONDS), + "Handler2 must run during RECEIVE_AND_DELETE drain - skipping it would lose the message permanently"); + + // Release handler1 so close() can finish. + handler1CanProceed.countDown(); + assertTrue(closeDone.await(5, TimeUnit.SECONDS), "close() should complete"); + } finally { + handler1CanProceed.countDown(); + closeThread.join(5000); + } + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorRollingMessagePumpIsolatedTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorRollingMessagePumpIsolatedTest.java index 703e80d787b7..d453d8ca7c41 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorRollingMessagePumpIsolatedTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorRollingMessagePumpIsolatedTest.java @@ -88,7 +88,7 @@ public ServiceBusReceiverAsyncClient answer(InvocationOnMock invocation) { final RollingMessagePump pump = new RollingMessagePump(builder, m -> { }, e -> { - }, 1, false); + }, 1, false, Duration.ofSeconds(30)); try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) { verifier.create(() -> pump.beginIntern()) @@ -141,7 +141,7 @@ public ServiceBusReceiverAsyncClient answer(InvocationOnMock invocation) { consumedMessages.add(messageContext.getMessage()); }; final RollingMessagePump pump = new RollingMessagePump(builder, messageConsumer, e -> { - }, 1, false); + }, 1, false, Duration.ofSeconds(30)); try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) { verifier.create(() -> pump.beginIntern()).thenAwait(Duration.ofSeconds(30)).verifyErrorSatisfies(e -> { @@ -191,7 +191,7 @@ public ServiceBusReceiverAsyncClient answer(InvocationOnMock invocation) { consumedMessages.add(messageContext.getMessage()); }; final RollingMessagePump pump = new RollingMessagePump(builder, messageConsumer, e -> { - }, 1, false); + }, 1, false, Duration.ofSeconds(30)); try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) { verifier.create(() -> pump.beginIntern()).thenAwait(Duration.ofSeconds(30)).verifyErrorSatisfies(e -> { @@ -263,7 +263,8 @@ public ServiceBusReceiverAsyncClient answer(InvocationOnMock invocation) { consumedErrors.add(errorContext.getException()); }; - final RollingMessagePump pump = new RollingMessagePump(builder, messageConsumer, errorConsumer, 1, false); + final RollingMessagePump pump + = new RollingMessagePump(builder, messageConsumer, errorConsumer, 1, false, Duration.ofSeconds(30)); try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) { verifier.create(() -> pump.beginIntern()).thenAwait(Duration.ofSeconds(30)).verifyErrorSatisfies(e -> { @@ -316,7 +317,7 @@ public void shouldCompleteMessageOnSuccessfulProcessing() { consumedMessages.add(messageContext.getMessage()); }; final RollingMessagePump pump = new RollingMessagePump(builder, messageConsumer, e -> { - }, 1, true); + }, 1, true, Duration.ofSeconds(30)); try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) { verifier.create(() -> pump.beginIntern()).thenAwait().thenCancel().verify(); @@ -355,7 +356,8 @@ public void shouldAbandonMessageOnErroredProcessing() { consumedErrors.add(errorContext.getException()); }; - final RollingMessagePump pump = new RollingMessagePump(builder, messageConsumer, errorConsumer, 1, true); + final RollingMessagePump pump + = new RollingMessagePump(builder, messageConsumer, errorConsumer, 1, true, Duration.ofSeconds(30)); try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) { verifier.create(() -> pump.beginIntern()).thenAwait().thenCancel().verify(); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorRollingMessagePumpTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorRollingMessagePumpTest.java index 034fc1821286..59d82ed7f956 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorRollingMessagePumpTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorRollingMessagePumpTest.java @@ -14,6 +14,7 @@ import reactor.test.StepVerifier; import java.io.IOException; +import java.time.Duration; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.doNothing; @@ -48,7 +49,7 @@ public void shouldThrowIfBeginAfterDisposal() { final RollingMessagePump pump = new RollingMessagePump(builder, m -> { }, e -> { - }, 1, false); + }, 1, false, Duration.ofSeconds(30)); pump.dispose(); assertThrows(IllegalStateException.class, () -> pump.begin()); @@ -64,7 +65,7 @@ public void shouldThrowIfBeginMoreThanOnce() { final RollingMessagePump pump = new RollingMessagePump(builder, m -> { }, e -> { - }, 1, false); + }, 1, false, Duration.ofSeconds(30)); pump.begin(); try { @@ -87,7 +88,7 @@ public void shouldCloseClientOnCancel() { final RollingMessagePump pump = new RollingMessagePump(builder, m -> { }, e -> { - }, 1, false); + }, 1, false, Duration.ofSeconds(30)); StepVerifier.create(pump.beginIntern()).thenAwait().thenCancel().verify(); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SessionsMessagePumpIsolatedTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SessionsMessagePumpIsolatedTest.java index 1f1cf0927508..5d5003efe4a6 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SessionsMessagePumpIsolatedTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SessionsMessagePumpIsolatedTest.java @@ -648,7 +648,7 @@ private SessionsMessagePump createSessionsMessagePump(ServiceBusSessionAcquirer return new SessionsMessagePump("identifier-1", "FQDN", "Orders", ServiceBusReceiveMode.PEEK_LOCK, instrumentation, sessionAcquirer, Duration.ZERO, sessionIdleTimeout, maxConcurrentSessions, concurrencyPerSession, 0, enableAutoDisposition, serializer, retryPolicy, processMessage, processError, - onTerminate); + onTerminate, Duration.ofSeconds(30)); } private static final class VirtualTimeStepVerifier implements AutoCloseable { diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/servicebus/properties/AzureServiceBusProperties.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/servicebus/properties/AzureServiceBusProperties.java index 314f938cef45..a4f287cd8366 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/servicebus/properties/AzureServiceBusProperties.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/servicebus/properties/AzureServiceBusProperties.java @@ -273,6 +273,11 @@ public static class Processor extends Consumer implements ServiceBusProcessorCli */ private Duration sessionIdleTimeout; + /** + * Maximum time to wait for in-flight message handlers to complete during processor shutdown. + */ + private Duration drainTimeout; + public Integer getMaxConcurrentCalls() { return maxConcurrentCalls; } @@ -304,6 +309,14 @@ public Duration getSessionIdleTimeout() { public void setSessionIdleTimeout(Duration sessionIdleTimeout) { this.sessionIdleTimeout = sessionIdleTimeout; } + + public Duration getDrainTimeout() { + return drainTimeout; + } + + public void setDrainTimeout(Duration drainTimeout) { + this.drainTimeout = drainTimeout; + } } diff --git a/sdk/spring/spring-cloud-azure-service/src/main/java/com/azure/spring/cloud/service/implementation/servicebus/properties/ServiceBusProcessorClientProperties.java b/sdk/spring/spring-cloud-azure-service/src/main/java/com/azure/spring/cloud/service/implementation/servicebus/properties/ServiceBusProcessorClientProperties.java index 9e5161f5c0f9..be5f3749f55a 100644 --- a/sdk/spring/spring-cloud-azure-service/src/main/java/com/azure/spring/cloud/service/implementation/servicebus/properties/ServiceBusProcessorClientProperties.java +++ b/sdk/spring/spring-cloud-azure-service/src/main/java/com/azure/spring/cloud/service/implementation/servicebus/properties/ServiceBusProcessorClientProperties.java @@ -16,4 +16,14 @@ public interface ServiceBusProcessorClientProperties extends ServiceBusReceiverC Duration getSessionIdleTimeout(); + /** + * Returns the maximum time the processor will wait for in-flight message handlers to + * complete before disposing the underlying receiver during shutdown. Mirrors the + * {@code drainTimeout(Duration)} setter on the underlying Service Bus processor builder. + * + * @return the configured drain timeout, or {@code null} if not set (the underlying SDK + * applies its own default in that case). + */ + Duration getDrainTimeout(); + } diff --git a/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/properties/ServiceBusProcessorClientTestProperties.java b/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/properties/ServiceBusProcessorClientTestProperties.java index 5d3e158902b4..d9500bb1eee0 100644 --- a/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/properties/ServiceBusProcessorClientTestProperties.java +++ b/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/servicebus/properties/ServiceBusProcessorClientTestProperties.java @@ -11,6 +11,7 @@ public class ServiceBusProcessorClientTestProperties extends ServiceBusReceiverC private Integer maxConcurrentCalls; private Integer maxConcurrentSessions; private Duration sessionIdleTimeout; + private Duration drainTimeout; @Override public Integer getMaxConcurrentCalls() { @@ -38,4 +39,13 @@ public Duration getSessionIdleTimeout() { public void setSessionIdleTimeout(Duration sessionIdleTimeout) { this.sessionIdleTimeout = sessionIdleTimeout; } + + @Override + public Duration getDrainTimeout() { + return drainTimeout; + } + + public void setDrainTimeout(Duration drainTimeout) { + this.drainTimeout = drainTimeout; + } } diff --git a/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/properties/ProcessorProperties.java b/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/properties/ProcessorProperties.java index f302c999f496..f155dba04e92 100644 --- a/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/properties/ProcessorProperties.java +++ b/sdk/spring/spring-messaging-azure-servicebus/src/main/java/com/azure/spring/messaging/servicebus/core/properties/ProcessorProperties.java @@ -21,6 +21,7 @@ public ProcessorProperties() { private Integer maxConcurrentCalls; private Integer maxConcurrentSessions; private Duration sessionIdleTimeout; + private Duration drainTimeout; @Override public Integer getMaxConcurrentCalls() { @@ -60,4 +61,17 @@ public Duration getSessionIdleTimeout() { public void setSessionIdleTimeout(Duration sessionIdleTimeout) { this.sessionIdleTimeout = sessionIdleTimeout; } + + @Override + public Duration getDrainTimeout() { + return drainTimeout; + } + + /** + * Set the maximum time to wait for in-flight message handlers to complete during processor shutdown. + * @param drainTimeout the drain timeout duration. + */ + public void setDrainTimeout(Duration drainTimeout) { + this.drainTimeout = drainTimeout; + } }