Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ jobs:

- name: Start CLI server
env:
TEMPORAL_CLI_VERSION: 1.7.0
TEMPORAL_CLI_VERSION: 1.7.1-standalone-nexus-operations
run: |
wget -O temporal_cli.tar.gz https://github.com/temporalio/cli/releases/download/v${TEMPORAL_CLI_VERSION}/temporal_cli_${TEMPORAL_CLI_VERSION}_linux_amd64.tar.gz
tar -xzf temporal_cli.tar.gz
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import com.google.protobuf.ByteString;
import com.google.protobuf.util.Durations;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.api.command.v1.*;
import io.temporal.api.common.v1.*;
import io.temporal.api.common.v1.Link;
Expand Down Expand Up @@ -31,6 +33,7 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import org.junit.*;

Expand Down Expand Up @@ -579,7 +582,28 @@ public void testNexusOperationTimeout_BeforeStart() {
public void testNexusOperationTimeout_AfterStart() {
String operationId = UUID.randomUUID().toString();
CompletableFuture<?> nexusPoller =
pollNexusTask().thenCompose(task -> completeNexusTask(task, operationId));
pollNexusTask()
.thenCompose(task -> completeNexusTask(task, operationId))
.exceptionally(
e -> {
// If operation already timed out by the time we send response, the RPC call may
// have succeeded
// or it may have thrown NOT_FOUND status code. Both scenarios are treated as
// success.
Throwable cause = (e instanceof CompletionException) ? e.getCause() : e;
if (cause instanceof StatusRuntimeException) {
if (((StatusRuntimeException) cause).getStatus().getCode()
== Status.Code.NOT_FOUND) {
return null;
}
}
// Every other exception should fail the test.
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else {
throw new CompletionException(cause);
}
});

try {
WorkflowStub stub = newWorkflowStub("TestNexusOperationTimeoutAfterStartWorkflow");
Expand All @@ -591,30 +615,39 @@ public void testNexusOperationTimeout_AfterStart() {
pollResp.getTaskToken(),
newScheduleOperationCommand(
defaultScheduleOperationAttributes()
.setScheduleToCloseTimeout(Durations.fromSeconds(2))));
// needs to be at least 3 seconds due to server bug
.setScheduleToCloseTimeout(Durations.fromSeconds(3))));
testWorkflowRule.assertHistoryEvent(
execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED);

// Wait for operation to be started
nexusPoller.get();

// Poll and verify started event is recorded and triggers workflow progress
pollResp = pollWorkflowTask();
testWorkflowRule.assertHistoryEvent(
execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED);
completeWorkflowTask(pollResp.getTaskToken());
// Keep processing workflow tasks until operation times out, then complete the workflow.
wftPolling:
while (true) {
pollResp = pollWorkflowTask();
for (HistoryEvent event : pollResp.getHistory().getEventsList()) {
if (event.getEventType() == EventType.EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT) {
completeWorkflow(pollResp.getTaskToken());
break wftPolling;
}
}
completeWorkflowTask(pollResp.getTaskToken());
}

// Poll to wait for new task after operation times out
pollResp = pollWorkflowTask();
completeWorkflow(pollResp.getTaskToken());
// Because Nexus operation may have timed out before it started,
// EVENT_TYPE_NEXUS_OPERATION_STARTED
// may or may not be present in history. Both scenarios are OK, so we don't check for it.

List<HistoryEvent> events =
testWorkflowRule.getHistoryEvents(
execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT);
Assert.assertEquals(1, events.size());
io.temporal.api.failure.v1.Failure failure =
events.get(0).getNexusOperationTimedOutEventAttributes().getFailure();
assertOperationFailureInfo(operationId, failure.getNexusOperationExecutionFailureInfo());
// If operation timed out before starting, then operation ID will be missing in failure info
assertOperationFailureInfoAnyID(failure.getNexusOperationExecutionFailureInfo());
Assert.assertEquals("nexus operation completed unsuccessfully", failure.getMessage());
io.temporal.api.failure.v1.Failure cause = failure.getCause();
Assert.assertEquals("operation timed out", cause.getMessage());
Expand Down Expand Up @@ -1540,8 +1573,12 @@ private void assertOperationFailureInfo(NexusOperationFailureInfo info) {
}

private void assertOperationFailureInfo(String operationID, NexusOperationFailureInfo info) {
Assert.assertNotNull(info);
assertOperationFailureInfoAnyID(info);
Assert.assertEquals(operationID, info.getOperationToken());
}

private void assertOperationFailureInfoAnyID(NexusOperationFailureInfo info) {
Assert.assertNotNull(info);
Assert.assertEquals(testEndpoint.getSpec().getName(), info.getEndpoint());
Assert.assertEquals(testService, info.getService());
Assert.assertEquals(testOperation, info.getOperation());
Expand Down
Loading