From 55617cfd7f406a9040b6ec2bd3b46c49de40b364 Mon Sep 17 00:00:00 2001 From: Maciej Dudkowski Date: Wed, 10 Jun 2026 15:21:05 -0400 Subject: [PATCH] Fix flaky test `NexusWorkflowTest.testNexusOperationTimeout_AfterStart` --- .github/workflows/ci.yml | 2 +- .../functional/NexusWorkflowTest.java | 61 +++++++++++++++---- 2 files changed, 50 insertions(+), 13 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f3ba1fa3b..03e0478cf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -84,7 +84,7 @@ jobs: - name: Start CLI server env: - TEMPORAL_CLI_VERSION: 1.7.0 + TEMPORAL_CLI_VERSION: 1.7.1-standalone-nexus-operations run: | wget -O temporal_cli.tar.gz https://github.com/temporalio/cli/releases/download/v${TEMPORAL_CLI_VERSION}/temporal_cli_${TEMPORAL_CLI_VERSION}_linux_amd64.tar.gz tar -xzf temporal_cli.tar.gz diff --git a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java index 558b7fcf9..b059d12a8 100644 --- a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java +++ b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java @@ -4,6 +4,8 @@ import com.google.protobuf.ByteString; import com.google.protobuf.util.Durations; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.temporal.api.command.v1.*; import io.temporal.api.common.v1.*; import io.temporal.api.common.v1.Link; @@ -31,6 +33,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.stream.Collectors; import org.junit.*; @@ -579,7 +582,28 @@ public void testNexusOperationTimeout_BeforeStart() { public void testNexusOperationTimeout_AfterStart() { String operationId = UUID.randomUUID().toString(); CompletableFuture nexusPoller = - pollNexusTask().thenCompose(task -> completeNexusTask(task, operationId)); + pollNexusTask() + .thenCompose(task -> completeNexusTask(task, operationId)) + .exceptionally( + e -> { + // If operation already timed out by the time we send response, the RPC call may + // have succeeded + // or it may have thrown NOT_FOUND status code. Both scenarios are treated as + // success. + Throwable cause = (e instanceof CompletionException) ? e.getCause() : e; + if (cause instanceof StatusRuntimeException) { + if (((StatusRuntimeException) cause).getStatus().getCode() + == Status.Code.NOT_FOUND) { + return null; + } + } + // Every other exception should fail the test. + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } else { + throw new CompletionException(cause); + } + }); try { WorkflowStub stub = newWorkflowStub("TestNexusOperationTimeoutAfterStartWorkflow"); @@ -591,22 +615,30 @@ public void testNexusOperationTimeout_AfterStart() { pollResp.getTaskToken(), newScheduleOperationCommand( defaultScheduleOperationAttributes() - .setScheduleToCloseTimeout(Durations.fromSeconds(2)))); + // needs to be at least 3 seconds due to server bug + .setScheduleToCloseTimeout(Durations.fromSeconds(3)))); testWorkflowRule.assertHistoryEvent( execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED); // Wait for operation to be started nexusPoller.get(); - // Poll and verify started event is recorded and triggers workflow progress - pollResp = pollWorkflowTask(); - testWorkflowRule.assertHistoryEvent( - execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED); - completeWorkflowTask(pollResp.getTaskToken()); + // Keep processing workflow tasks until operation times out, then complete the workflow. + wftPolling: + while (true) { + pollResp = pollWorkflowTask(); + for (HistoryEvent event : pollResp.getHistory().getEventsList()) { + if (event.getEventType() == EventType.EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT) { + completeWorkflow(pollResp.getTaskToken()); + break wftPolling; + } + } + completeWorkflowTask(pollResp.getTaskToken()); + } - // Poll to wait for new task after operation times out - pollResp = pollWorkflowTask(); - completeWorkflow(pollResp.getTaskToken()); + // Because Nexus operation may have timed out before it started, + // EVENT_TYPE_NEXUS_OPERATION_STARTED + // may or may not be present in history. Both scenarios are OK, so we don't check for it. List events = testWorkflowRule.getHistoryEvents( @@ -614,7 +646,8 @@ public void testNexusOperationTimeout_AfterStart() { Assert.assertEquals(1, events.size()); io.temporal.api.failure.v1.Failure failure = events.get(0).getNexusOperationTimedOutEventAttributes().getFailure(); - assertOperationFailureInfo(operationId, failure.getNexusOperationExecutionFailureInfo()); + // If operation timed out before starting, then operation ID will be missing in failure info + assertOperationFailureInfoAnyID(failure.getNexusOperationExecutionFailureInfo()); Assert.assertEquals("nexus operation completed unsuccessfully", failure.getMessage()); io.temporal.api.failure.v1.Failure cause = failure.getCause(); Assert.assertEquals("operation timed out", cause.getMessage()); @@ -1540,8 +1573,12 @@ private void assertOperationFailureInfo(NexusOperationFailureInfo info) { } private void assertOperationFailureInfo(String operationID, NexusOperationFailureInfo info) { - Assert.assertNotNull(info); + assertOperationFailureInfoAnyID(info); Assert.assertEquals(operationID, info.getOperationToken()); + } + + private void assertOperationFailureInfoAnyID(NexusOperationFailureInfo info) { + Assert.assertNotNull(info); Assert.assertEquals(testEndpoint.getSpec().getName(), info.getEndpoint()); Assert.assertEquals(testService, info.getService()); Assert.assertEquals(testOperation, info.getOperation());