diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a5083cbf2..7bf551c73 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -114,6 +114,7 @@ jobs: --dynamic-config-value 'component.callbacks.allowedAddresses=[{"Pattern":"localhost:7243","AllowInsecure":true}]' \ --dynamic-config-value frontend.activityAPIsEnabled=true \ --dynamic-config-value activity.enableStandalone=true \ + --dynamic-config-value activity.startDelayEnabled=true \ --dynamic-config-value nexusoperation.enableStandalone=true \ --dynamic-config-value history.enableChasm=true \ --dynamic-config-value history.enableTransitionHistory=true & diff --git a/temporal-sdk/src/main/java/io/temporal/client/StartActivityOptions.java b/temporal-sdk/src/main/java/io/temporal/client/StartActivityOptions.java index 7b9bf4677..7eed75447 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/StartActivityOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/client/StartActivityOptions.java @@ -45,6 +45,7 @@ public static final class Builder { private @Nullable String staticSummary; private @Nullable String staticDetails; private @Nullable Priority priority; + private @Nullable Duration startDelay; private Builder() {} @@ -65,6 +66,7 @@ private Builder(StartActivityOptions options) { this.staticSummary = options.staticSummary; this.staticDetails = options.staticDetails; this.priority = options.priority; + this.startDelay = options.startDelay; } /** Required. A unique identifier for this activity in the namespace. */ @@ -159,6 +161,20 @@ public Builder setPriority(Priority priority) { return this; } + /** + * Time to wait before dispatching the first activity task. The delay is one-shot — retry + * attempts do not re-apply it. {@code ScheduleToStart} and {@code ScheduleToClose} timeouts + * begin counting only after the delay elapses. Must be non-negative; {@code null} or {@link + * Duration#ZERO} mean no delay. + */ + public Builder setStartDelay(Duration startDelay) { + if (startDelay != null && startDelay.isNegative()) { + throw new IllegalArgumentException("startDelay must be non-negative, got " + startDelay); + } + this.startDelay = startDelay; + return this; + } + public StartActivityOptions build() { Preconditions.checkArgument(!Strings.isNullOrEmpty(id), "id must not be null or empty"); Preconditions.checkArgument( @@ -183,6 +199,7 @@ public StartActivityOptions build() { private final @Nullable String staticSummary; private final @Nullable String staticDetails; private final @Nullable Priority priority; + private final @Nullable Duration startDelay; private StartActivityOptions(Builder builder) { this.id = builder.id; @@ -198,6 +215,7 @@ private StartActivityOptions(Builder builder) { this.staticSummary = builder.staticSummary; this.staticDetails = builder.staticDetails; this.priority = builder.priority; + this.startDelay = builder.startDelay; } public Builder toBuilder() { @@ -265,6 +283,11 @@ public Priority getPriority() { return priority; } + @Nullable + public Duration getStartDelay() { + return startDelay; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -282,7 +305,8 @@ public boolean equals(Object o) { && Objects.equals(typedSearchAttributes, that.typedSearchAttributes) && Objects.equals(staticSummary, that.staticSummary) && Objects.equals(staticDetails, that.staticDetails) - && Objects.equals(priority, that.priority); + && Objects.equals(priority, that.priority) + && Objects.equals(startDelay, that.startDelay); } @Override @@ -300,7 +324,8 @@ public int hashCode() { typedSearchAttributes, staticSummary, staticDetails, - priority); + priority, + startDelay); } @Override @@ -332,6 +357,8 @@ public String toString() { + staticDetails + "', priority=" + priority + + ", startDelay=" + + startDelay + '}'; } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootActivityClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootActivityClientInvoker.java index 16e8c8095..96a9e70f5 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootActivityClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootActivityClientInvoker.java @@ -97,6 +97,9 @@ public StartActivityOutput startActivity(StartActivityInput input) { if (options.getPriority() != null) { request.setPriority(ProtoConverters.toProto(options.getPriority())); } + if (options.getStartDelay() != null) { + request.setStartDelay(ProtobufTimeUtils.toProtoDuration(options.getStartDelay())); + } io.temporal.api.common.v1.Header grpcHeader = HeaderUtils.toHeaderGrpc(input.getHeader(), null); request.setHeader(grpcHeader); diff --git a/temporal-sdk/src/test/java/io/temporal/client/StartActivityOptionsTest.java b/temporal-sdk/src/test/java/io/temporal/client/StartActivityOptionsTest.java index dfee844b5..96d048268 100644 --- a/temporal-sdk/src/test/java/io/temporal/client/StartActivityOptionsTest.java +++ b/temporal-sdk/src/test/java/io/temporal/client/StartActivityOptionsTest.java @@ -32,6 +32,23 @@ public void testMissingTimeoutFails() { StartActivityOptions.newBuilder().setId("id").setTaskQueue("q").build(); } + @Test(expected = IllegalArgumentException.class) + public void testNegativeStartDelayFails() { + StartActivityOptions.newBuilder().setStartDelay(Duration.ofSeconds(-1)); + } + + @Test + public void testZeroStartDelayAccepted() { + StartActivityOptions opts = + StartActivityOptions.newBuilder() + .setId("id") + .setTaskQueue("q") + .setStartToCloseTimeout(Duration.ofSeconds(5)) + .setStartDelay(Duration.ZERO) + .build(); + assertEquals(Duration.ZERO, opts.getStartDelay()); + } + @Test public void testToBuilder() { StartActivityOptions original = @@ -64,6 +81,7 @@ public void testToBuilderPreservesAllFields() { .setStaticSummary("summary") .setStaticDetails("details") .setPriority(priority) + .setStartDelay(Duration.ofSeconds(7)) .build(); StartActivityOptions copy = original.toBuilder().build(); @@ -80,5 +98,6 @@ public void testToBuilderPreservesAllFields() { assertEquals("summary", copy.getStaticSummary()); assertEquals("details", copy.getStaticDetails()); assertEquals(priority, copy.getPriority()); + assertEquals(Duration.ofSeconds(7), copy.getStartDelay()); } } diff --git a/temporal-sdk/src/test/java/io/temporal/client/functional/StandaloneActivityTest.java b/temporal-sdk/src/test/java/io/temporal/client/functional/StandaloneActivityTest.java index a54f846ce..5be3226dc 100644 --- a/temporal-sdk/src/test/java/io/temporal/client/functional/StandaloneActivityTest.java +++ b/temporal-sdk/src/test/java/io/temporal/client/functional/StandaloneActivityTest.java @@ -40,6 +40,8 @@ * test server may not support the standalone activity APIs. */ public class StandaloneActivityTest { + // TODO: enable tests disabled with ths when time-skipping is available + private static boolean RUN_TIME_SENSITIVE_TESTS = false; // --------------------------------------------------------------------------- // Activity interfaces and implementations @@ -96,6 +98,12 @@ public interface AlwaysFailActivity { void alwaysFail(); } + @ActivityInterface + public interface RetryThenSucceedActivity { + @ActivityMethod(name = "RetryThenSucceed") + int run(); + } + /** Snapshot of {@link ActivityInfo} fields captured inside an activity body. */ public static class ActivityInfoSnapshot { public String activityId; @@ -200,6 +208,17 @@ public void alwaysFail() { } } + public static class RetryThenSucceedActivityImpl implements RetryThenSucceedActivity { + @Override + public int run() { + int attempt = Activity.getExecutionContext().getInfo().getAttempt(); + if (attempt == 1) { + throw ApplicationFailure.newFailure("fail on attempt 1", "test-type"); + } + return attempt; + } + } + // --------------------------------------------------------------------------- // Test rule // --------------------------------------------------------------------------- @@ -215,7 +234,8 @@ public void alwaysFail() { new InspectInfoActivityImpl(), new EchoVoidActivityImpl(), new ConcatActivityImpl(), - new AlwaysFailActivityImpl()) + new AlwaysFailActivityImpl(), + new RetryThenSucceedActivityImpl()) .build(); // --------------------------------------------------------------------------- @@ -986,6 +1006,165 @@ public void testOnlyStartToCloseTimeoutIsValid() { newActivityClient().execute(SimpleActivity.class, SimpleActivity::execute, opts, "x")); } + // --------------------------------------------------------------------------- + // Start delay + // --------------------------------------------------------------------------- + + @Test + public void testStartDelayDelaysFirstDispatch() { + assumeTrue(SDKTestWorkflowRule.useExternalService); + Duration delay = Duration.ofSeconds(2); + StartActivityOptions opts = + StartActivityOptions.newBuilder() + .setId(uniqueId()) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setScheduleToCloseTimeout(Duration.ofMinutes(5)) + .setStartDelay(delay) + .build(); + + ActivityHandle handle = + newActivityClient().start(SimpleActivity.class, SimpleActivity::execute, opts, "hello"); + assertEquals("echo:hello", handle.getResult()); + + ActivityExecutionDescription desc = handle.describe(); + Duration between = Duration.between(desc.getScheduledTime(), desc.getLastStartedTime()); + assertTrue( + "lastStartedTime - scheduledTime should be >= startDelay - 500ms, was " + between, + between.compareTo(delay.minusMillis(500)) >= 0); + } + + @Test + public void testStartDelayPreservesScheduleToStartTimeout() { + assumeTrue(RUN_TIME_SENSITIVE_TESTS); + assumeTrue(SDKTestWorkflowRule.useExternalService); + StartActivityOptions opts = + StartActivityOptions.newBuilder() + .setId(uniqueId()) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setStartToCloseTimeout(Duration.ofMinutes(5)) + .setScheduleToStartTimeout(Duration.ofSeconds(1)) + .setStartDelay(Duration.ofSeconds(2)) + .build(); + String result = + newActivityClient().execute(SimpleActivity.class, SimpleActivity::execute, opts, "x"); + assertEquals("echo:x", result); + } + + @Test + public void testStartDelayPreservesScheduleToCloseTimeout() { + assumeTrue(RUN_TIME_SENSITIVE_TESTS); + assumeTrue(SDKTestWorkflowRule.useExternalService); + StartActivityOptions opts = + StartActivityOptions.newBuilder() + .setId(uniqueId()) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setScheduleToCloseTimeout(Duration.ofSeconds(1)) + .setStartDelay(Duration.ofSeconds(2)) + .build(); + String result = + newActivityClient().execute(SimpleActivity.class, SimpleActivity::execute, opts, "x"); + assertEquals("echo:x", result); + } + + @Test + public void testStartDelayNotReappliedOnRetry() { + assumeTrue(RUN_TIME_SENSITIVE_TESTS); + assumeTrue(SDKTestWorkflowRule.useExternalService); + StartActivityOptions opts = + StartActivityOptions.newBuilder() + .setId(uniqueId()) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setScheduleToCloseTimeout(Duration.ofMinutes(5)) + .setStartDelay(Duration.ofSeconds(2)) + .setRetryOptions( + RetryOptions.newBuilder() + .setInitialInterval(Duration.ofMillis(100)) + .setMaximumAttempts(5) + .build()) + .build(); + long startMs = System.currentTimeMillis(); + int finalAttempt = + newActivityClient() + .execute(RetryThenSucceedActivity.class, RetryThenSucceedActivity::run, opts); + long elapsedMs = System.currentTimeMillis() - startMs; + + // Bug-trap: confirm the activity actually retried rather than succeeding silently on attempt 1. + assertTrue( + "activity should have retried at least once (final attempt was " + finalAttempt + ")", + finalAttempt >= 2); + + // If start delay were re-applied to retries, elapsed would be ~2 * startDelay (~4000ms). + // Without re-application: ~2000ms delay + ~100ms retry interval + worker overhead. + assertTrue( + "retry should not re-apply startDelay; elapsed was " + elapsedMs + "ms", elapsedMs < 3500); + } + + @Test + public void testCancelDuringStartDelay() { + assumeTrue(SDKTestWorkflowRule.useExternalService); + StartActivityOptions opts = + StartActivityOptions.newBuilder() + .setId(uniqueId()) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setScheduleToCloseTimeout(Duration.ofMinutes(5)) + .setStartDelay(Duration.ofHours(1)) + .build(); + ActivityHandle handle = + newActivityClient().start(SimpleActivity.class, SimpleActivity::execute, opts, "x"); + handle.cancel("test cancel during start delay"); + + assertEventually( + Duration.ofSeconds(10), + () -> + assertEquals( + ActivityExecutionStatus.ACTIVITY_EXECUTION_STATUS_CANCELED, + handle.describe().getStatus())); + } + + @Test + public void testTerminateDuringStartDelay() { + assumeTrue(SDKTestWorkflowRule.useExternalService); + StartActivityOptions opts = + StartActivityOptions.newBuilder() + .setId(uniqueId()) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setScheduleToCloseTimeout(Duration.ofMinutes(5)) + .setStartDelay(Duration.ofHours(1)) + .build(); + ActivityHandle handle = + newActivityClient().start(SimpleActivity.class, SimpleActivity::execute, opts, "x"); + handle.terminate("test terminate during start delay"); + + assertEventually( + Duration.ofSeconds(10), + () -> + assertEquals( + ActivityExecutionStatus.ACTIVITY_EXECUTION_STATUS_TERMINATED, + handle.describe().getStatus())); + } + + @Test + public void testZeroStartDelayBehavesAsUnset() { + assumeTrue(RUN_TIME_SENSITIVE_TESTS); + assumeTrue(SDKTestWorkflowRule.useExternalService); + StartActivityOptions opts = + StartActivityOptions.newBuilder() + .setId(uniqueId()) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setScheduleToCloseTimeout(Duration.ofMinutes(5)) + .setStartDelay(Duration.ZERO) + .build(); + ActivityHandle handle = + newActivityClient().start(SimpleActivity.class, SimpleActivity::execute, opts, "x"); + assertEquals("echo:x", handle.getResult()); + + ActivityExecutionDescription desc = handle.describe(); + Duration between = Duration.between(desc.getScheduledTime(), desc.getLastStartedTime()); + assertTrue( + "Duration.ZERO should not introduce dispatch latency, was " + between, + between.compareTo(Duration.ofSeconds(1)) < 0); + } + // --------------------------------------------------------------------------- // Interceptor helpers // ---------------------------------------------------------------------------