SCHEDULER_RESCALE_TRIGGER_ACTIVE_CHECKPOINT_ENABLED =
+ key("jobmanager.adaptive-scheduler.rescale-trigger.active-checkpoint.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ Description.builder()
+ .text(
+ "When enabled, the Adaptive Scheduler actively triggers a checkpoint when resources change and rescaling is desired, "
+ + "rather than waiting for the next periodic checkpoint. "
+ + "This reduces rescaling latency, especially when checkpoint intervals are large. "
+ + "The active trigger respects the configured minimum pause between checkpoints and "
+ + "will not fire if a checkpoint is already in progress or being triggered.")
+ .build());
+
/**
* @deprecated Use {@link
* JobManagerOptions#SCHEDULER_SUBMISSION_RESOURCE_STABILIZATION_TIMEOUT}.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 2a86a566b1f00..7750543be4100 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -61,6 +61,7 @@
import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
@@ -1999,6 +2000,27 @@ public long getCheckpointTimeout() {
return checkpointTimeout;
}
+ /**
+ * Returns the remaining {@link Duration} until {@code minPauseBetweenCheckpoints} is satisfied
+ * for a new active-trigger checkpoint, computed from the time elapsed since the last completed
+ * checkpoint (or from the coordinator clock's epoch when no checkpoint has completed yet —
+ * which is normally far in the past in production). {@link Duration#ZERO} means the trigger can
+ * fire immediately.
+ *
+ * Returns {@link Optional#empty()} as a fallback if a checkpoint is already in flight
+ * (triggering or pending), in which case no active trigger should be scheduled.
+ */
+ public Optional getActiveCheckpointTriggerDelay() {
+ synchronized (lock) {
+ if (isTriggering || !pendingCheckpoints.isEmpty()) {
+ return Optional.empty();
+ }
+ final long elapsed = clock.relativeTimeMillis() - lastCheckpointCompletionRelativeTime;
+ final long remaining = minPauseBetweenCheckpoints - elapsed;
+ return Optional.of(remaining > 0 ? Duration.ofMillis(remaining) : Duration.ZERO);
+ }
+ }
+
/**
* @deprecated use {@link #getNumQueuedRequests()}
*/
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 70c5b62ce90ad..b75f0d09f29e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -314,7 +314,9 @@ public static Settings of(
SCHEDULER_RESCALE_TRIGGER_MAX_DELAY,
maximumDelayForRescaleTriggerDefault),
rescaleOnFailedCheckpointsCount,
- configuration.get(WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE));
+ configuration.get(WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE),
+ configuration.get(
+ JobManagerOptions.SCHEDULER_RESCALE_TRIGGER_ACTIVE_CHECKPOINT_ENABLED));
}
private final SchedulerExecutionMode executionMode;
@@ -326,6 +328,7 @@ public static Settings of(
private final Duration maximumDelayForTriggeringRescale;
private final int rescaleOnFailedCheckpointCount;
private final int rescaleHistoryMax;
+ private final boolean activeCheckpointTriggerEnabled;
private Settings(
SchedulerExecutionMode executionMode,
@@ -336,7 +339,8 @@ private Settings(
Duration executingResourceStabilizationTimeout,
Duration maximumDelayForTriggeringRescale,
int rescaleOnFailedCheckpointCount,
- int rescaleHistoryMax) {
+ int rescaleHistoryMax,
+ boolean activeCheckpointTriggerEnabled) {
this.executionMode = executionMode;
this.submissionResourceWaitTimeout = submissionResourceWaitTimeout;
this.submissionResourceStabilizationTimeout = submissionResourceStabilizationTimeout;
@@ -346,6 +350,7 @@ private Settings(
this.maximumDelayForTriggeringRescale = maximumDelayForTriggeringRescale;
this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
this.rescaleHistoryMax = rescaleHistoryMax;
+ this.activeCheckpointTriggerEnabled = activeCheckpointTriggerEnabled;
}
public SchedulerExecutionMode getExecutionMode() {
@@ -384,6 +389,10 @@ public int getRescaleHistoryMax() {
return rescaleHistoryMax;
}
+ public boolean isActiveCheckpointTriggerEnabled() {
+ return activeCheckpointTriggerEnabled;
+ }
+
public JobRescaleConfigInfo toJobRescaleConfigInfo() {
return new JobRescaleConfigInfo(
rescaleHistoryMax,
@@ -1311,7 +1320,8 @@ public void goToExecuting(
userCodeClassLoader,
failureCollection,
this::createExecutingStateTransitionManager,
- settings.getRescaleOnFailedCheckpointCount()));
+ settings.getRescaleOnFailedCheckpointCount(),
+ settings.isActiveCheckpointTriggerEnabled()));
}
private StateTransitionManager createExecutingStateTransitionManager(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
index 87f810ae784d3..a7945e361669b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java
@@ -164,6 +164,21 @@ private void progressToStabilized(Temporal firstChangeEventTimestamp) {
progressToPhase(new Stabilized(clock, this, firstChangeEventTimestamp, maxTriggerDelay));
}
+ /**
+ * Requests the active checkpoint trigger from the context. Called from within phase lifecycle
+ * methods:
+ *
+ *
+ * - On entering {@link Stabilizing} (to overlap checkpoint with the stabilization wait)
+ *
- On each {@link Stabilizing#onChange} event (retry if a previous trigger was skipped)
+ *
- On entering {@link Stabilized} (fallback if no checkpoint completed during
+ * stabilization)
+ *
+ */
+ private void requestActiveCheckpointTrigger() {
+ transitionContext.requestActiveCheckpointTrigger();
+ }
+
private void triggerTransitionToSubsequentState() {
progressToPhase(new Transitioning(clock, this));
transitionContext.transitionToSubsequentState();
@@ -362,6 +377,7 @@ private Stabilizing(
resourceStabilizationTimeout);
scheduleTransitionEvaluation();
+ context().requestActiveCheckpointTrigger();
}
@Override
@@ -370,6 +386,7 @@ void onChange(boolean newResourceDriven) {
// event was already handled by a onTrigger callback with a no-op
onChangeEventTimestamp = now();
scheduleTransitionEvaluation();
+ context().requestActiveCheckpointTrigger();
}
@Override
@@ -427,6 +444,7 @@ private Stabilized(
},
firstChangeEventTimestamp,
maxTriggerDelay);
+ context().requestActiveCheckpointTrigger();
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
index deae52856dc41..d0ecf284b7914 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java
@@ -18,9 +18,11 @@
package org.apache.flink.runtime.scheduler.adaptive;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.checkpoint.CheckpointStatsListener;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
@@ -40,6 +42,7 @@
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
@@ -64,6 +67,9 @@ class Executing extends StateWithExecutionGraph
private final StateTransitionManager stateTransitionManager;
private final int rescaleOnFailedCheckpointCount;
+ private final boolean activeCheckpointTriggerEnabled;
+ // only modifiable from the main thread
+ private boolean activeCheckpointTriggerScheduled;
// null indicates that there was no change event observed, yet
@Nullable private AtomicInteger failedCheckpointCountdown;
@@ -77,7 +83,8 @@ class Executing extends StateWithExecutionGraph
List failureCollection,
Function
stateTransitionManagerFactory,
- int rescaleOnFailedCheckpointCount) {
+ int rescaleOnFailedCheckpointCount,
+ boolean activeCheckpointTriggerEnabled) {
super(
context,
executionGraph,
@@ -96,6 +103,7 @@ class Executing extends StateWithExecutionGraph
rescaleOnFailedCheckpointCount > 0,
"The rescaleOnFailedCheckpointCount should be larger than 0.");
this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
+ this.activeCheckpointTriggerEnabled = activeCheckpointTriggerEnabled;
this.failedCheckpointCountdown = null;
recordRescaleForJobIntoExecuting(logger, context);
@@ -182,6 +190,105 @@ public ScheduledFuture> scheduleOperation(Runnable callback, Duration delay) {
return context.runIfState(this, callback, delay);
}
+ @Override
+ public void requestActiveCheckpointTrigger() {
+ if (!activeCheckpointTriggerEnabled) {
+ return;
+ }
+ final CheckpointCoordinator checkpointCoordinator =
+ getExecutionGraph().getCheckpointCoordinator();
+ if (checkpointCoordinator == null
+ || !checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
+ getLogger()
+ .debug(
+ "Skipping active checkpoint trigger for rescale: checkpointing not configured.");
+ return;
+ }
+
+ final Optional triggerDelay =
+ checkpointCoordinator.getActiveCheckpointTriggerDelay();
+ if (triggerDelay.isEmpty()) {
+ getLogger()
+ .debug(
+ "Skipping active checkpoint trigger for rescale: checkpoint already in progress.");
+ return;
+ }
+ scheduleActiveCheckpointTriggerRetry(triggerDelay.get());
+ }
+
+ private void scheduleActiveCheckpointTriggerRetry(Duration delay) {
+ if (activeCheckpointTriggerScheduled) {
+ return;
+ }
+ activeCheckpointTriggerScheduled = true;
+ if (!delay.isZero()) {
+ getLogger()
+ .debug(
+ "Min pause not satisfied, scheduling active checkpoint trigger retry in {} ms.",
+ delay.toMillis());
+ }
+ context.runIfState(this, this::tryFireActiveCheckpointAfterRetry, delay);
+ }
+
+ private void tryFireActiveCheckpointAfterRetry() {
+ activeCheckpointTriggerScheduled = false;
+
+ // Parallelism is the only guard re-evaluated here: it can change between the request
+ // and the scheduled fire (e.g. resources changed again, or the parallelism was reverted
+ // back to the current value while we waited for min-pause). The null check and
+ // periodic-checkpoint config are invariants validated at request time.
+ if (!parallelismChanged()) {
+ getLogger()
+ .debug("Active checkpoint trigger for rescale dropped: parallelism unchanged.");
+ return;
+ }
+ final CheckpointCoordinator checkpointCoordinator =
+ Preconditions.checkNotNull(
+ getExecutionGraph().getCheckpointCoordinator(),
+ "Checkpoint coordinator was non-null when the trigger was scheduled; "
+ + "an Executing state never drops its coordinator.");
+ final Optional triggerDelay =
+ checkpointCoordinator.getActiveCheckpointTriggerDelay();
+ if (triggerDelay.isEmpty()) {
+ getLogger()
+ .debug(
+ "Active checkpoint trigger for rescale dropped: checkpoint already in progress after retry.");
+ } else if (triggerDelay.get().isZero()) {
+ fireActiveCheckpointTrigger(checkpointCoordinator);
+ } else {
+ getLogger()
+ .debug(
+ "Active checkpoint trigger for rescale silently dropped: a periodic checkpoint completed while the trigger was scheduled.");
+ }
+ }
+
+ private void fireActiveCheckpointTrigger(CheckpointCoordinator checkpointCoordinator) {
+ Preconditions.checkState(
+ activeCheckpointTriggerEnabled,
+ "Active checkpoint trigger fired while the feature is disabled.");
+ final JobID jobId = getExecutionGraph().getJobID();
+ getLogger().info("Actively triggering checkpoint to expedite rescaling, job {}.", jobId);
+ // isPeriodic=false: min-pause is enforced above via getActiveCheckpointTriggerDelay.
+ FutureUtils.assertNoException(
+ checkpointCoordinator
+ .triggerCheckpoint(false)
+ .handle(
+ (completedCheckpoint, throwable) -> {
+ if (throwable != null) {
+ getLogger()
+ .warn(
+ "Active checkpoint trigger for rescale failed.",
+ throwable);
+ } else {
+ getLogger()
+ .info(
+ "Active checkpoint for rescale completed successfully: {}.",
+ completedCheckpoint.getCheckpointID());
+ }
+ return null;
+ }));
+ }
+
@Override
public void transitionToSubsequentState() {
Optional availableVertexParallelism =
@@ -399,6 +506,7 @@ static class Factory implements StateFactory {
private final Function
stateTransitionManagerFactory;
private final int rescaleOnFailedCheckpointCount;
+ private final boolean activeCheckpointTriggerEnabled;
Factory(
ExecutionGraph executionGraph,
@@ -410,7 +518,8 @@ static class Factory implements StateFactory {
List failureCollection,
Function
stateTransitionManagerFactory,
- int rescaleOnFailedCheckpointCount) {
+ int rescaleOnFailedCheckpointCount,
+ boolean activeCheckpointTriggerEnabled) {
this.context = context;
this.log = log;
this.executionGraph = executionGraph;
@@ -420,6 +529,7 @@ static class Factory implements StateFactory {
this.failureCollection = failureCollection;
this.stateTransitionManagerFactory = stateTransitionManagerFactory;
this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
+ this.activeCheckpointTriggerEnabled = activeCheckpointTriggerEnabled;
}
public Class getStateClass() {
@@ -436,7 +546,8 @@ public Executing getState() {
userCodeClassLoader,
failureCollection,
stateTransitionManagerFactory,
- rescaleOnFailedCheckpointCount);
+ rescaleOnFailedCheckpointCount,
+ activeCheckpointTriggerEnabled);
}
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java
index 98229a9afd3e3..5b2ca1b928d85 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java
@@ -89,5 +89,12 @@ interface Context extends RescaleContext {
* @return the {@link JobID} of the job
*/
JobID getJobId();
+
+ /**
+ * Requests the context to actively trigger a checkpoint to expedite rescaling. The
+ * implementation decides whether to actually trigger based on its own guard conditions.
+ * Multiple calls are safe; guards prevent redundant triggers.
+ */
+ default void requestActiveCheckpointTrigger() {}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index 0a505ab07f2c6..5493e05667c68 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -63,6 +63,7 @@
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
@@ -791,6 +792,8 @@ public static class CheckpointCoordinatorBuilder {
VertexFinishedStateChecker>
vertexFinishedStateCheckerFactory = VertexFinishedStateChecker::new;
+ private Clock clock = SystemClock.getInstance();
+
public CheckpointCoordinatorBuilder setCheckpointCoordinatorConfiguration(
CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration) {
this.checkpointCoordinatorConfiguration = checkpointCoordinatorConfiguration;
@@ -870,6 +873,11 @@ public CheckpointCoordinatorBuilder setVertexFinishedStateCheckerFactory(
return this;
}
+ public CheckpointCoordinatorBuilder setClock(Clock clock) {
+ this.clock = clock;
+ return this;
+ }
+
public CheckpointCoordinator build(ScheduledExecutorService executorService)
throws Exception {
return build(
@@ -899,7 +907,7 @@ public CheckpointCoordinator build(ExecutionGraph executionGraph) throws Excepti
timer,
failureManager,
checkpointPlanCalculator,
- SystemClock.getInstance(),
+ clock,
checkpointStatsTracker,
vertexFinishedStateCheckerFactory);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java
index cf6051a90b315..8453f5c5a6f6b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java
@@ -400,6 +400,66 @@ private static void assertFinalStateTransitionHappened(
assertThat(testInstance.getPhase()).isInstanceOf(Transitioning.class);
}
+ @Test
+ void testActiveCheckpointTriggerCalledOnEnteringStabilizing() {
+ final TestingStateTransitionManagerContext ctx =
+ TestingStateTransitionManagerContext.stableContext();
+ ctx.withSufficientResources();
+ final DefaultStateTransitionManager testInstance =
+ ctx.createTestInstanceThatPassedCooldownPhase();
+
+ assertThat(testInstance.getPhase()).isInstanceOf(Idling.class);
+ assertThat(ctx.getActiveCheckpointTriggerCount())
+ .as("Neither Cooldown nor Idling should fire the active trigger")
+ .isZero();
+
+ testInstance.onChange(true);
+
+ assertThat(testInstance.getPhase()).isInstanceOf(Stabilizing.class);
+ assertThat(ctx.getActiveCheckpointTriggerCount())
+ .as("Entering Stabilizing must fire the active trigger exactly once")
+ .isEqualTo(1);
+ }
+
+ @Test
+ void testActiveCheckpointTriggerCalledOnChangeInStabilizing() {
+ final TestingStateTransitionManagerContext ctx =
+ TestingStateTransitionManagerContext.stableContext();
+ ctx.withSufficientResources();
+ final DefaultStateTransitionManager testInstance =
+ ctx.createTestInstanceThatPassedCooldownPhase();
+
+ testInstance.onChange(true);
+ assertThat(testInstance.getPhase()).isInstanceOf(Stabilizing.class);
+
+ testInstance.onChange(true);
+
+ assertThat(testInstance.getPhase()).isInstanceOf(Stabilizing.class);
+ assertThat(ctx.getActiveCheckpointTriggerCount())
+ .as(
+ "Two onChange calls. One entering Stabilizing, one onChange while in Stabilizing — must fire the trigger twice")
+ .isEqualTo(2);
+ }
+
+ @Test
+ void testActiveCheckpointTriggerCalledOnEnteringStabilized() {
+ final TestingStateTransitionManagerContext ctx =
+ TestingStateTransitionManagerContext.stableContext();
+ ctx.withSufficientResources();
+ final DefaultStateTransitionManager testInstance =
+ ctx.createTestInstanceThatPassedCooldownPhase();
+
+ testInstance.onChange(true);
+ assertThat(testInstance.getPhase()).isInstanceOf(Stabilizing.class);
+
+ ctx.passResourceStabilizationTimeout();
+
+ assertThat(testInstance.getPhase()).isInstanceOf(Stabilized.class);
+ assertThat(ctx.getActiveCheckpointTriggerCount())
+ .as("Entering Stabilizing then Stabilized must fire the trigger twice in total")
+ .isEqualTo(2);
+ }
+
private static void changeWithoutPhaseMove(
TestingStateTransitionManagerContext ctx,
DefaultStateTransitionManager testInstance,
@@ -460,6 +520,7 @@ private static class TestingStateTransitionManagerContext
// internal state used for assertions
private final AtomicBoolean transitionTriggered = new AtomicBoolean();
+ private int activeCheckpointTriggerCount = 0;
private final SortedMap>> scheduledTasks =
new TreeMap<>();
@@ -537,6 +598,11 @@ public void transitionToSubsequentState() {
transitionTriggered.set(true);
}
+ @Override
+ public void requestActiveCheckpointTrigger() {
+ activeCheckpointTriggerCount++;
+ }
+
@Override
public ScheduledFuture> scheduleOperation(Runnable callback, Duration delay) {
final Instant triggerTime =
@@ -703,5 +769,9 @@ public boolean stateTransitionWasTriggered() {
public void clearStateTransition() {
transitionTriggered.set(false);
}
+
+ public int getActiveCheckpointTriggerCount() {
+ return activeCheckpointTriggerCount;
+ }
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
index f593082f6c75b..2d7bf94f4481c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
@@ -29,6 +29,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -57,6 +58,7 @@
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl;
import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo;
@@ -75,6 +77,8 @@
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.clock.ManualClock;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -160,7 +164,8 @@ void testNoDeploymentCallOnEnterWhenVertexRunning() throws Exception {
ClassLoader.getSystemClassLoader(),
new ArrayList<>(),
(context) -> TestingStateTransitionManager.withNoOp(),
- 1);
+ 1,
+ false);
assertThat(mockExecutionVertex.isDeployCalled()).isFalse();
}
}
@@ -186,7 +191,8 @@ void testIllegalStateExceptionOnNotRunningExecutionGraph() {
ClassLoader.getSystemClassLoader(),
new ArrayList<>(),
context -> TestingStateTransitionManager.withNoOp(),
- 1);
+ 1,
+ false);
}
})
.isInstanceOf(IllegalStateException.class);
@@ -556,6 +562,564 @@ public CheckpointCoordinator getCheckpointCoordinator() {
}
}
+ @Test
+ void testActiveCheckpointTriggerRetryIsNoopAfterStopWithSavepoint() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new MockExecutionJobVertex(MockExecutionVertex::new);
+
+ ManuallyTriggeredScheduledExecutor checkpointTimer =
+ new ManuallyTriggeredScheduledExecutor();
+ ManualClock clock = new ManualClock();
+ CheckpointCoordinatorConfiguration coordConfig =
+ new CheckpointCoordinatorConfiguration
+ .CheckpointCoordinatorConfigurationBuilder()
+ .setCheckpointInterval(10_000L)
+ .setMinPauseBetweenCheckpoints(10_000L)
+ .setMaxConcurrentCheckpoints(Integer.MAX_VALUE)
+ .build();
+ CheckpointCoordinator coordinator =
+ new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setCheckpointCoordinatorConfiguration(coordConfig)
+ .setTimer(checkpointTimer)
+ .setClock(clock)
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator getCheckpointCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public Iterable getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+ AccessExecutionJobVertex::getJobVertexId,
+ v -> v.getParallelism() + 1))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+ exec.requestActiveCheckpointTrigger();
+ ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+ (ManuallyTriggeredComponentMainThreadExecutor) ctx.getMainThreadExecutor();
+ assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+ .as("A retry should be scheduled while min-pause is not satisfied")
+ .isNotEmpty();
+
+ ctx.setExpectStopWithSavepoint(assertNonNull());
+ exec.stopWithSavepoint("file:///tmp/target", true, SavepointFormatType.CANONICAL);
+
+ ctxExecutor.triggerAll();
+ checkpointTimer.triggerAll();
+ int pendingAfterSavepoint = coordinator.getNumberOfPendingCheckpoints();
+
+ // Trigger the previously scheduled retry; it must be a no-op because the state
+ // transitioned to StopWithSavepoint (runIfState gates the action on
+ // hadStateTransition).
+ ctxExecutor.triggerNonPeriodicScheduledTasks();
+ checkpointTimer.triggerAll();
+ assertThat(coordinator.getNumberOfPendingCheckpoints())
+ .as("Active checkpoint trigger retry must not fire after stopWithSavepoint")
+ .isEqualTo(pendingAfterSavepoint);
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenDisabled() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ final AtomicBoolean coordinatorAccessed = new AtomicBoolean(false);
+ CheckpointCoordinator coordinator =
+ new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator getCheckpointCoordinator() {
+ coordinatorAccessed.set(true);
+ return coordinator;
+ }
+ };
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(false)
+ .build(ctx);
+
+ exec.requestActiveCheckpointTrigger();
+ assertThat(coordinatorAccessed.get()).isFalse();
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenNoCoordinator() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new MockExecutionJobVertex(MockExecutionVertex::new);
+
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator getCheckpointCoordinator() {
+ return null;
+ }
+
+ @Override
+ public Iterable getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+ // Set parallelism change so the only blocking guard is the null coordinator check;
+ // if the null check is ever removed from requestActiveCheckpointTrigger this test
+ // will NPE on `cc.isPeriodicCheckpointingConfigured()` and fail loudly.
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+ AccessExecutionJobVertex::getJobVertexId,
+ v -> v.getParallelism() + 1))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+ ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+ (ManuallyTriggeredComponentMainThreadExecutor) ctx.getMainThreadExecutor();
+ int baseline = ctxExecutor.getActiveNonPeriodicScheduledTask().size();
+ exec.requestActiveCheckpointTrigger();
+ assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+ .as("No checkpoint trigger should be scheduled when coordinator is null")
+ .hasSize(baseline);
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenPeriodicCheckpointingNotConfigured() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new MockExecutionJobVertex(MockExecutionVertex::new);
+
+ ManuallyTriggeredScheduledExecutor checkpointTimer =
+ new ManuallyTriggeredScheduledExecutor();
+ CheckpointCoordinator coordinator =
+ new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setCheckpointCoordinatorConfiguration(
+ new CheckpointCoordinatorConfiguration
+ .CheckpointCoordinatorConfigurationBuilder()
+ .setCheckpointInterval(Long.MAX_VALUE)
+ .build())
+ .setTimer(checkpointTimer)
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator getCheckpointCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public Iterable getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+ // Set parallelism change so the only blocking guard is periodic-checkpointing-not-
+ // configured. If the periodic check is removed, the request will schedule a retry —
+ // which the assertion below catches.
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+ AccessExecutionJobVertex::getJobVertexId,
+ v -> v.getParallelism() + 1))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+
+ assertThat(coordinator.isPeriodicCheckpointingConfigured()).isFalse();
+ ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+ (ManuallyTriggeredComponentMainThreadExecutor) ctx.getMainThreadExecutor();
+ int baseline = ctxExecutor.getActiveNonPeriodicScheduledTask().size();
+ exec.requestActiveCheckpointTrigger();
+ assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+ .as(
+ "No retry should be scheduled when periodic checkpointing is not configured")
+ .hasSize(baseline);
+ checkpointTimer.triggerAll();
+ assertThat(coordinator.getNumberOfPendingCheckpoints())
+ .as(
+ "No checkpoint should be triggered when periodic checkpointing is not configured")
+ .isEqualTo(0);
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenParallelismUnchanged() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new MockExecutionJobVertex(MockExecutionVertex::new);
+
+ ManuallyTriggeredScheduledExecutor checkpointTimer =
+ new ManuallyTriggeredScheduledExecutor();
+ CheckpointCoordinator coordinator =
+ new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setTimer(checkpointTimer)
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator getCheckpointCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public Iterable getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+ // Parallelism is intentionally UNCHANGED — the parallelism guard in
+ // tryFireActiveCheckpointAfterRetry must drop the trigger before it fires.
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+ AccessExecutionJobVertex::getJobVertexId,
+ AccessExecutionJobVertex::getParallelism))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+ ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+ (ManuallyTriggeredComponentMainThreadExecutor) ctx.getMainThreadExecutor();
+ exec.requestActiveCheckpointTrigger();
+ // The request path schedules a retry (parallelism check lives in the retry, not in
+ // the request). Run the scheduled retry to exercise the parallelism check; without
+ // this drain the test would pass for the wrong reason (retry never ran).
+ ctxExecutor.triggerNonPeriodicScheduledTasks();
+ checkpointTimer.triggerAll();
+ assertThat(coordinator.getNumberOfPendingCheckpoints())
+ .as("No checkpoint should be triggered when parallelism is unchanged")
+ .isEqualTo(0);
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenCheckpointInProgress() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new MockExecutionJobVertex(MockExecutionVertex::new);
+
+ ManuallyTriggeredScheduledExecutor checkpointTimer =
+ new ManuallyTriggeredScheduledExecutor();
+ CheckpointCoordinator coordinator =
+ new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setTimer(checkpointTimer)
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator getCheckpointCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public Iterable getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+
+ // Parallelism +1 and periodic configured so the only blocker is the in-progress
+ // check at request time.
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+ AccessExecutionJobVertex::getJobVertexId,
+ v -> v.getParallelism() + 1))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+ coordinator.triggerCheckpoint(false);
+ checkpointTimer.triggerAll();
+
+ int pendingBefore = coordinator.getNumberOfPendingCheckpoints();
+ assertThat(pendingBefore).isGreaterThan(0);
+
+ ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+ (ManuallyTriggeredComponentMainThreadExecutor) ctx.getMainThreadExecutor();
+ int scheduledBefore = ctxExecutor.getActiveNonPeriodicScheduledTask().size();
+ exec.requestActiveCheckpointTrigger();
+ // The in-progress check runs in the request path and must short-circuit before
+ // scheduling a retry — otherwise the test passes for the wrong reason (retry would
+ // also see in-progress and drop).
+ assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask())
+ .as(
+ "No retry should be scheduled when a checkpoint is already in progress at request time")
+ .hasSize(scheduledBefore);
+ checkpointTimer.triggerAll();
+ assertThat(coordinator.getNumberOfPendingCheckpoints())
+ .as(
+ "No additional checkpoint should be triggered when a checkpoint is already in progress")
+ .isEqualTo(pendingBefore);
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerFiresWhenAllGuardsPass() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new MockExecutionJobVertex(MockExecutionVertex::new);
+
+ ManuallyTriggeredScheduledExecutor checkpointTimer =
+ new ManuallyTriggeredScheduledExecutor();
+ CheckpointCoordinator coordinator =
+ new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setTimer(checkpointTimer)
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator getCheckpointCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public Iterable getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+ AccessExecutionJobVertex::getJobVertexId,
+ v -> v.getParallelism() + 1))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+
+ assertThat(coordinator.getNumberOfPendingCheckpoints()).isEqualTo(0);
+
+ ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+ (ManuallyTriggeredComponentMainThreadExecutor) ctx.getMainThreadExecutor();
+ exec.requestActiveCheckpointTrigger();
+ ctxExecutor.triggerNonPeriodicScheduledTasks();
+ checkpointTimer.triggerAll();
+ assertThat(coordinator.getNumberOfPendingCheckpoints()).isGreaterThan(0);
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerRespectsMinPauseBetweenCheckpoints() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new MockExecutionJobVertex(MockExecutionVertex::new);
+
+ ManuallyTriggeredScheduledExecutor checkpointTimer =
+ new ManuallyTriggeredScheduledExecutor();
+ ManualClock clock = new ManualClock();
+ CheckpointCoordinatorConfiguration coordConfig =
+ new CheckpointCoordinatorConfiguration
+ .CheckpointCoordinatorConfigurationBuilder()
+ .setCheckpointInterval(10_000L)
+ .setMinPauseBetweenCheckpoints(10_000L)
+ .setMaxConcurrentCheckpoints(Integer.MAX_VALUE)
+ .build();
+
+ CheckpointCoordinator coordinator =
+ new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setCheckpointCoordinatorConfiguration(coordConfig)
+ .setTimer(checkpointTimer)
+ .setClock(clock)
+ .build(EXECUTOR_EXTENSION.getExecutor());
+
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator getCheckpointCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public Iterable getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+ AccessExecutionJobVertex::getJobVertexId,
+ v -> v.getParallelism() + 1))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+ ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+ (ManuallyTriggeredComponentMainThreadExecutor) ctx.getMainThreadExecutor();
+ exec.requestActiveCheckpointTrigger();
+ ctxExecutor.triggerNonPeriodicScheduledTasks();
+ checkpointTimer.triggerAll();
+ assertThat(coordinator.getNumberOfPendingCheckpoints()).isEqualTo(0);
+ clock.advanceTime(10_000L, java.util.concurrent.TimeUnit.MILLISECONDS);
+ exec.requestActiveCheckpointTrigger();
+ ctxExecutor.triggerNonPeriodicScheduledTasks();
+ checkpointTimer.triggerAll();
+ assertThat(coordinator.getNumberOfPendingCheckpoints()).isEqualTo(1);
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerDeduplicatesScheduledRetries() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new MockExecutionJobVertex(MockExecutionVertex::new);
+
+ ManuallyTriggeredScheduledExecutor checkpointTimer =
+ new ManuallyTriggeredScheduledExecutor();
+ ManualClock clock = new ManualClock();
+ CheckpointCoordinatorConfiguration coordConfig =
+ new CheckpointCoordinatorConfiguration
+ .CheckpointCoordinatorConfigurationBuilder()
+ .setCheckpointInterval(10_000L)
+ .setMinPauseBetweenCheckpoints(10_000L)
+ .setMaxConcurrentCheckpoints(Integer.MAX_VALUE)
+ .build();
+
+ CheckpointCoordinator coordinator =
+ new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setCheckpointCoordinatorConfiguration(coordConfig)
+ .setTimer(checkpointTimer)
+ .setClock(clock)
+ .build(EXECUTOR_EXTENSION.getExecutor());
+
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator getCheckpointCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public Iterable getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+ AccessExecutionJobVertex::getJobVertexId,
+ v -> v.getParallelism() + 1))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+ ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+ (ManuallyTriggeredComponentMainThreadExecutor) ctx.getMainThreadExecutor();
+ int baseline = ctxExecutor.getActiveNonPeriodicScheduledTask().size();
+
+ // Call requestActiveCheckpointTrigger multiple times while min-pause is not satisfied.
+ // Only one delayed retry should be scheduled (dedup via
+ // activeCheckpointTriggerScheduled
+ // flag).
+ exec.requestActiveCheckpointTrigger();
+ exec.requestActiveCheckpointTrigger();
+ exec.requestActiveCheckpointTrigger();
+
+ assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask()).hasSize(baseline + 1);
+
+ // Advance clock past the min-pause and fire the single scheduled retry.
+ clock.advanceTime(10_000L, java.util.concurrent.TimeUnit.MILLISECONDS);
+ ctxExecutor.triggerNonPeriodicScheduledTasks();
+ checkpointTimer.triggerAll();
+ assertThat(coordinator.getNumberOfPendingCheckpoints()).isEqualTo(1);
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerHandlesFailureGracefully() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new MockExecutionJobVertex(MockExecutionVertex::new);
+ ManuallyTriggeredScheduledExecutor checkpointTimer =
+ new ManuallyTriggeredScheduledExecutor();
+ CheckpointCoordinator coordinator =
+ new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setTimer(checkpointTimer)
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ coordinator.shutdown();
+
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator getCheckpointCoordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public Iterable getVerticesTopologically() {
+ return Collections.singletonList(mejv);
+ }
+ };
+ ctx.setVertexParallelism(
+ new VertexParallelism(
+ graph.getAllVertices().values().stream()
+ .collect(
+ Collectors.toMap(
+ AccessExecutionJobVertex::getJobVertexId,
+ v -> v.getParallelism() + 1))));
+
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(true)
+ .build(ctx);
+ ManuallyTriggeredComponentMainThreadExecutor ctxExecutor =
+ (ManuallyTriggeredComponentMainThreadExecutor) ctx.getMainThreadExecutor();
+ ctxExecutor.triggerNonPeriodicScheduledTasks();
+ int baseline = ctxExecutor.getActiveNonPeriodicScheduledTask().size();
+ exec.requestActiveCheckpointTrigger();
+ ctxExecutor.triggerNonPeriodicScheduledTasks();
+ checkpointTimer.triggerAll();
+
+ assertThat(coordinator.getNumberOfPendingCheckpoints()).isEqualTo(0);
+ assertThat(ctxExecutor.getActiveNonPeriodicScheduledTask()).hasSize(baseline);
+ }
+ }
+
@Test
void testJobInformationMethods() throws Exception {
try (MockExecutingContext ctx = new MockExecutingContext()) {
@@ -691,6 +1255,7 @@ private final class ExecutingStateBuilder {
private Function
stateTransitionManagerFactory = context -> TestingStateTransitionManager.withNoOp();
private int rescaleOnFailedCheckpointCount = 1;
+ private boolean activeCheckpointTriggerEnabled = false;
private ExecutingStateBuilder() throws JobException, JobExecutionException {
operatorCoordinatorHandler = new TestingOperatorCoordinatorHandler();
@@ -720,6 +1285,12 @@ public ExecutingStateBuilder setRescaleOnFailedCheckpointCount(
return this;
}
+ public ExecutingStateBuilder setActiveCheckpointTriggerEnabled(
+ boolean activeCheckpointTriggerEnabled) {
+ this.activeCheckpointTriggerEnabled = activeCheckpointTriggerEnabled;
+ return this;
+ }
+
private Executing build(MockExecutingContext ctx) {
executionGraph.transitionToRunning();
@@ -733,7 +1304,8 @@ private Executing build(MockExecutingContext ctx) {
ClassLoader.getSystemClassLoader(),
new ArrayList<>(),
stateTransitionManagerFactory::apply,
- rescaleOnFailedCheckpointCount);
+ rescaleOnFailedCheckpointCount,
+ activeCheckpointTriggerEnabled);
} finally {
Preconditions.checkState(
!ctx.hadStateTransition,
@@ -1029,6 +1601,12 @@ public boolean updateState(TaskExecutionStateTransition state) {
public Iterable getVerticesTopologically() {
return getVerticesTopologicallySupplier.get();
}
+
+ @Nullable
+ @Override
+ public CheckpointCoordinator getCheckpointCoordinator() {
+ return null;
+ }
}
private static class FinishingMockExecutionGraph extends StateTrackingMockExecutionGraph {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java
index 6a24600f1ace1..dcc9ceefba776 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java
@@ -175,4 +175,93 @@ void testRescaleOnCheckpoint(
restClusterClient.cancel(jobGraph.getJobID()).join();
}
}
+
+ @Test
+ void testRescaleWithActiveCheckpointTrigger(
+ @InjectMiniCluster MiniCluster miniCluster,
+ @InjectClusterClient RestClusterClient> restClusterClient)
+ throws Exception {
+ final Configuration config = new Configuration();
+ config.set(JobManagerOptions.SCHEDULER_RESCALE_TRIGGER_ACTIVE_CHECKPOINT_ENABLED, true);
+
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.setParallelism(BEFORE_RESCALE_PARALLELISM);
+ env.enableCheckpointing(Duration.ofHours(24).toMillis());
+ env.fromSequence(0, Integer.MAX_VALUE).sinkTo(new DiscardingSink<>());
+
+ final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ final Iterator jobVertexIterator = jobGraph.getVertices().iterator();
+ assertThat(jobVertexIterator.hasNext()).isTrue();
+ final JobVertexID jobVertexId = jobVertexIterator.next().getID();
+
+ final JobResourceRequirements jobResourceRequirements =
+ JobResourceRequirements.newBuilder()
+ .setParallelismForJobVertex(jobVertexId, 1, AFTER_RESCALE_PARALLELISM)
+ .build();
+
+ restClusterClient.submitJob(jobGraph).join();
+
+ final JobID jobId = jobGraph.getJobID();
+ try {
+ LOG.info(
+ "Waiting for job {} to reach parallelism of {} for vertex {}.",
+ jobId,
+ BEFORE_RESCALE_PARALLELISM,
+ jobVertexId);
+ waitForRunningTasks(restClusterClient, jobId, BEFORE_RESCALE_PARALLELISM);
+
+ LOG.info(
+ "Updating job {} resource requirements: parallelism {} -> {}.",
+ jobId,
+ BEFORE_RESCALE_PARALLELISM,
+ AFTER_RESCALE_PARALLELISM);
+ restClusterClient.updateJobResourceRequirements(jobId, jobResourceRequirements).join();
+ LOG.info(
+ "Waiting for job {} to rescale to parallelism {} via active checkpoint trigger.",
+ jobId,
+ AFTER_RESCALE_PARALLELISM);
+ waitForRunningTasks(restClusterClient, jobId, AFTER_RESCALE_PARALLELISM);
+ final int expectedFreeSlotCount = NUMBER_OF_SLOTS - AFTER_RESCALE_PARALLELISM;
+ LOG.info(
+ "Waiting for {} slot(s) to become available after scale down.",
+ expectedFreeSlotCount);
+ waitForAvailableSlots(restClusterClient, expectedFreeSlotCount);
+ } finally {
+ restClusterClient.cancel(jobGraph.getJobID()).join();
+ }
+ }
+
+ @Test
+ void testNoRescaleWithoutCheckpointingConfigured(
+ @InjectMiniCluster MiniCluster miniCluster,
+ @InjectClusterClient RestClusterClient> restClusterClient)
+ throws Exception {
+ final Configuration config = new Configuration();
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.setParallelism(BEFORE_RESCALE_PARALLELISM);
+ env.fromSequence(0, Integer.MAX_VALUE).sinkTo(new DiscardingSink<>());
+
+ final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ final Iterator jobVertexIterator = jobGraph.getVertices().iterator();
+ assertThat(jobVertexIterator.hasNext()).isTrue();
+ final JobVertexID jobVertexId = jobVertexIterator.next().getID();
+
+ final JobResourceRequirements jobResourceRequirements =
+ JobResourceRequirements.newBuilder()
+ .setParallelismForJobVertex(jobVertexId, 1, AFTER_RESCALE_PARALLELISM)
+ .build();
+ restClusterClient.submitJob(jobGraph).join();
+ final JobID jobId = jobGraph.getJobID();
+ try {
+ waitForRunningTasks(restClusterClient, jobId, BEFORE_RESCALE_PARALLELISM);
+ restClusterClient.updateJobResourceRequirements(jobId, jobResourceRequirements).join();
+ Thread.sleep(REQUIREMENT_UPDATE_TO_CHECKPOINT_GAP.toMillis());
+ waitForRunningTasks(restClusterClient, jobId, BEFORE_RESCALE_PARALLELISM);
+ LOG.info("Verified: job {} did not rescale without checkpointing configured.", jobId);
+ } finally {
+ restClusterClient.cancel(jobGraph.getJobID()).join();
+ }
+ }
}