Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions api/src/main/java/io/grpc/ClientStreamTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ public void streamCreated(@Grpc.TransportAttr Attributes transportAttrs, Metadat
public void createPendingStream() {
}

/**
* A delay segment started with a specific reason during load balancing.
*
* @param reasonToken the reason for the delay, e.g., "pick_first:connecting"
* @since 1.82.0
*/
public void delayStarted(String reasonToken) {
}

/**
* The current delay segment ended.
*
* @since 1.82.0
*/
public void delayEnded() {
}

/**
* Headers has been sent to the socket.
*/
Expand Down
31 changes: 26 additions & 5 deletions api/src/main/java/io/grpc/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -549,25 +549,30 @@ public static final class PickResult {
// True if the result is created by withDrop()
private final boolean drop;
@Nullable private final String authorityOverride;
@Nullable private final String delayReasonToken;

private PickResult(
@Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory,
Status status, boolean drop) {
this.subchannel = subchannel;
this.streamTracerFactory = streamTracerFactory;
this.status = checkNotNull(status, "status");
this.drop = drop;
this.authorityOverride = null;
this(subchannel, streamTracerFactory, status, drop, null, null);
}

private PickResult(
@Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory,
Status status, boolean drop, @Nullable String authorityOverride) {
this(subchannel, streamTracerFactory, status, drop, authorityOverride, null);
}

private PickResult(
@Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory,
Status status, boolean drop, @Nullable String authorityOverride,
@Nullable String delayReasonToken) {
this.subchannel = subchannel;
this.streamTracerFactory = streamTracerFactory;
this.status = checkNotNull(status, "status");
this.drop = drop;
this.authorityOverride = authorityOverride;
this.delayReasonToken = delayReasonToken;
}

/**
Expand Down Expand Up @@ -727,6 +732,22 @@ public static PickResult withNoResult() {
return NO_RESULT;
}

/**
* No decision could be made. The RPC will stay buffered with a specific reason.
*
* @since 1.82.0
*/
public static PickResult withNoResult(String delayReasonToken) {
Preconditions.checkNotNull(delayReasonToken, "delayReasonToken");
return new PickResult(null, null, Status.OK, false, null, delayReasonToken);
}

/** Returns the delay reason token if any. */
@Nullable
public String getDelayReasonToken() {
return delayReasonToken;
}

/** Returns the authority override if any. */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11656")
@Nullable
Expand Down
47 changes: 42 additions & 5 deletions core/src/main/java/io/grpc/internal/DelayedClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ public final ClientStream newStream(
synchronized (lock) {
PickerState newerState = pickerState;
if (state == newerState) {
return createPendingStream(args, tracers, pickResult);
String token = pickResult != null ? pickResult.getDelayReasonToken() : null;
return createPendingStream(args, tracers, pickResult, token);
}
state = newerState;
}
Expand All @@ -173,8 +174,8 @@ public final ClientStream newStream(
*/
@GuardedBy("lock")
private PendingStream createPendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers,
PickResult pickResult) {
PendingStream pendingStream = new PendingStream(args, tracers);
PickResult pickResult, @Nullable String delayReasonToken) {
PendingStream pendingStream = new PendingStream(args, tracers, delayReasonToken);
if (args.getCallOptions().isWaitForReady() && pickResult != null && pickResult.hasResult()) {
pendingStream.lastPickStatus = pickResult.getStatus();
}
Expand Down Expand Up @@ -303,6 +304,7 @@ final void reprocess(@Nullable SubchannelPicker picker) {
final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
callOptions.isWaitForReady());
if (transport != null) {
stream.endDelay();
Executor executor = defaultAppExecutor;
// createRealStream may be expensive. It will start real streams on the transport. If
// there are pending requests, they will be serialized too, which may be expensive. Since
Expand All @@ -315,7 +317,9 @@ final void reprocess(@Nullable SubchannelPicker picker) {
executor.execute(runnable);
}
toRemove.add(stream);
} // else: stay pending
} else { // stay pending
stream.updateDelayReason(pickResult.getDelayReasonToken());
}
}

synchronized (lock) {
Expand Down Expand Up @@ -361,11 +365,43 @@ private class PendingStream extends DelayedStream {
private final Context context = Context.current();
private final ClientStreamTracer[] tracers;
private volatile Status lastPickStatus;
@Nullable private String delayReasonToken;

private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, @Nullable String initialToken) {
super("connecting_and_lb");
this.args = args;
this.tracers = tracers;
this.delayReasonToken = initialToken;
if (initialToken != null) {
for (ClientStreamTracer tracer : tracers) {
tracer.delayStarted(initialToken);
}
}
}

void updateDelayReason(String newToken) {
if (!java.util.Objects.equals(delayReasonToken, newToken)) {
if (delayReasonToken != null) {
for (ClientStreamTracer tracer : tracers) {
tracer.delayEnded();
}
}
delayReasonToken = newToken;
if (newToken != null) {
for (ClientStreamTracer tracer : tracers) {
tracer.delayStarted(newToken);
}
}
}
}

void endDelay() {
if (delayReasonToken != null) {
for (ClientStreamTracer tracer : tracers) {
tracer.delayEnded();
}
delayReasonToken = null;
}
}

/** Runnable may be null. */
Expand All @@ -391,6 +427,7 @@ private Runnable createRealStream(ClientTransport transport, String authorityOve

@Override
public void cancel(Status reason) {
endDelay();
super.cancel(reason);
synchronized (lock) {
if (reportTransportTerminated != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* list and sticking to the first that works.
*/
final class PickFirstLoadBalancer extends LoadBalancer {
private static final PickResult CONNECTING_RESULT = PickResult.withNoResult("pick_first:connecting");
private final Helper helper;
private Subchannel subchannel;
private ConnectivityState currentState = IDLE;
Expand Down Expand Up @@ -83,7 +84,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) {

// The channel state does not get updated when doing name resolving today, so for the moment
// let LB report CONNECTION and call subchannel.requestConnection() immediately.
updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
updateBalancingState(CONNECTING, new FixedResultPicker(CONNECTING_RESULT));
subchannel.requestConnection();
} else {
subchannel.updateAddresses(servers);
Expand Down Expand Up @@ -135,7 +136,7 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
case CONNECTING:
// It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave
// the current picker in-place. But ignoring the potential optimization is simpler.
picker = new FixedResultPicker(PickResult.withNoResult());
picker = new FixedResultPicker(CONNECTING_RESULT);
break;
case READY:
picker = new FixedResultPicker(PickResult.withSubchannel(subchannel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,54 @@ public void pendingStream_appendTimeoutInsight_waitForReady_withLastPickFailure(
+ " connecting_and_lb_delay=[0-9]+ns, was_still_waiting]");
}

@Test
public void streamDelayMetrics() {
ClientStreamTracer mockTracer = mock(ClientStreamTracer.class);
ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer };

SubchannelPicker connectingPicker = mock(SubchannelPicker.class);
when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withNoResult("pick_first:connecting"));

delayedTransport.reprocess(connectingPicker);
ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers);

InOrder inOrder = inOrder(mockTracer);
inOrder.verify(mockTracer).delayStarted("pick_first:connecting");

SubchannelPicker customDelayPicker = mock(SubchannelPicker.class);
when(customDelayPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withNoResult("rls:lookup_pending"));

delayedTransport.reprocess(customDelayPicker);

inOrder.verify(mockTracer).delayEnded();
inOrder.verify(mockTracer).delayStarted("rls:lookup_pending");

delayedTransport.reprocess(mockPicker);

inOrder.verify(mockTracer).delayEnded();
}

@Test
public void streamDelayMetrics_cancelled() {
ClientStreamTracer mockTracer = mock(ClientStreamTracer.class);
ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer };

SubchannelPicker connectingPicker = mock(SubchannelPicker.class);
when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withNoResult("pick_first:connecting"));

delayedTransport.reprocess(connectingPicker);
ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers);

verify(mockTracer).delayStarted("pick_first:connecting");

stream.cancel(Status.CANCELLED);

verify(mockTracer).delayEnded();
}

private static TransportProvider newTransportProvider(final ClientTransport transport) {
return new TransportProvider() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,9 @@ public void pickAfterResolved() throws Exception {
verify(mockSubchannel).requestConnection();

// Calling pickSubchannel() twice gave the same result
assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs),
pickerCaptor.getValue().pickSubchannel(mockArgs));
PickResult result = pickerCaptor.getValue().pickSubchannel(mockArgs);
assertThat(result.getDelayReasonToken()).isEqualTo("pick_first:connecting");
assertEquals(result, pickerCaptor.getValue().pickSubchannel(mockArgs));

verifyNoMoreInteractions(mockHelper);
}
Expand Down
2 changes: 1 addition & 1 deletion rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
convertRlsServerStatus(response.getStatus(),
lbPolicyConfig.getRouteLookupConfig().lookupService()));
} else {
return PickResult.withNoResult();
return PickResult.withNoResult("rls:lookup_pending");
}
}

Expand Down
2 changes: 2 additions & 0 deletions rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception {
PickResult res = picker.pickSubchannel(searchSubchannelArgs);
assertThat(res.getStatus().isOk()).isTrue();
assertThat(res.getSubchannel()).isNull();
assertThat(res.getDelayReasonToken()).isEqualTo("rls:lookup_pending");
// Cache is warm, but still unconnected
res = picker.pickSubchannel(searchSubchannelArgs);
inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class));
Expand Down Expand Up @@ -493,6 +494,7 @@ public void lb_working_withoutDefaultTarget() throws Exception {
PickResult res = picker.pickSubchannel(searchSubchannelArgs);
assertThat(res.getStatus().isOk()).isTrue();
assertThat(res.getSubchannel()).isNull();
assertThat(res.getDelayReasonToken()).isEqualTo("rls:lookup_pending");
// Cache is warm, but still unconnected
res = picker.pickSubchannel(searchSubchannelArgs);
inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class));
Expand Down
5 changes: 3 additions & 2 deletions util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@
* EquivalentAddressGroup}s from the {@link NameResolver}.
*/
final class RoundRobinLoadBalancer extends MultiChildLoadBalancer {
private static final PickResult CONNECTING_RESULT = PickResult.withNoResult("round_robin:connecting");
private final AtomicInteger sequence = new AtomicInteger(new Random().nextInt());
private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
private SubchannelPicker currentPicker = new FixedResultPicker(CONNECTING_RESULT);

public RoundRobinLoadBalancer(Helper helper) {
super(helper);
Expand All @@ -68,7 +69,7 @@ protected void updateOverallBalancingState() {
}

if (isConnecting) {
updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
updateBalancingState(CONNECTING, new FixedResultPicker(CONNECTING_RESULT));
} else {
updateBalancingState(TRANSIENT_FAILURE, createReadyPicker(getChildLbStates()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
public class RoundRobinLoadBalancerTest {
private static final Attributes.Key<String> MAJOR_KEY = Attributes.Key.create("major-key");
private static final SubchannelPicker EMPTY_PICKER =
new FixedResultPicker(PickResult.withNoResult());
new FixedResultPicker(PickResult.withNoResult("round_robin:connecting"));

@Rule public final MockitoRule mocks = MockitoJUnit.rule();

Expand Down
1 change: 1 addition & 0 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
errorPrefix() + "Unable to find non-dynamic cluster"));
}
// The dynamic cluster must not have loaded yet
helper.updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult("cds:discovery_pending")));
return Status.OK;
}
if (!clusterConfigOr.hasValue()) {
Expand Down
12 changes: 11 additions & 1 deletion xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,17 @@ public void updateBalancingState(final ConnectivityState newState,
}
ConnectivityState oldState = connectivityState;
connectivityState = newState;
picker = newPicker;
picker = new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
PickResult childResult = newPicker.pickSubchannel(args);
if (!childResult.hasResult() && childResult.getDelayReasonToken() != null) {
return PickResult.withNoResult(
"priority_" + priority + ":" + childResult.getDelayReasonToken());
}
return childResult;
}
};

if (deletionTimer != null && deletionTimer.isPending()) {
return;
Expand Down
8 changes: 5 additions & 3 deletions xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ public static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) {
}

private static final class RingHashPicker extends SubchannelPicker {
private static final PickResult RING_HASH_CONNECTING_RESULT =
PickResult.withNoResult("ring_hash:connecting");
private final SynchronizationContext syncContext;
private final List<RingEntry> ring;
// Avoid synchronization between pickSubchannel and subchannel's connectivity state change,
Expand Down Expand Up @@ -453,7 +455,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
// RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs
// are failed unless there is a READY connection.
if (subchannelView.connectivityState == CONNECTING) {
return PickResult.withNoResult();
return RING_HASH_CONNECTING_RESULT;
}

if (subchannelView.connectivityState == IDLE) {
Expand All @@ -463,7 +465,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
}
});

return PickResult.withNoResult(); // Indicates that this should be retried after backoff
return RING_HASH_CONNECTING_RESULT; // Indicates that this should be retried after backoff
}
}
} else {
Expand All @@ -487,7 +489,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
}
}
if (requestedConnection) {
return PickResult.withNoResult();
return RING_HASH_CONNECTING_RESULT;
}
}

Expand Down
Loading
Loading