diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingActivityClientInterceptor.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingActivityClientInterceptor.java new file mode 100644 index 000000000..0addec6d5 --- /dev/null +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingActivityClientInterceptor.java @@ -0,0 +1,30 @@ +package io.temporal.opentracing; + +import io.temporal.common.interceptors.ActivityClientCallsInterceptor; +import io.temporal.common.interceptors.ActivityClientInterceptorBase; +import io.temporal.opentracing.internal.ContextAccessor; +import io.temporal.opentracing.internal.OpenTracingActivityClientCallsInterceptor; +import io.temporal.opentracing.internal.SpanFactory; + +public class OpenTracingActivityClientInterceptor extends ActivityClientInterceptorBase { + private final OpenTracingOptions options; + private final SpanFactory spanFactory; + private final ContextAccessor contextAccessor; + + public OpenTracingActivityClientInterceptor() { + this(OpenTracingOptions.getDefaultInstance()); + } + + public OpenTracingActivityClientInterceptor(OpenTracingOptions options) { + this.options = options; + this.spanFactory = new SpanFactory(options); + this.contextAccessor = new ContextAccessor(options); + } + + @Override + public ActivityClientCallsInterceptor activityClientCallsInterceptor( + ActivityClientCallsInterceptor next) { + return new OpenTracingActivityClientCallsInterceptor( + next, options, spanFactory, contextAccessor); + } +} diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanCreationContext.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanCreationContext.java index a2756afdd..c419a0497 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanCreationContext.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanCreationContext.java @@ -14,6 +14,7 @@ public class SpanCreationContext { private final String runId; private final String parentWorkflowId; private final String parentRunId; + private final String activityId; private SpanCreationContext( SpanOperationType spanOperationType, @@ -21,13 +22,15 @@ private SpanCreationContext( String workflowId, String runId, String parentWorkflowId, - String parentRunId) { + String parentRunId, + String activityId) { this.spanOperationType = spanOperationType; this.actionName = actionName; this.workflowId = workflowId; this.runId = runId; this.parentWorkflowId = parentWorkflowId; this.parentRunId = parentRunId; + this.activityId = activityId; } public SpanOperationType getSpanOperationType() { @@ -59,6 +62,10 @@ public String getParentRunId() { return parentRunId; } + public @Nullable String getActivityId() { + return activityId; + } + public static Builder newBuilder() { return new Builder(); } @@ -70,6 +77,7 @@ public static final class Builder { private String runId; private String parentWorkflowId; private String parentRunId; + private String activityId; private Builder() {} @@ -103,9 +111,20 @@ public Builder setParentRunId(String parentRunId) { return this; } + public Builder setActivityId(String activityId) { + this.activityId = activityId; + return this; + } + public SpanCreationContext build() { return new SpanCreationContext( - spanOperationType, actionName, workflowId, runId, parentWorkflowId, parentRunId); + spanOperationType, + actionName, + workflowId, + runId, + parentWorkflowId, + parentRunId, + activityId); } } } diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java index 2f8a27429..36e8e291b 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java @@ -16,6 +16,14 @@ public enum SpanOperationType { HANDLE_SIGNAL("HandleSignal"), HANDLE_UPDATE("HandleUpdate"), START_NEXUS_OPERATION("StartNexusOperation"), + START_STANDALONE_ACTIVITY("StartStandaloneActivity"), + RUN_STANDALONE_ACTIVITY("RunStandaloneActivity"), + GET_STANDALONE_ACTIVITY_RESULT("GetStandaloneActivityResult"), + DESCRIBE_STANDALONE_ACTIVITY("DescribeStandaloneActivity"), + CANCEL_STANDALONE_ACTIVITY("CancelStandaloneActivity"), + TERMINATE_STANDALONE_ACTIVITY("TerminateStandaloneActivity"), + LIST_STANDALONE_ACTIVITIES("ListStandaloneActivities"), + COUNT_STANDALONE_ACTIVITIES("CountStandaloneActivities"), RUN_START_NEXUS_OPERATION("RunStartNexusOperationHandler"), RUN_CANCEL_NEXUS_OPERATION("RunCancelNexusOperationHandler"); diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/StandardTagNames.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/StandardTagNames.java index de9cf3f4e..d1fb48e47 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/StandardTagNames.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/StandardTagNames.java @@ -3,6 +3,7 @@ public class StandardTagNames { public static final String WORKFLOW_ID = "workflowId"; public static final String RUN_ID = "runId"; + public static final String ACTIVITY_ID = "activityId"; public static final String PARENT_WORKFLOW_ID = "parentWorkflowId"; public static final String PARENT_RUN_ID = "parentRunId"; diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java index 1734f3a36..d0978e457 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java @@ -84,6 +84,15 @@ protected Map getSpanTags(SpanCreationContext context) { return ImmutableMap.of( StandardTagNames.WORKFLOW_ID, context.getWorkflowId(), StandardTagNames.RUN_ID, context.getRunId()); + case START_STANDALONE_ACTIVITY: + case RUN_STANDALONE_ACTIVITY: + case GET_STANDALONE_ACTIVITY_RESULT: + case DESCRIBE_STANDALONE_ACTIVITY: + case CANCEL_STANDALONE_ACTIVITY: + case TERMINATE_STANDALONE_ACTIVITY: + return ImmutableMap.of(StandardTagNames.ACTIVITY_ID, context.getActivityId()); + case LIST_STANDALONE_ACTIVITIES: + case COUNT_STANDALONE_ACTIVITIES: case RUN_START_NEXUS_OPERATION: case RUN_CANCEL_NEXUS_OPERATION: case HANDLE_QUERY: diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityClientCallsInterceptor.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityClientCallsInterceptor.java new file mode 100644 index 000000000..b66e37faa --- /dev/null +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityClientCallsInterceptor.java @@ -0,0 +1,151 @@ +package io.temporal.opentracing.internal; + +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.Tracer; +import io.temporal.common.interceptors.ActivityClientCallsInterceptor; +import io.temporal.common.interceptors.ActivityClientCallsInterceptorBase; +import io.temporal.opentracing.OpenTracingOptions; +import io.temporal.opentracing.SpanOperationType; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; + +public class OpenTracingActivityClientCallsInterceptor extends ActivityClientCallsInterceptorBase { + private final SpanFactory spanFactory; + private final Tracer tracer; + private final ContextAccessor contextAccessor; + + public OpenTracingActivityClientCallsInterceptor( + ActivityClientCallsInterceptor next, + OpenTracingOptions options, + SpanFactory spanFactory, + ContextAccessor contextAccessor) { + super(next); + this.spanFactory = spanFactory; + this.tracer = options.getTracer(); + this.contextAccessor = contextAccessor; + } + + @Override + public StartActivityOutput startActivity(StartActivityInput input) { + Span activityStartSpan = + contextAccessor.writeSpanContextToHeader( + () -> + spanFactory + .createStandaloneActivityStartSpan( + tracer, input.getActivityType(), input.getOptions().getId()) + .start(), + input.getHeader(), + tracer); + try (Scope ignored = tracer.scopeManager().activate(activityStartSpan)) { + return super.startActivity(input); + } finally { + activityStartSpan.finish(); + } + } + + @Override + public GetActivityResultOutput getActivityResult(GetActivityResultInput input) + throws TimeoutException { + Span span = + spanFactory + .createStandaloneActivityOperationSpan( + tracer, SpanOperationType.GET_STANDALONE_ACTIVITY_RESULT, input.getActivityId()) + .start(); + try (Scope ignored = tracer.scopeManager().activate(span)) { + return super.getActivityResult(input); + } finally { + span.finish(); + } + } + + @Override + public CompletableFuture> getActivityResultAsync( + GetActivityResultInput input) { + Span span = + spanFactory + .createStandaloneActivityOperationSpan( + tracer, SpanOperationType.GET_STANDALONE_ACTIVITY_RESULT, input.getActivityId()) + .start(); + try (Scope ignored = tracer.scopeManager().activate(span)) { + return super.getActivityResultAsync(input) + .whenComplete( + (result, throwable) -> { + span.finish(); + }); + } catch (Throwable t) { + span.finish(); + throw t; + } + } + + @Override + public DescribeActivityOutput describeActivity(DescribeActivityInput input) { + Span span = + spanFactory + .createStandaloneActivityOperationSpan( + tracer, SpanOperationType.DESCRIBE_STANDALONE_ACTIVITY, input.getId()) + .start(); + try (Scope ignored = tracer.scopeManager().activate(span)) { + return super.describeActivity(input); + } finally { + span.finish(); + } + } + + @Override + public CancelActivityOutput cancelActivity(CancelActivityInput input) { + Span span = + spanFactory + .createStandaloneActivityOperationSpan( + tracer, SpanOperationType.CANCEL_STANDALONE_ACTIVITY, input.getId()) + .start(); + try (Scope ignored = tracer.scopeManager().activate(span)) { + return super.cancelActivity(input); + } finally { + span.finish(); + } + } + + @Override + public TerminateActivityOutput terminateActivity(TerminateActivityInput input) { + Span span = + spanFactory + .createStandaloneActivityOperationSpan( + tracer, SpanOperationType.TERMINATE_STANDALONE_ACTIVITY, input.getId()) + .start(); + try (Scope ignored = tracer.scopeManager().activate(span)) { + return super.terminateActivity(input); + } finally { + span.finish(); + } + } + + @Override + public ListActivitiesOutput listActivities(ListActivitiesInput input) { + Span span = + spanFactory + .createStandaloneActivityQuerySpan( + tracer, SpanOperationType.LIST_STANDALONE_ACTIVITIES, input.getQuery()) + .start(); + try (Scope ignored = tracer.scopeManager().activate(span)) { + return super.listActivities(input); + } finally { + span.finish(); + } + } + + @Override + public CountActivitiesOutput countActivities(CountActivitiesInput input) { + Span span = + spanFactory + .createStandaloneActivityQuerySpan( + tracer, SpanOperationType.COUNT_STANDALONE_ACTIVITIES, input.getQuery()) + .start(); + try (Scope ignored = tracer.scopeManager().activate(span)) { + return super.countActivities(input); + } finally { + span.finish(); + } + } +} diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityInboundCallsInterceptor.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityInboundCallsInterceptor.java index 2091df7ec..81b97ea68 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityInboundCallsInterceptor.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingActivityInboundCallsInterceptor.java @@ -46,14 +46,22 @@ public ActivityOutput execute(ActivityInput input) { contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer); ActivityInfo activityInfo = activityExecutionContext.getInfo(); Span activityRunSpan = - spanFactory - .createActivityRunSpan( - tracer, - activityInfo.getActivityType(), - activityInfo.getWorkflowId(), - activityInfo.getWorkflowRunId(), - rootSpanContext) - .start(); + activityInfo.isInWorkflow() + ? spanFactory + .createActivityRunSpan( + tracer, + activityInfo.getActivityType(), + activityInfo.getWorkflowId(), + activityInfo.getWorkflowRunId(), + rootSpanContext) + .start() + : spanFactory + .createStandaloneActivityRunSpan( + tracer, + activityInfo.getActivityType(), + activityInfo.getActivityId(), + rootSpanContext) + .start(); try (Scope scope = tracer.scopeManager().activate(activityRunSpan)) { return super.execute(input); } catch (Throwable t) { diff --git a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java index 945d777a6..d7f498014 100644 --- a/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java +++ b/contrib/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java @@ -186,6 +186,52 @@ public Tracer.SpanBuilder createCancelNexusOperationSpan( return createSpan(context, tracer, nexusStartSpanContext, References.FOLLOWS_FROM); } + public Tracer.SpanBuilder createStandaloneActivityStartSpan( + Tracer tracer, String activityType, String activityId) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.START_STANDALONE_ACTIVITY) + .setActionName(activityType) + .setActivityId(activityId) + .build(); + return createSpan(context, tracer, null, References.FOLLOWS_FROM); + } + + public Tracer.SpanBuilder createStandaloneActivityRunSpan( + Tracer tracer, + String activityType, + String activityId, + SpanContext activityStartSpanContext) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.RUN_STANDALONE_ACTIVITY) + .setActionName(activityType) + .setActivityId(activityId) + .build(); + return createSpan(context, tracer, activityStartSpanContext, References.FOLLOWS_FROM); + } + + public Tracer.SpanBuilder createStandaloneActivityOperationSpan( + Tracer tracer, SpanOperationType operationType, String activityId) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(operationType) + .setActionName("StandaloneActivity") + .setActivityId(activityId) + .build(); + return createSpan(context, tracer, null, References.FOLLOWS_FROM); + } + + public Tracer.SpanBuilder createStandaloneActivityQuerySpan( + Tracer tracer, SpanOperationType operationType, String query) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(operationType) + .setActionName(query) + .build(); + return createSpan(context, tracer, null, References.FOLLOWS_FROM); + } + public Tracer.SpanBuilder createWorkflowStartUpdateSpan( Tracer tracer, String updateName, String workflowId, String runId) { SpanCreationContext context = diff --git a/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityClientTracingTest.java b/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityClientTracingTest.java new file mode 100644 index 000000000..0e22ee475 --- /dev/null +++ b/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityClientTracingTest.java @@ -0,0 +1,296 @@ +package io.temporal.opentracing; + +import static org.junit.Assert.*; + +import io.opentracing.mock.MockSpan; +import io.opentracing.mock.MockTracer; +import io.opentracing.util.ThreadLocalScopeManager; +import io.temporal.api.workflowservice.v1.CountActivityExecutionsResponse; +import io.temporal.client.ActivityExecutionCount; +import io.temporal.client.StartActivityOptions; +import io.temporal.common.interceptors.ActivityClientCallsInterceptor; +import io.temporal.common.interceptors.ActivityClientCallsInterceptorBase; +import io.temporal.common.interceptors.Header; +import io.temporal.opentracing.internal.ContextAccessor; +import io.temporal.opentracing.internal.OpenTracingActivityClientCallsInterceptor; +import io.temporal.opentracing.internal.SpanFactory; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit tests for {@link OpenTracingActivityClientCallsInterceptor}. Verifies that each intercepted + * method creates a span with the expected operation name and tags. Uses a stub next-interceptor so + * no server is required. + */ +public class StandaloneActivityClientTracingTest { + + private final MockTracer mockTracer = + new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP); + + private final OpenTracingOptions otOptions = + OpenTracingOptions.newBuilder().setTracer(mockTracer).build(); + + private OpenTracingActivityClientCallsInterceptor interceptor; + + @Before + public void setUp() { + mockTracer.reset(); + interceptor = + new OpenTracingActivityClientCallsInterceptor( + new StubActivityClientCallsInterceptor(), + otOptions, + new SpanFactory(otOptions), + new ContextAccessor(otOptions)); + } + + @After + public void tearDown() { + mockTracer.reset(); + } + + @Test + public void testStartActivityCreatesSpanWithHeaderPropagation() { + StartActivityOptions opts = + StartActivityOptions.newBuilder() + .setId("act-123") + .setTaskQueue("tq") + .setScheduleToCloseTimeout(Duration.ofMinutes(1)) + .build(); + Header header = Header.empty(); + ActivityClientCallsInterceptor.StartActivityInput input = + new ActivityClientCallsInterceptor.StartActivityInput( + "MyActivity", Collections.emptyList(), opts, header); + + interceptor.startActivity(input); + + List spans = mockTracer.finishedSpans(); + assertEquals(1, spans.size()); + MockSpan span = spans.get(0); + assertEquals("StartStandaloneActivity:MyActivity", span.operationName()); + assertEquals("act-123", span.tags().get("activityId")); + assertFalse("Trace context should be propagated into header", header.getValues().isEmpty()); + } + + @Test + public void testGetActivityResultCreatesSpan() throws TimeoutException { + ActivityClientCallsInterceptor.GetActivityResultInput input = + new ActivityClientCallsInterceptor.GetActivityResultInput<>("act-456", null, String.class); + + interceptor.getActivityResult(input); + + List spans = mockTracer.finishedSpans(); + assertEquals(1, spans.size()); + MockSpan span = spans.get(0); + assertEquals("GetStandaloneActivityResult:StandaloneActivity", span.operationName()); + assertEquals("act-456", span.tags().get("activityId")); + } + + @Test + public void testGetActivityResultAsyncCreatesSpan() throws Exception { + ActivityClientCallsInterceptor.GetActivityResultInput input = + new ActivityClientCallsInterceptor.GetActivityResultInput<>("act-789", null, String.class); + + CompletableFuture> future = + interceptor.getActivityResultAsync(input); + future.get(); + + List spans = mockTracer.finishedSpans(); + assertEquals(1, spans.size()); + MockSpan span = spans.get(0); + assertEquals("GetStandaloneActivityResult:StandaloneActivity", span.operationName()); + assertEquals("act-789", span.tags().get("activityId")); + } + + @Test + public void testGetActivityResultAsyncFinishesSpanWhenNextThrowsSynchronously() { + OpenTracingActivityClientCallsInterceptor throwingInterceptor = + new OpenTracingActivityClientCallsInterceptor( + new SynchronouslyThrowingActivityClientCallsInterceptor(), + otOptions, + new SpanFactory(otOptions), + new ContextAccessor(otOptions)); + ActivityClientCallsInterceptor.GetActivityResultInput input = + new ActivityClientCallsInterceptor.GetActivityResultInput<>( + "act-throws", null, String.class); + + try { + throwingInterceptor.getActivityResultAsync(input); + fail("Expected getActivityResultAsync to throw"); + } catch (IllegalStateException expected) { + assertEquals("sync failure", expected.getMessage()); + } + + List spans = mockTracer.finishedSpans(); + assertEquals(1, spans.size()); + MockSpan span = spans.get(0); + assertEquals("GetStandaloneActivityResult:StandaloneActivity", span.operationName()); + assertEquals("act-throws", span.tags().get("activityId")); + } + + @Test + public void testDescribeActivityCreatesSpan() { + ActivityClientCallsInterceptor.DescribeActivityInput input = + new ActivityClientCallsInterceptor.DescribeActivityInput("act-desc", null); + + interceptor.describeActivity(input); + + List spans = mockTracer.finishedSpans(); + assertEquals(1, spans.size()); + MockSpan span = spans.get(0); + assertEquals("DescribeStandaloneActivity:StandaloneActivity", span.operationName()); + assertEquals("act-desc", span.tags().get("activityId")); + } + + @Test + public void testCancelActivityCreatesSpan() { + ActivityClientCallsInterceptor.CancelActivityInput input = + new ActivityClientCallsInterceptor.CancelActivityInput("act-cancel", null, "reason"); + + interceptor.cancelActivity(input); + + List spans = mockTracer.finishedSpans(); + assertEquals(1, spans.size()); + MockSpan span = spans.get(0); + assertEquals("CancelStandaloneActivity:StandaloneActivity", span.operationName()); + assertEquals("act-cancel", span.tags().get("activityId")); + } + + @Test + public void testTerminateActivityCreatesSpan() { + ActivityClientCallsInterceptor.TerminateActivityInput input = + new ActivityClientCallsInterceptor.TerminateActivityInput("act-term", null, "reason"); + + interceptor.terminateActivity(input); + + List spans = mockTracer.finishedSpans(); + assertEquals(1, spans.size()); + MockSpan span = spans.get(0); + assertEquals("TerminateStandaloneActivity:StandaloneActivity", span.operationName()); + assertEquals("act-term", span.tags().get("activityId")); + } + + @Test + public void testListActivitiesCreatesSpan() { + ActivityClientCallsInterceptor.ListActivitiesInput input = + new ActivityClientCallsInterceptor.ListActivitiesInput("TaskQueue = 'tq'"); + + interceptor.listActivities(input); + + List spans = mockTracer.finishedSpans(); + assertEquals(1, spans.size()); + MockSpan span = spans.get(0); + assertEquals("ListStandaloneActivities:TaskQueue = 'tq'", span.operationName()); + assertNull(span.tags().get("activityId")); + } + + @Test + public void testCountActivitiesCreatesSpan() { + ActivityClientCallsInterceptor.CountActivitiesInput input = + new ActivityClientCallsInterceptor.CountActivitiesInput("TaskQueue = 'tq'"); + + interceptor.countActivities(input); + + List spans = mockTracer.finishedSpans(); + assertEquals(1, spans.size()); + MockSpan span = spans.get(0); + assertEquals("CountStandaloneActivities:TaskQueue = 'tq'", span.operationName()); + assertNull(span.tags().get("activityId")); + } + + @Test + public void testStartActivitySpanIsChildOfActiveSpan() { + MockSpan parentSpan = mockTracer.buildSpan("ClientFunction").start(); + try (io.opentracing.Scope ignored = mockTracer.scopeManager().activate(parentSpan)) { + StartActivityOptions opts = + StartActivityOptions.newBuilder() + .setId("act-child") + .setTaskQueue("tq") + .setScheduleToCloseTimeout(Duration.ofMinutes(1)) + .build(); + interceptor.startActivity( + new ActivityClientCallsInterceptor.StartActivityInput( + "MyActivity", Collections.emptyList(), opts, Header.empty())); + } finally { + parentSpan.finish(); + } + + List spans = mockTracer.finishedSpans(); + assertEquals(2, spans.size()); + + MockSpan activitySpan = spans.get(0); + assertEquals("StartStandaloneActivity:MyActivity", activitySpan.operationName()); + assertEquals(parentSpan.context().spanId(), activitySpan.parentId()); + } + + private static class StubActivityClientCallsInterceptor + extends ActivityClientCallsInterceptorBase { + + StubActivityClientCallsInterceptor() { + super(null); + } + + @Override + public StartActivityOutput startActivity(StartActivityInput input) { + return new StartActivityOutput(input.getOptions().getId(), null); + } + + @Override + public GetActivityResultOutput getActivityResult(GetActivityResultInput input) + throws TimeoutException { + return new GetActivityResultOutput<>(null); + } + + @Override + public CompletableFuture> getActivityResultAsync( + GetActivityResultInput input) { + return CompletableFuture.completedFuture(new GetActivityResultOutput<>(null)); + } + + @Override + public DescribeActivityOutput describeActivity(DescribeActivityInput input) { + return new DescribeActivityOutput(null); + } + + @Override + public CancelActivityOutput cancelActivity(CancelActivityInput input) { + return new CancelActivityOutput(); + } + + @Override + public TerminateActivityOutput terminateActivity(TerminateActivityInput input) { + return new TerminateActivityOutput(); + } + + @Override + public ListActivitiesOutput listActivities(ListActivitiesInput input) { + return new ListActivitiesOutput(Stream.empty()); + } + + @Override + public CountActivitiesOutput countActivities(CountActivitiesInput input) { + return new CountActivitiesOutput( + new ActivityExecutionCount(CountActivityExecutionsResponse.getDefaultInstance())); + } + } + + private static class SynchronouslyThrowingActivityClientCallsInterceptor + extends ActivityClientCallsInterceptorBase { + + SynchronouslyThrowingActivityClientCallsInterceptor() { + super(null); + } + + @Override + public CompletableFuture> getActivityResultAsync( + GetActivityResultInput input) { + throw new IllegalStateException("sync failure"); + } + } +} diff --git a/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityWorkerTracingTest.java b/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityWorkerTracingTest.java new file mode 100644 index 000000000..90a599a83 --- /dev/null +++ b/contrib/temporal-opentracing/src/test/java/io/temporal/opentracing/StandaloneActivityWorkerTracingTest.java @@ -0,0 +1,92 @@ +package io.temporal.opentracing; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.opentracing.Span; +import io.opentracing.mock.MockSpan; +import io.opentracing.mock.MockTracer; +import io.opentracing.util.ThreadLocalScopeManager; +import io.temporal.activity.ActivityExecutionContext; +import io.temporal.activity.ActivityInfo; +import io.temporal.common.interceptors.ActivityInboundCallsInterceptor; +import io.temporal.common.interceptors.Header; +import io.temporal.opentracing.internal.ContextAccessor; +import io.temporal.opentracing.internal.OpenTracingActivityInboundCallsInterceptor; +import io.temporal.opentracing.internal.SpanFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** Unit tests for standalone activity tracing on the worker side. */ +public class StandaloneActivityWorkerTracingTest { + + private final MockTracer mockTracer = + new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP); + + private final OpenTracingOptions otOptions = + OpenTracingOptions.newBuilder().setTracer(mockTracer).build(); + + private final SpanFactory spanFactory = new SpanFactory(otOptions); + private final ContextAccessor contextAccessor = new ContextAccessor(otOptions); + + private OpenTracingActivityInboundCallsInterceptor interceptor; + + @Before + public void setUp() { + mockTracer.reset(); + interceptor = + new OpenTracingActivityInboundCallsInterceptor( + new StubActivityInboundCallsInterceptor(), otOptions, spanFactory, contextAccessor); + } + + @After + public void tearDown() { + mockTracer.reset(); + } + + @Test + public void testStandaloneActivityRunCreatesSpanWithActivityId() { + Header header = Header.empty(); + Span activityStartSpan = + contextAccessor.writeSpanContextToHeader( + () -> + spanFactory + .createStandaloneActivityStartSpan( + mockTracer, "MyStandaloneActivity", "act-run") + .start(), + header, + mockTracer); + activityStartSpan.finish(); + + ActivityExecutionContext executionContext = mock(ActivityExecutionContext.class); + ActivityInfo activityInfo = mock(ActivityInfo.class); + when(activityInfo.isInWorkflow()).thenReturn(false); + when(activityInfo.getActivityType()).thenReturn("MyStandaloneActivity"); + when(activityInfo.getActivityId()).thenReturn("act-run"); + when(executionContext.getInfo()).thenReturn(activityInfo); + + interceptor.init(executionContext); + interceptor.execute(new ActivityInboundCallsInterceptor.ActivityInput(header, new Object[0])); + + OpenTracingSpansHelper spansHelper = new OpenTracingSpansHelper(mockTracer.finishedSpans()); + MockSpan startSpan = + spansHelper.getSpanByOperationName("StartStandaloneActivity:MyStandaloneActivity"); + MockSpan runSpan = + spansHelper.getSpanByOperationName("RunStandaloneActivity:MyStandaloneActivity"); + assertEquals("act-run", runSpan.tags().get("activityId")); + assertEquals(startSpan.context().spanId(), runSpan.parentId()); + } + + private static class StubActivityInboundCallsInterceptor + implements ActivityInboundCallsInterceptor { + @Override + public void init(ActivityExecutionContext context) {} + + @Override + public ActivityOutput execute(ActivityInput input) { + return new ActivityOutput(null); + } + } +}