Skip to content

Commit 3db46fd

Browse files
committed
add recycle backoff test
1 parent dbdc249 commit 3db46fd

2 files changed

Lines changed: 108 additions & 9 deletions

File tree

java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.bigtable.v2.TelemetryConfiguration;
2424
import com.google.cloud.bigtable.data.v2.internal.csm.tracers.DebugTagTracer;
2525
import com.google.common.annotations.VisibleForTesting;
26+
import com.google.common.base.Ticker;
2627
import com.google.common.collect.HashMultiset;
2728
import com.google.common.collect.Multiset;
2829
import io.grpc.CallOptions;
@@ -100,6 +101,8 @@ public class ChannelPoolDpImpl implements ChannelPool {
100101
@GuardedBy("this")
101102
private boolean closed = false;
102103

104+
private final Ticker ticker;
105+
103106
@GuardedBy("this")
104107
private long lastRecycleNano = 0;
105108

@@ -111,7 +114,14 @@ public ChannelPoolDpImpl(
111114
ChannelPoolConfiguration config,
112115
DebugTagTracer debugTagTracer,
113116
ScheduledExecutorService executor) {
114-
this(channelSupplier, config, DEFAULT_LOG_NAME, debugTagTracer, executor, Clock.systemUTC());
117+
this(
118+
channelSupplier,
119+
config,
120+
DEFAULT_LOG_NAME,
121+
debugTagTracer,
122+
executor,
123+
Clock.systemUTC(),
124+
Ticker.systemTicker());
115125
}
116126

117127
public ChannelPoolDpImpl(
@@ -120,7 +130,14 @@ public ChannelPoolDpImpl(
120130
String logName,
121131
DebugTagTracer debugTagTracer,
122132
ScheduledExecutorService executor) {
123-
this(channelSupplier, config, logName, debugTagTracer, executor, Clock.systemUTC());
133+
this(
134+
channelSupplier,
135+
config,
136+
logName,
137+
debugTagTracer,
138+
executor,
139+
Clock.systemUTC(),
140+
Ticker.systemTicker());
124141
}
125142

126143
public ChannelPoolDpImpl(
@@ -130,8 +147,20 @@ public ChannelPoolDpImpl(
130147
DebugTagTracer debugTagTracer,
131148
ScheduledExecutorService executor,
132149
Clock clock) {
150+
this(channelSupplier, config, logName, debugTagTracer, executor, clock, Ticker.systemTicker());
151+
}
152+
153+
public ChannelPoolDpImpl(
154+
Supplier<ManagedChannel> channelSupplier,
155+
ChannelPoolConfiguration config,
156+
String logName,
157+
DebugTagTracer debugTagTracer,
158+
ScheduledExecutorService executor,
159+
Clock clock,
160+
Ticker ticker) {
133161
this.poolLogId = String.format("%d-%s", INDEX.getAndIncrement(), logName);
134162
this.clock = clock;
163+
this.ticker = ticker;
135164
this.channelSupplier = channelSupplier;
136165
this.executor = executor;
137166
updateConfig(config);
@@ -351,11 +380,12 @@ private void recycleChannel(ChannelWrapper channelWrapper) {
351380
return;
352381
}
353382

354-
if (lastRecycleNano > System.nanoTime() - recycleBackoff.toNanos()) {
383+
long nowNano = ticker.read();
384+
if (nowNano - lastRecycleNano < recycleBackoff.toNanos()) {
355385
return;
356386
}
357387

358-
lastRecycleNano = System.nanoTime();
388+
lastRecycleNano = nowNano;
359389
recycleBackoff = recycleBackoff.multipliedBy(2);
360390
if (recycleBackoff.compareTo(MAX_RECYCLE_BACKOFF) > 0) {
361391
recycleBackoff = MAX_RECYCLE_BACKOFF;
@@ -388,9 +418,6 @@ private synchronized void serviceChannelsSafe() {
388418
log(Level.FINE, "Servicing channels");
389419
dumpState();
390420

391-
Instant now = Instant.now(clock);
392-
Instant createdAtThreshold = now.minus(Duration.ofMinutes(50));
393-
394421
// Thin out the channels in each group, so that each AFEGroup only has 1 channel
395422
for (AfeChannelGroup group : channelGroups) {
396423
if (LOGGER.isLoggable(Level.FINEST) && group.channels.size() > 1) {

java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImplTest.java

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.google.cloud.bigtable.data.v2.internal.channels.SessionStream.Listener;
3131
import com.google.cloud.bigtable.data.v2.internal.csm.NoopMetrics;
3232
import com.google.cloud.bigtable.data.v2.internal.csm.tracers.DebugTagTracer;
33+
import com.google.common.base.Ticker;
3334
import io.grpc.Attributes;
3435
import io.grpc.CallOptions;
3536
import io.grpc.ClientCall;
@@ -41,6 +42,8 @@
4142
import java.util.Base64;
4243
import java.util.List;
4344
import java.util.concurrent.ScheduledExecutorService;
45+
import java.util.concurrent.TimeUnit;
46+
import java.util.concurrent.atomic.AtomicLong;
4447
import java.util.function.Supplier;
4548
import org.junit.jupiter.api.Test;
4649
import org.junit.jupiter.api.extension.ExtendWith;
@@ -235,7 +238,6 @@ void testDownsize() {
235238
listener.onClose(Status.OK, new Metadata());
236239
}
237240

238-
when(clock.instant()).thenReturn(Instant.now());
239241
pool.serviceChannels();
240242
verify(channel, times(numChannels - pool.minGroups)).shutdown();
241243

@@ -295,7 +297,6 @@ void testDownsizeToOptimal() {
295297
// I.e. dumpState
296298
// FINE: ChannelPool channelGroups: 5, channels: 5, starting channels: 0, totalStreams: 19,
297299
// AFEs: 5, distribution: [4, 4, 4, 4, 3]
298-
when(clock.instant()).thenReturn(Instant.now());
299300
pool.serviceChannels();
300301

301302
// Should scale down to 4 channels. 19 / 5 round up = 4.
@@ -563,4 +564,75 @@ void testCancelledDoesNotIncrementFailures() {
563564

564565
pool.close();
565566
}
567+
568+
@Test
569+
void testRecycleChannelBackoff() {
570+
when(channelSupplier.get()).thenReturn(channel);
571+
when(channel.newCall(any(), any())).thenReturn(clientCall);
572+
doNothing().when(clientCall).start(listener.capture(), any());
573+
574+
Ticker ticker = mock(Ticker.class);
575+
long startNanos = TimeUnit.SECONDS.toNanos(1);
576+
final AtomicLong time = new AtomicLong(startNanos);
577+
when(ticker.read()).thenAnswer(invocation -> time.get());
578+
579+
ChannelPoolDpImpl pool =
580+
new ChannelPoolDpImpl(
581+
channelSupplier,
582+
defaultConfig,
583+
"pool",
584+
debugTagTracer,
585+
bgExecutor,
586+
Clock.systemUTC(),
587+
ticker);
588+
589+
// --- First Recycle ---
590+
for (int i = 0; i < 5; i++) {
591+
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
592+
.start(mock(Listener.class), new Metadata());
593+
listener.getValue().onClose(Status.UNAVAILABLE, new Metadata());
594+
}
595+
// Should be recycled once
596+
verify(channel, times(1)).shutdown();
597+
verify(channelSupplier, times(2)).get(); // 1 initial + 1 recycle
598+
599+
// --- Second Recycle (Immediate, same time) ---
600+
// Time has not advanced. Backoff is now 2ms.
601+
for (int i = 0; i < 5; i++) {
602+
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
603+
.start(mock(Listener.class), new Metadata());
604+
listener.getValue().onClose(Status.UNAVAILABLE, new Metadata());
605+
}
606+
// Should NOT be recycled again because of backoff
607+
verify(channel, times(1)).shutdown();
608+
verify(channelSupplier, times(2)).get();
609+
610+
// --- Third Recycle (After partial backoff, still blocked) ---
611+
// Advance time by 1ms (backoff is 2ms, so still blocked)
612+
time.addAndGet(TimeUnit.MILLISECONDS.toNanos(1));
613+
614+
for (int i = 0; i < 5; i++) {
615+
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
616+
.start(mock(Listener.class), new Metadata());
617+
listener.getValue().onClose(Status.UNAVAILABLE, new Metadata());
618+
}
619+
// Should still NOT be recycled
620+
verify(channel, times(1)).shutdown();
621+
verify(channelSupplier, times(2)).get();
622+
623+
// --- Fourth Recycle (After full backoff) ---
624+
// Advance time by another 2ms (total 3ms from last recycle, which is > 2ms backoff)
625+
time.addAndGet(TimeUnit.MILLISECONDS.toNanos(2));
626+
627+
for (int i = 0; i < 5; i++) {
628+
pool.newStream(FakeSessionGrpc.getOpenSessionMethod(), CallOptions.DEFAULT)
629+
.start(mock(Listener.class), new Metadata());
630+
listener.getValue().onClose(Status.UNAVAILABLE, new Metadata());
631+
}
632+
// Now it should be recycled again
633+
verify(channel, times(2)).shutdown();
634+
verify(channelSupplier, times(3)).get();
635+
636+
pool.close();
637+
}
566638
}

0 commit comments

Comments
 (0)