From 0fe56ec04bf5dd016c55ae39e9c90ea759a3f48a Mon Sep 17 00:00:00 2001 From: Martijn Visser <2989614+MartijnVisser@users.noreply.github.com> Date: Thu, 4 Jun 2026 11:06:38 +0200 Subject: [PATCH] [FLINK-39845][tests] Fix SavepointITCase.testStopWithSavepointFailsOverToSavepoint under AdaptiveScheduler The JUnit5 migration (FLINK-39124, #27667) replaced a cause-chain search (ExceptionUtils.assertThrowable -> findThrowable) with a direct-cause assertion (assertThatThrownBy(...).hasCauseInstanceOf(StopWithSavepointStoppingException.class)). Under the AdaptiveScheduler, StopWithSavepoint.onLeave() wraps the expected StopWithSavepointStoppingException inside a FlinkException ("Stop with savepoint operation could not be completed."), so it is no longer the direct cause. This regressed the test_cron_adaptive_scheduler nightly leg on master (red every build since 2026-03-21); the default scheduler still exposes it as the direct cause and passed. Restore the chain search via FlinkAssertions.anyCauseMatches so the test passes under both the default and adaptive schedulers, matching the pre-migration behavior still present on release-2.1. Generated-by: Claude Code (Claude Opus 4.8) (cherry picked from commit c3ec5690874a55110ef3ecfb80a335c415ee1b57) --- .../flink/test/checkpointing/SavepointITCase.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 3b32135251955..d4aa038676886 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -132,6 +132,7 @@ import java.util.stream.Stream; import static java.util.concurrent.CompletableFuture.allOf; +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning; import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult; import static org.apache.flink.test.util.TestUtils.waitUntilAllTasksAreRunning; @@ -321,11 +322,14 @@ void testStopWithSavepointFailsOverToSavepoint() throws Throwable { savepointDir.getAbsolutePath(), SavepointFormatType.CANONICAL); + // The AdaptiveScheduler wraps StopWithSavepointStoppingException in a FlinkException, + // so search the whole cause chain rather than only the direct cause. assertThatThrownBy(savepointCompleted::get) .isInstanceOf(ExecutionException.class) - .hasCauseInstanceOf(StopWithSavepointStoppingException.class) - .cause() - .hasMessageStartingWith("A savepoint has been created at: "); + .satisfies( + anyCauseMatches( + StopWithSavepointStoppingException.class, + "A savepoint has been created at: ")); assertThat(client.getJobStatus(jobGraph.getJobID()).get()) .isIn(JobStatus.FAILED, JobStatus.FAILING); } finally {