From b50ca84e30c151d7f30c8917a470acbfc7ffa126 Mon Sep 17 00:00:00 2001 From: agravator Date: Wed, 13 May 2026 22:15:34 +0530 Subject: [PATCH 1/2] core,api,xds: Implement load balancing policy delay plumbing This commit implements the plumbing required to propagate delay reason tokens from load balancing policies up to the transport layer and tracers, as specified in the LB policy delay design. --- .../main/java/io/grpc/ClientStreamTracer.java | 17 +++++++ api/src/main/java/io/grpc/LoadBalancer.java | 31 ++++++++++-- .../grpc/internal/DelayedClientTransport.java | 47 ++++++++++++++++-- .../grpc/internal/PickFirstLoadBalancer.java | 5 +- .../internal/DelayedClientTransportTest.java | 48 +++++++++++++++++++ .../internal/PickFirstLoadBalancerTest.java | 5 +- .../java/io/grpc/rls/CachingRlsLbClient.java | 2 +- .../java/io/grpc/rls/RlsLoadBalancerTest.java | 2 + .../io/grpc/util/RoundRobinLoadBalancer.java | 5 +- .../grpc/util/RoundRobinLoadBalancerTest.java | 2 +- .../java/io/grpc/xds/CdsLoadBalancer2.java | 1 + .../io/grpc/xds/PriorityLoadBalancer.java | 12 ++++- .../io/grpc/xds/RingHashLoadBalancer.java | 8 ++-- .../io/grpc/xds/CdsLoadBalancer2Test.java | 22 +++++++++ .../io/grpc/xds/PriorityLoadBalancerTest.java | 30 ++++++++++++ .../io/grpc/xds/RingHashLoadBalancerTest.java | 41 ++++++++++++++++ 16 files changed, 256 insertions(+), 22 deletions(-) diff --git a/api/src/main/java/io/grpc/ClientStreamTracer.java b/api/src/main/java/io/grpc/ClientStreamTracer.java index 42e1fdfebea..07ceb11fa59 100644 --- a/api/src/main/java/io/grpc/ClientStreamTracer.java +++ b/api/src/main/java/io/grpc/ClientStreamTracer.java @@ -57,6 +57,23 @@ public void streamCreated(@Grpc.TransportAttr Attributes transportAttrs, Metadat public void createPendingStream() { } + /** + * A delay segment started with a specific reason during load balancing. + * + * @param reasonToken the reason for the delay, e.g., "pick_first:connecting" + * @since 1.82.0 + */ + public void delayStarted(String reasonToken) { + } + + /** + * The current delay segment ended. + * + * @since 1.82.0 + */ + public void delayEnded() { + } + /** * Headers has been sent to the socket. */ diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index 3187ae8ef1b..d3af8822058 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -549,25 +549,30 @@ public static final class PickResult { // True if the result is created by withDrop() private final boolean drop; @Nullable private final String authorityOverride; + @Nullable private final String delayReasonToken; private PickResult( @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, Status status, boolean drop) { - this.subchannel = subchannel; - this.streamTracerFactory = streamTracerFactory; - this.status = checkNotNull(status, "status"); - this.drop = drop; - this.authorityOverride = null; + this(subchannel, streamTracerFactory, status, drop, null, null); } private PickResult( @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, Status status, boolean drop, @Nullable String authorityOverride) { + this(subchannel, streamTracerFactory, status, drop, authorityOverride, null); + } + + private PickResult( + @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, + Status status, boolean drop, @Nullable String authorityOverride, + @Nullable String delayReasonToken) { this.subchannel = subchannel; this.streamTracerFactory = streamTracerFactory; this.status = checkNotNull(status, "status"); this.drop = drop; this.authorityOverride = authorityOverride; + this.delayReasonToken = delayReasonToken; } /** @@ -727,6 +732,22 @@ public static PickResult withNoResult() { return NO_RESULT; } + /** + * No decision could be made. The RPC will stay buffered with a specific reason. + * + * @since 1.82.0 + */ + public static PickResult withNoResult(String delayReasonToken) { + Preconditions.checkNotNull(delayReasonToken, "delayReasonToken"); + return new PickResult(null, null, Status.OK, false, null, delayReasonToken); + } + + /** Returns the delay reason token if any. */ + @Nullable + public String getDelayReasonToken() { + return delayReasonToken; + } + /** Returns the authority override if any. */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11656") @Nullable diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 5569e1eecf8..d979f50a648 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -157,7 +157,8 @@ public final ClientStream newStream( synchronized (lock) { PickerState newerState = pickerState; if (state == newerState) { - return createPendingStream(args, tracers, pickResult); + String token = pickResult != null ? pickResult.getDelayReasonToken() : null; + return createPendingStream(args, tracers, pickResult, token); } state = newerState; } @@ -173,8 +174,8 @@ public final ClientStream newStream( */ @GuardedBy("lock") private PendingStream createPendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, - PickResult pickResult) { - PendingStream pendingStream = new PendingStream(args, tracers); + PickResult pickResult, @Nullable String delayReasonToken) { + PendingStream pendingStream = new PendingStream(args, tracers, delayReasonToken); if (args.getCallOptions().isWaitForReady() && pickResult != null && pickResult.hasResult()) { pendingStream.lastPickStatus = pickResult.getStatus(); } @@ -303,6 +304,7 @@ final void reprocess(@Nullable SubchannelPicker picker) { final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, callOptions.isWaitForReady()); if (transport != null) { + stream.endDelay(); Executor executor = defaultAppExecutor; // createRealStream may be expensive. It will start real streams on the transport. If // there are pending requests, they will be serialized too, which may be expensive. Since @@ -315,7 +317,9 @@ final void reprocess(@Nullable SubchannelPicker picker) { executor.execute(runnable); } toRemove.add(stream); - } // else: stay pending + } else { // stay pending + stream.updateDelayReason(pickResult.getDelayReasonToken()); + } } synchronized (lock) { @@ -361,11 +365,43 @@ private class PendingStream extends DelayedStream { private final Context context = Context.current(); private final ClientStreamTracer[] tracers; private volatile Status lastPickStatus; + @Nullable private String delayReasonToken; - private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) { + private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, @Nullable String initialToken) { super("connecting_and_lb"); this.args = args; this.tracers = tracers; + this.delayReasonToken = initialToken; + if (initialToken != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayStarted(initialToken); + } + } + } + + void updateDelayReason(String newToken) { + if (!java.util.Objects.equals(delayReasonToken, newToken)) { + if (delayReasonToken != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayEnded(); + } + } + delayReasonToken = newToken; + if (newToken != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayStarted(newToken); + } + } + } + } + + void endDelay() { + if (delayReasonToken != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayEnded(); + } + delayReasonToken = null; + } } /** Runnable may be null. */ @@ -391,6 +427,7 @@ private Runnable createRealStream(ClientTransport transport, String authorityOve @Override public void cancel(Status reason) { + endDelay(); super.cancel(reason); synchronized (lock) { if (reportTransportTerminated != null) { diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java index cf4b4c94e04..b8e501da561 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java @@ -38,6 +38,7 @@ * list and sticking to the first that works. */ final class PickFirstLoadBalancer extends LoadBalancer { + private static final PickResult CONNECTING_RESULT = PickResult.withNoResult("pick_first:connecting"); private final Helper helper; private Subchannel subchannel; private ConnectivityState currentState = IDLE; @@ -83,7 +84,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) { // The channel state does not get updated when doing name resolving today, so for the moment // let LB report CONNECTION and call subchannel.requestConnection() immediately. - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState(CONNECTING, new FixedResultPicker(CONNECTING_RESULT)); subchannel.requestConnection(); } else { subchannel.updateAddresses(servers); @@ -135,7 +136,7 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo case CONNECTING: // It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave // the current picker in-place. But ignoring the potential optimization is simpler. - picker = new FixedResultPicker(PickResult.withNoResult()); + picker = new FixedResultPicker(CONNECTING_RESULT); break; case READY: picker = new FixedResultPicker(PickResult.withSubchannel(subchannel)); diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index d7e1d4ca4f6..5891d26f344 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -772,6 +772,54 @@ public void pendingStream_appendTimeoutInsight_waitForReady_withLastPickFailure( + " connecting_and_lb_delay=[0-9]+ns, was_still_waiting]"); } + @Test + public void streamDelayMetrics() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + SubchannelPicker connectingPicker = mock(SubchannelPicker.class); + when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("pick_first:connecting")); + + delayedTransport.reprocess(connectingPicker); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers); + + InOrder inOrder = inOrder(mockTracer); + inOrder.verify(mockTracer).delayStarted("pick_first:connecting"); + + SubchannelPicker customDelayPicker = mock(SubchannelPicker.class); + when(customDelayPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("rls:lookup_pending")); + + delayedTransport.reprocess(customDelayPicker); + + inOrder.verify(mockTracer).delayEnded(); + inOrder.verify(mockTracer).delayStarted("rls:lookup_pending"); + + delayedTransport.reprocess(mockPicker); + + inOrder.verify(mockTracer).delayEnded(); + } + + @Test + public void streamDelayMetrics_cancelled() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + SubchannelPicker connectingPicker = mock(SubchannelPicker.class); + when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("pick_first:connecting")); + + delayedTransport.reprocess(connectingPicker); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers); + + verify(mockTracer).delayStarted("pick_first:connecting"); + + stream.cancel(Status.CANCELLED); + + verify(mockTracer).delayEnded(); + } + private static TransportProvider newTransportProvider(final ClientTransport transport) { return new TransportProvider() { @Override diff --git a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java index 1e130423a45..5bfabd7ea0e 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java @@ -147,8 +147,9 @@ public void pickAfterResolved() throws Exception { verify(mockSubchannel).requestConnection(); // Calling pickSubchannel() twice gave the same result - assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), - pickerCaptor.getValue().pickSubchannel(mockArgs)); + PickResult result = pickerCaptor.getValue().pickSubchannel(mockArgs); + assertThat(result.getDelayReasonToken()).isEqualTo("pick_first:connecting"); + assertEquals(result, pickerCaptor.getValue().pickSubchannel(mockArgs)); verifyNoMoreInteractions(mockHelper); } diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index a2846fd04c8..2748a2679f1 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -1050,7 +1050,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { convertRlsServerStatus(response.getStatus(), lbPolicyConfig.getRouteLookupConfig().lookupService())); } else { - return PickResult.withNoResult(); + return PickResult.withNoResult("rls:lookup_pending"); } } diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index a52390743a6..d5d94c4dd6e 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -262,6 +262,7 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception { PickResult res = picker.pickSubchannel(searchSubchannelArgs); assertThat(res.getStatus().isOk()).isTrue(); assertThat(res.getSubchannel()).isNull(); + assertThat(res.getDelayReasonToken()).isEqualTo("rls:lookup_pending"); // Cache is warm, but still unconnected res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); @@ -493,6 +494,7 @@ public void lb_working_withoutDefaultTarget() throws Exception { PickResult res = picker.pickSubchannel(searchSubchannelArgs); assertThat(res.getStatus().isOk()).isTrue(); assertThat(res.getSubchannel()).isNull(); + assertThat(res.getDelayReasonToken()).isEqualTo("rls:lookup_pending"); // Cache is warm, but still unconnected res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); diff --git a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java index 22940e875ac..33097cce31f 100644 --- a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java +++ b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java @@ -41,8 +41,9 @@ * EquivalentAddressGroup}s from the {@link NameResolver}. */ final class RoundRobinLoadBalancer extends MultiChildLoadBalancer { + private static final PickResult CONNECTING_RESULT = PickResult.withNoResult("round_robin:connecting"); private final AtomicInteger sequence = new AtomicInteger(new Random().nextInt()); - private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); + private SubchannelPicker currentPicker = new FixedResultPicker(CONNECTING_RESULT); public RoundRobinLoadBalancer(Helper helper) { super(helper); @@ -68,7 +69,7 @@ protected void updateOverallBalancingState() { } if (isConnecting) { - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState(CONNECTING, new FixedResultPicker(CONNECTING_RESULT)); } else { updateBalancingState(TRANSIENT_FAILURE, createReadyPicker(getChildLbStates())); } diff --git a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java index 18854ca1bb6..895cf9b4251 100644 --- a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java +++ b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java @@ -86,7 +86,7 @@ public class RoundRobinLoadBalancerTest { private static final Attributes.Key MAJOR_KEY = Attributes.Key.create("major-key"); private static final SubchannelPicker EMPTY_PICKER = - new FixedResultPicker(PickResult.withNoResult()); + new FixedResultPicker(PickResult.withNoResult("round_robin:connecting")); @Rule public final MockitoRule mocks = MockitoJUnit.rule(); diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index f6ee60ab1ef..8bda76a5e68 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -119,6 +119,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { errorPrefix() + "Unable to find non-dynamic cluster")); } // The dynamic cluster must not have loaded yet + helper.updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult("cds:discovery_pending"))); return Status.OK; } if (!clusterConfigOr.hasValue()) { diff --git a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java index 6e4566de76d..d5d000c0dec 100644 --- a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java @@ -322,7 +322,17 @@ public void updateBalancingState(final ConnectivityState newState, } ConnectivityState oldState = connectivityState; connectivityState = newState; - picker = newPicker; + picker = new SubchannelPicker() { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + PickResult childResult = newPicker.pickSubchannel(args); + if (!childResult.hasResult() && childResult.getDelayReasonToken() != null) { + return PickResult.withNoResult( + "priority_" + priority + ":" + childResult.getDelayReasonToken()); + } + return childResult; + } + }; if (deletionTimer != null && deletionTimer.isPending()) { return; diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index 513f4d643ea..5f15658128b 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -356,6 +356,8 @@ public static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) { } private static final class RingHashPicker extends SubchannelPicker { + private static final PickResult RING_HASH_CONNECTING_RESULT = + PickResult.withNoResult("ring_hash:connecting"); private final SynchronizationContext syncContext; private final List ring; // Avoid synchronization between pickSubchannel and subchannel's connectivity state change, @@ -453,7 +455,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { // RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs // are failed unless there is a READY connection. if (subchannelView.connectivityState == CONNECTING) { - return PickResult.withNoResult(); + return RING_HASH_CONNECTING_RESULT; } if (subchannelView.connectivityState == IDLE) { @@ -463,7 +465,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } }); - return PickResult.withNoResult(); // Indicates that this should be retried after backoff + return RING_HASH_CONNECTING_RESULT; // Indicates that this should be retried after backoff } } } else { @@ -487,7 +489,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } } if (requestedConnection) { - return PickResult.withNoResult(); + return RING_HASH_CONNECTING_RESULT; } } diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index ff4813fe6a8..e179446f715 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -343,6 +343,28 @@ public void dynamicCluster() { assertThat(this.lastXdsConfig.getClusters()).doesNotContainKey(clusterName); } + @Test + public void discoverDynamicCluster_pending_emitsToken() { + String clusterName = "cluster2"; + CdsConfig cdsConfig = new CdsConfig(clusterName, /*dynamic=*/ true); + + XdsConfig mockXdsConfig = mock(XdsConfig.class); + when(mockXdsConfig.getClusters()).thenReturn(ImmutableMap.of()); + + loadBalancer.acceptResolvedAddresses(ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes(Attributes.newBuilder() + .set(XdsAttributes.XDS_CONFIG, mockXdsConfig) + .set(XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY, xdsDepManager) + .build()) + .setLoadBalancingPolicyConfig(cdsConfig) + .build()); + + verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getDelayReasonToken()).isEqualTo("cds:discovery_pending"); + } + @Test public void discoverAggregateCluster_createsPriorityLbPolicy() { CdsLoadBalancerProvider cdsLoadBalancerProvider = new CdsLoadBalancerProvider(lbRegistry); diff --git a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java index beb568be9ce..56feb08f02b 100644 --- a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java @@ -910,6 +910,36 @@ public void noDuplicateOverallBalancingStateUpdate() { verify(helper, times(4)).updateBalancingState(any(), any()); } + @Test + public void priorityPicker_prependsToken() throws Exception { + PriorityChildConfig priorityChildConfig0 = + new PriorityChildConfig(newChildConfig(fooLbProvider, new Object()), true); + PriorityLbConfig priorityLbConfig = + new PriorityLbConfig(ImmutableMap.of("p0", priorityChildConfig0), ImmutableList.of("p0")); + + priorityLb.acceptResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setLoadBalancingPolicyConfig(priorityLbConfig) + .build()); + + Helper helper0 = Iterables.getOnlyElement(fooHelpers); // priority p0 + + SubchannelPicker mockChildPicker = mock(SubchannelPicker.class); + when(mockChildPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("child_token")); + + helper0.updateBalancingState(CONNECTING, mockChildPicker); + + verify(helper, atLeastOnce()) + .updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + + SubchannelPicker priorityPicker = pickerCaptor.getValue(); + PickResult result = priorityPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + + assertThat(result.getDelayReasonToken()).isEqualTo("priority_p0:child_token"); + } + private void assertLatestConnectivityState(ConnectivityState expectedState) { verify(helper, atLeastOnce()) .updateBalancingState(connectivityStateCaptor.capture(), pickerCaptor.capture()); diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index b515ed81158..d7a7892c57f 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -160,6 +160,7 @@ public void subchannelLazyConnectUntilPicked() { PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); + assertThat(result.getDelayReasonToken()).isEqualTo("ring_hash:connecting"); Subchannel subchannel = Iterables.getOnlyElement(subchannels.values()); int expectedTimes = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() && !PickFirstLoadBalancerProvider.isEnabledHappyEyeballs() ? 1 : 2; @@ -524,6 +525,7 @@ public void pickWithRandomHash_atLeastOneSubchannelConnecting() { PickResult result = picker.pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); // buffer request + assertThat(result.getDelayReasonToken()).isEqualTo("ring_hash:connecting"); verifyConnection(0); } @@ -546,6 +548,7 @@ public void pickWithRandomHash_firstSubchannelInTransientFailure_remainingSubcha PickResult result = picker.pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); // buffer request + assertThat(result.getDelayReasonToken()).isEqualTo("ring_hash:connecting"); verifyConnection(1); } @@ -1161,6 +1164,44 @@ public void tfWithReadyChild_doesNotTriggerIdleChildConnection() { assertThat(connectionRequestedQueue.poll()).isNull(); } + @Test + public void ringHashPicker_passesThroughChildToken() throws Exception { + final SubchannelPicker mockChildPicker = mock(SubchannelPicker.class); + when(mockChildPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("child_delay_token")); + + loadBalancer = new RingHashLoadBalancer(helper, random) { + @Override + protected ChildLbState createChildLbState(Object key) { + return new ChildLbState(key, pickFirstLbProvider) { + @Override + public SubchannelPicker getCurrentPicker() { + return mockChildPicker; + } + + @Override + public ConnectivityState getCurrentState() { + return READY; + } + }; + } + }; + + RingHashConfig config = new RingHashConfig(10, 100, ""); + List servers = createWeightedServerAddrs(1); + + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + + verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + + PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid()); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + + assertThat(result.getDelayReasonToken()).isEqualTo("child_delay_token"); + } + private List initializeLbSubchannels(RingHashConfig config, List servers, InitializationFlags... initFlags) { From c38ce1dd8f2df6c00a566db8f6cf7b50ee2d3c51 Mon Sep 17 00:00:00 2001 From: agravator Date: Thu, 14 May 2026 09:55:07 +0530 Subject: [PATCH 2/2] fix: tests --- .../grpc/internal/DelayedClientTransport.java | 3 +- .../ForwardingClientStreamTracer.java | 10 ++++ .../grpc/internal/PickFirstLoadBalancer.java | 3 +- .../internal/DelayedClientTransportTest.java | 3 +- .../util/ForwardingClientStreamTracer.java | 10 ++++ .../io/grpc/util/RoundRobinLoadBalancer.java | 3 +- .../java/io/grpc/xds/CdsLoadBalancer2.java | 4 +- .../io/grpc/xds/PriorityLoadBalancer.java | 53 +++++++++++++++---- .../io/grpc/xds/RingHashLoadBalancer.java | 3 +- .../io/grpc/xds/CdsLoadBalancer2Test.java | 16 ++++-- .../io/grpc/xds/PriorityLoadBalancerTest.java | 13 ++--- .../io/grpc/xds/RingHashLoadBalancerTest.java | 37 ------------- 12 files changed, 93 insertions(+), 65 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index d979f50a648..b9269350088 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -367,7 +367,8 @@ private class PendingStream extends DelayedStream { private volatile Status lastPickStatus; @Nullable private String delayReasonToken; - private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, @Nullable String initialToken) { + private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, + @Nullable String initialToken) { super("connecting_and_lb"); this.args = args; this.tracers = tracers; diff --git a/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java b/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java index e7679ea14cc..e4c2b3b9933 100644 --- a/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java +++ b/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java @@ -39,6 +39,16 @@ public void createPendingStream() { delegate().createPendingStream(); } + @Override + public void delayStarted(String reasonToken) { + delegate().delayStarted(reasonToken); + } + + @Override + public void delayEnded() { + delegate().delayEnded(); + } + @Override public void outboundHeaders() { delegate().outboundHeaders(); diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java index b8e501da561..4111700fffe 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java @@ -38,7 +38,8 @@ * list and sticking to the first that works. */ final class PickFirstLoadBalancer extends LoadBalancer { - private static final PickResult CONNECTING_RESULT = PickResult.withNoResult("pick_first:connecting"); + private static final PickResult CONNECTING_RESULT = + PickResult.withNoResult("pick_first:connecting"); private final Helper helper; private Subchannel subchannel; private ConnectivityState currentState = IDLE; diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 5891d26f344..8f34295a701 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -782,7 +782,7 @@ public void streamDelayMetrics() { .thenReturn(PickResult.withNoResult("pick_first:connecting")); delayedTransport.reprocess(connectingPicker); - ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers); + delayedTransport.newStream(method, headers, callOptions, customTracers); InOrder inOrder = inOrder(mockTracer); inOrder.verify(mockTracer).delayStarted("pick_first:connecting"); @@ -812,6 +812,7 @@ public void streamDelayMetrics_cancelled() { delayedTransport.reprocess(connectingPicker); ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers); + stream.start(streamListener); verify(mockTracer).delayStarted("pick_first:connecting"); diff --git a/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java b/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java index 9c9998571e5..1bf24b12a19 100644 --- a/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java +++ b/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java @@ -38,6 +38,16 @@ public void createPendingStream() { delegate().createPendingStream(); } + @Override + public void delayStarted(String reasonToken) { + delegate().delayStarted(reasonToken); + } + + @Override + public void delayEnded() { + delegate().delayEnded(); + } + @Override public void outboundHeaders() { delegate().outboundHeaders(); diff --git a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java index 33097cce31f..ab0b2c49c21 100644 --- a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java +++ b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java @@ -41,7 +41,8 @@ * EquivalentAddressGroup}s from the {@link NameResolver}. */ final class RoundRobinLoadBalancer extends MultiChildLoadBalancer { - private static final PickResult CONNECTING_RESULT = PickResult.withNoResult("round_robin:connecting"); + private static final PickResult CONNECTING_RESULT = + PickResult.withNoResult("round_robin:connecting"); private final AtomicInteger sequence = new AtomicInteger(new Random().nextInt()); private SubchannelPicker currentPicker = new FixedResultPicker(CONNECTING_RESULT); diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index 8bda76a5e68..8be155ec0f8 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -17,6 +17,7 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.xds.XdsLbPolicies.CDS_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; @@ -119,7 +120,8 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { errorPrefix() + "Unable to find non-dynamic cluster")); } // The dynamic cluster must not have loaded yet - helper.updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult("cds:discovery_pending"))); + helper.updateBalancingState( + CONNECTING, new FixedResultPicker(PickResult.withNoResult("cds:discovery_pending"))); return Status.OK; } if (!clusterConfigOr.hasValue()) { diff --git a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java index d5d000c0dec..ea26c8cc2bc 100644 --- a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java @@ -322,17 +322,11 @@ public void updateBalancingState(final ConnectivityState newState, } ConnectivityState oldState = connectivityState; connectivityState = newState; - picker = new SubchannelPicker() { - @Override - public PickResult pickSubchannel(PickSubchannelArgs args) { - PickResult childResult = newPicker.pickSubchannel(args); - if (!childResult.hasResult() && childResult.getDelayReasonToken() != null) { - return PickResult.withNoResult( - "priority_" + priority + ":" + childResult.getDelayReasonToken()); - } - return childResult; - } - }; + if (newState == CONNECTING || newState == IDLE) { + picker = new PriorityPicker(newPicker, priority); + } else { + picker = newPicker; + } if (deletionTimer != null && deletionTimer.isPending()) { return; @@ -367,4 +361,41 @@ protected Helper delegate() { } } } + + private static final class PriorityPicker extends SubchannelPicker { + private final SubchannelPicker delegate; + private final String priority; + + PriorityPicker(SubchannelPicker delegate, String priority) { + this.delegate = com.google.common.base.Preconditions.checkNotNull(delegate, "delegate"); + this.priority = com.google.common.base.Preconditions.checkNotNull(priority, "priority"); + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + PickResult childResult = delegate.pickSubchannel(args); + if (!childResult.hasResult() && childResult.getDelayReasonToken() != null) { + return PickResult.withNoResult( + "priority_" + priority + ":" + childResult.getDelayReasonToken()); + } + return childResult; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PriorityPicker that = (PriorityPicker) o; + return delegate.equals(that.delegate) && priority.equals(that.priority); + } + + @Override + public int hashCode() { + return java.util.Objects.hash(delegate, priority); + } + } } diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index 5f15658128b..15cd5dba621 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -465,7 +465,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } }); - return RING_HASH_CONNECTING_RESULT; // Indicates that this should be retried after backoff + // Indicates that this should be retried after backoff + return RING_HASH_CONNECTING_RESULT; } } } else { diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index e179446f715..51e0d08f223 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -348,19 +348,25 @@ public void discoverDynamicCluster_pending_emitsToken() { String clusterName = "cluster2"; CdsConfig cdsConfig = new CdsConfig(clusterName, /*dynamic=*/ true); - XdsConfig mockXdsConfig = mock(XdsConfig.class); - when(mockXdsConfig.getClusters()).thenReturn(ImmutableMap.of()); + XdsConfig xdsConfig = new XdsConfig(null, null, null, ImmutableMap.of()); loadBalancer.acceptResolvedAddresses(ResolvedAddresses.newBuilder() .setAddresses(Collections.emptyList()) .setAttributes(Attributes.newBuilder() - .set(XdsAttributes.XDS_CONFIG, mockXdsConfig) - .set(XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY, xdsDepManager) + .set(XdsAttributes.XDS_CONFIG, xdsConfig) + .set( + XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY, + new XdsConfig.XdsClusterSubscriptionRegistry() { + @Override + public XdsConfig.Subscription subscribeToCluster(String clusterName) { + return mock(XdsConfig.Subscription.class); + } + }) .build()) .setLoadBalancingPolicyConfig(cdsConfig) .build()); - verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verify(helper).updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); assertThat(result.getDelayReasonToken()).isEqualTo("cds:discovery_pending"); } diff --git a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java index 56feb08f02b..6f0db55a8a7 100644 --- a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java @@ -531,7 +531,8 @@ public void connectingResetFailOverIfSeenReadyOrIdleSinceTransientFailure() { .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); // Nothing important about this verify, other than to provide a baseline - verify(helper).updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); + verify(helper, times(2)) + .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); assertThat(fooBalancers).hasSize(1); assertThat(fooHelpers).hasSize(1); Helper helper0 = Iterables.getOnlyElement(fooHelpers); @@ -547,7 +548,7 @@ public void connectingResetFailOverIfSeenReadyOrIdleSinceTransientFailure() { helper0.updateBalancingState( CONNECTING, EMPTY_PICKER); - verify(helper, times(2)) + verify(helper, times(3)) .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); // failover happens @@ -573,7 +574,7 @@ public void failoverTimerNotRestartedOnDupConnecting() { .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); // Nothing important about this verify, other than to provide a baseline - inOrder.verify(helper) + inOrder.verify(helper, times(2)) .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); assertThat(fooBalancers).hasSize(1); assertThat(fooHelpers).hasSize(1); @@ -591,7 +592,7 @@ public void failoverTimerNotRestartedOnDupConnecting() { fakeClock.forwardTime(5, TimeUnit.SECONDS); assertThat(fooBalancers).hasSize(2); assertThat(fooHelpers).hasSize(2); - inOrder.verify(helper, times(2)) + inOrder.verify(helper, times(3)) .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); Helper helper1 = Iterables.getLast(fooHelpers); @@ -869,7 +870,7 @@ public void raceBetweenShutdownAndChildLbBalancingStateUpdate() { .setAddresses(ImmutableList.of()) .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); - verify(helper).updateBalancingState(eq(CONNECTING), isA(SubchannelPicker.class)); + verify(helper, times(2)).updateBalancingState(eq(CONNECTING), isA(SubchannelPicker.class)); // LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first, // any further balancing state update should be ignored. @@ -907,7 +908,7 @@ public void noDuplicateOverallBalancingStateUpdate() { .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); - verify(helper, times(4)).updateBalancingState(any(), any()); + verify(helper, times(6)).updateBalancingState(any(), any()); } @Test diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index d7a7892c57f..931d1f4df8e 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -1164,43 +1164,6 @@ public void tfWithReadyChild_doesNotTriggerIdleChildConnection() { assertThat(connectionRequestedQueue.poll()).isNull(); } - @Test - public void ringHashPicker_passesThroughChildToken() throws Exception { - final SubchannelPicker mockChildPicker = mock(SubchannelPicker.class); - when(mockChildPicker.pickSubchannel(any(PickSubchannelArgs.class))) - .thenReturn(PickResult.withNoResult("child_delay_token")); - - loadBalancer = new RingHashLoadBalancer(helper, random) { - @Override - protected ChildLbState createChildLbState(Object key) { - return new ChildLbState(key, pickFirstLbProvider) { - @Override - public SubchannelPicker getCurrentPicker() { - return mockChildPicker; - } - - @Override - public ConnectivityState getCurrentState() { - return READY; - } - }; - } - }; - - RingHashConfig config = new RingHashConfig(10, 100, ""); - List servers = createWeightedServerAddrs(1); - - loadBalancer.acceptResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); - - verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - - PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid()); - PickResult result = pickerCaptor.getValue().pickSubchannel(args); - - assertThat(result.getDelayReasonToken()).isEqualTo("child_delay_token"); - } private List initializeLbSubchannels(RingHashConfig config, List servers, InitializationFlags... initFlags) {