Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ jobs:
--dynamic-config-value 'component.callbacks.allowedAddresses=[{"Pattern":"localhost:7243","AllowInsecure":true}]' \
--dynamic-config-value frontend.activityAPIsEnabled=true \
--dynamic-config-value activity.enableStandalone=true \
--dynamic-config-value activity.startDelayEnabled=true \
--dynamic-config-value nexusoperation.enableStandalone=true \
--dynamic-config-value history.enableChasm=true \
--dynamic-config-value history.enableTransitionHistory=true &
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public static final class Builder {
private @Nullable String staticSummary;
private @Nullable String staticDetails;
private @Nullable Priority priority;
private @Nullable Duration startDelay;

private Builder() {}

Expand All @@ -65,6 +66,7 @@ private Builder(StartActivityOptions options) {
this.staticSummary = options.staticSummary;
this.staticDetails = options.staticDetails;
this.priority = options.priority;
this.startDelay = options.startDelay;
}

/** Required. A unique identifier for this activity in the namespace. */
Expand Down Expand Up @@ -159,6 +161,20 @@ public Builder setPriority(Priority priority) {
return this;
}

/**
* Time to wait before dispatching the first activity task. The delay is one-shot — retry
* attempts do not re-apply it. {@code ScheduleToStart} and {@code ScheduleToClose} timeouts
* begin counting only after the delay elapses. Must be non-negative; {@code null} or {@link
* Duration#ZERO} mean no delay.
*/
public Builder setStartDelay(Duration startDelay) {
if (startDelay != null && startDelay.isNegative()) {
throw new IllegalArgumentException("startDelay must be non-negative, got " + startDelay);
}
this.startDelay = startDelay;
return this;
}

public StartActivityOptions build() {
Preconditions.checkArgument(!Strings.isNullOrEmpty(id), "id must not be null or empty");
Preconditions.checkArgument(
Expand All @@ -183,6 +199,7 @@ public StartActivityOptions build() {
private final @Nullable String staticSummary;
private final @Nullable String staticDetails;
private final @Nullable Priority priority;
private final @Nullable Duration startDelay;

private StartActivityOptions(Builder builder) {
this.id = builder.id;
Expand All @@ -198,6 +215,7 @@ private StartActivityOptions(Builder builder) {
this.staticSummary = builder.staticSummary;
this.staticDetails = builder.staticDetails;
this.priority = builder.priority;
this.startDelay = builder.startDelay;
}

public Builder toBuilder() {
Expand Down Expand Up @@ -265,6 +283,11 @@ public Priority getPriority() {
return priority;
}

@Nullable
public Duration getStartDelay() {
return startDelay;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -282,7 +305,8 @@ public boolean equals(Object o) {
&& Objects.equals(typedSearchAttributes, that.typedSearchAttributes)
&& Objects.equals(staticSummary, that.staticSummary)
&& Objects.equals(staticDetails, that.staticDetails)
&& Objects.equals(priority, that.priority);
&& Objects.equals(priority, that.priority)
&& Objects.equals(startDelay, that.startDelay);
}

@Override
Expand All @@ -300,7 +324,8 @@ public int hashCode() {
typedSearchAttributes,
staticSummary,
staticDetails,
priority);
priority,
startDelay);
}

@Override
Expand Down Expand Up @@ -332,6 +357,8 @@ public String toString() {
+ staticDetails
+ "', priority="
+ priority
+ ", startDelay="
+ startDelay
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ public StartActivityOutput startActivity(StartActivityInput input) {
if (options.getPriority() != null) {
request.setPriority(ProtoConverters.toProto(options.getPriority()));
}
if (options.getStartDelay() != null) {
request.setStartDelay(ProtobufTimeUtils.toProtoDuration(options.getStartDelay()));
}

io.temporal.api.common.v1.Header grpcHeader = HeaderUtils.toHeaderGrpc(input.getHeader(), null);
request.setHeader(grpcHeader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,23 @@ public void testMissingTimeoutFails() {
StartActivityOptions.newBuilder().setId("id").setTaskQueue("q").build();
}

@Test(expected = IllegalArgumentException.class)
public void testNegativeStartDelayFails() {
StartActivityOptions.newBuilder().setStartDelay(Duration.ofSeconds(-1));
}

@Test
public void testZeroStartDelayAccepted() {
StartActivityOptions opts =
StartActivityOptions.newBuilder()
.setId("id")
.setTaskQueue("q")
.setStartToCloseTimeout(Duration.ofSeconds(5))
.setStartDelay(Duration.ZERO)
.build();
assertEquals(Duration.ZERO, opts.getStartDelay());
}

@Test
public void testToBuilder() {
StartActivityOptions original =
Expand Down Expand Up @@ -64,6 +81,7 @@ public void testToBuilderPreservesAllFields() {
.setStaticSummary("summary")
.setStaticDetails("details")
.setPriority(priority)
.setStartDelay(Duration.ofSeconds(7))
.build();

StartActivityOptions copy = original.toBuilder().build();
Expand All @@ -80,5 +98,6 @@ public void testToBuilderPreservesAllFields() {
assertEquals("summary", copy.getStaticSummary());
assertEquals("details", copy.getStaticDetails());
assertEquals(priority, copy.getPriority());
assertEquals(Duration.ofSeconds(7), copy.getStartDelay());
}
}
Comment thread
GregoryTravis marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
* test server may not support the standalone activity APIs.
*/
public class StandaloneActivityTest {
// TODO: enable tests disabled with ths when time-skipping is available
private static boolean RUN_TIME_SENSITIVE_TESTS = false;

// ---------------------------------------------------------------------------
// Activity interfaces and implementations
Expand Down Expand Up @@ -96,6 +98,12 @@ public interface AlwaysFailActivity {
void alwaysFail();
}

@ActivityInterface
public interface RetryThenSucceedActivity {
@ActivityMethod(name = "RetryThenSucceed")
int run();
}

/** Snapshot of {@link ActivityInfo} fields captured inside an activity body. */
public static class ActivityInfoSnapshot {
public String activityId;
Expand Down Expand Up @@ -200,6 +208,17 @@ public void alwaysFail() {
}
}

public static class RetryThenSucceedActivityImpl implements RetryThenSucceedActivity {
@Override
public int run() {
int attempt = Activity.getExecutionContext().getInfo().getAttempt();
if (attempt == 1) {
throw ApplicationFailure.newFailure("fail on attempt 1", "test-type");
}
return attempt;
}
}

// ---------------------------------------------------------------------------
// Test rule
// ---------------------------------------------------------------------------
Expand All @@ -215,7 +234,8 @@ public void alwaysFail() {
new InspectInfoActivityImpl(),
new EchoVoidActivityImpl(),
new ConcatActivityImpl(),
new AlwaysFailActivityImpl())
new AlwaysFailActivityImpl(),
new RetryThenSucceedActivityImpl())
.build();

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -986,6 +1006,165 @@ public void testOnlyStartToCloseTimeoutIsValid() {
newActivityClient().execute(SimpleActivity.class, SimpleActivity::execute, opts, "x"));
}

// ---------------------------------------------------------------------------
// Start delay
// ---------------------------------------------------------------------------

@Test
public void testStartDelayDelaysFirstDispatch() {
assumeTrue(SDKTestWorkflowRule.useExternalService);
Duration delay = Duration.ofSeconds(2);
StartActivityOptions opts =
StartActivityOptions.newBuilder()
.setId(uniqueId())
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setScheduleToCloseTimeout(Duration.ofMinutes(5))
.setStartDelay(delay)
.build();

ActivityHandle<String> handle =
newActivityClient().start(SimpleActivity.class, SimpleActivity::execute, opts, "hello");
assertEquals("echo:hello", handle.getResult());

ActivityExecutionDescription desc = handle.describe();
Duration between = Duration.between(desc.getScheduledTime(), desc.getLastStartedTime());
assertTrue(
"lastStartedTime - scheduledTime should be >= startDelay - 500ms, was " + between,
between.compareTo(delay.minusMillis(500)) >= 0);
}

@Test
public void testStartDelayPreservesScheduleToStartTimeout() {
assumeTrue(RUN_TIME_SENSITIVE_TESTS);
assumeTrue(SDKTestWorkflowRule.useExternalService);
StartActivityOptions opts =
StartActivityOptions.newBuilder()
.setId(uniqueId())
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setStartToCloseTimeout(Duration.ofMinutes(5))
.setScheduleToStartTimeout(Duration.ofSeconds(1))
.setStartDelay(Duration.ofSeconds(2))
.build();
String result =
newActivityClient().execute(SimpleActivity.class, SimpleActivity::execute, opts, "x");
assertEquals("echo:x", result);
}

@Test
public void testStartDelayPreservesScheduleToCloseTimeout() {
assumeTrue(RUN_TIME_SENSITIVE_TESTS);
assumeTrue(SDKTestWorkflowRule.useExternalService);
StartActivityOptions opts =
StartActivityOptions.newBuilder()
.setId(uniqueId())
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setScheduleToCloseTimeout(Duration.ofSeconds(1))
.setStartDelay(Duration.ofSeconds(2))
.build();
String result =
newActivityClient().execute(SimpleActivity.class, SimpleActivity::execute, opts, "x");
assertEquals("echo:x", result);
}

@Test
public void testStartDelayNotReappliedOnRetry() {
assumeTrue(RUN_TIME_SENSITIVE_TESTS);
assumeTrue(SDKTestWorkflowRule.useExternalService);
StartActivityOptions opts =
StartActivityOptions.newBuilder()
.setId(uniqueId())
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setScheduleToCloseTimeout(Duration.ofMinutes(5))
.setStartDelay(Duration.ofSeconds(2))
.setRetryOptions(
RetryOptions.newBuilder()
.setInitialInterval(Duration.ofMillis(100))
.setMaximumAttempts(5)
.build())
.build();
long startMs = System.currentTimeMillis();
int finalAttempt =
newActivityClient()
.execute(RetryThenSucceedActivity.class, RetryThenSucceedActivity::run, opts);
long elapsedMs = System.currentTimeMillis() - startMs;

// Bug-trap: confirm the activity actually retried rather than succeeding silently on attempt 1.
assertTrue(
"activity should have retried at least once (final attempt was " + finalAttempt + ")",
finalAttempt >= 2);

// If start delay were re-applied to retries, elapsed would be ~2 * startDelay (~4000ms).
// Without re-application: ~2000ms delay + ~100ms retry interval + worker overhead.
assertTrue(
"retry should not re-apply startDelay; elapsed was " + elapsedMs + "ms", elapsedMs < 3500);
}

@Test
public void testCancelDuringStartDelay() {
assumeTrue(SDKTestWorkflowRule.useExternalService);
StartActivityOptions opts =
StartActivityOptions.newBuilder()
.setId(uniqueId())
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setScheduleToCloseTimeout(Duration.ofMinutes(5))
.setStartDelay(Duration.ofHours(1))
.build();
ActivityHandle<String> handle =
newActivityClient().start(SimpleActivity.class, SimpleActivity::execute, opts, "x");
handle.cancel("test cancel during start delay");

assertEventually(
Duration.ofSeconds(10),
() ->
assertEquals(
ActivityExecutionStatus.ACTIVITY_EXECUTION_STATUS_CANCELED,
handle.describe().getStatus()));
}

@Test
public void testTerminateDuringStartDelay() {
assumeTrue(SDKTestWorkflowRule.useExternalService);
StartActivityOptions opts =
StartActivityOptions.newBuilder()
.setId(uniqueId())
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setScheduleToCloseTimeout(Duration.ofMinutes(5))
.setStartDelay(Duration.ofHours(1))
.build();
ActivityHandle<String> handle =
newActivityClient().start(SimpleActivity.class, SimpleActivity::execute, opts, "x");
handle.terminate("test terminate during start delay");

assertEventually(
Duration.ofSeconds(10),
() ->
assertEquals(
ActivityExecutionStatus.ACTIVITY_EXECUTION_STATUS_TERMINATED,
handle.describe().getStatus()));
}

@Test
public void testZeroStartDelayBehavesAsUnset() {
assumeTrue(RUN_TIME_SENSITIVE_TESTS);
assumeTrue(SDKTestWorkflowRule.useExternalService);
StartActivityOptions opts =
StartActivityOptions.newBuilder()
.setId(uniqueId())
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setScheduleToCloseTimeout(Duration.ofMinutes(5))
.setStartDelay(Duration.ZERO)
.build();
ActivityHandle<String> handle =
newActivityClient().start(SimpleActivity.class, SimpleActivity::execute, opts, "x");
assertEquals("echo:x", handle.getResult());

ActivityExecutionDescription desc = handle.describe();
Duration between = Duration.between(desc.getScheduledTime(), desc.getLastStartedTime());
assertTrue(
"Duration.ZERO should not introduce dispatch latency, was " + between,
between.compareTo(Duration.ofSeconds(1)) < 0);
}

// ---------------------------------------------------------------------------
// Interceptor helpers
// ---------------------------------------------------------------------------
Expand Down
Loading