diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 13c8130c8f1d..6eb1b76bad4e 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -9,6 +9,7 @@ #### Bugs Fixed #### Other Changes +* Replaced per-client `Schedulers.newSingle()` schedulers in `GlobalEndpointManager` and `GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker` with shared `BoundedElastic` schedulers in `CosmosSchedulers` to prevent thread count from scaling linearly with client/tenant count. - See [PR 49062](https://github.com/Azure/azure-sdk-for-java/pull/49062) ### 4.80.0 (2026-05-01) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java index b7db63c7e10b..65bb2c9a7e9f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java @@ -16,6 +16,8 @@ public class CosmosSchedulers { private final static String OPEN_CONNECTIONS_BOUNDED_ELASTIC_THREAD_NAME = "open-connections-bounded-elastic"; private final static String ASYNC_CACHE_BACKGROUND_REFRESH_THREAD_NAME = "async-cache-background-refresh-bounded-elastic"; private final static String FAULT_INJECTION_CONNECTION_ERROR_THREAD_NAME = "fault-injection-connection-error-bounded-elastic"; + private final static String GLOBAL_ENDPOINT_MANAGER_THREAD_NAME = "global-endpoint-manager-bounded-elastic"; + private final static String PARTITION_AVAILABILITY_CHECK_THREAD_NAME = "partition-availability-check-bounded-elastic"; private final static int TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60; // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS @@ -97,4 +99,26 @@ public class CosmosSchedulers { TTL_FOR_SCHEDULER_WORKER_IN_SECONDS, true ); + + // Shared scheduler for GlobalEndpointManager background location refresh tasks. + // Using a shared bounded elastic scheduler instead of per-client Schedulers.newSingle() + // to prevent thread count from scaling linearly with client/tenant count. + public final static Scheduler GLOBAL_ENDPOINT_MANAGER_BOUNDED_ELASTIC = Schedulers.newBoundedElastic( + Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, + Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, + GLOBAL_ENDPOINT_MANAGER_THREAD_NAME, + TTL_FOR_SCHEDULER_WORKER_IN_SECONDS, + true + ); + + // Shared scheduler for per-partition circuit breaker availability staleness checks. + // Using a shared bounded elastic scheduler instead of per-client Schedulers.newSingle() + // to prevent thread count from scaling linearly with client/tenant count. + public final static Scheduler PARTITION_AVAILABILITY_CHECK_BOUNDED_ELASTIC = Schedulers.newBoundedElastic( + Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, + Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, + PARTITION_AVAILABILITY_CHECK_THREAD_NAME, + TTL_FOR_SCHEDULER_WORKER_IN_SECONDS, + true + ); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java index d67f8d77088c..e313b5219713 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java @@ -9,10 +9,9 @@ import com.azure.cosmos.implementation.routing.RegionalRoutingContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; import java.net.URI; import java.time.Duration; @@ -23,6 +22,7 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.function.Function; @@ -34,8 +34,6 @@ public class GlobalEndpointManager implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(GlobalEndpointManager.class); - private static final CosmosDaemonThreadFactory theadFactory = new CosmosDaemonThreadFactory("cosmos-global-endpoint-mgr"); - private final int backgroundRefreshLocationTimeIntervalInMS; private final int backgroundRefreshJitterMaxInSeconds; private final LocationCache locationCache; @@ -45,7 +43,7 @@ public class GlobalEndpointManager implements AutoCloseable { private final DatabaseAccountManagerInternal owner; private final AtomicBoolean isRefreshing; private final AtomicBoolean refreshInBackground; - private final Scheduler scheduler = Schedulers.newSingle(theadFactory); + private final AtomicReference backgroundRefreshDisposable = new AtomicReference<>(); private volatile boolean isClosed; private volatile DatabaseAccount latestDatabaseAccount; private final AtomicBoolean hasThinClientReadLocations = new AtomicBoolean(false); @@ -194,7 +192,10 @@ public boolean canUseMultipleWriteLocations(RxDocumentServiceRequest request) { public void close() { this.isClosed = true; this.perPartitionAutomaticFailoverConfigModifier = null; - this.scheduler.dispose(); + Disposable disposable = this.backgroundRefreshDisposable.getAndSet(null); + if (disposable != null && !disposable.isDisposed()) { + disposable.dispose(); + } logger.debug("GlobalEndpointManager closed."); } @@ -322,7 +323,11 @@ private Mono refreshLocationPrivateAsync(DatabaseAccount databaseAccount) } private void startRefreshLocationTimerAsync() { - startRefreshLocationTimerAsync(false).subscribe(); + Disposable newDisposable = startRefreshLocationTimerAsync(false).subscribe(); + Disposable oldDisposable = this.backgroundRefreshDisposable.getAndSet(newDisposable); + if (oldDisposable != null && !oldDisposable.isDisposed()) { + oldDisposable.dispose(); + } } private Mono startRefreshLocationTimerAsync(boolean initialization) { @@ -371,7 +376,7 @@ private Mono startRefreshLocationTimerAsync(boolean initialization) { this.startRefreshLocationTimerAsync(); return Mono.empty(); - }).subscribeOn(scheduler); + }).subscribeOn(CosmosSchedulers.GLOBAL_ENDPOINT_MANAGER_BOUNDED_ELASTIC); } public boolean hasThinClientReadLocations() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionCircuitBreaker/GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionCircuitBreaker/GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker.java index 0213f2255144..cf5ffe7aa874 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionCircuitBreaker/GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionCircuitBreaker/GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker.java @@ -7,6 +7,7 @@ import com.azure.cosmos.CosmosDiagnostics; import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.Configs; +import com.azure.cosmos.implementation.CosmosSchedulers; import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.OperationType; @@ -22,10 +23,9 @@ import com.azure.cosmos.implementation.routing.RegionalRoutingContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; import java.net.URI; import java.time.Duration; @@ -56,9 +56,7 @@ public class GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker impleme private final ConcurrentHashMap regionalRoutingContextToRegion; private final AtomicBoolean isClosed = new AtomicBoolean(false); private final AtomicBoolean isPartitionRecoveryTaskRunning = new AtomicBoolean(false); - private final Scheduler partitionRecoveryScheduler = Schedulers.newSingle( - "partition-availability-staleness-check", - true); + private final AtomicReference partitionRecoveryDisposable = new AtomicReference<>(); public GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker(GlobalEndpointManager globalEndpointManager) { this.partitionKeyRangeToLocationSpecificUnavailabilityInfo = new ConcurrentHashMap<>(); @@ -73,8 +71,12 @@ public GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker(GlobalEndpoin } public void init() { - if (this.consecutiveExceptionBasedCircuitBreaker.isPartitionLevelCircuitBreakerEnabled() && !this.isPartitionRecoveryTaskRunning.get()) { - this.updateStaleLocationInfo().subscribeOn(this.partitionRecoveryScheduler).doOnSubscribe(ignore -> this.isPartitionRecoveryTaskRunning.set(true)).subscribe(); + if (this.consecutiveExceptionBasedCircuitBreaker.isPartitionLevelCircuitBreakerEnabled() + && this.isPartitionRecoveryTaskRunning.compareAndSet(false, true)) { + + this.partitionRecoveryDisposable.set(this.updateStaleLocationInfo() + .subscribeOn(CosmosSchedulers.PARTITION_AVAILABILITY_CHECK_BOUNDED_ELASTIC) + .subscribe()); } } @@ -449,7 +451,10 @@ public void setGlobalAddressResolver(GlobalAddressResolver globalAddressResolver @Override public void close() { this.isClosed.set(true); - this.partitionRecoveryScheduler.dispose(); + Disposable disposable = this.partitionRecoveryDisposable.getAndSet(null); + if (disposable != null && !disposable.isDisposed()) { + disposable.dispose(); + } } private class PartitionLevelLocationUnavailabilityInfo {