From d12039963b46653309109325ccb9357ea24739fc Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sun, 28 Jun 2026 18:16:16 -0700 Subject: [PATCH 1/3] fix(amber): order worker state by version, not timestamp Worker state was reconciled at the controller by last-nanoTime-wins. A fast source's startWorker response carries a stale RUNNING snapshot that can arrive after COMPLETED was recorded, clobbering it and leaving the operator stuck orange. Order state causally instead: WorkerStateManager now stamps every transition with a monotonic per-worker version, carried on all state reports (WorkerStateResponse, WorkerStateUpdatedRequest, WorkerMetrics). The controller applies a state only if its version is newer, and treats COMPLETED/TERMINATED as absorbing. Stats keep timestamp ordering. Closes #6010 --- .../architecture/rpc/controlcommands.proto | 2 + .../architecture/rpc/controlreturns.proto | 2 + .../architecture/worker/statistics.proto | 3 + .../promisehandlers/PauseHandler.scala | 7 +- .../QueryWorkerStatisticsHandler.scala | 4 +- .../promisehandlers/ResumeHandler.scala | 2 +- .../WorkerStateUpdatedHandler.scala | 2 +- .../layer/WorkerExecution.scala | 70 +++++++----- .../RegionExecutionCoordinator.scala | 9 +- .../architecture/worker/DataProcessor.scala | 5 +- .../worker/promisehandlers/PauseHandler.scala | 2 +- .../QueryStatisticsHandler.scala | 8 +- .../promisehandlers/ResumeHandler.scala | 2 +- .../worker/promisehandlers/StartHandler.scala | 4 +- .../common/statetransition/StateManager.scala | 11 ++ .../execution/OperatorExecutionSpec.scala | 13 ++- .../layer/WorkerExecutionSpec.scala | 103 ++++++++++++++---- .../RegionCoordinatorTestSupport.scala | 3 +- .../WorkerStateManagerSpec.scala | 32 ++++++ 19 files changed, 210 insertions(+), 74 deletions(-) diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto index 1f55927e4ae..b4913932a82 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto @@ -166,6 +166,8 @@ message PortCompletedRequest { message WorkerStateUpdatedRequest { worker.WorkerState state = 1 [(scalapb.field).no_box = true]; + // Monotonic per-worker version of state, for causal ordering by the controller. + int64 state_version = 2; } message LinkWorkersRequest { diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto index 43613b5cfdc..563c3ee1dd7 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto @@ -134,6 +134,8 @@ message StartWorkflowResponse { message WorkerStateResponse { worker.WorkerState state = 1 [(scalapb.field).no_box = true]; + // Monotonic per-worker version of state, for causal ordering by the controller. + int64 state_version = 2; } message WorkerMetricsResponse { diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/worker/statistics.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/worker/statistics.proto index 85d1fcf4aaa..b75a1d60fbb 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/worker/statistics.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/worker/statistics.proto @@ -60,4 +60,7 @@ message WorkerStatistics { message WorkerMetrics { WorkerState worker_state = 1 [(scalapb.field).no_box = true]; WorkerStatistics worker_statistics = 2 [(scalapb.field).no_box = true]; + // Monotonic per-worker version of worker_state, used by the controller to order + // state reports causally instead of by wall-clock timestamp. See WorkerStateManager. + int64 state_version = 3; } \ No newline at end of file diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala index 35a85f56ae9..eb13d92a0d3 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala @@ -68,13 +68,16 @@ trait PauseHandler { // send a pause message workerInterface.pauseWorker(EmptyRequest(), mkContext(worker)).flatMap { resp => - workerExecution.update(System.nanoTime(), resp.state) + workerExecution.updateState(resp.stateVersion, resp.state) workerInterface .queryStatistics(EmptyRequest(), mkContext(worker)) // get the stats and current input tuple from the worker .map { case WorkerMetricsResponse(metrics) => - workerExecution.update(System.nanoTime(), metrics.workerStatistics) + workerExecution.updateStats( + System.nanoTime(), + metrics.workerStatistics + ) } } }.toSeq diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala index 1f22fa5b368..293b78722e6 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala @@ -175,7 +175,9 @@ trait QueryWorkerStatisticsHandler { processLayers(layers).map { _ => collectedResults.foreach { case (wExec, resp, timestamp) => - wExec.update(timestamp, resp.metrics.workerState, resp.metrics.workerStatistics) + // State is ordered by the worker's logical version; stats by receipt time. + wExec.updateState(resp.metrics.stateVersion, resp.metrics.workerState) + wExec.updateStats(timestamp, resp.metrics.workerStatistics) } forwardStats(msg.updateTarget) // Record the completion timestamp before releasing the lock so that any timer diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala index c94ba91c205..f1c173b9ca1 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala @@ -52,7 +52,7 @@ trait ResumeHandler { cp.workflowExecution .getLatestOperatorExecution(VirtualIdentityUtils.getPhysicalOpId(workerId)) .getWorkerExecution(workerId) - .update(System.nanoTime(), resp.state) + .updateState(resp.stateVersion, resp.state) } } .toSeq diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala index 5ee98a4918d..7226d93e0c7 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala @@ -49,7 +49,7 @@ trait WorkerStateUpdatedHandler { .find(_.hasOperatorExecution(physicalOpId)) .map(_.getOperatorExecution(physicalOpId)) .foreach(operatorExecution => - operatorExecution.getWorkerExecution(ctx.sender).update(System.nanoTime(), msg.state) + operatorExecution.getWorkerExecution(ctx.sender).updateState(msg.stateVersion, msg.state) ) val stats = cp.workflowExecution.getAllRegionExecutionsStats sendToClient(ExecutionStatsUpdate(stats)) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecution.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecution.scala index 55e1e309181..518beae21c4 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecution.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecution.scala @@ -21,7 +21,11 @@ package org.apache.texera.amber.engine.architecture.deploysemantics.layer import org.apache.texera.amber.core.workflow.PortIdentity import org.apache.texera.amber.engine.architecture.controller.execution.WorkerPortExecution -import org.apache.texera.amber.engine.architecture.worker.statistics.WorkerState.UNINITIALIZED +import org.apache.texera.amber.engine.architecture.worker.statistics.WorkerState.{ + COMPLETED, + TERMINATED, + UNINITIALIZED +} import org.apache.texera.amber.engine.architecture.worker.statistics.{WorkerState, WorkerStatistics} import scala.collection.mutable @@ -37,50 +41,56 @@ case class WorkerExecution() extends Serializable { private var stats: WorkerStatistics = { WorkerStatistics(Seq.empty, Seq.empty, 0, 0, 0) } - private var lastUpdateTimeStamp = 0L + // Logical version of the last applied state, sourced from the worker's + // WorkerStateManager. Starts below any real version so the first report applies. + private var lastStateVersion = -1L + // Wall-clock (controller-side nanoTime) of the last applied stats snapshot. + private var lastStatsTimeStamp = 0L + + private def isTerminal(s: WorkerState): Boolean = s == COMPLETED || s == TERMINATED /** - * Updates both the worker state and statistics if the provided timestamp is newer - * than the last recorded update timestamp. This ensures that only the most recent - * data is reflected in the execution state. + * Applies a worker state report, ordered causally by the worker's monotonic + * `stateVersion` rather than by wall-clock time. A report is applied only when it + * is strictly newer than the last applied one, so a stale state that arrives late + * (e.g. the RUNNING snapshot carried by a slow startWorker response) cannot clobber + * a newer state. In addition, terminal states (COMPLETED/TERMINATED) are absorbing: + * once reached, no later report can move the worker out of them. * - * @param timeStamp the nanosecond-timestamp of this update - * @param state the new WorkerState to set - * @param stats the new WorkerStatistics to set + * @param stateVersion the worker-side monotonic version of this state + * @param newState the reported WorkerState */ - def update(timeStamp: Long, state: WorkerState, stats: WorkerStatistics): Unit = { - if (this.lastUpdateTimeStamp < timeStamp) { - this.stats = stats - this.state = state - this.lastUpdateTimeStamp = timeStamp + def updateState(stateVersion: Long, newState: WorkerState): Unit = { + if (isTerminal(this.state)) { + return + } + if (this.lastStateVersion < stateVersion) { + this.state = newState + this.lastStateVersion = stateVersion } } /** - * Updates only the worker state if the provided timestamp is newer than the - * last recorded update timestamp. - * - * @param timeStamp the nanosecond-timestamp of this update - * @param state the new WorkerState to set + * Forces the worker into TERMINATED, e.g. when the controller kills a region. This + * still respects terminal-state absorption: a worker that already COMPLETED on its + * own is left as COMPLETED. Uses the maximum version so it wins over any in-flight + * non-terminal report for a worker that had not yet reached a terminal state. */ - def update(timeStamp: Long, state: WorkerState): Unit = { - if (this.lastUpdateTimeStamp < timeStamp) { - this.state = state - this.lastUpdateTimeStamp = timeStamp - } - } + def forceTerminate(): Unit = updateState(Long.MaxValue, TERMINATED) /** * Updates only the worker statistics if the provided timestamp is newer than the - * last recorded update timestamp. + * last recorded stats timestamp. Stats are monotonic snapshots, so newest-wins by + * wall-clock is sufficient (and necessary, since two snapshots taken within the same + * state share a state version). * * @param timeStamp the nanosecond-timestamp of this update - * @param stats the new WorkerStatistics to set + * @param newStats the new WorkerStatistics to set */ - def update(timeStamp: Long, stats: WorkerStatistics): Unit = { - if (this.lastUpdateTimeStamp < timeStamp) { - this.stats = stats - this.lastUpdateTimeStamp = timeStamp + def updateStats(timeStamp: Long, newStats: WorkerStatistics): Unit = { + if (this.lastStatsTimeStamp < timeStamp) { + this.stats = newStats + this.lastStatsTimeStamp = timeStamp } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 9e84e5e80be..4424cd4224b 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -51,7 +51,6 @@ import org.apache.texera.amber.engine.architecture.scheduling.config.{ ResourceConfig } import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings.Partitioning -import org.apache.texera.amber.engine.architecture.worker.statistics.WorkerState import org.apache.texera.amber.engine.common.AmberLogging import org.apache.texera.amber.engine.common.FutureBijection._ import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient @@ -211,7 +210,7 @@ class RegionExecutionCoordinator( regionExecution.getAllOperatorExecutions.foreach { case (_, opExec) => opExec.getWorkerIds.foreach { workerId => - opExec.getWorkerExecution(workerId).update(System.nanoTime(), WorkerState.TERMINATED) + opExec.getWorkerExecution(workerId).forceTerminate() } } Future.Unit // propagate success @@ -579,12 +578,14 @@ class RegionExecutionCoordinator( asyncRPCClient.workerInterface .startWorker(EmptyRequest(), asyncRPCClient.mkContext(workerId)) .map(resp => - // update worker state + // Update worker state, ordered by the worker's logical state version + // (not arrival time) so this RUNNING snapshot cannot clobber a later + // COMPLETED if the response arrives after the worker has finished. workflowExecution .getRegionExecution(region.id) .getOperatorExecution(opId) .getWorkerExecution(workerId) - .update(System.nanoTime(), resp.state) + .updateState(resp.stateVersion, resp.state) ) } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala index 84f1e8ec659..d88bcedc752 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala @@ -213,7 +213,10 @@ class DataProcessor( RUNNING, () => { asyncRPCClient.controllerInterface.workerStateUpdated( - WorkerStateUpdatedRequest(stateManager.getCurrentState), + WorkerStateUpdatedRequest( + stateManager.getCurrentState, + stateManager.getStateVersion + ), asyncRPCClient.mkContext(CONTROLLER) ) } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/PauseHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/PauseHandler.scala index cec7ca87e63..1b4b8493e42 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/PauseHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/PauseHandler.scala @@ -46,7 +46,7 @@ trait PauseHandler { dp.pauseManager.pause(UserPause) dp.stateManager.transitTo(PAUSED) } - WorkerStateResponse(dp.stateManager.getCurrentState) + WorkerStateResponse(dp.stateManager.getCurrentState, dp.stateManager.getStateVersion) } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/QueryStatisticsHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/QueryStatisticsHandler.scala index 74d8d14faa6..d9153fb0ade 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/QueryStatisticsHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/QueryStatisticsHandler.scala @@ -35,7 +35,13 @@ trait QueryStatisticsHandler { request: EmptyRequest, ctx: AsyncRPCContext ): Future[WorkerMetricsResponse] = { - WorkerMetricsResponse(WorkerMetrics(dp.stateManager.getCurrentState, dp.collectStatistics())) + WorkerMetricsResponse( + WorkerMetrics( + dp.stateManager.getCurrentState, + dp.collectStatistics(), + dp.stateManager.getStateVersion + ) + ) } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/ResumeHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/ResumeHandler.scala index 434c50c914c..cc368a339dd 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/ResumeHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/ResumeHandler.scala @@ -43,7 +43,7 @@ trait ResumeHandler { dp.stateManager.transitTo(RUNNING) dp.adaptiveBatchingMonitor.resumeAdaptiveBatching() } - WorkerStateResponse(dp.stateManager.getCurrentState) + WorkerStateResponse(dp.stateManager.getCurrentState, dp.stateManager.getStateVersion) } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/StartHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/StartHandler.scala index 5d1bf8ccbde..46d43920f65 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/StartHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/StartHandler.scala @@ -56,12 +56,12 @@ trait StartHandler { dp.inputGateway.getChannel(channelId).setPortId(PortIdentity()) startChannel(request, ctx) endChannel(request, ctx) - WorkerStateResponse(dp.stateManager.getCurrentState) + WorkerStateResponse(dp.stateManager.getCurrentState, dp.stateManager.getStateVersion) } else if (dp.inputManager.getInputPortReaderThreads.nonEmpty) { // This means the worker should read from materialized storage for its input ports. // Start the reader threads dp.inputManager.startInputPortReaderThreads() - WorkerStateResponse(dp.stateManager.getCurrentState) + WorkerStateResponse(dp.stateManager.getCurrentState, dp.stateManager.getStateVersion) } else { throw new WorkflowRuntimeException( s"non-source worker $actorId received unexpected StartWorker!" diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/common/statetransition/StateManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/common/statetransition/StateManager.scala index 9bd951389ea..f516342b7e8 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/common/statetransition/StateManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/common/statetransition/StateManager.scala @@ -43,6 +43,16 @@ class StateManager[T]( private var currentState: T = initialState + // Monotonically increasing version, bumped on every successful state transition. + // It is the state machine's logical clock: because a single owner drives all + // transitions, the version totally orders them in causal order. Reporting this + // version alongside the state lets remote observers (e.g. the controller) reject + // stale state reports that arrive out of order, without relying on wall-clock + // timestamps that cannot be compared across processes. + private var stateVersion: Long = 0L + + def getStateVersion: Long = stateVersion + def assertState(state: T): Unit = { if (currentState != state) { throw InvalidStateException( @@ -84,6 +94,7 @@ class StateManager[T]( throw InvalidTransitionException(s"cannot transit from $currentState to $state", actorId) } currentState = state + stateVersion += 1 } } diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/OperatorExecutionSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/OperatorExecutionSpec.scala index e8b85391eab..704b4f336d7 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/OperatorExecutionSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/OperatorExecutionSpec.scala @@ -42,10 +42,10 @@ class OperatorExecutionSpec extends AnyFlatSpec { PortTupleMetricsMapping(PortIdentity(portIdx), TupleMetrics(count, size)) /** - * Push `(state, stats)` onto an existing `WorkerExecution`. Production - * code applies updates only if the timestamp is newer than the - * previously-recorded one; we use a monotonically increasing nano-clock - * surrogate so each call wins. + * Push `(state, stats)` onto an existing `WorkerExecution`. Production code + * orders state by a monotonic logical version and stats by timestamp, applying + * an update only when it is newer; we use a single monotonically increasing + * surrogate for both so each call wins. */ private var clock: Long = 0L private def applyUpdate( @@ -54,14 +54,15 @@ class OperatorExecutionSpec extends AnyFlatSpec { stats: WorkerStatistics ): Unit = { clock += 1 - worker.update(clock, state, stats) + worker.updateState(clock, state) + worker.updateStats(clock, stats) } private def setState( worker: org.apache.texera.amber.engine.architecture.deploysemantics.layer.WorkerExecution, state: WorkerState ): Unit = { clock += 1 - worker.update(clock, state) + worker.updateState(clock, state) } // --------------------------------------------------------------------------- diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecutionSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecutionSpec.scala index 2c39954d047..1c3685a5b03 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecutionSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecutionSpec.scala @@ -36,51 +36,110 @@ class WorkerExecutionSpec extends AnyFlatSpec { assert(we.getStats.controlProcessingTime == 0L) } - "WorkerExecution.update(state)" should "apply when the timestamp is newer" in { + // ---- state ordering by logical version ---- + + "WorkerExecution.updateState" should "apply a state with a newer version" in { val we = WorkerExecution() - we.update(timeStamp = 10L, state = WorkerState.RUNNING) + we.updateState(stateVersion = 1L, WorkerState.READY) + we.updateState(stateVersion = 2L, WorkerState.RUNNING) assert(we.getState == WorkerState.RUNNING) } - it should "ignore updates with a non-newer timestamp" in { + it should "apply the very first report (version 0)" in { val we = WorkerExecution() - we.update(timeStamp = 10L, state = WorkerState.RUNNING) - we.update(timeStamp = 10L, state = WorkerState.PAUSED) // not strictly newer - we.update(timeStamp = 5L, state = WorkerState.COMPLETED) // older - assert(we.getState == WorkerState.RUNNING) + we.updateState(stateVersion = 0L, WorkerState.READY) + assert(we.getState == WorkerState.READY) } - "WorkerExecution.update(state, stats)" should "update both atomically when newer" in { + it should "ignore a stale report whose version is lower (out-of-order arrival)" in { val we = WorkerExecution() - we.update(timeStamp = 10L, state = WorkerState.RUNNING, stats = stats(idle = 7L)) - assert(we.getState == WorkerState.RUNNING) - assert(we.getStats.idleTime == 7L) + we.updateState(stateVersion = 5L, WorkerState.PAUSED) + we.updateState(stateVersion = 4L, WorkerState.RUNNING) // arrives late, older version + assert(we.getState == WorkerState.PAUSED) } - it should "ignore updates with a non-newer timestamp" in { + it should "ignore a report whose version is not strictly newer" in { val we = WorkerExecution() - we.update(timeStamp = 10L, state = WorkerState.RUNNING, stats = stats(idle = 7L)) - we.update(timeStamp = 5L, state = WorkerState.COMPLETED, stats = stats(idle = 99L)) + we.updateState(stateVersion = 3L, WorkerState.RUNNING) + we.updateState(stateVersion = 3L, WorkerState.PAUSED) // same version assert(we.getState == WorkerState.RUNNING) - assert(we.getStats.idleTime == 7L) } - "WorkerExecution.update(stats)" should "update only the stats when newer" in { + // ---- terminal-state absorption ---- + + it should "treat COMPLETED as absorbing even against a higher-version report" in { + val we = WorkerExecution() + we.updateState(stateVersion = 2L, WorkerState.COMPLETED) + we.updateState(stateVersion = 99L, WorkerState.RUNNING) // higher version, but illegal + assert(we.getState == WorkerState.COMPLETED) + } + + it should "treat TERMINATED as absorbing" in { + val we = WorkerExecution() + we.updateState(stateVersion = 2L, WorkerState.TERMINATED) + we.updateState(stateVersion = 99L, WorkerState.RUNNING) + assert(we.getState == WorkerState.TERMINATED) + } + + // Regression for issue #6010: a fast source's startWorker response carries a stale + // RUNNING snapshot that can reach the controller AFTER COMPLETED was recorded. With + // wall-clock ordering the late RUNNING won and the operator was stuck orange; with + // version ordering (and terminal absorption) COMPLETED must survive. + it should "not let a late startWorker RUNNING snapshot clobber COMPLETED (#6010)" in { + val we = WorkerExecution() + we.updateState(stateVersion = 1L, WorkerState.READY) + we.updateState(stateVersion = 3L, WorkerState.COMPLETED) // via completion stats query + we.updateState(stateVersion = 2L, WorkerState.RUNNING) // late startWorker response + assert(we.getState == WorkerState.COMPLETED) + } + + // ---- forceTerminate ---- + + "WorkerExecution.forceTerminate" should "move a non-terminal worker to TERMINATED" in { + val we = WorkerExecution() + we.updateState(stateVersion = 2L, WorkerState.RUNNING) + we.forceTerminate() + assert(we.getState == WorkerState.TERMINATED) + } + + it should "leave a worker that already COMPLETED as COMPLETED" in { val we = WorkerExecution() - we.update(timeStamp = 10L, state = WorkerState.RUNNING, stats = stats(idle = 7L)) - we.update(timeStamp = 20L, stats = stats(idle = 42L)) + we.updateState(stateVersion = 3L, WorkerState.COMPLETED) + we.forceTerminate() + assert(we.getState == WorkerState.COMPLETED) + } + + // ---- stats ordering by timestamp ---- + + "WorkerExecution.updateStats" should "apply newer stats and keep state untouched" in { + val we = WorkerExecution() + we.updateState(stateVersion = 2L, WorkerState.RUNNING) + we.updateStats(timeStamp = 10L, stats(idle = 7L)) + we.updateStats(timeStamp = 20L, stats(idle = 42L)) assert(we.getState == WorkerState.RUNNING) assert(we.getStats.idleTime == 42L) } - it should "ignore stats updates with a non-newer timestamp" in { + it should "ignore stats with a non-newer timestamp" in { val we = WorkerExecution() - we.update(timeStamp = 20L, stats = stats(idle = 42L)) - we.update(timeStamp = 20L, stats = stats(idle = 99L)) // not strictly newer - we.update(timeStamp = 5L, stats = stats(idle = 0L)) // older + we.updateStats(timeStamp = 20L, stats(idle = 42L)) + we.updateStats(timeStamp = 20L, stats(idle = 99L)) // not strictly newer + we.updateStats(timeStamp = 5L, stats(idle = 0L)) // older assert(we.getStats.idleTime == 42L) } + it should "track state version and stats timestamp independently" in { + val we = WorkerExecution() + // A high stats timestamp must not block a later (higher-version) state update, + // and vice versa — the two orderings are independent. + we.updateStats(timeStamp = 1000L, stats(idle = 1L)) + we.updateState(stateVersion = 1L, WorkerState.RUNNING) + assert(we.getState == WorkerState.RUNNING) + assert(we.getStats.idleTime == 1L) + } + + // ---- port executions ---- + "WorkerExecution.getInputPortExecution" should "lazily create and reuse a port execution per port id" in { val we = WorkerExecution() val first = we.getInputPortExecution(PortIdentity(0)) diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala index 64d85972ffb..1fab5f06293 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala @@ -140,7 +140,8 @@ object RegionCoordinatorTestSupport { case InitializeExecutor | OpenExecutor => Some(EmptyReturn()) case StartWorker => - Some(WorkerStateResponse(WorkerState.RUNNING)) + // RUNNING is the worker's 2nd transition (UNINITIALIZED→READY→RUNNING). + Some(WorkerStateResponse(WorkerState.RUNNING, stateVersion = 2L)) case EndWorker => endWorkerResponse(call) case other => diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/common/statetransition/WorkerStateManagerSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/common/statetransition/WorkerStateManagerSpec.scala index 751c029ee4b..295756e93cc 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/common/statetransition/WorkerStateManagerSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/common/statetransition/WorkerStateManagerSpec.scala @@ -128,4 +128,36 @@ class WorkerStateManagerSpec extends AnyFlatSpec { sm.transitTo(COMPLETED) } } + + // -- State version (logical clock) -- + + it should "start the state version at 0" in { + assert(newManager(UNINITIALIZED).getStateVersion == 0L) + } + + it should "bump the state version on every successful transition" in { + val sm = newManager(UNINITIALIZED) + assert(sm.getStateVersion == 0L) + sm.transitTo(READY) + assert(sm.getStateVersion == 1L) + sm.transitTo(RUNNING) + assert(sm.getStateVersion == 2L) + sm.transitTo(COMPLETED) + assert(sm.getStateVersion == 3L) + } + + it should "not bump the state version on a no-op self-transition" in { + val sm = newManager(RUNNING) + val before = sm.getStateVersion + sm.transitTo(RUNNING) // no-op + assert(sm.getStateVersion == before) + } + + it should "not bump the state version when a transition is rejected" in { + val sm = newManager(UNINITIALIZED) + intercept[InvalidTransitionException] { + sm.transitTo(RUNNING) + } + assert(sm.getStateVersion == 0L) + } } From af62bb29026e3155bb141c4e593a4d942396ca0f Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sun, 28 Jun 2026 18:43:35 -0700 Subject: [PATCH 2/3] fix(amber): version Python worker state reports too MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The pyamber worker reports its state to the controller via start/pause/ resume responses and query-statistics metrics. Without a version these defaulted to 0, so after the first report the controller's version gate dropped every later state change — breaking reconfiguration, which waits to observe RUNNING -> PAUSED -> RUNNING (ReconfigurationIntegrationSpec timed out). Mirror the Scala change in pyamber: StateManager bumps a monotonic state version on each transition, and the four state-reporting handlers include it. --- .../handlers/control/pause_worker_handler.py | 4 ++- .../control/query_statistics_handler.py | 1 + .../handlers/control/resume_worker_handler.py | 4 ++- .../handlers/control/start_worker_handler.py | 5 +++- .../architecture/managers/state_manager.py | 15 +++++++++++ .../managers/test_state_manager.py | 27 +++++++++++++++++++ 6 files changed, 53 insertions(+), 3 deletions(-) diff --git a/amber/src/main/python/core/architecture/handlers/control/pause_worker_handler.py b/amber/src/main/python/core/architecture/handlers/control/pause_worker_handler.py index ef9188914e0..aee0188c45d 100644 --- a/amber/src/main/python/core/architecture/handlers/control/pause_worker_handler.py +++ b/amber/src/main/python/core/architecture/handlers/control/pause_worker_handler.py @@ -27,4 +27,6 @@ class PauseWorkerHandler(ControlHandler): async def pause_worker(self, req: EmptyRequest) -> WorkerStateResponse: self.context.pause_manager.pause(PauseType.USER_PAUSE) state = self.context.state_manager.get_current_state() - return WorkerStateResponse(state) + return WorkerStateResponse( + state, state_version=self.context.state_manager.get_state_version() + ) diff --git a/amber/src/main/python/core/architecture/handlers/control/query_statistics_handler.py b/amber/src/main/python/core/architecture/handlers/control/query_statistics_handler.py index b636249d714..c0bd0124ef2 100644 --- a/amber/src/main/python/core/architecture/handlers/control/query_statistics_handler.py +++ b/amber/src/main/python/core/architecture/handlers/control/query_statistics_handler.py @@ -30,5 +30,6 @@ async def query_statistics(self, req: EmptyRequest) -> WorkerMetricsResponse: metrics = WorkerMetrics( worker_state=self.context.state_manager.get_current_state(), worker_statistics=self.context.statistics_manager.get_statistics(), + state_version=self.context.state_manager.get_state_version(), ) return WorkerMetricsResponse(metrics) diff --git a/amber/src/main/python/core/architecture/handlers/control/resume_worker_handler.py b/amber/src/main/python/core/architecture/handlers/control/resume_worker_handler.py index 3ebaadb6611..fc2f898423a 100644 --- a/amber/src/main/python/core/architecture/handlers/control/resume_worker_handler.py +++ b/amber/src/main/python/core/architecture/handlers/control/resume_worker_handler.py @@ -27,4 +27,6 @@ class ResumeWorkerHandler(ControlHandler): async def resume_worker(self, req: EmptyRequest) -> WorkerStateResponse: self.context.pause_manager.resume(PauseType.USER_PAUSE) state = self.context.state_manager.get_current_state() - return WorkerStateResponse(state) + return WorkerStateResponse( + state, state_version=self.context.state_manager.get_state_version() + ) diff --git a/amber/src/main/python/core/architecture/handlers/control/start_worker_handler.py b/amber/src/main/python/core/architecture/handlers/control/start_worker_handler.py index bfa1556722f..7b9c58a0938 100644 --- a/amber/src/main/python/core/architecture/handlers/control/start_worker_handler.py +++ b/amber/src/main/python/core/architecture/handlers/control/start_worker_handler.py @@ -97,4 +97,7 @@ async def start_worker(self, req: EmptyRequest) -> WorkerStateResponse: elif self.context.input_manager.get_input_port_mat_reader_threads(): self.context.input_manager.start_input_port_mat_reader_threads() - return WorkerStateResponse(self.context.state_manager.get_current_state()) + return WorkerStateResponse( + self.context.state_manager.get_current_state(), + state_version=self.context.state_manager.get_state_version(), + ) diff --git a/amber/src/main/python/core/architecture/managers/state_manager.py b/amber/src/main/python/core/architecture/managers/state_manager.py index e80b6d5fc4f..bd8c75d8358 100644 --- a/amber/src/main/python/core/architecture/managers/state_manager.py +++ b/amber/src/main/python/core/architecture/managers/state_manager.py @@ -37,6 +37,13 @@ class StateManager: def __init__(self, state_transition_graph: Dict[T, Set[T]], initial_state: T): self._state_transition_graph = state_transition_graph self._current_state: T = initial_state + # Monotonically increasing version, bumped on every successful transition. + # It is the state machine's logical clock: since a single owner drives all + # transitions, the version totally orders them causally. Reporting it + # alongside the state lets the controller reject stale state reports that + # arrive out of order, without comparing wall-clock timestamps across + # processes. Must mirror the Scala StateManager. + self._state_version: int = 0 def assert_state(self, state: T) -> None: """ @@ -75,6 +82,7 @@ def transit_to(self, state: T) -> None: ) self._current_state = state + self._state_version += 1 def get_current_state(self) -> T: """ @@ -82,3 +90,10 @@ def get_current_state(self) -> T: :return: """ return self._current_state + + def get_state_version(self) -> int: + """ + Return the monotonic version of the current state. + :return: + """ + return self._state_version diff --git a/amber/src/test/python/core/architecture/managers/test_state_manager.py b/amber/src/test/python/core/architecture/managers/test_state_manager.py index 464c64194fc..fc68096c0e6 100644 --- a/amber/src/test/python/core/architecture/managers/test_state_manager.py +++ b/amber/src/test/python/core/architecture/managers/test_state_manager.py @@ -72,3 +72,30 @@ def test_it_can_transit_directly_from_ready_to_completed(self, state_manager): state_manager.transit_to(WorkerState.READY) state_manager.transit_to(WorkerState.COMPLETED) state_manager.assert_state(WorkerState.COMPLETED) + + def test_state_version_starts_at_zero(self, state_manager): + assert state_manager.get_state_version() == 0 + + def test_state_version_bumps_on_every_successful_transition(self, state_manager): + # The controller relies on this monotonic version to order Python-worker + # state reports causally; without it, RUNNING -> PAUSED -> RUNNING during + # reconfiguration would be dropped as stale. Mirrors the Scala StateManager. + assert state_manager.get_state_version() == 0 + state_manager.transit_to(WorkerState.READY) + assert state_manager.get_state_version() == 1 + state_manager.transit_to(WorkerState.RUNNING) + assert state_manager.get_state_version() == 2 + state_manager.transit_to(WorkerState.COMPLETED) + assert state_manager.get_state_version() == 3 + + def test_state_version_does_not_bump_on_noop_self_transition(self, state_manager): + state_manager.transit_to(WorkerState.READY) + before = state_manager.get_state_version() + state_manager.transit_to(WorkerState.READY) # no-op + assert state_manager.get_state_version() == before + + def test_state_version_does_not_bump_on_rejected_transition(self, state_manager): + # UNINITIALIZED -> RUNNING is illegal (must pass through READY). + with pytest.raises(InvalidTransitionException): + state_manager.transit_to(WorkerState.RUNNING) + assert state_manager.get_state_version() == 0 From ee1411a0945ac0578e1ea9d18f6cc836ea51475e Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Sun, 28 Jun 2026 19:03:19 -0700 Subject: [PATCH 3/3] test(amber): assert worker state_version in pyamber main_loop tests The pause/resume/query-statistics assertions compared full messages against literals that defaulted state_version to 0; read the actual version from the report (as the stats sizes already are) so the equality holds while StateManager tests cover the version itself. --- .../python/core/runnables/test_main_loop.py | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/amber/src/test/python/core/runnables/test_main_loop.py b/amber/src/test/python/core/runnables/test_main_loop.py index c32f45b8886..4c1c9e10564 100644 --- a/amber/src/test/python/core/runnables/test_main_loop.py +++ b/amber/src/test/python/core/runnables/test_main_loop.py @@ -692,6 +692,9 @@ def test_main_loop_thread_can_process_messages( control_processing_time=stats.control_processing_time, idle_time=stats.idle_time, ), + # version is the worker's logical state clock; read it from the actual + # report rather than pinning a brittle count (covered by StateManager tests). + state_version=worker_metrics_response.metrics.state_version, ) assert elem == DCMElement( @@ -1099,13 +1102,19 @@ def send_pause( output_queue, ): input_queue.put(mock_pause) - assert output_queue.get() == DCMElement( + elem = output_queue.get() + # version is the worker's logical state clock; read it from the actual + # report rather than pinning a brittle count (covered by StateManager tests). + state_version = elem.payload.return_invocation.return_value.worker_state_response.state_version + assert elem == DCMElement( tag=mock_control_output_channel, payload=DirectControlMessagePayloadV2( return_invocation=ReturnInvocation( command_id=command_sequence, return_value=ControlReturn( - worker_state_response=WorkerStateResponse(WorkerState.PAUSED) + worker_state_response=WorkerStateResponse( + WorkerState.PAUSED, state_version=state_version + ) ), ) ), @@ -1120,13 +1129,19 @@ def send_resume( output_queue, ): input_queue.put(mock_resume) - assert output_queue.get() == DCMElement( + elem = output_queue.get() + # version is the worker's logical state clock; read it from the actual + # report rather than pinning a brittle count (covered by StateManager tests). + state_version = elem.payload.return_invocation.return_value.worker_state_response.state_version + assert elem == DCMElement( tag=mock_control_output_channel, payload=DirectControlMessagePayloadV2( return_invocation=ReturnInvocation( command_id=command_sequence, return_value=ControlReturn( - worker_state_response=WorkerStateResponse(WorkerState.RUNNING) + worker_state_response=WorkerStateResponse( + WorkerState.RUNNING, state_version=state_version + ) ), ) ),