Skip to content

fix(amber): order worker state by version, not timestamp#6011

Open
Yicong-Huang wants to merge 4 commits into
apache:mainfrom
Yicong-Huang:fix/worker-state-causal-order
Open

fix(amber): order worker state by version, not timestamp#6011
Yicong-Huang wants to merge 4 commits into
apache:mainfrom
Yicong-Huang:fix/worker-state-causal-order

Conversation

@Yicong-Huang

@Yicong-Huang Yicong-Huang commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this PR?

A fast source operator stayed orange (RUNNING) in the editor after the run finished (issue #6010).

Why it's a bug. The controller reconstructs each worker's state from several unordered channels (source RUNNING via the startWorker response, non-source RUNNING via workerStateUpdated, COMPLETED/PAUSED via queryStatistics/pauseWorker responses) and reconciled them with last-System.nanoTime()-wins in WorkerExecution. Worker state, however, is single-writer and strictly ordered causally. For a tiny source the run finishes almost instantly, so the startWorker response — carrying the stale RUNNING it sampled at launch — can reach the controller after COMPLETED was recorded; its later receipt timestamp wins and clobbers COMPLETED. Results render fine (separate path), so only the border is stuck.

This PR orders worker state causally instead of by wall clock:

Before After
WorkerExecution.update(nanoTime, state) — last-write-wins by receipt time updateState(version, state) — newest logical version wins
stale late RUNNING clobbers COMPLETED RUNNING (v2) < COMPLETED (v3) ⇒ COMPLETED kept
terminal states (COMPLETED/TERMINATED) are absorbing
  • WorkerStateManager bumps a monotonic stateVersion on every transitTo (its state-machine logical clock).
  • The version rides on every state report: WorkerStateResponse, WorkerStateUpdatedRequest, WorkerMetrics (3 new proto fields).
  • The controller applies a state only when its version is newer. Stats stay timestamp-ordered (monotonic snapshots can share a state version).
  • A single per-worker counter ⇒ no cross-process clock-sync assumptions (why not worker timestamps).
  • pyamber parity: the Python worker also reports state (start/pause/resume + query-statistics). Its StateManager now bumps the same monotonic version and the four handlers include it; otherwise version-0 reports would be dropped after the first and reconfiguration (RUNNING→PAUSED→RUNNING) would hang.

Any related issues, documentation, discussions?

Closes #6010. The timestamp-based update was introduced in #3557.

How was this PR tested?

JDK 17. Scala unit + Scala/Python integration + Python unit:

sbt "WorkflowExecutionService/testOnly \
  *WorkerExecutionSpec *WorkerStateManagerSpec *OperatorExecutionSpec \
  *RegionExecutionCoordinatorSpec *WorkflowExecutionCoordinatorSpec \
  *RegionExecutionSpec *WorkflowExecutionSpec"

# Scala-Python reconfiguration end-to-end (spawns Python UDF workers):
sbt "WorkflowExecutionService/testOnly *ReconfigurationIntegrationSpec"   # 3 passed

# Python unit:
cd amber && pytest src/test/python/core/architecture/managers/test_state_manager.py   # 9 passed
  • WorkerExecutionSpec: version ordering (positive + stale/equal-version negatives), terminal-state absorption, forceTerminate, independent stats-vs-state ordering, and a named regression for Fast source operator stays orange (RUNNING) after the workflow completes #6010 (COMPLETED survives a late RUNNING). Verified the regression goes red when updateState is reverted to last-write-wins.
  • WorkerStateManagerSpec / test_state_manager.py: version starts at 0, bumps per successful transition, no bump on no-op self-transition or rejected transition.
  • ReconfigurationIntegrationSpec reproduced the failure (timeout) before the pyamber fix and passes after.

Was this PR authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.8 (1M context), via Claude Code

Worker state was reconciled at the controller by last-nanoTime-wins.
A fast source's startWorker response carries a stale RUNNING snapshot
that can arrive after COMPLETED was recorded, clobbering it and leaving
the operator stuck orange.

Order state causally instead: WorkerStateManager now stamps every
transition with a monotonic per-worker version, carried on all state
reports (WorkerStateResponse, WorkerStateUpdatedRequest, WorkerMetrics).
The controller applies a state only if its version is newer, and treats
COMPLETED/TERMINATED as absorbing. Stats keep timestamp ordering.

Closes apache#6010
@github-actions

Copy link
Copy Markdown
Contributor

Automated Reviewer Suggestions

Based on the git blame history of the changed files, we recommend the following reviewers:

  • Contributors with relevant context: @aglinxinyuan, @kunwp1, @Ma77Ball
    You can notify them by mentioning @aglinxinyuan, @kunwp1, @Ma77Ball in a comment.

@codecov-commenter

codecov-commenter commented Jun 29, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 93.54839% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 56.58%. Comparing base (d10e1a2) to head (025ce39).

Files with missing lines Patch % Lines
...hitecture/handlers/control/start_worker_handler.py 0.00% 1 Missing ⚠️
...ecture/deploysemantics/layer/WorkerExecution.scala 91.66% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #6011      +/-   ##
============================================
- Coverage     56.74%   56.58%   -0.17%     
+ Complexity     3024     3011      -13     
============================================
  Files          1124     1120       -4     
  Lines         43593    43179     -414     
  Branches       4712     4657      -55     
============================================
- Hits          24739    24432     -307     
+ Misses        17386    17307      -79     
+ Partials       1468     1440      -28     
Flag Coverage Δ *Carryforward flag
access-control-service 70.00% <ø> (ø) Carriedforward from ce23c01
agent-service 44.59% <ø> (ø) Carriedforward from ce23c01
amber 58.57% <95.83%> (-0.03%) ⬇️
computing-unit-managing-service 0.00% <ø> (ø) Carriedforward from ce23c01
config-service 51.56% <ø> (-0.75%) ⬇️ Carriedforward from ce23c01
file-service 59.02% <ø> (-3.80%) ⬇️ Carriedforward from ce23c01
frontend 49.34% <ø> (-0.45%) ⬇️ Carriedforward from ce23c01
notebook-migration-service 78.57% <ø> (ø) Carriedforward from ce23c01
pyamber 90.23% <85.71%> (+0.03%) ⬆️
python 90.78% <ø> (+0.01%) ⬆️ Carriedforward from ce23c01
workflow-compiling-service 55.14% <ø> (ø) Carriedforward from ce23c01

*This pull request uses carry forward flags. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@github-actions

github-actions Bot commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

⚠️ Benchmark changes need a look

🟢 0 better · 🔴 2 worse · ⚪ 13 noise (<±5%) · 0 without baseline

Compared against main d10e1a2 benchmarked on this same runner, so the delta is largely free of cross-runner hardware noise. The "7d avg" column still reflects the gh-pages dashboard. Treat <±5% as noise unless repeated.

Dashboard · Run

config throughput MB/s latency max Δ latest / 7d
🔴 bs=10 sw=10 sl=64 380 0.232 25,398/37,589/37,589 us 🔴 +8.7% / 🔴 +149.4%
bs=100 sw=10 sl=64 802 0.49 123,400/139,584/139,584 us ⚪ within ±5% / 🔴 +28.2%
bs=1000 sw=10 sl=64 923 0.563 1,084,067/1,116,763/1,116,763 us ⚪ within ±5% / 🔴 +8.2%
Baseline details

Latest main d10e1a2 from same runner

config metric PR latest main 7d avg Δ latest Δ 7d
bs=10 sw=10 sl=64 throughput 380 tuples/sec 388 tuples/sec 770.82 tuples/sec -2.1% -50.7%
bs=10 sw=10 sl=64 MB/s 0.232 MB/s 0.237 MB/s 0.47 MB/s -2.1% -50.7%
bs=10 sw=10 sl=64 p50 25,398 us 24,455 us 12,723 us +3.9% +99.6%
bs=10 sw=10 sl=64 p95 37,589 us 34,568 us 15,070 us +8.7% +149.4%
bs=10 sw=10 sl=64 p99 37,589 us 34,568 us 18,429 us +8.7% +104.0%
bs=100 sw=10 sl=64 throughput 802 tuples/sec 808 tuples/sec 973.75 tuples/sec -0.7% -17.6%
bs=100 sw=10 sl=64 MB/s 0.49 MB/s 0.493 MB/s 0.594 MB/s -0.6% -17.6%
bs=100 sw=10 sl=64 p50 123,400 us 122,834 us 102,519 us +0.5% +20.4%
bs=100 sw=10 sl=64 p95 139,584 us 139,118 us 108,855 us +0.3% +28.2%
bs=100 sw=10 sl=64 p99 139,584 us 139,118 us 117,788 us +0.3% +18.5%
bs=1000 sw=10 sl=64 throughput 923 tuples/sec 900 tuples/sec 1,004 tuples/sec +2.6% -8.1%
bs=1000 sw=10 sl=64 MB/s 0.563 MB/s 0.549 MB/s 0.613 MB/s +2.6% -8.1%
bs=1000 sw=10 sl=64 p50 1,084,067 us 1,109,505 us 1,001,930 us -2.3% +8.2%
bs=1000 sw=10 sl=64 p95 1,116,763 us 1,156,428 us 1,042,923 us -3.4% +7.1%
bs=1000 sw=10 sl=64 p99 1,116,763 us 1,156,428 us 1,074,893 us -3.4% +3.9%
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,526.87,200,128000,380,0.232,25398.31,37589.38,37589.38
1,100,10,64,20,2492.67,2000,1280000,802,0.490,123400.40,139584.10,139584.10
2,1000,10,64,20,21671.42,20000,12800000,923,0.563,1084067.10,1116762.87,1116762.87

The pyamber worker reports its state to the controller via start/pause/
resume responses and query-statistics metrics. Without a version these
defaulted to 0, so after the first report the controller's version gate
dropped every later state change — breaking reconfiguration, which waits
to observe RUNNING -> PAUSED -> RUNNING (ReconfigurationIntegrationSpec
timed out).

Mirror the Scala change in pyamber: StateManager bumps a monotonic
state version on each transition, and the four state-reporting handlers
include it.

@aglinxinyuan aglinxinyuan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

The pause/resume/query-statistics assertions compared full messages
against literals that defaulted state_version to 0; read the actual
version from the report (as the stats sizes already are) so the
equality holds while StateManager tests cover the version itself.
@Yicong-Huang Yicong-Huang added release/v1.2 back porting to release/v1.2 and removed release/v1.2 back porting to release/v1.2 labels Jun 29, 2026
@Yicong-Huang Yicong-Huang requested a review from Copilot June 29, 2026 05:30

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes a race in Amber’s controller-side worker-state reconstruction where late-arriving RUNNING snapshots (e.g., from startWorker) could overwrite a newer terminal state (COMPLETED), causing operators to remain visually “RUNNING” after completion. The PR replaces receipt-time ordering with causal ordering via a per-worker logical stateVersion reported by the worker state machine, and makes terminal states absorbing.

Changes:

  • Introduce a monotonic per-worker stateVersion (Scala + Python) and propagate it through RPC/proto messages that report state.
  • Update controller-side reconciliation to apply worker state only when stateVersion is strictly newer; keep worker statistics ordered by timestamp independently.
  • Add/adjust Scala and Python tests to cover version ordering, stale/equal-version rejection, terminal absorption, and regression for #6010.

Reviewed changes

Copilot reviewed 26 out of 26 changed files in this pull request and generated no comments.

Show a summary per file
File Description
amber/src/main/scala/org/apache/texera/amber/engine/common/statetransition/StateManager.scala Adds a monotonic stateVersion logical clock bumped on successful transitions.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecution.scala Replaces timestamp-based state updates with updateState(stateVersion, state); adds terminal-state absorption and separate updateStats.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala Applies startWorker state using stateVersion; switches termination path to forceTerminate().
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala Orders pushed state updates by stateVersion instead of controller receipt time.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala Applies resume responses using stateVersion.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala Applies pause responses using stateVersion; updates stats via updateStats.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala Splits metrics application into updateState(stateVersion, ...) and updateStats(timestamp, ...).
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/StartHandler.scala Includes stateVersion in WorkerStateResponse for start responses.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/ResumeHandler.scala Includes stateVersion in WorkerStateResponse for resume responses.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/PauseHandler.scala Includes stateVersion in WorkerStateResponse for pause responses.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/QueryStatisticsHandler.scala Includes stateVersion in WorkerMetrics returned by queryStatistics.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala Includes stateVersion in WorkerStateUpdatedRequest push events.
amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/worker/statistics.proto Adds state_version to WorkerMetrics for causal state ordering.
amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto Adds state_version to WorkerStateResponse.
amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto Adds state_version to WorkerStateUpdatedRequest.
amber/src/main/python/core/architecture/managers/state_manager.py Adds a monotonic _state_version and get_state_version(); bumps on transitions.
amber/src/main/python/core/architecture/handlers/control/start_worker_handler.py Returns WorkerStateResponse with state_version.
amber/src/main/python/core/architecture/handlers/control/pause_worker_handler.py Returns WorkerStateResponse with state_version.
amber/src/main/python/core/architecture/handlers/control/resume_worker_handler.py Returns WorkerStateResponse with state_version.
amber/src/main/python/core/architecture/handlers/control/query_statistics_handler.py Includes state_version in WorkerMetrics.
amber/src/test/scala/org/apache/texera/amber/engine/common/statetransition/WorkerStateManagerSpec.scala Adds unit tests for initial version, bumping rules, and no-op/rejected transitions.
amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionCoordinatorTestSupport.scala Updates test stub StartWorker response to include stateVersion.
amber/src/test/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecutionSpec.scala Adds coverage for version-based ordering, terminal absorption, and #6010 regression.
amber/src/test/scala/org/apache/texera/amber/engine/architecture/controller/execution/OperatorExecutionSpec.scala Updates test helper to use updateState + updateStats with a monotonic surrogate.
amber/src/test/python/core/architecture/managers/test_state_manager.py Adds Python unit tests mirroring Scala version semantics.
amber/src/test/python/core/runnables/test_main_loop.py Updates expectations to carry through the reported state_version instead of hardcoding counts.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@xuang7 xuang7 added the release/v1.2 back porting to release/v1.2 label Jun 29, 2026
@xuang7

xuang7 commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

Hi @Yicong-Huang, it seems this PR cannot be directly backported. Do we still need to backport this change? I noticed that OperatorExecutionSpec.scala does not exist on the release branch.

@xuang7 xuang7 removed the release/v1.2 back porting to release/v1.2 label Jun 29, 2026
@Yicong-Huang

Copy link
Copy Markdown
Contributor Author

I think it will be good if we can include the fix in release v1.2, I will do a separate pr to fix v1.2. But I want to first this PR reviewed.

cc @Xiao-zhen-Liu can you help take a look as well?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Fast source operator stays orange (RUNNING) after the workflow completes

5 participants