[SPARK-57152][SDP] Implement SCD2 Batch Processor; Find Affected Aux/Target Table Rows#56283
Open
AnishMahto wants to merge 9 commits into
Open
[SPARK-57152][SDP] Implement SCD2 Batch Processor; Find Affected Aux/Target Table Rows#56283AnishMahto wants to merge 9 commits into
AnishMahto wants to merge 9 commits into
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Approved AutoCDC SPIP: https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7
This is a stacked PR. Review incremental diff here: AnishMahto/spark@SPARK-57134-SCD2-preprocess-microbatch...SPARK-57152-SCD2-find-affected-rows
What changes were proposed in this pull request?
Preamble:
The SCD type 2 flow is a foreachBatch streaming query on an input change-data-feed, and is responsible for reconciling the incoming change data onto some target table that follows SCD2 replication semantics.
SCD2 flows also maintain an "auxiliary" table to keep track of early-arriving out-of-order received events state. Each microbatch will need to reconcile against this auxiliary table as well, and update the auxiliary table's state appropriately for future microbatches.
Find Affected Aux/Target Table Rows
After preprocessing the microbatch such that we have each incoming row's startAt, endAt, and recordStartAt projected, the next step in reconciliation is determining which existing rows in the auxiliary and target tables either might be affected by the incoming rows or they might affect the incoming rows themselves.
A no-op upsert run row in the auxiliary table can be affected by the microbatch if an incoming row makes the row no longer a no-op (i.e microbatch delivers an interleaving row that does indeed change history tracked columns). A tombstone in the auxiliary table can affect an incoming row if it now matches against an upsert in the microbatch.
A row in the target table can be affected by the microbatch if an incoming upsert makes the target table's row a no-op upsert, or an incoming delete/upsert event terminates an existing row in the target table. An active row (endAt=null) in the target table could become terminated, or an existing closed row in the target table could become bisected. Conversely, existing rows in the target table can dictate when an incoming upsert row should be considered closed from.
We take a practical, conservative approach in selecting the set of rows that could possibly be affected or affect the microbatch. Per key we retrieve all existing rows whose startAt comes after the youngest sequence in the incoming microbatch, as well as the first existing row in both the target/aux that comes before the youngest sequence.
This is opposed to doing a very complex and expensive join to determine which rows are definitively affected by/affecting the microbatch. In practice its not common for events to actually receive very old events out of order, so pulling in all existing rows that come after the oldest row in the microbatch will generally be a very small result set.
Why are the changes needed?
AutoCDC SCD2 core algorithm.
Does this PR introduce any user-facing change?
No, new feature.
How was this patch tested?
Unit tested in
Scd2BatchProcessorSuite.Was this patch authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.7.