Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,10 @@ public void continueAsNew(ContinueAsNewInput input) {
attributes.setWorkflowTaskTimeout(
ProtobufTimeUtils.toProtoDuration(options.getWorkflowTaskTimeout()));
}
if (options.getBackoffStartInterval() != null) {
attributes.setBackoffStartInterval(
ProtobufTimeUtils.toProtoDuration(options.getBackoffStartInterval()));
}
if (options.getTaskQueue() != null && !options.getTaskQueue().isEmpty()) {
attributes.setTaskQueue(TaskQueue.newBuilder().setName(options.getTaskQueue()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public static final class Builder {
private String taskQueue;
private RetryOptions retryOptions;
private Duration workflowTaskTimeout;
private Duration backoffStartInterval;
private Map<String, Object> memo;
private Map<String, Object> searchAttributes;
private SearchAttributes typedSearchAttributes;
Expand All @@ -57,6 +58,7 @@ private Builder(ContinueAsNewOptions options) {
this.taskQueue = options.taskQueue;
this.retryOptions = options.retryOptions;
this.workflowTaskTimeout = options.workflowTaskTimeout;
this.backoffStartInterval = options.backoffStartInterval;
this.memo = options.getMemo();
this.searchAttributes = options.getSearchAttributes();
this.typedSearchAttributes = options.getTypedSearchAttributes();
Expand Down Expand Up @@ -85,6 +87,12 @@ public Builder setWorkflowTaskTimeout(Duration workflowTaskTimeout) {
return this;
}

/** Sets the delay before the first workflow task of the continued run is scheduled. */
public Builder setBackoffStartInterval(Duration backoffStartInterval) {
this.backoffStartInterval = backoffStartInterval;
return this;
}

public Builder setMemo(Map<String, Object> memo) {
this.memo = memo;
return this;
Expand Down Expand Up @@ -152,6 +160,7 @@ public ContinueAsNewOptions build() {
taskQueue,
retryOptions,
workflowTaskTimeout,
backoffStartInterval,
memo,
searchAttributes,
typedSearchAttributes,
Expand All @@ -165,6 +174,7 @@ public ContinueAsNewOptions build() {
private final @Nullable String taskQueue;
private final @Nullable RetryOptions retryOptions;
private final @Nullable Duration workflowTaskTimeout;
private final @Nullable Duration backoffStartInterval;
private final @Nullable Map<String, Object> memo;
private final @Nullable Map<String, Object> searchAttributes;
private final @Nullable SearchAttributes typedSearchAttributes;
Expand All @@ -186,10 +196,37 @@ public ContinueAsNewOptions(
@Nullable List<ContextPropagator> contextPropagators,
@SuppressWarnings("deprecation") @Nullable VersioningIntent versioningIntent,
@Nullable InitialVersioningBehavior initialVersioningBehavior) {
this(
workflowRunTimeout,
taskQueue,
retryOptions,
workflowTaskTimeout,
null,
memo,
searchAttributes,
typedSearchAttributes,
contextPropagators,
versioningIntent,
initialVersioningBehavior);
}

public ContinueAsNewOptions(
@Nullable Duration workflowRunTimeout,
@Nullable String taskQueue,
@Nullable RetryOptions retryOptions,
@Nullable Duration workflowTaskTimeout,
@Nullable Duration backoffStartInterval,
@Nullable Map<String, Object> memo,
@Nullable Map<String, Object> searchAttributes,
@Nullable SearchAttributes typedSearchAttributes,
@Nullable List<ContextPropagator> contextPropagators,
@SuppressWarnings("deprecation") @Nullable VersioningIntent versioningIntent,
@Nullable InitialVersioningBehavior initialVersioningBehavior) {
this.workflowRunTimeout = workflowRunTimeout;
this.taskQueue = taskQueue;
this.retryOptions = retryOptions;
this.workflowTaskTimeout = workflowTaskTimeout;
this.backoffStartInterval = backoffStartInterval;
this.memo = memo;
this.searchAttributes = searchAttributes;
this.typedSearchAttributes = typedSearchAttributes;
Expand All @@ -215,6 +252,14 @@ public RetryOptions getRetryOptions() {
return workflowTaskTimeout;
}

/**
* @return the delay before the first workflow task of the continued run is scheduled, or null if
* unset.
*/
public @Nullable Duration getBackoffStartInterval() {
return backoffStartInterval;
}

public @Nullable Map<String, Object> getMemo() {
return memo;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,50 @@
package io.temporal.internal.sync;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
import io.temporal.api.common.v1.SearchAttributes;
import io.temporal.api.common.v1.WorkflowType;
import io.temporal.common.interceptors.Header;
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor.ContinueAsNewInput;
import io.temporal.internal.common.SearchAttributesUtil;
import io.temporal.internal.replay.ReplayWorkflowContext;
import io.temporal.workflow.ContinueAsNewOptions;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;

public class SyncWorkflowContextTest {
SyncWorkflowContext context;
ReplayWorkflowContext mockReplayWorkflowContext = mock(ReplayWorkflowContext.class);
ExecutorService threadPool;
DeterministicRunner runner;

@Before
public void setUp() {
this.threadPool = Executors.newCachedThreadPool();
this.context = DummySyncWorkflowContext.newDummySyncWorkflowContext();
this.context.setReplayContext(mockReplayWorkflowContext);
when(mockReplayWorkflowContext.getWorkflowType())
.thenReturn(WorkflowType.newBuilder().setName("dummy-workflow").build());
}

@After
public void tearDown() {
if (runner != null) {
runner.close();
}
threadPool.shutdown();
}

@Test
Expand All @@ -32,6 +57,27 @@ public void testUpsertSearchAttributes() {
verify(mockReplayWorkflowContext, times(1)).upsertSearchAttributes(serializedAttr);
}

@Test
public void testContinueAsNewBackoffStartInterval() {
Duration backoffStartInterval = Duration.ofSeconds(7);
ContinueAsNewOptions options =
ContinueAsNewOptions.newBuilder().setBackoffStartInterval(backoffStartInterval).build();
runner =
DeterministicRunner.newRunner(
threadPool::submit,
context,
() ->
context.continueAsNew(
new ContinueAsNewInput(null, options, new Object[0], Header.empty())));

runner.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS);

ArgumentCaptor<ContinueAsNewWorkflowExecutionCommandAttributes> attributes =
ArgumentCaptor.forClass(ContinueAsNewWorkflowExecutionCommandAttributes.class);
verify(mockReplayWorkflowContext).continueAsNewOnCompletion(attributes.capture());
assertEquals(7, attributes.getValue().getBackoffStartInterval().getSeconds());
}

@Test(expected = IllegalArgumentException.class)
public void testUpsertSearchAttributesException() {
Map<String, Object> attr = new HashMap<>();
Expand Down
Loading