Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ message PortCompletedRequest {

message WorkerStateUpdatedRequest {
worker.WorkerState state = 1 [(scalapb.field).no_box = true];
// Monotonic per-worker version of state, for causal ordering by the controller.
int64 state_version = 2;
}

message LinkWorkersRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ message StartWorkflowResponse {

message WorkerStateResponse {
worker.WorkerState state = 1 [(scalapb.field).no_box = true];
// Monotonic per-worker version of state, for causal ordering by the controller.
int64 state_version = 2;
}

message WorkerMetricsResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,7 @@ message WorkerStatistics {
message WorkerMetrics {
WorkerState worker_state = 1 [(scalapb.field).no_box = true];
WorkerStatistics worker_statistics = 2 [(scalapb.field).no_box = true];
// Monotonic per-worker version of worker_state, used by the controller to order
// state reports causally instead of by wall-clock timestamp. See WorkerStateManager.
int64 state_version = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ class PauseWorkerHandler(ControlHandler):
async def pause_worker(self, req: EmptyRequest) -> WorkerStateResponse:
self.context.pause_manager.pause(PauseType.USER_PAUSE)
state = self.context.state_manager.get_current_state()
return WorkerStateResponse(state)
return WorkerStateResponse(
state, state_version=self.context.state_manager.get_state_version()
)
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ async def query_statistics(self, req: EmptyRequest) -> WorkerMetricsResponse:
metrics = WorkerMetrics(
worker_state=self.context.state_manager.get_current_state(),
worker_statistics=self.context.statistics_manager.get_statistics(),
state_version=self.context.state_manager.get_state_version(),
)
return WorkerMetricsResponse(metrics)
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ class ResumeWorkerHandler(ControlHandler):
async def resume_worker(self, req: EmptyRequest) -> WorkerStateResponse:
self.context.pause_manager.resume(PauseType.USER_PAUSE)
state = self.context.state_manager.get_current_state()
return WorkerStateResponse(state)
return WorkerStateResponse(
state, state_version=self.context.state_manager.get_state_version()
)
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,7 @@ async def start_worker(self, req: EmptyRequest) -> WorkerStateResponse:
elif self.context.input_manager.get_input_port_mat_reader_threads():
self.context.input_manager.start_input_port_mat_reader_threads()

return WorkerStateResponse(self.context.state_manager.get_current_state())
return WorkerStateResponse(
self.context.state_manager.get_current_state(),
state_version=self.context.state_manager.get_state_version(),
)
15 changes: 15 additions & 0 deletions amber/src/main/python/core/architecture/managers/state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ class StateManager:
def __init__(self, state_transition_graph: Dict[T, Set[T]], initial_state: T):
self._state_transition_graph = state_transition_graph
self._current_state: T = initial_state
# Monotonically increasing version, bumped on every successful transition.
# It is the state machine's logical clock: since a single owner drives all
# transitions, the version totally orders them causally. Reporting it
# alongside the state lets the controller reject stale state reports that
# arrive out of order, without comparing wall-clock timestamps across
# processes. Must mirror the Scala StateManager.
self._state_version: int = 0

def assert_state(self, state: T) -> None:
"""
Expand Down Expand Up @@ -75,10 +82,18 @@ def transit_to(self, state: T) -> None:
)

self._current_state = state
self._state_version += 1

def get_current_state(self) -> T:
"""
Return the current state.
:return:
"""
return self._current_state

def get_state_version(self) -> int:
"""
Return the monotonic version of the current state.
:return:
"""
return self._state_version
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,16 @@ trait PauseHandler {
// send a pause message
workerInterface.pauseWorker(EmptyRequest(), mkContext(worker)).flatMap {
resp =>
workerExecution.update(System.nanoTime(), resp.state)
workerExecution.updateState(resp.stateVersion, resp.state)
workerInterface
.queryStatistics(EmptyRequest(), mkContext(worker))
// get the stats and current input tuple from the worker
.map {
case WorkerMetricsResponse(metrics) =>
workerExecution.update(System.nanoTime(), metrics.workerStatistics)
workerExecution.updateStats(
System.nanoTime(),
metrics.workerStatistics
)
}
}
}.toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ trait QueryWorkerStatisticsHandler {
processLayers(layers).map { _ =>
collectedResults.foreach {
case (wExec, resp, timestamp) =>
wExec.update(timestamp, resp.metrics.workerState, resp.metrics.workerStatistics)
// State is ordered by the worker's logical version; stats by receipt time.
wExec.updateState(resp.metrics.stateVersion, resp.metrics.workerState)
wExec.updateStats(timestamp, resp.metrics.workerStatistics)
}
forwardStats(msg.updateTarget)
// Record the completion timestamp before releasing the lock so that any timer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ trait ResumeHandler {
cp.workflowExecution
.getLatestOperatorExecution(VirtualIdentityUtils.getPhysicalOpId(workerId))
.getWorkerExecution(workerId)
.update(System.nanoTime(), resp.state)
.updateState(resp.stateVersion, resp.state)
}
}
.toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ trait WorkerStateUpdatedHandler {
.find(_.hasOperatorExecution(physicalOpId))
.map(_.getOperatorExecution(physicalOpId))
.foreach(operatorExecution =>
operatorExecution.getWorkerExecution(ctx.sender).update(System.nanoTime(), msg.state)
operatorExecution.getWorkerExecution(ctx.sender).updateState(msg.stateVersion, msg.state)
)
val stats = cp.workflowExecution.getAllRegionExecutionsStats
sendToClient(ExecutionStatsUpdate(stats))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ package org.apache.texera.amber.engine.architecture.deploysemantics.layer

import org.apache.texera.amber.core.workflow.PortIdentity
import org.apache.texera.amber.engine.architecture.controller.execution.WorkerPortExecution
import org.apache.texera.amber.engine.architecture.worker.statistics.WorkerState.UNINITIALIZED
import org.apache.texera.amber.engine.architecture.worker.statistics.WorkerState.{
COMPLETED,
TERMINATED,
UNINITIALIZED
}
import org.apache.texera.amber.engine.architecture.worker.statistics.{WorkerState, WorkerStatistics}

import scala.collection.mutable
Expand All @@ -37,50 +41,56 @@ case class WorkerExecution() extends Serializable {
private var stats: WorkerStatistics = {
WorkerStatistics(Seq.empty, Seq.empty, 0, 0, 0)
}
private var lastUpdateTimeStamp = 0L
// Logical version of the last applied state, sourced from the worker's
// WorkerStateManager. Starts below any real version so the first report applies.
private var lastStateVersion = -1L
// Wall-clock (controller-side nanoTime) of the last applied stats snapshot.
private var lastStatsTimeStamp = 0L

private def isTerminal(s: WorkerState): Boolean = s == COMPLETED || s == TERMINATED

/**
* Updates both the worker state and statistics if the provided timestamp is newer
* than the last recorded update timestamp. This ensures that only the most recent
* data is reflected in the execution state.
* Applies a worker state report, ordered causally by the worker's monotonic
* `stateVersion` rather than by wall-clock time. A report is applied only when it
* is strictly newer than the last applied one, so a stale state that arrives late
* (e.g. the RUNNING snapshot carried by a slow startWorker response) cannot clobber
* a newer state. In addition, terminal states (COMPLETED/TERMINATED) are absorbing:
* once reached, no later report can move the worker out of them.
*
* @param timeStamp the nanosecond-timestamp of this update
* @param state the new WorkerState to set
* @param stats the new WorkerStatistics to set
* @param stateVersion the worker-side monotonic version of this state
* @param newState the reported WorkerState
*/
def update(timeStamp: Long, state: WorkerState, stats: WorkerStatistics): Unit = {
if (this.lastUpdateTimeStamp < timeStamp) {
this.stats = stats
this.state = state
this.lastUpdateTimeStamp = timeStamp
def updateState(stateVersion: Long, newState: WorkerState): Unit = {
if (isTerminal(this.state)) {
return
}
if (this.lastStateVersion < stateVersion) {
this.state = newState
this.lastStateVersion = stateVersion
}
}

/**
* Updates only the worker state if the provided timestamp is newer than the
* last recorded update timestamp.
*
* @param timeStamp the nanosecond-timestamp of this update
* @param state the new WorkerState to set
* Forces the worker into TERMINATED, e.g. when the controller kills a region. This
* still respects terminal-state absorption: a worker that already COMPLETED on its
* own is left as COMPLETED. Uses the maximum version so it wins over any in-flight
* non-terminal report for a worker that had not yet reached a terminal state.
*/
def update(timeStamp: Long, state: WorkerState): Unit = {
if (this.lastUpdateTimeStamp < timeStamp) {
this.state = state
this.lastUpdateTimeStamp = timeStamp
}
}
def forceTerminate(): Unit = updateState(Long.MaxValue, TERMINATED)

/**
* Updates only the worker statistics if the provided timestamp is newer than the
* last recorded update timestamp.
* last recorded stats timestamp. Stats are monotonic snapshots, so newest-wins by
* wall-clock is sufficient (and necessary, since two snapshots taken within the same
* state share a state version).
*
* @param timeStamp the nanosecond-timestamp of this update
* @param stats the new WorkerStatistics to set
* @param newStats the new WorkerStatistics to set
*/
def update(timeStamp: Long, stats: WorkerStatistics): Unit = {
if (this.lastUpdateTimeStamp < timeStamp) {
this.stats = stats
this.lastUpdateTimeStamp = timeStamp
def updateStats(timeStamp: Long, newStats: WorkerStatistics): Unit = {
if (this.lastStatsTimeStamp < timeStamp) {
this.stats = newStats
this.lastStatsTimeStamp = timeStamp
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import org.apache.texera.amber.engine.architecture.scheduling.config.{
ResourceConfig
}
import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings.Partitioning
import org.apache.texera.amber.engine.architecture.worker.statistics.WorkerState
import org.apache.texera.amber.engine.common.AmberLogging
import org.apache.texera.amber.engine.common.FutureBijection._
import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient
Expand Down Expand Up @@ -211,7 +210,7 @@ class RegionExecutionCoordinator(
regionExecution.getAllOperatorExecutions.foreach {
case (_, opExec) =>
opExec.getWorkerIds.foreach { workerId =>
opExec.getWorkerExecution(workerId).update(System.nanoTime(), WorkerState.TERMINATED)
opExec.getWorkerExecution(workerId).forceTerminate()
}
}
Future.Unit // propagate success
Expand Down Expand Up @@ -579,12 +578,14 @@ class RegionExecutionCoordinator(
asyncRPCClient.workerInterface
.startWorker(EmptyRequest(), asyncRPCClient.mkContext(workerId))
.map(resp =>
// update worker state
// Update worker state, ordered by the worker's logical state version
// (not arrival time) so this RUNNING snapshot cannot clobber a later
// COMPLETED if the response arrives after the worker has finished.
workflowExecution
.getRegionExecution(region.id)
.getOperatorExecution(opId)
.getWorkerExecution(workerId)
.update(System.nanoTime(), resp.state)
.updateState(resp.stateVersion, resp.state)
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ class DataProcessor(
RUNNING,
() => {
asyncRPCClient.controllerInterface.workerStateUpdated(
WorkerStateUpdatedRequest(stateManager.getCurrentState),
WorkerStateUpdatedRequest(
stateManager.getCurrentState,
stateManager.getStateVersion
),
asyncRPCClient.mkContext(CONTROLLER)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ trait PauseHandler {
dp.pauseManager.pause(UserPause)
dp.stateManager.transitTo(PAUSED)
}
WorkerStateResponse(dp.stateManager.getCurrentState)
WorkerStateResponse(dp.stateManager.getCurrentState, dp.stateManager.getStateVersion)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@ trait QueryStatisticsHandler {
request: EmptyRequest,
ctx: AsyncRPCContext
): Future[WorkerMetricsResponse] = {
WorkerMetricsResponse(WorkerMetrics(dp.stateManager.getCurrentState, dp.collectStatistics()))
WorkerMetricsResponse(
WorkerMetrics(
dp.stateManager.getCurrentState,
dp.collectStatistics(),
dp.stateManager.getStateVersion
)
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ trait ResumeHandler {
dp.stateManager.transitTo(RUNNING)
dp.adaptiveBatchingMonitor.resumeAdaptiveBatching()
}
WorkerStateResponse(dp.stateManager.getCurrentState)
WorkerStateResponse(dp.stateManager.getCurrentState, dp.stateManager.getStateVersion)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ trait StartHandler {
dp.inputGateway.getChannel(channelId).setPortId(PortIdentity())
startChannel(request, ctx)
endChannel(request, ctx)
WorkerStateResponse(dp.stateManager.getCurrentState)
WorkerStateResponse(dp.stateManager.getCurrentState, dp.stateManager.getStateVersion)
} else if (dp.inputManager.getInputPortReaderThreads.nonEmpty) {
// This means the worker should read from materialized storage for its input ports.
// Start the reader threads
dp.inputManager.startInputPortReaderThreads()
WorkerStateResponse(dp.stateManager.getCurrentState)
WorkerStateResponse(dp.stateManager.getCurrentState, dp.stateManager.getStateVersion)
} else {
throw new WorkflowRuntimeException(
s"non-source worker $actorId received unexpected StartWorker!"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ class StateManager[T](

private var currentState: T = initialState

// Monotonically increasing version, bumped on every successful state transition.
// It is the state machine's logical clock: because a single owner drives all
// transitions, the version totally orders them in causal order. Reporting this
// version alongside the state lets remote observers (e.g. the controller) reject
// stale state reports that arrive out of order, without relying on wall-clock
// timestamps that cannot be compared across processes.
private var stateVersion: Long = 0L

def getStateVersion: Long = stateVersion

def assertState(state: T): Unit = {
if (currentState != state) {
throw InvalidStateException(
Expand Down Expand Up @@ -84,6 +94,7 @@ class StateManager[T](
throw InvalidTransitionException(s"cannot transit from $currentState to $state", actorId)
}
currentState = state
stateVersion += 1
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,30 @@ def test_it_can_transit_directly_from_ready_to_completed(self, state_manager):
state_manager.transit_to(WorkerState.READY)
state_manager.transit_to(WorkerState.COMPLETED)
state_manager.assert_state(WorkerState.COMPLETED)

def test_state_version_starts_at_zero(self, state_manager):
assert state_manager.get_state_version() == 0

def test_state_version_bumps_on_every_successful_transition(self, state_manager):
# The controller relies on this monotonic version to order Python-worker
# state reports causally; without it, RUNNING -> PAUSED -> RUNNING during
# reconfiguration would be dropped as stale. Mirrors the Scala StateManager.
assert state_manager.get_state_version() == 0
state_manager.transit_to(WorkerState.READY)
assert state_manager.get_state_version() == 1
state_manager.transit_to(WorkerState.RUNNING)
assert state_manager.get_state_version() == 2
state_manager.transit_to(WorkerState.COMPLETED)
assert state_manager.get_state_version() == 3

def test_state_version_does_not_bump_on_noop_self_transition(self, state_manager):
state_manager.transit_to(WorkerState.READY)
before = state_manager.get_state_version()
state_manager.transit_to(WorkerState.READY) # no-op
assert state_manager.get_state_version() == before

def test_state_version_does_not_bump_on_rejected_transition(self, state_manager):
# UNINITIALIZED -> RUNNING is illegal (must pass through READY).
with pytest.raises(InvalidTransitionException):
state_manager.transit_to(WorkerState.RUNNING)
assert state_manager.get_state_version() == 0
Loading
Loading