Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.temporal.opentracing;

import io.temporal.common.interceptors.ActivityClientCallsInterceptor;
import io.temporal.common.interceptors.ActivityClientInterceptorBase;
import io.temporal.opentracing.internal.ContextAccessor;
import io.temporal.opentracing.internal.OpenTracingActivityClientCallsInterceptor;
import io.temporal.opentracing.internal.SpanFactory;

public class OpenTracingActivityClientInterceptor extends ActivityClientInterceptorBase {
private final OpenTracingOptions options;
private final SpanFactory spanFactory;
private final ContextAccessor contextAccessor;

public OpenTracingActivityClientInterceptor() {
this(OpenTracingOptions.getDefaultInstance());
}

public OpenTracingActivityClientInterceptor(OpenTracingOptions options) {
this.options = options;
this.spanFactory = new SpanFactory(options);
this.contextAccessor = new ContextAccessor(options);
}

@Override
public ActivityClientCallsInterceptor activityClientCallsInterceptor(
ActivityClientCallsInterceptor next) {
return new OpenTracingActivityClientCallsInterceptor(
next, options, spanFactory, contextAccessor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,23 @@ public class SpanCreationContext {
private final String runId;
private final String parentWorkflowId;
private final String parentRunId;
private final String activityId;

private SpanCreationContext(
SpanOperationType spanOperationType,
String actionName,
String workflowId,
String runId,
String parentWorkflowId,
String parentRunId) {
String parentRunId,
String activityId) {
this.spanOperationType = spanOperationType;
this.actionName = actionName;
this.workflowId = workflowId;
this.runId = runId;
this.parentWorkflowId = parentWorkflowId;
this.parentRunId = parentRunId;
this.activityId = activityId;
}

public SpanOperationType getSpanOperationType() {
Expand Down Expand Up @@ -59,6 +62,10 @@ public String getParentRunId() {
return parentRunId;
}

public @Nullable String getActivityId() {
return activityId;
}

public static Builder newBuilder() {
return new Builder();
}
Expand All @@ -70,6 +77,7 @@ public static final class Builder {
private String runId;
private String parentWorkflowId;
private String parentRunId;
private String activityId;

private Builder() {}

Expand Down Expand Up @@ -103,9 +111,20 @@ public Builder setParentRunId(String parentRunId) {
return this;
}

public Builder setActivityId(String activityId) {
this.activityId = activityId;
return this;
}

public SpanCreationContext build() {
return new SpanCreationContext(
spanOperationType, actionName, workflowId, runId, parentWorkflowId, parentRunId);
spanOperationType,
actionName,
workflowId,
runId,
parentWorkflowId,
parentRunId,
activityId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ public enum SpanOperationType {
HANDLE_SIGNAL("HandleSignal"),
HANDLE_UPDATE("HandleUpdate"),
START_NEXUS_OPERATION("StartNexusOperation"),
START_STANDALONE_ACTIVITY("StartStandaloneActivity"),
RUN_STANDALONE_ACTIVITY("RunStandaloneActivity"),
GET_STANDALONE_ACTIVITY_RESULT("GetStandaloneActivityResult"),
DESCRIBE_STANDALONE_ACTIVITY("DescribeStandaloneActivity"),
CANCEL_STANDALONE_ACTIVITY("CancelStandaloneActivity"),
TERMINATE_STANDALONE_ACTIVITY("TerminateStandaloneActivity"),
LIST_STANDALONE_ACTIVITIES("ListStandaloneActivities"),
COUNT_STANDALONE_ACTIVITIES("CountStandaloneActivities"),
RUN_START_NEXUS_OPERATION("RunStartNexusOperationHandler"),
RUN_CANCEL_NEXUS_OPERATION("RunCancelNexusOperationHandler");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
public class StandardTagNames {
public static final String WORKFLOW_ID = "workflowId";
public static final String RUN_ID = "runId";
public static final String ACTIVITY_ID = "activityId";
public static final String PARENT_WORKFLOW_ID = "parentWorkflowId";
public static final String PARENT_RUN_ID = "parentRunId";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ protected Map<String, String> getSpanTags(SpanCreationContext context) {
return ImmutableMap.of(
StandardTagNames.WORKFLOW_ID, context.getWorkflowId(),
StandardTagNames.RUN_ID, context.getRunId());
case START_STANDALONE_ACTIVITY:
case RUN_STANDALONE_ACTIVITY:
case GET_STANDALONE_ACTIVITY_RESULT:
case DESCRIBE_STANDALONE_ACTIVITY:
case CANCEL_STANDALONE_ACTIVITY:
case TERMINATE_STANDALONE_ACTIVITY:
return ImmutableMap.of(StandardTagNames.ACTIVITY_ID, context.getActivityId());
case LIST_STANDALONE_ACTIVITIES:
case COUNT_STANDALONE_ACTIVITIES:
case RUN_START_NEXUS_OPERATION:
case RUN_CANCEL_NEXUS_OPERATION:
case HANDLE_QUERY:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package io.temporal.opentracing.internal;

import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.temporal.common.interceptors.ActivityClientCallsInterceptor;
import io.temporal.common.interceptors.ActivityClientCallsInterceptorBase;
import io.temporal.opentracing.OpenTracingOptions;
import io.temporal.opentracing.SpanOperationType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;

public class OpenTracingActivityClientCallsInterceptor extends ActivityClientCallsInterceptorBase {
private final SpanFactory spanFactory;
private final Tracer tracer;
private final ContextAccessor contextAccessor;

public OpenTracingActivityClientCallsInterceptor(
ActivityClientCallsInterceptor next,
OpenTracingOptions options,
SpanFactory spanFactory,
ContextAccessor contextAccessor) {
super(next);
this.spanFactory = spanFactory;
this.tracer = options.getTracer();
this.contextAccessor = contextAccessor;
}

@Override
public StartActivityOutput startActivity(StartActivityInput input) {
Span activityStartSpan =
contextAccessor.writeSpanContextToHeader(
() ->
spanFactory
.createStandaloneActivityStartSpan(
tracer, input.getActivityType(), input.getOptions().getId())
.start(),
input.getHeader(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(activityStartSpan)) {
return super.startActivity(input);
} finally {
activityStartSpan.finish();
}
}

@Override
public <R> GetActivityResultOutput<R> getActivityResult(GetActivityResultInput<R> input)
throws TimeoutException {
Span span =
spanFactory
.createStandaloneActivityOperationSpan(
tracer, SpanOperationType.GET_STANDALONE_ACTIVITY_RESULT, input.getActivityId())
.start();
try (Scope ignored = tracer.scopeManager().activate(span)) {
return super.getActivityResult(input);
} finally {
span.finish();
}
}

@Override
public <R> CompletableFuture<GetActivityResultOutput<R>> getActivityResultAsync(
GetActivityResultInput<R> input) {
Span span =
spanFactory
.createStandaloneActivityOperationSpan(
tracer, SpanOperationType.GET_STANDALONE_ACTIVITY_RESULT, input.getActivityId())
.start();
try (Scope ignored = tracer.scopeManager().activate(span)) {
return super.getActivityResultAsync(input)
.whenComplete(
(result, throwable) -> {
span.finish();
});
} catch (Throwable t) {
span.finish();
throw t;
}
}

@Override
public DescribeActivityOutput describeActivity(DescribeActivityInput input) {
Span span =
spanFactory
.createStandaloneActivityOperationSpan(
tracer, SpanOperationType.DESCRIBE_STANDALONE_ACTIVITY, input.getId())
.start();
try (Scope ignored = tracer.scopeManager().activate(span)) {
return super.describeActivity(input);
} finally {
span.finish();
}
}

@Override
public CancelActivityOutput cancelActivity(CancelActivityInput input) {
Span span =
spanFactory
.createStandaloneActivityOperationSpan(
tracer, SpanOperationType.CANCEL_STANDALONE_ACTIVITY, input.getId())
.start();
try (Scope ignored = tracer.scopeManager().activate(span)) {
return super.cancelActivity(input);
} finally {
span.finish();
}
}

@Override
public TerminateActivityOutput terminateActivity(TerminateActivityInput input) {
Span span =
spanFactory
.createStandaloneActivityOperationSpan(
tracer, SpanOperationType.TERMINATE_STANDALONE_ACTIVITY, input.getId())
.start();
try (Scope ignored = tracer.scopeManager().activate(span)) {
return super.terminateActivity(input);
} finally {
span.finish();
}
}

@Override
public ListActivitiesOutput listActivities(ListActivitiesInput input) {
Span span =
spanFactory
.createStandaloneActivityQuerySpan(
tracer, SpanOperationType.LIST_STANDALONE_ACTIVITIES, input.getQuery())
.start();
try (Scope ignored = tracer.scopeManager().activate(span)) {
return super.listActivities(input);
} finally {
span.finish();
}
}

@Override
public CountActivitiesOutput countActivities(CountActivitiesInput input) {
Span span =
spanFactory
.createStandaloneActivityQuerySpan(
tracer, SpanOperationType.COUNT_STANDALONE_ACTIVITIES, input.getQuery())
.start();
try (Scope ignored = tracer.scopeManager().activate(span)) {
return super.countActivities(input);
} finally {
span.finish();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,22 @@ public ActivityOutput execute(ActivityInput input) {
contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer);
ActivityInfo activityInfo = activityExecutionContext.getInfo();
Span activityRunSpan =
spanFactory
.createActivityRunSpan(
tracer,
activityInfo.getActivityType(),
activityInfo.getWorkflowId(),
activityInfo.getWorkflowRunId(),
rootSpanContext)
.start();
activityInfo.isInWorkflow()
? spanFactory
.createActivityRunSpan(
tracer,
activityInfo.getActivityType(),
activityInfo.getWorkflowId(),
activityInfo.getWorkflowRunId(),
rootSpanContext)
.start()
: spanFactory
.createStandaloneActivityRunSpan(
tracer,
activityInfo.getActivityType(),
activityInfo.getActivityId(),
rootSpanContext)
.start();
try (Scope scope = tracer.scopeManager().activate(activityRunSpan)) {
return super.execute(input);
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,52 @@ public Tracer.SpanBuilder createCancelNexusOperationSpan(
return createSpan(context, tracer, nexusStartSpanContext, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createStandaloneActivityStartSpan(
Tracer tracer, String activityType, String activityId) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.START_STANDALONE_ACTIVITY)
.setActionName(activityType)
.setActivityId(activityId)
.build();
return createSpan(context, tracer, null, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createStandaloneActivityRunSpan(
Tracer tracer,
String activityType,
String activityId,
SpanContext activityStartSpanContext) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.RUN_STANDALONE_ACTIVITY)
.setActionName(activityType)
.setActivityId(activityId)
.build();
return createSpan(context, tracer, activityStartSpanContext, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createStandaloneActivityOperationSpan(
Tracer tracer, SpanOperationType operationType, String activityId) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(operationType)
.setActionName("StandaloneActivity")

@444am 444am Jun 11, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ActionName has to be set for getSpanName(), but ActivityType is not provided in GetActivityResultInput / DescribeActivityInput etc. To minimize the cardinality I'm using a fixed placeholder here as of now. Let me know if you believe repeatedly using activityId for distinction would be better.

.setActivityId(activityId)
.build();
return createSpan(context, tracer, null, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createStandaloneActivityQuerySpan(
Tracer tracer, SpanOperationType operationType, String query) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(operationType)
.setActionName(query)
.build();
return createSpan(context, tracer, null, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createWorkflowStartUpdateSpan(
Tracer tracer, String updateName, String workflowId, String runId) {
SpanCreationContext context =
Expand Down
Loading