From a1dcd44841ac847128d8b1384ddb10a4ccbf6fec Mon Sep 17 00:00:00 2001 From: Oskar Arvidsson Date: Tue, 26 May 2026 15:43:53 -0400 Subject: [PATCH] fix(pubsub): make process span current during receiveMessage() The subscribe process span was created but never set as the current context before calling MessageReceiver.receiveMessage(). This meant user code inside the callback could not create child spans under the process span, and downstream calls were not parented correctly. Wrap the receiveMessage() calls in MessageDispatcher with Span.makeCurrent() so Context.current() contains the process span during user code execution. Add a getter for the process span on PubsubMessageWrapper and a test verifying child span parenting. --- .../cloud/pubsub/v1/MessageDispatcher.java | 28 +++++--- .../cloud/pubsub/v1/PubsubMessageWrapper.java | 4 ++ .../pubsub/v1/MessageDispatcherTest.java | 67 +++++++++++++++++++ 3 files changed, 88 insertions(+), 11 deletions(-) diff --git a/java-pubsub/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/java-pubsub/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index f2f7cd2e7e61..a44d68824749 100644 --- a/java-pubsub/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/java-pubsub/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -29,6 +29,8 @@ import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.ReceivedMessage; import com.google.pubsub.v1.SubscriptionName; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Scope; import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; @@ -688,17 +690,21 @@ public void run() { messageWrapper, ackHandler.ackRequestData.getAckId(), exactlyOnceDeliveryEnabled.get()); - if (shouldSetMessageFuture()) { - // This is the message future that is propagated to the user - SettableApiFuture messageFuture = - ackHandler.getMessageFutureIfExists(); - final AckReplyConsumerWithResponse ackReplyConsumerWithResponse = - new AckReplyConsumerWithResponseImpl(ackReplySettableApiFuture, messageFuture); - receiverWithAckResponse.receiveMessage(message, ackReplyConsumerWithResponse); - } else { - final AckReplyConsumer ackReplyConsumer = - new AckReplyConsumerImpl(ackReplySettableApiFuture); - receiver.receiveMessage(message, ackReplyConsumer); + final Span processSpan = messageWrapper.getSubscribeProcessSpan(); + try (Scope ignored = processSpan != null ? processSpan.makeCurrent() : Scope.noop()) { + if (shouldSetMessageFuture()) { + // This is the message future that is propagated to the user + SettableApiFuture messageFuture = + ackHandler.getMessageFutureIfExists(); + final AckReplyConsumerWithResponse ackReplyConsumerWithResponse = + new AckReplyConsumerWithResponseImpl( + ackReplySettableApiFuture, messageFuture); + receiverWithAckResponse.receiveMessage(message, ackReplyConsumerWithResponse); + } else { + final AckReplyConsumer ackReplyConsumer = + new AckReplyConsumerImpl(ackReplySettableApiFuture); + receiver.receiveMessage(message, ackReplyConsumer); + } } } catch (Exception e) { ackReplySettableApiFuture.setException(e); diff --git a/java-pubsub/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java b/java-pubsub/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java index 19864a26f5a1..c5922ba2e132 100644 --- a/java-pubsub/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java +++ b/java-pubsub/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java @@ -177,6 +177,10 @@ void setSubscribeSchedulerSpan(Span span) { this.subscribeSchedulerSpan = span; } + Span getSubscribeProcessSpan() { + return subscribeProcessSpan; + } + void setSubscribeProcessSpan(Span span) { this.subscribeProcessSpan = span; } diff --git a/java-pubsub/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/java-pubsub/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 1285fadd5938..1167c4d1a069 100644 --- a/java-pubsub/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/java-pubsub/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -29,10 +29,16 @@ import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.ReceivedMessage; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; +import io.opentelemetry.sdk.trace.data.SpanData; import java.time.Duration; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -76,6 +82,8 @@ public class MessageDispatcherTest { private static final Duration ACK_EXPIRATION_PADDING_DEFAULT = Subscriber.ACK_EXPIRATION_PADDING_DEFAULT; + @Rule public final OpenTelemetryRule openTelemetryTesting = OpenTelemetryRule.create(); + private Distribution mockAckLatencyDistribution; private MessageDispatcher.AckProcessor mockAckProcessor; @@ -950,4 +958,63 @@ public void testAckDuringNackImmediatelyShutdown() throws Exception { .sendAckOperations( argThat(new CustomArgumentMatchers.AckRequestDataListMatcher(Collections.emptyList()))); } + + @Test + public void testProcessSpanIsCurrentDuringReceiveMessage() { + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + OpenTelemetryPubsubTracer pubsubTracer = + new OpenTelemetryPubsubTracer(openTelemetryTracer, true); + + // Capture the current span inside the receiveMessage callback. + AtomicReference capturedSpan = new AtomicReference<>(); + MessageReceiver capturingReceiver = + (message, ackReplyConsumer) -> { + capturedSpan.set(Span.current()); + ackReplyConsumer.ack(); + }; + + MessageDispatcher messageDispatcher = + getMessageDispatcherWithTracer(capturingReceiver, pubsubTracer); + + messageDispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); + + // The captured span should be valid and should be the process span created by the dispatcher. + Span span = capturedSpan.get(); + assertThat(span).isNotNull(); + assertThat(span.getSpanContext().isValid()).isTrue(); + + List allSpans = openTelemetryTesting.getSpans(); + boolean foundProcessSpan = + allSpans.stream() + .anyMatch( + s -> + s.getName().contains("process") + && s.getSpanContext().equals(span.getSpanContext())); + assertThat(foundProcessSpan).isTrue(); + } + + private MessageDispatcher getMessageDispatcherWithTracer( + MessageReceiver messageReceiver, OpenTelemetryPubsubTracer tracer) { + MessageDispatcher messageDispatcher = + MessageDispatcher.newBuilder(messageReceiver) + .setAckProcessor(mockAckProcessor) + .setAckExpirationPadding(ACK_EXPIRATION_PADDING_DEFAULT) + .setMaxAckExtensionPeriod(MAX_ACK_EXTENSION_PERIOD) + .setMinDurationPerAckExtension(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION) + .setMinDurationPerAckExtensionDefaultUsed(true) + .setMaxDurationPerAckExtension(Subscriber.DEFAULT_MAX_ACK_DEADLINE_EXTENSION) + .setMaxDurationPerAckExtensionDefaultUsed(true) + .setAckLatencyDistribution(mock(Distribution.class)) + .setFlowController(mock(FlowController.class)) + .setExecutor(MoreExecutors.directExecutor()) + .setSubscriptionName(MOCK_SUBSCRIPTION_NAME) + .setSystemExecutor(systemExecutor) + .setApiClock(clock) + .setTracer(tracer) + .setSubscriberShutdownSettings(SubscriberShutdownSettings.newBuilder().build()) + .build(); + + messageDispatcher.setMessageDeadlineSeconds(MIN_ACK_DEADLINE_SECONDS); + return messageDispatcher; + } }