Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import com.google.pubsub.v1.SubscriptionName;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
Expand Down Expand Up @@ -688,17 +690,21 @@ public void run() {
messageWrapper,
ackHandler.ackRequestData.getAckId(),
exactlyOnceDeliveryEnabled.get());
if (shouldSetMessageFuture()) {
// This is the message future that is propagated to the user
SettableApiFuture<AckResponse> messageFuture =
ackHandler.getMessageFutureIfExists();
final AckReplyConsumerWithResponse ackReplyConsumerWithResponse =
new AckReplyConsumerWithResponseImpl(ackReplySettableApiFuture, messageFuture);
receiverWithAckResponse.receiveMessage(message, ackReplyConsumerWithResponse);
} else {
final AckReplyConsumer ackReplyConsumer =
new AckReplyConsumerImpl(ackReplySettableApiFuture);
receiver.receiveMessage(message, ackReplyConsumer);
final Span processSpan = messageWrapper.getSubscribeProcessSpan();
try (Scope ignored = processSpan != null ? processSpan.makeCurrent() : Scope.noop()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In Java, a try-with-resources statement gracefully handles a null resource by doing nothing (no close() is called and no exception is thrown). We can simplify this by using null instead of Scope.noop(), which avoids the dependency on the Scope.noop() API and is more idiomatic.

Suggested change
try (Scope ignored = processSpan != null ? processSpan.makeCurrent() : Scope.noop()) {
try (Scope ignored = processSpan != null ? processSpan.makeCurrent() : null) {

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the library is targeting Java 8, where a try-with-resources with a null resource will throw.

if (shouldSetMessageFuture()) {
// This is the message future that is propagated to the user
SettableApiFuture<AckResponse> messageFuture =
ackHandler.getMessageFutureIfExists();
final AckReplyConsumerWithResponse ackReplyConsumerWithResponse =
new AckReplyConsumerWithResponseImpl(
ackReplySettableApiFuture, messageFuture);
receiverWithAckResponse.receiveMessage(message, ackReplyConsumerWithResponse);
} else {
final AckReplyConsumer ackReplyConsumer =
new AckReplyConsumerImpl(ackReplySettableApiFuture);
receiver.receiveMessage(message, ackReplyConsumer);
}
}
} catch (Exception e) {
ackReplySettableApiFuture.setException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ void setSubscribeSchedulerSpan(Span span) {
this.subscribeSchedulerSpan = span;
}

Span getSubscribeProcessSpan() {
return subscribeProcessSpan;
}

void setSubscribeProcessSpan(Span span) {
this.subscribeProcessSpan = span;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand Down Expand Up @@ -76,6 +82,8 @@ public class MessageDispatcherTest {
private static final Duration ACK_EXPIRATION_PADDING_DEFAULT =
Subscriber.ACK_EXPIRATION_PADDING_DEFAULT;

@Rule public final OpenTelemetryRule openTelemetryTesting = OpenTelemetryRule.create();

private Distribution mockAckLatencyDistribution;

private MessageDispatcher.AckProcessor mockAckProcessor;
Expand Down Expand Up @@ -950,4 +958,63 @@ public void testAckDuringNackImmediatelyShutdown() throws Exception {
.sendAckOperations(
argThat(new CustomArgumentMatchers.AckRequestDataListMatcher(Collections.emptyList())));
}

@Test
public void testProcessSpanIsCurrentDuringReceiveMessage() {
Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
OpenTelemetryPubsubTracer pubsubTracer =
new OpenTelemetryPubsubTracer(openTelemetryTracer, true);

// Capture the current span inside the receiveMessage callback.
AtomicReference<Span> capturedSpan = new AtomicReference<>();
MessageReceiver capturingReceiver =
(message, ackReplyConsumer) -> {
capturedSpan.set(Span.current());
ackReplyConsumer.ack();
};

MessageDispatcher messageDispatcher =
getMessageDispatcherWithTracer(capturingReceiver, pubsubTracer);

messageDispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));

// The captured span should be valid and should be the process span created by the dispatcher.
Span span = capturedSpan.get();
assertThat(span).isNotNull();
assertThat(span.getSpanContext().isValid()).isTrue();

List<SpanData> allSpans = openTelemetryTesting.getSpans();
boolean foundProcessSpan =
allSpans.stream()
.anyMatch(
s ->
s.getName().contains("process")
&& s.getSpanContext().equals(span.getSpanContext()));
assertThat(foundProcessSpan).isTrue();
}

private MessageDispatcher getMessageDispatcherWithTracer(
MessageReceiver messageReceiver, OpenTelemetryPubsubTracer tracer) {
MessageDispatcher messageDispatcher =
MessageDispatcher.newBuilder(messageReceiver)
.setAckProcessor(mockAckProcessor)
.setAckExpirationPadding(ACK_EXPIRATION_PADDING_DEFAULT)
.setMaxAckExtensionPeriod(MAX_ACK_EXTENSION_PERIOD)
.setMinDurationPerAckExtension(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION)
.setMinDurationPerAckExtensionDefaultUsed(true)
.setMaxDurationPerAckExtension(Subscriber.DEFAULT_MAX_ACK_DEADLINE_EXTENSION)
.setMaxDurationPerAckExtensionDefaultUsed(true)
.setAckLatencyDistribution(mock(Distribution.class))
.setFlowController(mock(FlowController.class))
.setExecutor(MoreExecutors.directExecutor())
.setSubscriptionName(MOCK_SUBSCRIPTION_NAME)
.setSystemExecutor(systemExecutor)
.setApiClock(clock)
.setTracer(tracer)
.setSubscriberShutdownSettings(SubscriberShutdownSettings.newBuilder().build())
.build();

messageDispatcher.setMessageDeadlineSeconds(MIN_ACK_DEADLINE_SECONDS);
return messageDispatcher;
}
}
Loading