Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#### Bugs Fixed

#### Other Changes
* Replaced per-client `Schedulers.newSingle()` schedulers in `GlobalEndpointManager` and `GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker` with shared `BoundedElastic` schedulers in `CosmosSchedulers` to prevent thread count from scaling linearly with client/tenant count. - See [PR 49062](https://github.com/Azure/azure-sdk-for-java/pull/49062)

### 4.80.0 (2026-05-01)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public class CosmosSchedulers {
private final static String OPEN_CONNECTIONS_BOUNDED_ELASTIC_THREAD_NAME = "open-connections-bounded-elastic";
private final static String ASYNC_CACHE_BACKGROUND_REFRESH_THREAD_NAME = "async-cache-background-refresh-bounded-elastic";
private final static String FAULT_INJECTION_CONNECTION_ERROR_THREAD_NAME = "fault-injection-connection-error-bounded-elastic";
private final static String GLOBAL_ENDPOINT_MANAGER_THREAD_NAME = "global-endpoint-manager-bounded-elastic";
private final static String PARTITION_AVAILABILITY_CHECK_THREAD_NAME = "partition-availability-check-bounded-elastic";

private final static int TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60; // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS

Expand Down Expand Up @@ -97,4 +99,26 @@ public class CosmosSchedulers {
TTL_FOR_SCHEDULER_WORKER_IN_SECONDS,
true
);

// Shared scheduler for GlobalEndpointManager background location refresh tasks.
// Using a shared bounded elastic scheduler instead of per-client Schedulers.newSingle()
// to prevent thread count from scaling linearly with client/tenant count.
public final static Scheduler GLOBAL_ENDPOINT_MANAGER_BOUNDED_ELASTIC = Schedulers.newBoundedElastic(
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
GLOBAL_ENDPOINT_MANAGER_THREAD_NAME,
TTL_FOR_SCHEDULER_WORKER_IN_SECONDS,
true
);

// Shared scheduler for per-partition circuit breaker availability staleness checks.
// Using a shared bounded elastic scheduler instead of per-client Schedulers.newSingle()
// to prevent thread count from scaling linearly with client/tenant count.
public final static Scheduler PARTITION_AVAILABILITY_CHECK_BOUNDED_ELASTIC = Schedulers.newBoundedElastic(
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
PARTITION_AVAILABILITY_CHECK_THREAD_NAME,
TTL_FOR_SCHEDULER_WORKER_IN_SECONDS,
true
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
import com.azure.cosmos.implementation.routing.RegionalRoutingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.net.URI;
import java.time.Duration;
Expand All @@ -23,6 +22,7 @@
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -34,8 +34,6 @@
public class GlobalEndpointManager implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(GlobalEndpointManager.class);

private static final CosmosDaemonThreadFactory theadFactory = new CosmosDaemonThreadFactory("cosmos-global-endpoint-mgr");

private final int backgroundRefreshLocationTimeIntervalInMS;
private final int backgroundRefreshJitterMaxInSeconds;
private final LocationCache locationCache;
Expand All @@ -45,7 +43,7 @@ public class GlobalEndpointManager implements AutoCloseable {
private final DatabaseAccountManagerInternal owner;
private final AtomicBoolean isRefreshing;
private final AtomicBoolean refreshInBackground;
private final Scheduler scheduler = Schedulers.newSingle(theadFactory);
private final AtomicReference<Disposable> backgroundRefreshDisposable = new AtomicReference<>();
private volatile boolean isClosed;
private volatile DatabaseAccount latestDatabaseAccount;
private final AtomicBoolean hasThinClientReadLocations = new AtomicBoolean(false);
Expand Down Expand Up @@ -194,7 +192,10 @@ public boolean canUseMultipleWriteLocations(RxDocumentServiceRequest request) {
public void close() {
this.isClosed = true;
this.perPartitionAutomaticFailoverConfigModifier = null;
this.scheduler.dispose();
Disposable disposable = this.backgroundRefreshDisposable.getAndSet(null);
if (disposable != null && !disposable.isDisposed()) {
disposable.dispose();
}
logger.debug("GlobalEndpointManager closed.");
}
Comment thread
xinlian12 marked this conversation as resolved.

Expand Down Expand Up @@ -322,7 +323,11 @@ private Mono<Void> refreshLocationPrivateAsync(DatabaseAccount databaseAccount)
}

private void startRefreshLocationTimerAsync() {
startRefreshLocationTimerAsync(false).subscribe();
Disposable newDisposable = startRefreshLocationTimerAsync(false).subscribe();
Disposable oldDisposable = this.backgroundRefreshDisposable.getAndSet(newDisposable);
if (oldDisposable != null && !oldDisposable.isDisposed()) {
oldDisposable.dispose();
}
}

private Mono<Void> startRefreshLocationTimerAsync(boolean initialization) {
Expand Down Expand Up @@ -371,7 +376,7 @@ private Mono<Void> startRefreshLocationTimerAsync(boolean initialization) {

this.startRefreshLocationTimerAsync();
return Mono.empty();
}).subscribeOn(scheduler);
}).subscribeOn(CosmosSchedulers.GLOBAL_ENDPOINT_MANAGER_BOUNDED_ELASTIC);
}

public boolean hasThinClientReadLocations() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.GlobalEndpointManager;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.OperationType;
Expand All @@ -22,10 +23,9 @@
import com.azure.cosmos.implementation.routing.RegionalRoutingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.net.URI;
import java.time.Duration;
Expand Down Expand Up @@ -56,9 +56,7 @@ public class GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker impleme
private final ConcurrentHashMap<RegionalRoutingContext, String> regionalRoutingContextToRegion;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicBoolean isPartitionRecoveryTaskRunning = new AtomicBoolean(false);
private final Scheduler partitionRecoveryScheduler = Schedulers.newSingle(
"partition-availability-staleness-check",
true);
private final AtomicReference<Disposable> partitionRecoveryDisposable = new AtomicReference<>();

public GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker(GlobalEndpointManager globalEndpointManager) {
this.partitionKeyRangeToLocationSpecificUnavailabilityInfo = new ConcurrentHashMap<>();
Expand All @@ -73,8 +71,12 @@ public GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker(GlobalEndpoin
}

public void init() {
if (this.consecutiveExceptionBasedCircuitBreaker.isPartitionLevelCircuitBreakerEnabled() && !this.isPartitionRecoveryTaskRunning.get()) {
this.updateStaleLocationInfo().subscribeOn(this.partitionRecoveryScheduler).doOnSubscribe(ignore -> this.isPartitionRecoveryTaskRunning.set(true)).subscribe();
if (this.consecutiveExceptionBasedCircuitBreaker.isPartitionLevelCircuitBreakerEnabled()
&& this.isPartitionRecoveryTaskRunning.compareAndSet(false, true)) {

this.partitionRecoveryDisposable.set(this.updateStaleLocationInfo()
.subscribeOn(CosmosSchedulers.PARTITION_AVAILABILITY_CHECK_BOUNDED_ELASTIC)
.subscribe());
}
}

Expand Down Expand Up @@ -449,7 +451,10 @@ public void setGlobalAddressResolver(GlobalAddressResolver globalAddressResolver
@Override
public void close() {
this.isClosed.set(true);
this.partitionRecoveryScheduler.dispose();
Disposable disposable = this.partitionRecoveryDisposable.getAndSet(null);
if (disposable != null && !disposable.isDisposed()) {
disposable.dispose();
}
}
Comment thread
xinlian12 marked this conversation as resolved.

private class PartitionLevelLocationUnavailabilityInfo {
Expand Down
Loading