txn-wal: fix data loss in txns_progress_frontiers UP TO race#36548
Open
antiguru wants to merge 1 commit into
Open
txn-wal: fix data loss in txns_progress_frontiers UP TO race#36548antiguru wants to merge 1 commit into
antiguru wants to merge 1 commit into
Conversation
9b72364 to
237719e
Compare
The deasync rewrite in MaterializeInc#36537 reordered the per-activation logic in `txns_progress_frontiers` so the passthrough frontier was applied (dropping the output capability when it crossed `until`) before the passthrough buffer was drained. If both happened in a single activation, any buffered batch was drained into the no-cap branch and silently discarded — visible to callers as missing rows at the tail of a `SUBSCRIBE ... UP TO` (SQL-299). Fix: * Drain the passthrough buffer first, at the capability held from the previous activation. This preserves the differential invariant `send_time <= record_time` and guarantees buffered records are emitted before any until-driven cap drop. * Replace the single retained `DataRemapEntry` with a `VecDeque` plus a `latest_remap_log` for frontier-only advances. The async original walked entries one at a time off the FIFO channel and could advance the cap stepwise at every `physical_upper_i`; the sync `for_each` flattens that into one activation, so the queue is needed to recover stepwise advancement and avoid stalling when `pass_frontier` sits between two entries' `physical_upper`s. `latest_remap_log` covers the case where the remap source downgrades its cap past the last emitted entry's `logical_upper` without producing a new entry — without it the operator deadlocks after consuming the queue because `waiting_for_remap` blocks `pass_frontier` from driving cap further. * Adds a caller-facing contract on `txns_progress_frontiers` derived from the pre-MaterializeInc#36537 async semantics, since the function had none. The `as_of_until` test now asserts the set of records and the differential invariant (`stream_ts <= record_time`) rather than exact stream-timestamp matching, because with the drain-at-current-cap approach the per-batch stream timestamp is a function of scheduling cadence rather than part of the contract. Adds `passthrough_drained_before_until_drop`, a Rust regression test that uses `timely::execute_directly` and `UnorderedInput` to deterministically construct the race state — a buffered passthrough record at time 0 plus a passthrough frontier advanced to `until = 5` — before stepping the worker. The buggy ordering would drop the cap before draining and emit zero records; the fix emits one. Runs in under a second. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Member
Author
|
Heads up: #36553 reverts both #36537 (this PR's target) and #36514 (container builders for the persist source). #36514 was merged on top of #36537 and propagated the sync When this PR lands, both #36514 and #36537 will need to be re-applied on top of the fix (or rebased through it). |
antiguru
added a commit
that referenced
this pull request
May 14, 2026
…s close (#36537)" (#36553) ### Motivation Reverts #36537. Caused regression tracked in SQL-299. Forward fix in #36548 will take more time; reverting in the interim to unblock. ### Description Pure `git revert ff13539`. Restores `txns_progress_frontiers` to its pre-#36537 async implementation. ### Verification `cargo check -p mz-txn-wal` clean.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
Fixes SQL-299.
The deasync rewrite in #36537 reordered the per-activation logic in
txns_progress_frontiersso the passthrough frontier was applied (dropping the output capability when it crosseduntil) before the passthrough buffer was drained. If both happened in a single activation, any buffered batch was drained into the no-cap branch and silently discarded — visible to callers as missing rows at the tail of aSUBSCRIBE ... UP TO.Description
send_time <= record_time(cap is the prior frontier, which is<=every record subsequently received) and guarantees buffered records are emitted before any until-driven cap drop.DataRemapEntrywith aVecDequeplus alatest_remap_logfor frontier-only advances. The async original walked entries one at a time off the FIFO channel and could advance the cap stepwise at everyphysical_upper_i; the syncfor_eachflattens that into one activation, so the queue is needed to recover stepwise advancement and avoid stalling whenpass_frontiersits between two entries'physical_uppers.latest_remap_logcovers the case where the remap source downgrades its cap past the last emitted entry'slogical_upperwithout producing a new entry — without it the operator deadlocks after consuming the queue becausewaiting_for_remapblockspass_frontierfrom driving cap further.txns_progress_frontiersderived from the pre-txn-wal: deasync txns_progress_frontiers + retain remap across close #36537 async semantics, since the function had none.Verification
mz-txn-wallib tests pass (29/29), includingas_of_until,data_subscribe,subscribe_shard_finalize,subscribe_shard_register_forget, andfrontiers_advance_to_logical_after_remap_close.as_of_untiltest relaxed: now asserts the differential invariant (stream_ts <= record_time) and the set of records, instead of exact stream-timestamp matching. With the drain-at-current-cap approach the stream timestamp is a function of scheduling cadence rather than part of the contract.test-sql-299intest/cluster: drives the SQL surface with a tight commit loop in parallel withSUBSCRIBE ... UP TO mz_now()+1(withcompute_subscribe_snapshot_optimizationdisabled to force the snapshot through the operator). On the buggy commit some iterations lose rows; with the fix every iteration returns the full count.