diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala index 2b0f8e293e76b..8284441e9e2b1 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala @@ -29,4 +29,15 @@ private[pipelines] object AutoCdcReservedNames { /** Common reserved-name prefix shared by AutoCDC internal columns and internal tables. */ val prefix: String = "__spark_autocdc_" + + /** + * Reserved name of the operational metadata column AutoCDC that is projected on every AutoCDC + * microbatch, auxiliary table, and target table. + * + * Shared across all SCD strategies and across the flow resolution, batch-processor, and + * streaming-write layers. + * + * Note that the schema of the CDC metadata column however can and does differ on the SCD-type. + */ + val cdcMetadataColName: String = s"${prefix}metadata" } diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 0656a7eb91b01..35006dc4ee21f 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -143,7 +143,7 @@ case class Scd1BatchProcessor( F.when(rowDeleteSequence.isNull, changeArgs.sequencing).otherwise(F.lit(null)) validatedMicrobatch.withColumn( - Scd1BatchProcessor.cdcMetadataColName, + AutoCdcReservedNames.cdcMetadataColName, Scd1BatchProcessor.constructCdcMetadataCol( deleteSequence = rowDeleteSequence, upsertSequence = rowUpsertSequence, @@ -175,7 +175,7 @@ case class Scd1BatchProcessor( schema = microbatchWithCdcMetadataDf.schema, columnSelection = Some( ColumnSelection.ExcludeColumns( - Seq(UnqualifiedColumnName(Scd1BatchProcessor.cdcMetadataColName)) + Seq(UnqualifiedColumnName(AutoCdcReservedNames.cdcMetadataColName)) ) ), caseSensitive = caseSensitiveColumnComparison @@ -197,7 +197,7 @@ case class Scd1BatchProcessor( // select. Identifiers could have special characters such as '.'. F.col(QuotingUtils.quoteIdentifier(colName)) }) :+ F.col( - Scd1BatchProcessor.cdcMetadataColName + AutoCdcReservedNames.cdcMetadataColName ) microbatchWithCdcMetadataDf.select( @@ -223,7 +223,7 @@ case class Scd1BatchProcessor( val aliasedMicrobatchDf = microbatchDf.alias("microbatch") val aliasedAuxiliaryTableDf = auxiliaryTableDf.alias("auxiliaryTable") - val cdcMetadata = Scd1BatchProcessor.cdcMetadataColName + val cdcMetadata = AutoCdcReservedNames.cdcMetadataColName val microbatchCdcMetadata = F.col(s"microbatch.$cdcMetadata") val effectiveSeq = F.greatest( @@ -267,7 +267,7 @@ case class Scd1BatchProcessor( auxiliaryTableIdentifier: TableIdentifier ): Unit = { val auxIdentQuoted = auxiliaryTableIdentifier.quotedString - val meta = Scd1BatchProcessor.cdcMetadataColName + val meta = AutoCdcReservedNames.cdcMetadataColName // Project the reconciled microbatch down to just keys + `_cdc_metadata`; data columns are // irrelevant for the auxiliary table and should not be persisted. @@ -330,7 +330,7 @@ case class Scd1BatchProcessor( reconciledMicrobatchDf: DataFrame, targetTableIdentifier: TableIdentifier ): Unit = { - val meta = Scd1BatchProcessor.cdcMetadataColName + val meta = AutoCdcReservedNames.cdcMetadataColName val destinationTableStr = targetTableIdentifier.quotedString // (Re-)alias the reconciled microbatch DF for easy reference for the remainder of the merge. @@ -415,7 +415,6 @@ object Scd1BatchProcessor { * enforced at [[org.apache.spark.sql.pipelines.graph.AutoCdcMergeFlow]] construction. */ private[autocdc] val winningRowColName: String = s"${AutoCdcReservedNames.prefix}winning_row" - private[pipelines] val cdcMetadataColName: String = s"${AutoCdcReservedNames.prefix}metadata" private[pipelines] val cdcDeleteSequenceFieldName: String = "deleteSequence" private[pipelines] val cdcUpsertSequenceFieldName: String = "upsertSequence" diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala new file mode 100644 index 0000000000000..7f4c3e0174945 --- /dev/null +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala @@ -0,0 +1,580 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.SparkException +import org.apache.spark.sql.{functions => F} +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.util.QuotingUtils +import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType} +import org.apache.spark.util.ArrayImplicits._ + +/** + * Per-microbatch processor for SCD Type 2 AutoCDC flows, complying to the specified + * [[changeArgs]] configuration. + * + * @param changeArgs The CDC flow configuration. + * @param resolvedSequencingType The post-analysis [[DataType]] of the sequencing column, derived + * from the flow's resolved DataFrame at flow setup time. + */ +case class Scd2BatchProcessor( + changeArgs: ChangeArgs, + resolvedSequencingType: DataType) { + + /** + * Backtick-quoted key column names. Use when the name flows through an expression parser + * (e.g., [[F.col]]), which interprets dotted names as struct-field accesses. + */ + private lazy val keysQuoted: Seq[String] = changeArgs.keys.map(_.quoted) + + /** + * Raw key column names. Use when the name is matched literally against a schema field + * (e.g., DataFrame `.join(other, usingColumns)`), where backticks are NOT stripped. + */ + private lazy val keysRaw: Seq[String] = changeArgs.keys.map(_.name) + + /** + * Reconcile a CDC microbatch into the canonical form the auxiliary- and target-table merges + * consume. + * + * Step ordering is load-bearing: the row-extension steps reference user data columns that + * target-column selection is allowed to drop, so selection runs last. Unlike SCD1, no per-key + * deduplication step is performed here - SCD2 preserves every event as part of the row's + * history, including byte-identical full-event duplicates. + * + * Duplicate event elimination (e.g., collapsing two identical events at the same sequence), + * whether across microbatches or within the same microbatch, is the responsibility of + * downstream reconciliation - not preprocessing. + * + * @param microbatchDf + * the incoming CDC microbatch. + * @return + * a dataframe that retains every input row 1:1 - no rows added, dropped, reordered, or + * merged - with the following schema, in column order: + * 1. The user columns of `microbatchDf` that survive [[ChangeArgs.columnSelection]], in + * the order they appeared in the input. + * 2. [[startAtColName]], populated with the sequence value of the row. + * 3. [[endAtColName]], populated with the sequence value of the row IFF it's a delete + * event, null otherwise. + * 4. [[cdcMetadataColName]], conforming to [[targetCdcMetadataColSchema]]. + */ + private[autocdc] def preprocessMicrobatch(microbatchDf: DataFrame): DataFrame = { + microbatchDf + .transform(extendMicrobatchRowsWithStartAt) + .transform(extendMicrobatchRowsWithEndAt) + .transform(extendMicrobatchRowsWithCdcMetadata) + .transform(projectTargetColumnsOntoMicrobatch) + } + + /** + * Stamp each microbatch row with its currently known start-at (i.e active-from) using its + * sequencing. + */ + private def extendMicrobatchRowsWithStartAt(microbatchDf: DataFrame): DataFrame = { + microbatchDf.withColumn( + colName = Scd2BatchProcessor.startAtColName, + col = changeArgs.sequencing.cast(resolvedSequencingType) + ) + } + + /** + * Stamp each microbatch delete event row with its end time sequence, as they are instantaneous + * events. + * + * Non-deletes leave a null end, as do not yet know if the row reprsents an active upsert, or a + * closed upsert. This will become clear in later reconciliation against the aux/target tables. + */ + private def extendMicrobatchRowsWithEndAt(microbatchDf: DataFrame): DataFrame = { + microbatchDf.withColumn( + colName = Scd2BatchProcessor.endAtColName, + col = ( + changeArgs.deleteCondition match { + case Some(deleteCondition) => + F.when(deleteCondition, changeArgs.sequencing).otherwise(null) + case None => + F.lit(null) + } + ).cast(resolvedSequencingType) + ) + } + + /** + * Project the operational CDC metadata column carrying the literal event sequence. Downstream + * merges rely on it to preserve original event lineage regardless of how rows start/end-at are + * coalesced. + */ + private def extendMicrobatchRowsWithCdcMetadata(microbatchDf: DataFrame): DataFrame = { + microbatchDf.withColumn( + colName = AutoCdcReservedNames.cdcMetadataColName, + col = Scd2BatchProcessor.constructTargetCdcMetadataCol( + recordStartAt = changeArgs.sequencing, + sequencingType = resolvedSequencingType + ) + ) + } + + /** + * Apply the user's target column selection while preserving the SCD2 framework columns; the + * latter are required by downstream merges and persisted to both the auxiliary and target + * tables, so users cannot deselect them. + * + * Requires the framework columns to already be present on the input. + */ + private def projectTargetColumnsOntoMicrobatch( + microbatch: DataFrame + ): DataFrame = { + val dataSchema = StructType( + microbatch.schema.fields.filterNot(f => + Scd2BatchProcessor.reservedFrameworkColNames.contains(f.name) + ) + ) + val userSelectedDataSchema = + ColumnSelection.applyToSchema( + schemaName = "microbatch", + schema = dataSchema, + columnSelection = changeArgs.columnSelection, + caseSensitive = + microbatch.sparkSession.sessionState.conf.caseSensitiveAnalysis + ) + val finalColumnsToSelect: Seq[Column] = + userSelectedDataSchema.fieldNames.toSeq.map(colName => { + // Spark drops backticks in the schema, quote all identifiers for safety before executing + // select. Identifiers could have special characters such as '.'. + F.col(QuotingUtils.quoteIdentifier(colName)) + }) ++ Seq( + F.col(Scd2BatchProcessor.startAtColName), + F.col(Scd2BatchProcessor.endAtColName), + F.col(AutoCdcReservedNames.cdcMetadataColName) + ) + microbatch.select(finalColumnsToSelect: _*) + } + + /** + * For each key in the preprocessed microbatch, compute the earliest [[recordStartAtFieldName]] + * across the key's events. + * + * @param preprocessedBatchDf + * a validated and preprocessed microbatch as produced by [[preprocessMicrobatch]] - in + * particular, non-null key columns and a non-null [[recordStartAtFieldName]] on every row. + * @return + * a dataframe containing one row per distinct key. Schema, in column order: + * 1. The key columns ([[ChangeArgs.keys]]), in their declared order. + * 2. [[minSequenceColName]], carrying the min [[recordStartAtFieldName]] + * across all records within the microbatch for that key. + */ + private[autocdc] def computeMinimumSequencePerKey(preprocessedBatchDf: DataFrame): DataFrame = { + val recordStartAt = + Scd2BatchProcessor.recordStartAtOf(F.col(AutoCdcReservedNames.cdcMetadataColName)) + preprocessedBatchDf + .groupBy(keysQuoted.map(F.col): _*) + .agg(F.min(recordStartAt).alias(Scd2BatchProcessor.minSequenceColName)) + } + + /** + * Find the auxiliary-table rows whose state matters for reconciling the microbatch. + * + * @param rawAuxiliaryTableDf + * the auxiliary table in its native schema, whose CDC metadata column carries an extra + * [[deletedByBatchIdFieldName]] on top of the target/microbatch schema. + * @param perKeyMinimumSequenceInMicrobatch + * one row per distinct key as produced by [[computeMinimumSequencePerKey]], representing + * the minimum sequence for that key in the microbatch. + * @param batchId + * the underlying Spark streaming query's batchId, which serves as the idempotency key. + * @return + * a dataframe containing all the affected aux rows, but with the CDC metadata column narrowed + * to the target/microbatch schema (aux-only subfields stripped) so the result is + * union-compatible with preprocessed microbatch rows and target-table rows downstream. + */ + private[autocdc] def findAffectedRowsFromAuxiliaryTable( + rawAuxiliaryTableDf: DataFrame, + perKeyMinimumSequenceInMicrobatch: DataFrame, + batchId: Long + ): DataFrame = { + val auxTableRecordStartAtField = Scd2BatchProcessor.recordStartAtOf( + F.col(AutoCdcReservedNames.cdcMetadataColName) + ) + val auxTableDeletedByBatchIdField = Scd2BatchProcessor.deletedByBatchIdOf( + F.col(AutoCdcReservedNames.cdcMetadataColName) + ) + + val reducedAuxiliaryTableDf = rawAuxiliaryTableDf + .filter( + // Ignore any auxiliary table rows logically deleted by any microbatch other than this one + // itself. Recall this execution could be a retry attempt on the same microbatch, and + // batchId is our idempotency key. + auxTableDeletedByBatchIdField.isNull || + auxTableDeletedByBatchIdField === F.lit(batchId) + ) + // The aux table's CDC metadata column is a superset of the target's: it carries the + // additional [[deletedByBatchIdFieldName]]. Since we eventually union aux rows with + // target and microbatch rows (which use the target's narrower CDC metadata schema), strip + // the aux-only subfields here so all three sources share an identical CDC metadata column + // schema, and replace the existing CDC metadata column with it. + .withColumn( + AutoCdcReservedNames.cdcMetadataColName, + Scd2BatchProcessor.constructTargetCdcMetadataCol( + recordStartAt = auxTableRecordStartAtField, + sequencingType = resolvedSequencingType + ) + ) + + val perKeyMinimumSequenceInMicrobatchCol = F.col(Scd2BatchProcessor.minSequenceColName) + + // Per key, identify the sequence value associated with the anchor row in the aux table. + // + // The anchor row is the aux row with the largest [[recordStartAtFieldName]] strictly less + // than the min sequence in the incoming microbatch for that key. The reconciler needs this + // "left context" in two cases: + // (1) Incoming no-op upsert: without the anchor, it would look like a new run head, when in + // reality it's a part of an existing no-op run/head. + // (2) Incoming state-changing upsert that bisects two aux no-ops: the anchor surfaces + // the before-half so both halves can be promoted to target. (The after-half is + // picked up by the >= minSeq branch.) + // + // Because no-op upserts are stored only in the aux table, the anchor concept only exists when + // pulling in rows from the aux table, and is not relevant for the target table. + // + // Keys with no aux row strictly before the min sequence have no anchor; their affected set + // reduces to "all aux rows at or after the min sequence." + // + // The shape of this DataFrame is: [key1, key2, ... keyN, anchorSequence] + val perKeyAnchorSequence: DataFrame = reducedAuxiliaryTableDf + // The number of rows in [[perKeyMinimumSequenceInMicrobatch]] is bounded by the + // number of unique keys in the microbatch, which should typically be small. The + // auxiliary table should generally also be small, containing only no-op upsert runs + // and tombstones per key. Therefore this join should be cheap, and broadcast joinable. + .join(perKeyMinimumSequenceInMicrobatch, keysRaw) + .filter(auxTableRecordStartAtField < perKeyMinimumSequenceInMicrobatchCol) + .groupBy(keysQuoted.map(F.col): _*) + .agg( + F.max(auxTableRecordStartAtField).as(Scd2BatchProcessor.anchorSequenceColName) + ) + val anchorSequenceCol = F.col(Scd2BatchProcessor.anchorSequenceColName) + + // Now that we have the minimum sequence in the microbatch and the sequence of the anchor row, + // we have enough information to compute the full set of auxiliary rows that affect or are + // affected by the microbatch. + val auxRowIsAfterMinSequenceInMicrobatch = + auxTableRecordStartAtField >= perKeyMinimumSequenceInMicrobatchCol + + val auxRowIsAnchorRow = auxTableRecordStartAtField === anchorSequenceCol + + val auxRowAffectsMicrobatch = auxRowIsAfterMinSequenceInMicrobatch || auxRowIsAnchorRow + + val affectedRowsFromAuxiliaryTable = reducedAuxiliaryTableDf + // Per row, join/project the minimum microbatch sequence and anchor sequence for that row's + // key set. This join is relatively cheap, because the size of the dataframes being joined is + // bound by the number of unique keys in the microbatch. + .join(perKeyMinimumSequenceInMicrobatch, keysRaw) + .join( + perKeyAnchorSequence, + keysRaw, + joinType = "left" + ) + // Using the joined information, determine if the row is affected by the microbatch. + .filter(auxRowAffectsMicrobatch) + .drop(perKeyMinimumSequenceInMicrobatchCol, anchorSequenceCol) + + affectedRowsFromAuxiliaryTable + } + + /** + * Find the target-table rows whose state matters for reconciling the microbatch. + * + * @param targetTableDf + * the target table in its native schema. + * @param perKeyMinimumSequenceInMicrobatch + * one row per distinct key as produced by [[computeMinimumSequencePerKey]], representing + * the minimum sequence for that key in the microbatch. + * @return + * a dataframe containing the affected target rows, exactly as-is from the target table. + */ + private[autocdc] def findAffectedRowsFromTargetTable( + targetTableDf: DataFrame, + perKeyMinimumSequenceInMicrobatch: DataFrame + ): DataFrame = { + val targetEndAtCol = F.col(Scd2BatchProcessor.endAtColName) + val perKeyMinimumSequenceInMicrobatchCol = F.col(Scd2BatchProcessor.minSequenceColName) + + // Per key, identify all the rows in the target table that may be affected by the + // incoming microbatch. + // + // Unlike the auxiliary table, the target table holds visible rows only: no hidden open + // no-op upsert rows, no tombstones. Visible rows for a given key form a non-overlapping + // interval partition over the sequencing axis, and at most one row has a null [[endAtColName]] + // (the currently active row per key). + // + // Hence we can simply grab all rows that were active at some point after the min sequencing + // per key, which can be determined entirely by the row's [[endAtColName]]. + val isCurrentlyActiveRow = targetEndAtCol.isNull + val rowEndsAfterMinimumSequence = targetEndAtCol >= perKeyMinimumSequenceInMicrobatchCol + val rowMayBeAffected = isCurrentlyActiveRow || rowEndsAfterMinimumSequence + + val affectedRowsFromTargetTable = targetTableDf + .join(perKeyMinimumSequenceInMicrobatch, keysRaw) + .filter(rowMayBeAffected) + .drop(perKeyMinimumSequenceInMicrobatchCol) + + affectedRowsFromTargetTable + } +} + +/** + * Concept: run of upsert events. + * + * A run is a maximal sequence of consecutive upsert events (in sorted order by sequencing) + * for the same key whose tracked-history-column values are all identical. The transition + * from a previous run's tail to a new run's head represents a real state change; every + * subsequent event in the run is a no-op continuation that logically coalesces with the head. + * + * Runs matter because SCD2 only emits a new visible historical row when a + * tracked-history column actually changes. By convention we choose that only the tail of a + * run produces a visible row in the target table; the rest become hidden rows in the aux + * table. Selecting the tail means the latest no-op upsert is reflected in the target table. + * + * Example, with trackHistoryCols = [name], events for some key: + * (S=5, name=Alice) -> starts run head at S=5. Row lives in aux table. + * (S=10, name=Alice) -> no-op, adds to run at S=5. Row lives in aux table. + * (S=15, name=Alice) -> no-op and tail of run at S=5. Row lives in target table with + * START_AT=5. + * (S=20, name=Charlie) -> new run head/tail (run size=1) at S=20. Row lives in target + * table. + * + * Now if a new late-arriving event (S=12, name=Bob) arrives for the same key, we have: + * (S=5, name=Alice) -> starts run head at S=5. Row lives in aux table. + * (S=10, name=Alice) -> no-op but now tail of run at S=5. Row now lives in target + * table with START_AT=5. + * (S=12, name=Bob) -> new run head/tail (run size=1) at S=12. Row lives in target + * table. + * (S=15, name=Alice) -> previously-visible tail converts to a new run head at S=15. Row + * remains in target table, but now with START_AT=15. + * (S=20, name=Charlie) -> new run head at S=20. Row lives in target table. + * + * Note that if we did not track the no-op events in the aux table for the run at S=5 before the + * event (S=12, name=Bob) arrived, then we would not have correctly reconciled that the event + * (S=10, name=Alice) is now the visible tail of the Alice run before Bob. + * + * ------------- + * Concept: target table. + * + * The user-consumable output table of the CDC transformation. Every row in the target table + * represents the visible tail of a run (maybe size 1), carrying the run head's START_AT and the + * latest row values for that run. The target table in its entirety represents the SCD2 + * representation of the CDC flow's source table. + * + * ------------- + * Concept: aux table. + * + * The side state table used to track out of order events from the CDC source. Two classes + * of events are represented as rows in this table: + * 1. Early-arriving deletes, with no matching upsert; this is considered a tombstone, + * and may match with a late-arriving upsert in a future microbatch. + * 2. No-op upserts (i.e. tails of runs); hidden no-op rows that may reconcile as + * state-changing run heads in a future microbatch. + * + * The aux table is considered an internal table that users should neither tamper nor consider + * public contract. + * + * ------------- + * Concept: decomposition tail. + * + * A transient and synthetic row produced by the batch processor during reconciliation (not + * from the CDC source) when a previously-closed historical row [START_AT=X, END_AT=Y] is + * bisected by a late-arriving event. The bisected row is split into a head + * [START_AT=X, END_AT=null] - inheriting the original row's data and `__RECORD_START_AT` - + * and a tail [START_AT=null, END_AT=Y, `__RECORD_START_AT`=null] that carries the original + * row's right boundary. The tail typically becomes the closing END_AT of a bisecting upsert, + * giving it a valid right boundary in the target-table history. + * + * Decomposition tails are uniquely identified by `__RECORD_START_AT` = null - the only row + * category with that property - and are never persisted in their tail form: each is either + * absorbed by the next event in the affected window (dropped as redundant) or promoted to a + * tombstone in the aux table if it survives reconciliation unmatched. + * + * ------------- + * Concept: same-sequence tie-break between an upsert and a delete. + * + * When an upsert event and a delete event share the same `__RECORD_START_AT`, the delete wins: + * the visible upsert is dropped (as a zero-width interval) and only the tombstone is written + * to the aux table. The reverse pair (delete arriving first, then an upsert at the same + * sequence) is symmetric: the tombstone closes the upsert at the same instant, again leaving + * a zero-width visible interval that is dropped, and only the tombstone survives. + * + * This tie-break is an internal contract only - we do not publicly guarantee deterministic + * resolution when two events for the same key share a sequence value. Users who care about + * ordering should ensure their sequencing column is unique per (key, event). + */ +object Scd2BatchProcessor { + /** + * CDC metadata column field that represents the exact time (sequence) of the CDC event that + * produced this row. Null only for synthetic decomposition tails. + */ + private[autocdc] val recordStartAtFieldName: String = "__RECORD_START_AT" + + /** + * CDC metadata column field that represents the microbatch id a particular row was considered + * logically deleted by. Any future microbatches should consider that row as deleted. + * + * Logically deleted rows exist as a concept in the auxiliary to provide idempotency, should a + * microbatch fail between a MERGE executed against the auxiliary table and the MERGE executed + * against the target table. + * + * This field only exists in the CDC metadata column for the auxiliary table, not in CDC + * metadata column for the target table. + */ + private val deletedByBatchIdFieldName: String = "__DELETED_BY_BATCH_ID" + + /** + * What this column represents depends on which AutoCDC artifact table it is read from. + * + * In the target table: + * The user-visible column representing when this row is considered active from, i.e. + * this upsert run's head [[recordStartAtFieldName]]. + * In the aux table: + * If this row represents a tombstone, then the same value as [[recordStartAtFieldName]]. + * Else this row represents a coalesced no-op row that is part of an upsert run. + * Inherit the [[recordStartAtFieldName]] of the head of this upsert's run. + * + * The invariant in both tables is: startAtColName <= recordStartAtFieldName. If an event was + * generated at time X, it is active by time X, or earlier if it is not a run head. + */ + private[autocdc] val startAtColName: String = "__START_AT" + + /** + * What this column represents depends on which AutoCDC artifact table it is read from. + * + * In the target table: + * The user-visible column representing when this row became inactive. Null IFF the row + * is active: neither superseded by a state-changing upsert nor affected by a delete. + * In the aux table: + * If this row is a tombstone, then by convention the sequence of the delete event that + * produced it. Delete events are considered instantaneous in time. + * Else this row is a coalesced no-op row that is part of an upsert run, and by + * convention the value will always be null. + */ + private[autocdc] val endAtColName: String = "__END_AT" + + /** + * Column names reserved by AutoCDC, that will be projected onto the microbatch and target + * tables. If the user's source dataframe contains any of these columns, SCD2 reconciliation + * will fail. + */ + private val reservedFrameworkColNames: Set[String] = Set( + startAtColName, + endAtColName, + AutoCdcReservedNames.cdcMetadataColName + ) + + /** + * Name of temporary column projected onto microbatch to compute the min sequencing value per + * key within the microbatch. + */ + private[autocdc] val minSequenceColName: String = s"${AutoCdcReservedNames.prefix}min_sequence" + + /** + * Name of temporary column projected used to identify the sequence associated with the anchor + * row found in the auxiliary table for the incoming microbatch. Since sequences must be unique + * amongst all rows for a key (or risk undefined behavior), this sequence value uniquely + * identifies an exact row in the aux. + */ + private val anchorSequenceColName: String = s"${AutoCdcReservedNames.prefix}anchor_sequence" + + /** Project the [[recordStartAtFieldName]] out of an SCD2 CDC metadata column. */ + private def recordStartAtOf(cdcMetadataCol: Column): Column = + cdcMetadataCol.getField(recordStartAtFieldName) + + /** Project the [[deletedByBatchIdFieldName]] out of an SCD2 CDC metadata column. */ + private def deletedByBatchIdOf(cdcMetadataCol: Column): Column = + cdcMetadataCol.getField(deletedByBatchIdFieldName) + + /** + * Schema of the CDC metadata struct column for SCD2 target table rows. + */ + private[pipelines] def targetCdcMetadataColSchema(sequencingType: DataType): StructType = + StructType( + Seq( + // The sequence value of the originating CDC event for this row. Nullable because + // decomposition tails, which are transient and synthetically constructed during + // reconciliation, have a null record start at. + StructField(recordStartAtFieldName, sequencingType, nullable = true) + ) + ) + + /** + * Construct the CDC metadata struct column for SCD2 target/microbatch rows, following the + * exact schema and field ordering defined by [[targetCdcMetadataColSchema]]. + */ + private def constructTargetCdcMetadataCol( + recordStartAt: Column, + sequencingType: DataType + ): Column = { + val cdcMetadataFieldsInOrder = targetCdcMetadataColSchema(sequencingType).fields.map { field => + val value = field.name match { + case `recordStartAtFieldName` => recordStartAt + case other => + throw SparkException.internalError( + s"Unable to construct SCD2 target CDC metadata column due to unknown " + + s"`${other}` field." + ) + } + value.cast(field.dataType).as(field.name) + } + F.struct(cdcMetadataFieldsInOrder.toImmutableArraySeq: _*) + } + + /** + * Schema of the CDC metadata struct column for SCD2 aux-table rows. Strict superset of + * [[targetCdcMetadataColSchema]]: extends it with the aux-only [[deletedByBatchIdFieldName]] + * used for SCD2 idempotency. + */ + private[pipelines] def auxCdcMetadataColSchema(sequencingType: DataType): StructType = + StructType( + targetCdcMetadataColSchema(sequencingType).fields.toImmutableArraySeq ++ + Seq( + // The microbatch id by which this aux row was logically deleted, or null if the + // row is still live. + StructField(deletedByBatchIdFieldName, LongType, nullable = true) + ) + ) + + /** + * Construct the CDC metadata struct column for SCD2 aux-table rows, following the exact + * schema and field ordering defined by [[auxCdcMetadataColSchema]]. + */ + private[autocdc] def constructAuxCdcMetadataCol( + recordStartAt: Column, + deletedByBatchId: Column, + sequencingType: DataType + ): Column = { + val cdcMetadataFieldsInOrder = auxCdcMetadataColSchema(sequencingType).fields.map { field => + val value = field.name match { + case `recordStartAtFieldName` => recordStartAt + case `deletedByBatchIdFieldName` => deletedByBatchId + case other => + throw SparkException.internalError( + s"Unable to construct SCD2 aux CDC metadata column due to unknown " + + s"`${other}` field." + ) + } + value.cast(field.dataType).as(field.name) + } + F.struct(cdcMetadataFieldsInOrder.toImmutableArraySeq: _*) + } +} diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala index f88b0cd3a1cbe..dd4d1556afbf8 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala @@ -288,7 +288,7 @@ class AutoCdcMergeFlow( // CDC operational metadata column at the end. StructType( userSelectedSchema.fields :+ StructField( - Scd1BatchProcessor.cdcMetadataColName, + AutoCdcReservedNames.cdcMetadataColName, Scd1BatchProcessor.cdcMetadataColSchema(sequencingType), nullable = false ) @@ -334,7 +334,7 @@ class AutoCdcMergeFlow( deleteSequence = F.lit(null), upsertSequence = F.lit(null), sequencingType = sequencingType - ).as(Scd1BatchProcessor.cdcMetadataColName) + ).as(AutoCdcReservedNames.cdcMetadataColName) df.select(userSelectedCols :+ emptyCdcMetadataCol: _*) case ScdType.Type2 => diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala index 0d1c33be21727..3841495f01c8f 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala @@ -688,11 +688,12 @@ class Scd1MergeStreamingWrite( /** CDC metadata field resolved out of the flow's augmented schema. */ private lazy val cdcMetadataField: StructField = { val resolver = updateContext.spark.sessionState.conf.resolver + val cdcMetadataColName = AutoCdcReservedNames.cdcMetadataColName flow.schema.fields - .find(field => resolver(field.name, Scd1BatchProcessor.cdcMetadataColName)) + .find(field => resolver(field.name, cdcMetadataColName)) .getOrElse( throw SparkException.internalError( - s"CDC metadata column '${Scd1BatchProcessor.cdcMetadataColName}' was not found in the " + + s"CDC metadata column '$cdcMetadataColName' was not found in the " + s"AutoCDC flow's target table schema." ) ) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala index 0dc0a90276600..8688df071113b 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala @@ -82,7 +82,7 @@ trait AutoCdcCatalogExecutionTestBase { } /** - * Schema of the [[Scd1BatchProcessor.cdcMetadataColName]] struct column for a given + * Schema of the [[AutoCdcReservedNames.cdcMetadataColName]] struct column for a given * sequencing column type. Defaults to [[LongType]] because all current SCD1 tests use * `Long` sequencing. */ @@ -92,7 +92,7 @@ trait AutoCdcCatalogExecutionTestBase { .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, sequencingType) /** - * Build a [[Row]] matching the [[Scd1BatchProcessor.cdcMetadataColName]] struct's two fields, + * Build a [[Row]] matching the [[AutoCdcReservedNames.cdcMetadataColName]] struct's two fields, * in the order produced by [[Scd1BatchProcessor.constructCdcMetadataCol]]: */ protected def cdcMetadataRow[T](deleteSeq: Option[T], upsertSeq: Option[T]): Row = diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala index cf7c9533bee98..32374f8ecb048 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala @@ -186,7 +186,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { /** Convenience to extract the [[StructType]] of the projected `_cdc_metadata` column. */ private def cdcMetadataStruct(schema: StructType): StructType = - schema(Scd1BatchProcessor.cdcMetadataColName).dataType.asInstanceOf[StructType] + schema(AutoCdcReservedNames.cdcMetadataColName).dataType.asInstanceOf[StructType] test( "AutoCdcMergeFlow.schema appends _cdc_metadata to the source schema when no " + @@ -200,7 +200,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { .add("seq", LongType) .add( StructField( - Scd1BatchProcessor.cdcMetadataColName, + AutoCdcReservedNames.cdcMetadataColName, Scd1BatchProcessor.cdcMetadataColSchema(LongType), nullable = false ) @@ -223,7 +223,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { .add("seq", LongType) .add( StructField( - Scd1BatchProcessor.cdcMetadataColName, + AutoCdcReservedNames.cdcMetadataColName, Scd1BatchProcessor.cdcMetadataColSchema(LongType), nullable = false ) @@ -244,7 +244,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { .add("seq", LongType) .add( StructField( - Scd1BatchProcessor.cdcMetadataColName, + AutoCdcReservedNames.cdcMetadataColName, Scd1BatchProcessor.cdcMetadataColSchema(LongType), nullable = false ) @@ -270,7 +270,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { test("AutoCdcMergeFlow.schema's _cdc_metadata field is non-null with nullable inner fields") { val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf()) - val metaField = resolvedFlow.schema(Scd1BatchProcessor.cdcMetadataColName) + val metaField = resolvedFlow.schema(AutoCdcReservedNames.cdcMetadataColName) assert(!metaField.nullable, "_cdc_metadata column itself must be non-null") val metaStruct = metaField.dataType.asInstanceOf[StructType] @@ -330,7 +330,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { // The user-selected portion drops `name`; the trailing column is the SCD1 metadata. assert( loadedDf.schema.fieldNames.toSeq == - Seq("id", "seq", Scd1BatchProcessor.cdcMetadataColName) + Seq("id", "seq", AutoCdcReservedNames.cdcMetadataColName) ) } @@ -345,7 +345,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { assert(loadedDf.schema == resolvedFlow.schema) assert( loadedDf.schema.fieldNames.toSeq == - Seq("id", "seq", Scd1BatchProcessor.cdcMetadataColName) + Seq("id", "seq", AutoCdcReservedNames.cdcMetadataColName) ) } @@ -442,7 +442,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { // Locks in the previous engine-level guard at flow-construction time. Any future // regression where a user-supplied CDC stream carries the reserved metadata column name // should fail eagerly here. - val sourceDf = sourceDfWithExtraColumns(Scd1BatchProcessor.cdcMetadataColName -> StringType) + val sourceDf = sourceDfWithExtraColumns(AutoCdcReservedNames.cdcMetadataColName -> StringType) checkError( exception = intercept[AnalysisException] { @@ -452,7 +452,7 @@ class AutoCdcFlowSuite extends QueryTest with SharedSparkSession { sqlState = "42710", parameters = Map( "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive, - "columnName" -> Scd1BatchProcessor.cdcMetadataColName, + "columnName" -> AutoCdcReservedNames.cdcMetadataColName, "schemaName" -> "changeDataFeed", "reservedColumnNamePrefix" -> AutoCdcReservedNames.prefix ) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala index 475d25f5aa2cf..1aa2cbcd5417b 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala @@ -47,13 +47,13 @@ class Scd1BatchProcessorMergeSuite */ private val minimalSchema: StructType = new StructType() .add("id", IntegerType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType()) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType()) /** Minimal target-table shape: one key, one data column, and CDC metadata. */ private val targetSchema: StructType = new StructType() .add("id", IntegerType) .add("value", StringType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType()) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType()) /** * A processor with a single key column `id`. `sequencing` is irrelevant for @@ -85,7 +85,7 @@ class Scd1BatchProcessorMergeSuite val withKeys = keyColumns.foldLeft(new StructType()) { case (s, (name, dt)) => s.add(name, dt) } - withKeys.add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType()) + withKeys.add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType()) } /** @@ -116,7 +116,7 @@ class Scd1BatchProcessorMergeSuite .add("id", IntegerType) .add("value", StringType) .add( - Scd1BatchProcessor.cdcMetadataColName, + AutoCdcReservedNames.cdcMetadataColName, new StructType() .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType) .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType) @@ -446,7 +446,7 @@ class Scd1BatchProcessorMergeSuite // The schema always stores the backtick consumed column name, so unticked the raw name here. .add(rawKeyName, IntegerType) .add("value", StringType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType()) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType()) createTable( defaultTargetIdent, diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index 9432150c40167..d2c78442c4762 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -33,7 +33,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { .add("name", StringType) .add("age", IntegerType) .add( - Scd1BatchProcessor.cdcMetadataColName, + AutoCdcReservedNames.cdcMetadataColName, new StructType() .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType) .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType) @@ -596,7 +596,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { // Original columns are preserved in their original order, with CDC metadata appended at // the very end. assert(result.schema.fieldNames.toSeq == - schema.fieldNames.toSeq :+ Scd1BatchProcessor.cdcMetadataColName) + schema.fieldNames.toSeq :+ AutoCdcReservedNames.cdcMetadataColName) } test("extendMicrobatchRowsWithCdcMetadata casts delete / upsert sequence fields to " + @@ -624,7 +624,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { val resultDf = processor.extendMicrobatchRowsWithCdcMetadata(batch) val cdcMetadataDataType = - resultDf.schema(Scd1BatchProcessor.cdcMetadataColName).dataType.asInstanceOf[StructType] + resultDf.schema(AutoCdcReservedNames.cdcMetadataColName).dataType.asInstanceOf[StructType] assert(columnNamesAndDataTypes(cdcMetadataDataType) == Seq( Scd1BatchProcessor.cdcDeleteSequenceFieldName -> LongType, Scd1BatchProcessor.cdcUpsertSequenceFieldName -> LongType)) @@ -723,7 +723,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { val result = processor.projectTargetColumnsOntoMicrobatch(batch) assert(result.schema.fieldNames.toSeq == - Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName)) + Seq("id", "age", AutoCdcReservedNames.cdcMetadataColName)) checkAnswer( df = result, expectedAnswer = Row(1, 30, Row(null, 10L)) @@ -753,7 +753,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { assert( result.schema.fieldNames.toSeq == - Seq("id", "name", Scd1BatchProcessor.cdcMetadataColName) + Seq("id", "name", AutoCdcReservedNames.cdcMetadataColName) ) checkAnswer( df = result, @@ -785,7 +785,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { // in which the user listed columns in IncludeColumns. The CDC metadata column is appended // last as always. assert(result.schema.fieldNames.toSeq == - Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName)) + Seq("id", "age", AutoCdcReservedNames.cdcMetadataColName)) checkAnswer( df = result, @@ -800,7 +800,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { // Even if a column is created with backticks via DDL, those backticks are consumed by Spark // before resolving the schema; they won't show up in the schema field. .add("user.id", StringType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType) val batch = microbatchOf(schema)( Row(1, "u-100", Row(null, 10L)) @@ -826,7 +826,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { val result = processor.projectTargetColumnsOntoMicrobatch(batch) assert(result.schema.fieldNames.toSeq == - Seq("id", "user.id", Scd1BatchProcessor.cdcMetadataColName)) + Seq("id", "user.id", AutoCdcReservedNames.cdcMetadataColName)) checkAnswer( df = result, expectedAnswer = Row(1, "u-100", Row(null, 10L)) @@ -860,7 +860,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { // Output column names follow the microbatch schema's casing, not the casing in the user's // columnSelection. The CDC metadata column is appended last as always. assert(result.schema.fieldNames.toSeq == - Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName)) + Seq("id", "age", AutoCdcReservedNames.cdcMetadataColName)) checkAnswer( df = result, expectedAnswer = Row(1, 30, Row(null, 10L)) @@ -880,7 +880,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { // Data column. .add("value", StringType) // CDC metadata column. - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType) /** * Schema for the auxiliary input to [[Scd1BatchProcessor.applyTombstonesToMicrobatch]] tests. @@ -893,7 +893,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { // Key column. .add("id", IntegerType) // CDC metadata column. - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType) test("applyTombstonesToMicrobatch drops late-arriving deletes and upserts when a matching " + "tombstone exists for the same key") { @@ -1015,7 +1015,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { val schema = new StructType() .add("region", StringType) .add("customer_id", IntegerType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType) val microbatch = microbatchOf(schema)( Row("US", 1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))), @@ -1051,7 +1051,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { test("applyTombstonesToMicrobatch supports backticked key names containing a literal dot") { val schema = new StructType() .add("user.id", IntegerType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType) val microbatch = microbatchOf(schema)( Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala index 76790847ede5c..bb8043e720c65 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala @@ -41,12 +41,12 @@ class Scd1ForeachBatchHandlerSuite private val auxiliarySchema = new StructType() .add("id", IntegerType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType()) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType()) private val targetSchema = new StructType() .add("id", IntegerType) .add("value", StringType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType()) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType()) private val processor = Scd1BatchProcessor( changeArgs = ChangeArgs( @@ -155,11 +155,11 @@ class Scd1ForeachBatchHandlerSuite val compositeAuxSchema = new StructType() .add("country", StringType) .add("city", StringType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType()) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType()) val compositeTargetSchema = new StructType() .add("country", StringType) .add("city", StringType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType()) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType()) val compositeProcessor = Scd1BatchProcessor( changeArgs = ChangeArgs( @@ -492,12 +492,12 @@ class Scd1ForeachBatchHandlerSuite val compositeAuxSchema = new StructType() .add("country", StringType) .add("city", StringType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType()) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType()) val compositeTargetSchema = new StructType() .add("country", StringType) .add("city", StringType) .add("population", LongType) - .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType()) + .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType()) val compositeProcessor = Scd1BatchProcessor( changeArgs = ChangeArgs( diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessorSuite.scala new file mode 100644 index 0000000000000..14e77fbd190ab --- /dev/null +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessorSuite.scala @@ -0,0 +1,1169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.autocdc + +import org.apache.spark.sql.{functions => F, Column, QueryTest, Row} +import org.apache.spark.sql.classic.DataFrame +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +class Scd2BatchProcessorSuite extends QueryTest with SharedSparkSession { + + /** Build a microbatch [[DataFrame]] from explicit rows and an explicit schema. */ + private def microbatchOf(schema: StructType)(rows: Row*): DataFrame = + spark.createDataFrame(spark.sparkContext.parallelize(rows), schema) + + /** + * Build an aux-table [[DataFrame]] from explicit user rows + framework column values. + * + * Each input [[Row]] carries the user columns followed by: + * - the row's `__START_AT` value + * - the row's `__END_AT` value (null for non-tombstone rows) + * - the row's `_cdc_metadata` struct as a [[Row]] + * (e.g., `Row(recordStartAt, deletedByBatchId)`) + */ + private def auxTableOf( + userSchema: StructType, + sequencingType: DataType = LongType + )(rows: Row*): DataFrame = { + val schema = userSchema + .add(Scd2BatchProcessor.startAtColName, sequencingType, nullable = true) + .add(Scd2BatchProcessor.endAtColName, sequencingType, nullable = true) + .add( + AutoCdcReservedNames.cdcMetadataColName, + Scd2BatchProcessor.auxCdcMetadataColSchema(sequencingType), + nullable = false + ) + spark.createDataFrame(spark.sparkContext.parallelize(rows), schema) + } + + /** + * Build a target-table [[DataFrame]] from explicit user rows + framework column values. + * + * Each input [[Row]] carries the user columns followed by: + * - the row's `__START_AT` value + * - the row's `__END_AT` value (null IFF the row is currently active) + * - the row's `_cdc_metadata` struct as a [[Row]] (e.g., `Row(recordStartAt)`) + */ + private def targetTableOf( + userSchema: StructType, + sequencingType: DataType = LongType + )(rows: Row*): DataFrame = { + val schema = userSchema + .add(Scd2BatchProcessor.startAtColName, sequencingType, nullable = true) + .add(Scd2BatchProcessor.endAtColName, sequencingType, nullable = true) + .add( + AutoCdcReservedNames.cdcMetadataColName, + Scd2BatchProcessor.targetCdcMetadataColSchema(sequencingType), + nullable = false + ) + spark.createDataFrame(spark.sparkContext.parallelize(rows), schema) + } + + /** + * Build a minimum-sequence-per-key [[DataFrame]] used by the `findAffected*` functions. + * + * Each input [[Row]] carries the key columns followed by the per-key minimum sequence. + */ + private def minSeqOf( + keySchema: StructType, + sequencingType: DataType = LongType + )(rows: Row*): DataFrame = { + val schema = keySchema.add( + Scd2BatchProcessor.minSequenceColName, + sequencingType, + nullable = false + ) + spark.createDataFrame(spark.sparkContext.parallelize(rows), schema) + } + + /** + * Build a [[Scd2BatchProcessor]] suitable for `findAffected*` and + * `computeMinimumSequencePerKey` tests. The `sequencing` is fixed to `F.col("seq")`, + * so the input microbatch must include a `seq` column. `deleteCondition` is optional + * and only needed by tests that exercise both event kinds. + */ + private def processorWithKeys( + keys: Seq[String], + deleteCondition: Option[Column] = None + ): Scd2BatchProcessor = + Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = keys.map(UnqualifiedColumnName(_)), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + deleteCondition = deleteCondition + ), + resolvedSequencingType = LongType + ) + + /** Key-only schema for single-key `findAffected*` tests' minSeq dataframes. */ + private val singleKeyKeySchema: StructType = new StructType() + .add("id", IntegerType) + + /** User schema for single-key `findAffected*` tests: the key column plus a `value` column. */ + private val singleKeyUserSchema: StructType = singleKeyKeySchema + .add("value", StringType) + + // =============== preprocessMicrobatch tests =============== + + test("preprocessMicrobatch appends framework columns __START_AT, __END_AT, " + + "_cdc_metadata at the end of the schema in that order") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)(Row(1, 10L, "a")) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2 + ), + resolvedSequencingType = LongType + ) + + val result = processor.preprocessMicrobatch(batch) + + assert(result.schema.fieldNames.toSeq == Seq( + "id", "seq", "value", + Scd2BatchProcessor.startAtColName, + Scd2BatchProcessor.endAtColName, + AutoCdcReservedNames.cdcMetadataColName + )) + } + + test("preprocessMicrobatch returns an empty DataFrame with the full preprocessed schema") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)() + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2 + ), + resolvedSequencingType = LongType + ) + + val result = processor.preprocessMicrobatch(batch) + + assert(result.collect().isEmpty) + assert(result.schema.fieldNames.toSeq == Seq( + "id", "seq", "value", + Scd2BatchProcessor.startAtColName, + Scd2BatchProcessor.endAtColName, + AutoCdcReservedNames.cdcMetadataColName + )) + } + + test("preprocessMicrobatch stamps __START_AT, __END_AT, and __RECORD_START_AT correctly " + + "across delete and upsert events for the same key") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + .add("is_delete", BooleanType) + + // All three events target the same key. SCD2 must preserve every event in the output - + // unlike SCD1, no per-key deduplication is performed; this also implicitly pins the + // no-dedup contract of preprocessMicrobatch. + val batch = microbatchOf(schema)( + Row(1, 10L, "first-upsert", false), + Row(1, 20L, "second-upsert", false), + Row(1, 30L, null, true) + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + deleteCondition = Some(F.col("is_delete")) + ), + resolvedSequencingType = LongType + ) + + // Per-row contract for the framework columns: + // - __START_AT = sequencing for every row (the active-from time) + // - __END_AT = sequencing for delete rows; null for upserts (mutual exclusion) + // - __RECORD_START_AT = sequencing for every row, regardless of delete vs upsert + // (lineage preserved into the merge step) + checkAnswer( + df = processor.preprocessMicrobatch(batch), + expectedAnswer = Seq( + Row(1, 10L, "first-upsert", false, 10L, null, Row(10L)), + Row(1, 20L, "second-upsert", false, 20L, null, Row(20L)), + Row(1, 30L, null, true, 30L, 30L, Row(30L)) + ) + ) + } + + test("preprocessMicrobatch preserves byte-identical full-event duplicates") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + .add("is_delete", BooleanType) + + // Two byte-identical events for the same key: same key, same sequencing, same data, same + // delete flag. SCD2 preprocessing intentionally preserves every event verbatim, including + // full-event duplicates. Cross-event redundancy elimination (collapsing duplicates before + // they could reconcile to a zero-width visible row) is the responsibility of downstream + // reconciliation, not preprocessing. + val batch = microbatchOf(schema)( + Row(1, 10L, "alice", false), + Row(1, 10L, "alice", false) + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + deleteCondition = Some(F.col("is_delete")) + ), + resolvedSequencingType = LongType + ) + + // Both rows must survive verbatim. + checkAnswer( + df = processor.preprocessMicrobatch(batch), + expectedAnswer = Seq( + Row(1, 10L, "alice", false, 10L, null, Row(10L)), + Row(1, 10L, "alice", false, 10L, null, Row(10L)) + ) + ) + } + + test("preprocessMicrobatch leaves __END_AT null on every row when deleteCondition is None") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("value", StringType) + + val batch = microbatchOf(schema)( + Row(1, 10L, "a"), + Row(2, 20L, "b") + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + deleteCondition = None + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.preprocessMicrobatch(batch).select( + F.col(Scd2BatchProcessor.endAtColName) + ), + expectedAnswer = Seq(Row(null), Row(null)) + ) + } + + test("preprocessMicrobatch treats null deleteCondition results as upsert " + + "(__END_AT stays null)") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("is_delete", BooleanType) + + val batch = microbatchOf(schema)( + // is_delete is null - the delete condition evaluates to null, which Spark treats as the + // otherwise branch, so the row is classified as an upsert. + Row(1, 10L, null) + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + deleteCondition = Some(F.col("is_delete")) + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.preprocessMicrobatch(batch).select( + F.col(Scd2BatchProcessor.endAtColName) + ), + expectedAnswer = Row(null) + ) + } + + test("preprocessMicrobatch evaluates an arbitrary sequencing expression per-row") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("alt_seq", LongType) + .add("value", StringType) + + // Sequencing is a function call referencing multiple columns, not a bare identifier. Locks + // in that the framework columns evaluate the full expression per-row rather than treating + // `sequencing` as a single column reference. + val batch = microbatchOf(schema)( + // greatest(10, 30) = 30 + Row(1, 10L, 30L, "row1"), + // greatest(40, 20) = 40 + Row(2, 40L, 20L, "row2") + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.greatest(F.col("seq"), F.col("alt_seq")), + storedAsScdType = ScdType.Type2 + ), + resolvedSequencingType = LongType + ) + + val result = processor.preprocessMicrobatch(batch) + + checkAnswer( + df = result.select( + F.col(Scd2BatchProcessor.startAtColName), + F.col(s"${AutoCdcReservedNames.cdcMetadataColName}." + + s"${Scd2BatchProcessor.recordStartAtFieldName}") + ), + expectedAnswer = Seq( + Row(30L, 30L), + Row(40L, 40L) + ) + ) + } + + /** Schema reused by columnSelection tests: id (key), name, age, seq (sequencing). */ + private val multiUserColSchema: StructType = new StructType() + .add("id", IntegerType) + .add("name", StringType) + .add("age", IntegerType) + .add("seq", LongType) + + test("preprocessMicrobatch keeps every user column when columnSelection is None") { + val batch = microbatchOf(multiUserColSchema)( + Row(1, "alice", 30, 10L) + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + columnSelection = None + ), + resolvedSequencingType = LongType + ) + + val result = processor.preprocessMicrobatch(batch) + + assert(result.schema.fieldNames.toSeq == Seq( + "id", "name", "age", "seq", + Scd2BatchProcessor.startAtColName, + Scd2BatchProcessor.endAtColName, + AutoCdcReservedNames.cdcMetadataColName + )) + } + + test("preprocessMicrobatch retains framework columns even when IncludeColumns omits them") { + val batch = microbatchOf(multiUserColSchema)( + Row(1, "alice", 30, 10L) + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + columnSelection = Some(ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("age")) + )) + ), + resolvedSequencingType = LongType + ) + + val result = processor.preprocessMicrobatch(batch) + + assert(result.schema.fieldNames.toSeq == Seq( + "id", "age", + Scd2BatchProcessor.startAtColName, + Scd2BatchProcessor.endAtColName, + AutoCdcReservedNames.cdcMetadataColName + )) + checkAnswer( + df = result, + expectedAnswer = Row(1, 30, 10L, null, Row(10L)) + ) + } + + test("preprocessMicrobatch drops user columns listed in ExcludeColumns; " + + "framework columns survive") { + val batch = microbatchOf(multiUserColSchema)( + Row(1, "alice", 30, 10L) + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + columnSelection = Some(ColumnSelection.ExcludeColumns( + Seq(UnqualifiedColumnName("name")) + )) + ), + resolvedSequencingType = LongType + ) + + val result = processor.preprocessMicrobatch(batch) + + assert(result.schema.fieldNames.toSeq == Seq( + "id", "age", "seq", + Scd2BatchProcessor.startAtColName, + Scd2BatchProcessor.endAtColName, + AutoCdcReservedNames.cdcMetadataColName + )) + checkAnswer( + df = result, + expectedAnswer = Row(1, 30, 10L, 10L, null, Row(10L)) + ) + } + + test("preprocessMicrobatch preserves the microbatch schema's user-column order, " + + "ignoring the order of IncludeColumns") { + val batch = microbatchOf(multiUserColSchema)( + Row(1, "alice", 30, 10L) + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + // User specifies (age, id) - intentionally different from the schema order (id, age). + columnSelection = Some(ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("id")) + )) + ), + resolvedSequencingType = LongType + ) + + val result = processor.preprocessMicrobatch(batch) + + // Output column order follows the microbatch schema (id before age), not the user's listing + // order in IncludeColumns. Framework columns are always appended last. + assert(result.schema.fieldNames.toSeq == Seq( + "id", "age", + Scd2BatchProcessor.startAtColName, + Scd2BatchProcessor.endAtColName, + AutoCdcReservedNames.cdcMetadataColName + )) + } + + test("preprocessMicrobatch resolves columnSelection case-insensitively " + + "when SQLConf.CASE_SENSITIVE=false") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val batch = microbatchOf(multiUserColSchema)( + Row(1, "alice", 30, 10L) + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + // User columns intentionally use a different case than the schema (id, age). + columnSelection = Some(ColumnSelection.IncludeColumns( + Seq(UnqualifiedColumnName("ID"), UnqualifiedColumnName("AGE")) + )) + ), + resolvedSequencingType = LongType + ) + + val result = processor.preprocessMicrobatch(batch) + + // Output column names follow the microbatch schema's casing, not the user's casing. + assert(result.schema.fieldNames.toSeq == Seq( + "id", "age", + Scd2BatchProcessor.startAtColName, + Scd2BatchProcessor.endAtColName, + AutoCdcReservedNames.cdcMetadataColName + )) + } + } + + test("preprocessMicrobatch handles backticked column names containing a literal dot") { + val schema = new StructType() + .add("id", IntegerType) + // Even if a column is created with backticks via DDL, those backticks are consumed by Spark + // before resolving the schema; they won't show up in the schema field. + .add("user.id", StringType) + .add("seq", LongType) + + val batch = microbatchOf(schema)( + Row(1, "u-100", 10L) + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + columnSelection = Some(ColumnSelection.IncludeColumns( + Seq( + UnqualifiedColumnName("id"), + UnqualifiedColumnName("`user.id`") + ) + )) + ), + resolvedSequencingType = LongType + ) + + val result = processor.preprocessMicrobatch(batch) + + assert(result.schema.fieldNames.toSeq == Seq( + "id", "user.id", + Scd2BatchProcessor.startAtColName, + Scd2BatchProcessor.endAtColName, + AutoCdcReservedNames.cdcMetadataColName + )) + checkAnswer( + df = result, + expectedAnswer = Row(1, "u-100", 10L, null, Row(10L)) + ) + } + + test("preprocessMicrobatch correctly populates framework columns even when ExcludeColumns " + + "drops the columns referenced by sequencing and deleteCondition") { + val schema = new StructType() + .add("id", IntegerType) + .add("value", StringType) + // Both seq and is_delete are referenced by the flow's sequencing / deleteCondition + // expressions, but the user wants them excluded from the target table. + .add("seq", LongType) + .add("is_delete", BooleanType) + + val batch = microbatchOf(schema)( + Row(1, "alice", 10L, false), + Row(1, null, 20L, true) + ) + + val processor = Scd2BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + sequencing = F.col("seq"), + storedAsScdType = ScdType.Type2, + deleteCondition = Some(F.col("is_delete")), + columnSelection = Some(ColumnSelection.ExcludeColumns( + Seq(UnqualifiedColumnName("seq"), UnqualifiedColumnName("is_delete")) + )) + ), + resolvedSequencingType = LongType + ) + + // The orchestrator runs row-extension steps before column selection, so the framework + // columns reference seq / is_delete fully even though the final projection drops them. + val result = processor.preprocessMicrobatch(batch) + + assert(result.schema.fieldNames.toSeq == Seq( + "id", "value", + Scd2BatchProcessor.startAtColName, + Scd2BatchProcessor.endAtColName, + AutoCdcReservedNames.cdcMetadataColName + )) + checkAnswer( + df = result, + expectedAnswer = Seq( + Row(1, "alice", 10L, null, Row(10L)), + Row(1, null, 20L, 20L, Row(20L)) + ) + ) + } + + // =============== computeMinimumSequencePerKey tests =============== + + test("computeMinimumSequencePerKey returns one row per distinct key and aggregates across " + + "both upsert and delete events") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + .add("is_delete", BooleanType) + + val processor = processorWithKeys( + keys = Seq("id"), + deleteCondition = Some(F.col("is_delete")) + ) + + // Two keys, each with multiple events including at least one delete and at least one + // out-of-order sequence. Delete events must feed into the per-key min exactly like + // upserts: `preprocessMicrobatch` stamps `__RECORD_START_AT = sequencing` on every + // row regardless of kind, so the min computation cannot legitimately ignore deletes. + // (If it did, the early-delete-bisects-late-upsert reconciliation case would silently + // lose its anchor pull-in via the find* paths.) + val raw = microbatchOf(schema)( + Row(1, 30L, false), // out-of-order: appears before lower sequences in the input + Row(1, 10L, true), // delete - smallest sequence for key=1 + Row(1, 20L, false), + Row(2, 50L, false), + Row(2, 40L, true) // delete - smallest sequence for key=2 + ) + + val preprocessed = processor.preprocessMicrobatch(raw) + val result = processor.computeMinimumSequencePerKey(preprocessed) + + assert(result.schema.fieldNames.toSeq == Seq( + "id", Scd2BatchProcessor.minSequenceColName + )) + checkAnswer( + df = result, + expectedAnswer = Seq( + Row(1, 10L), + Row(2, 40L) + ) + ) + } + + test("computeMinimumSequencePerKey is compatible with composite keys") { + val schema = new StructType() + .add("region", StringType) + .add("customer_id", IntegerType) + .add("seq", LongType) + + val processor = processorWithKeys(keys = Seq("region", "customer_id")) + + // Three composite-key tuples that share their first or second key column. If the + // function mistakenly grouped by `region` alone, (US, 1) and (US, 2) would collapse + // and we'd see only two output rows; if it grouped by `customer_id` alone, + // (US, 1) and (EU, 1) would collapse. + val raw = microbatchOf(schema)( + Row("US", 1, 100L), + Row("US", 1, 50L), // smaller sequence for (US, 1) + Row("US", 2, 200L), + Row("EU", 1, 30L) + ) + + val preprocessed = processor.preprocessMicrobatch(raw) + val result = processor.computeMinimumSequencePerKey(preprocessed) + + assert(result.schema.fieldNames.toSeq == Seq( + "region", "customer_id", Scd2BatchProcessor.minSequenceColName + )) + checkAnswer( + df = result, + expectedAnswer = Seq( + Row("US", 1, 50L), + Row("US", 2, 200L), + Row("EU", 1, 30L) + ) + ) + } + + test("computeMinimumSequencePerKey returns an empty result for an empty microbatch") { + val schema = new StructType() + .add("id", IntegerType) + .add("seq", LongType) + + val processor = processorWithKeys(keys = Seq("id")) + + val raw = microbatchOf(schema)() + val preprocessed = processor.preprocessMicrobatch(raw) + val result = processor.computeMinimumSequencePerKey(preprocessed) + + assert(result.collect().isEmpty) + } + + test("computeMinimumSequencePerKey resolves key columns containing a literal dot") { + // Symmetric to the dotted-name test for findAffectedRowsFromAuxiliaryTable: the + // `groupBy(keysQuoted.map(F.col): _*)` site relies on `keysQuoted` correctly + // backtick-quoting "a.b" so that F.col parses it as a literal column name (rather + // than struct-field access). Pins the F.col axis of the keysQuoted vs keysRaw split. + val schema = new StructType() + .add("a.b", IntegerType) + .add("seq", LongType) + + val processor = processorWithKeys(keys = Seq("`a.b`")) + + val raw = microbatchOf(schema)( + Row(1, 30L), + Row(1, 10L) + ) + val preprocessed = processor.preprocessMicrobatch(raw) + val result = processor.computeMinimumSequencePerKey(preprocessed) + + assert(result.schema.fieldNames.toSeq == Seq( + "a.b", Scd2BatchProcessor.minSequenceColName + )) + checkAnswer( + df = result, + expectedAnswer = Seq(Row(1, 10L)) + ) + } + + // =============== findAffectedRowsFromAuxiliaryTable tests =============== + + test("findAffectedRowsFromAuxiliaryTable returns the anchor row per key") { + val processor = processorWithKeys(Seq("id")) + + // Two keys to demonstrate per-key anchor isolation. + // + // Input row shape per `auxTableOf`: + // (id, value, __START_AT, __END_AT, Row(recordStartAt, deletedByBatchId)) + // + // Key 1: aux rows at recordStartAt 3, 5, 10. minSeq = 10. + // - 3 -> older than the anchor; dropped. + // - 5 -> anchor (max < 10); included. + // - 10 -> at minSeq; included via the >= branch (NOT as anchor; selection is strict <). + // Key 2: only one aux row at 7, minSeq = 7. + // - 7 -> at minSeq; included via >= branch. No anchor (no rows < 7 for this key). + val aux = auxTableOf(singleKeyUserSchema)( + Row(1, "v1.3", 3L, null, Row(3L, null)), + Row(1, "v1.5", 5L, null, Row(5L, null)), + Row(1, "v1.10", 10L, null, Row(10L, null)), + Row(2, "v2.7", 7L, null, Row(7L, null)) + ) + val minSeq = minSeqOf(singleKeyKeySchema)( + Row(1, 10L), + Row(2, 7L) + ) + + val result = processor.findAffectedRowsFromAuxiliaryTable( + rawAuxiliaryTableDf = aux, + perKeyMinimumSequenceInMicrobatch = minSeq, + batchId = 100L + ) + + checkAnswer( + df = result, + expectedAnswer = Seq( + Row(1, "v1.5", 5L, null, Row(5L)), // anchor for key=1 + Row(1, "v1.10", 10L, null, Row(10L)), // >= minSeq for key=1 + Row(2, "v2.7", 7L, null, Row(7L)) // >= minSeq for key=2 (no anchor) + ) + ) + } + + test("findAffectedRowsFromAuxiliaryTable pulls in both tombstone and no-op upsert rows") { + val processor = processorWithKeys(Seq("id")) + + // Aux carries a mix of row kinds for one key. The find function does NOT distinguish + // between them - it filters purely on `recordStartAt` - so a tombstone, a no-op upsert + // run head, and a continuation are all eligible anchor candidates and all eligible for + // the >= minSeq inclusion branch. + val aux = auxTableOf(singleKeyUserSchema)( + // Tombstone at recordStartAt = 3 (deleted at sequence 3): startAt = endAt = 3. + // Older than the anchor; dropped. + Row(1, null, 3L, 3L, Row(3L, null)), + // No-op upsert continuation at recordStartAt = 7: startAt inherits its run head's + // recordStartAt, endAt is null. Anchor for minSeq=10 (max < 10). + Row(1, "alice", 5L, null, Row(7L, null)), + // Tombstone at recordStartAt = 12: at-or-after minSeq, included via >= branch. + Row(1, null, 12L, 12L, Row(12L, null)), + // No-op upsert continuation at recordStartAt = 15: included via >= branch. + Row(1, "bob", 13L, null, Row(15L, null)) + ) + val minSeq = minSeqOf(singleKeyKeySchema)(Row(1, 10L)) + + val result = processor.findAffectedRowsFromAuxiliaryTable( + rawAuxiliaryTableDf = aux, + perKeyMinimumSequenceInMicrobatch = minSeq, + batchId = 100L + ) + + checkAnswer( + df = result, + expectedAnswer = Seq( + Row(1, "alice", 5L, null, Row(7L)), + Row(1, null, 12L, 12L, Row(12L)), + Row(1, "bob", 13L, null, Row(15L)) + ) + ) + } + + test("findAffectedRowsFromAuxiliaryTable pulls in both consecutive no-op upsert events " + + "being interleaved by incoming microbatch row") { + val processor = processorWithKeys(Seq("id")) + + val aux = auxTableOf(singleKeyUserSchema)( + Row(1, "alice", 2L, null, Row(8L, null)), + Row(1, "alice", 2L, null, Row(12L, null)) + ) + val minSeq = minSeqOf(singleKeyKeySchema)(Row(1, 10L)) + + val result = processor.findAffectedRowsFromAuxiliaryTable( + rawAuxiliaryTableDf = aux, + perKeyMinimumSequenceInMicrobatch = minSeq, + batchId = 100L + ) + + checkAnswer( + df = result, + expectedAnswer = Seq( + // Row with record start at of 8 gets pulled in as an anchor, + Row(1, "alice", 2L, null, Row(8L)), + // Row with record start at of 12 gets pulled in as a regular affected row. + Row(1, "alice", 2L, null, Row(12L)) + ) + ) + } + + test("findAffectedRowsFromAuxiliaryTable selects tombstones as anchor if applicable") { + val processor = processorWithKeys(Seq("id")) + + // Tombstone-as-anchor is incidental: the find function selects the anchor purely on + // `max recordStartAt < minSeq`, so a tombstone qualifies just like any other row kind. + // Downstream reconciliation does not actually rely on the anchor when it is a + // tombstone (a delete already closed the prior run, so any subsequent incoming event + // is necessarily a fresh run head regardless of whether the anchor is surfaced). We + // still pull it in as a harmless side effect of the range filter, and this behavior is + // documented via test. + val aux = auxTableOf(singleKeyUserSchema)( + Row(1, null, 7L, 7L, Row(7L, null)), + Row(1, null, 12L, 12L, Row(12L, null)) + ) + val minSeq = minSeqOf(singleKeyKeySchema)(Row(1, 10L)) + + val result = processor.findAffectedRowsFromAuxiliaryTable( + rawAuxiliaryTableDf = aux, + perKeyMinimumSequenceInMicrobatch = minSeq, + batchId = 100L + ) + + checkAnswer( + df = result, + expectedAnswer = Seq( + // Pulled in as anchor. + Row(1, null, 7L, 7L, Row(7L)), + // Pulled in as regular affected row. + Row(1, null, 12L, 12L, Row(12L)) + ) + ) + } + + test("findAffectedRowsFromAuxiliaryTable filters logically-deleted aux rows") { + val processor = processorWithKeys(Seq("id")) + + val currentBatchId = 100L + val differentBatchId = 99L + + // The idempotency filter retains rows deleted by `currentBatchId` (so a mid-flight + // retry sees its own prior writes) and drops rows deleted by any other batch. This + // applies uniformly to both the anchor and non-anchor affected rows. + val aux = auxTableOf(singleKeyUserSchema)( + // Anchor candidate (recordStartAt < minSeq): + Row(1, "anchor", 5L, null, Row(5L, currentBatchId)), // deleted by current -> kept + // At-or-after minSeq: + Row(1, "live", 10L, null, Row(10L, null)), // not deleted -> kept + Row(1, "retried", 11L, null, Row(11L, currentBatchId)), // deleted by current -> kept + Row(1, "ignored", 12L, null, Row(12L, differentBatchId)) // deleted by another -> dropped + ) + val minSeq = minSeqOf(singleKeyKeySchema)(Row(1, 10L)) + + val result = processor.findAffectedRowsFromAuxiliaryTable( + rawAuxiliaryTableDf = aux, + perKeyMinimumSequenceInMicrobatch = minSeq, + batchId = currentBatchId + ) + + checkAnswer( + df = result, + expectedAnswer = Seq( + Row(1, "anchor", 5L, null, Row(5L)), + Row(1, "live", 10L, null, Row(10L)), + Row(1, "retried", 11L, null, Row(11L)) + ) + ) + } + + test("findAffectedRowsFromAuxiliaryTable narrows CDC metadata column to match target's") { + val processor = processorWithKeys(Seq("id")) + + // Pre-condition: aux's `_cdc_metadata` carries __RECORD_START_AT and __DELETED_BY_BATCH_ID. + // The find function must strip the aux-only field so the result is union-compatible + // with target-table rows and preprocessed-microbatch rows downstream. + val aux = auxTableOf(singleKeyUserSchema)(Row(1, "v", 5L, null, Row(5L, null))) + val minSeq = minSeqOf(singleKeyKeySchema)(Row(1, 10L)) + + val result = processor.findAffectedRowsFromAuxiliaryTable( + rawAuxiliaryTableDf = aux, + perKeyMinimumSequenceInMicrobatch = minSeq, + batchId = 100L + ) + + val cdcMetadataField = result.schema(AutoCdcReservedNames.cdcMetadataColName) + assert(cdcMetadataField.dataType == Scd2BatchProcessor.targetCdcMetadataColSchema(LongType)) + } + + test("findAffectedRowsFromAuxiliaryTable resolves key columns containing a literal dot") { + val processor = processorWithKeys(Seq("`a.b`")) + val keySchema = new StructType().add("a.b", IntegerType) + val userSchema = keySchema.add("value", StringType) + + val aux = auxTableOf(userSchema)(Row(1, "v", 5L, null, Row(5L, null))) + val minSeq = minSeqOf(keySchema)(Row(1, 10L)) + + val result = processor.findAffectedRowsFromAuxiliaryTable( + rawAuxiliaryTableDf = aux, + perKeyMinimumSequenceInMicrobatch = minSeq, + batchId = 100L + ) + + // The lone aux row is the anchor (recordStartAt=5 < minSeq=10, no other candidates). + checkAnswer( + df = result, + expectedAnswer = Seq(Row(1, "v", 5L, null, Row(5L))) + ) + } + + test("findAffectedRowsFromAuxiliaryTable respects composite keys") { + val keySchema = new StructType() + .add("region", StringType) + .add("customer_id", IntegerType) + val userSchema = keySchema.add("name", StringType) + + val processor = processorWithKeys(Seq("region", "customer_id")) + + // Three composite keys: (US, 1), (EU, 1), (US, 2). Each is independent. + // (US, 1): anchor at 3; row at 10 included via >=. + // (EU, 1): anchor at 4; no rows at or after 12 -> only the anchor. + // (US, 2): no aux rows -> contributes nothing. + val aux = auxTableOf(userSchema)( + Row("US", 1, "us1.3", 3L, null, Row(3L, null)), + Row("US", 1, "us1.10", 10L, null, Row(10L, null)), + Row("EU", 1, "eu1.4", 4L, null, Row(4L, null)) + ) + val minSeq = minSeqOf(keySchema)( + Row("US", 1, 10L), + Row("EU", 1, 12L), + Row("US", 2, 100L) + ) + + val result = processor.findAffectedRowsFromAuxiliaryTable( + rawAuxiliaryTableDf = aux, + perKeyMinimumSequenceInMicrobatch = minSeq, + batchId = 100L + ) + + checkAnswer( + df = result, + expectedAnswer = Seq( + Row("US", 1, "us1.3", 3L, null, Row(3L)), + Row("US", 1, "us1.10", 10L, null, Row(10L)), + Row("EU", 1, "eu1.4", 4L, null, Row(4L)) + ) + ) + } + + test("findAffectedRowsFromAuxiliaryTable returns an empty result when the aux table is empty") { + val processor = processorWithKeys(Seq("id")) + + val aux = auxTableOf(singleKeyUserSchema)() + val minSeq = minSeqOf(singleKeyKeySchema)(Row(1, 10L)) + + val result = processor.findAffectedRowsFromAuxiliaryTable( + rawAuxiliaryTableDf = aux, + perKeyMinimumSequenceInMicrobatch = minSeq, + batchId = 100L + ) + + assert(result.collect().isEmpty) + } + + test("findAffectedRowsFromAuxiliaryTable returns no rows for a microbatch key that has " + + "no rows in the aux table") { + val processor = processorWithKeys(Seq("id")) + + // Aux only has rows for key=1. Microbatch only sees key=2. + val aux = auxTableOf(singleKeyUserSchema)(Row(1, "v", 5L, null, Row(5L, null))) + val minSeq = minSeqOf(singleKeyKeySchema)(Row(2, 10L)) + + val result = processor.findAffectedRowsFromAuxiliaryTable( + rawAuxiliaryTableDf = aux, + perKeyMinimumSequenceInMicrobatch = minSeq, + batchId = 100L + ) + + assert(result.collect().isEmpty) + } + + test("findAffectedRowsFromAuxiliaryTable excludes aux rows for keys not in the microbatch") { + val processor = processorWithKeys(Seq("id")) + + // Aux has rows for keys 1 and 2. Microbatch only mentions key=1, so key=2's aux rows + // must be dropped (the inner join with minSeq strips them). + val aux = auxTableOf(singleKeyUserSchema)( + Row(1, "v1", 5L, null, Row(5L, null)), + Row(2, "v2", 7L, null, Row(7L, null)) + ) + val minSeq = minSeqOf(singleKeyKeySchema)(Row(1, 10L)) + + val result = processor.findAffectedRowsFromAuxiliaryTable( + rawAuxiliaryTableDf = aux, + perKeyMinimumSequenceInMicrobatch = minSeq, + batchId = 100L + ) + + checkAnswer( + df = result, + expectedAnswer = Seq(Row(1, "v1", 5L, null, Row(5L))) + ) + } + + // =============== findAffectedRowsFromTargetTable tests =============== + + test("findAffectedRowsFromTargetTable includes both closed and active affected rows") { + val processor = processorWithKeys(Seq("id")) + + // Single key with four target rows: + // - row closed at endAt=5 -> < minSeq=10 -> excluded + // - row closed at endAt=10 -> = minSeq=10 -> included (>=) + // - row closed at endAt=15 -> > minSeq=10 -> included + // - row active (endAt=null) -> always included + val target = targetTableOf(singleKeyUserSchema)( + Row(1, "old", 1L, 5L, Row(1L)), + Row(1, "edge", 5L, 10L, Row(5L)), + Row(1, "recent", 10L, 15L, Row(10L)), + Row(1, "active", 15L, null, Row(15L)) + ) + val minSeq = minSeqOf(singleKeyKeySchema)(Row(1, 10L)) + + val result = processor.findAffectedRowsFromTargetTable( + targetTableDf = target, + perKeyMinimumSequenceInMicrobatch = minSeq + ) + + checkAnswer( + df = result, + expectedAnswer = Seq( + Row(1, "edge", 5L, 10L, Row(5L)), + Row(1, "recent", 10L, 15L, Row(10L)), + Row(1, "active", 15L, null, Row(15L)) + ) + ) + } + + test("findAffectedRowsFromTargetTable computes inclusion independently per key") { + val processor = processorWithKeys(Seq("id")) + + // Two keys with overlapping endAt ranges but different per-key minSeqs. Each key is + // reconciled independently against its own minSeq. + val target = targetTableOf(singleKeyUserSchema)( + // Key 1: minSeq=10. "active" (null) and "recent" (15) are at/after 10. + Row(1, "k1.old", 1L, 5L, Row(1L)), + Row(1, "k1.recent", 5L, 15L, Row(5L)), + Row(1, "k1.active", 15L, null, Row(15L)), + // Key 2: minSeq=20. Only "active" (null) is at/after 20. + Row(2, "k2.old", 1L, 10L, Row(1L)), + Row(2, "k2.recent", 10L, 18L, Row(10L)), + Row(2, "k2.active", 18L, null, Row(18L)) + ) + val minSeq = minSeqOf(singleKeyKeySchema)( + Row(1, 10L), + Row(2, 20L) + ) + + val result = processor.findAffectedRowsFromTargetTable( + targetTableDf = target, + perKeyMinimumSequenceInMicrobatch = minSeq + ) + + checkAnswer( + df = result, + expectedAnswer = Seq( + Row(1, "k1.recent", 5L, 15L, Row(5L)), + Row(1, "k1.active", 15L, null, Row(15L)), + Row(2, "k2.active", 18L, null, Row(18L)) + ) + ) + } + + test("findAffectedRowsFromTargetTable respects composite keys") { + val keySchema = new StructType() + .add("region", StringType) + .add("customer_id", IntegerType) + val userSchema = keySchema.add("name", StringType) + + val processor = processorWithKeys(Seq("region", "customer_id")) + + // (US, 1) and (EU, 1) are distinct composite keys. (US, 1)'s active row is included + // for minSeq=10; (EU, 1)'s active row is included for minSeq=12; (EU, 1)'s old closed + // row at endAt=5 is excluded (5 < 12). (US, 2) has no target rows. + val target = targetTableOf(userSchema)( + Row("US", 1, "us1", 1L, null, Row(1L)), + Row("EU", 1, "eu1.old", 1L, 5L, Row(1L)), + Row("EU", 1, "eu1", 5L, null, Row(5L)) + ) + val minSeq = minSeqOf(keySchema)( + Row("US", 1, 10L), + Row("EU", 1, 12L), + Row("US", 2, 100L) + ) + + val result = processor.findAffectedRowsFromTargetTable( + targetTableDf = target, + perKeyMinimumSequenceInMicrobatch = minSeq + ) + + checkAnswer( + df = result, + expectedAnswer = Seq( + Row("US", 1, "us1", 1L, null, Row(1L)), + Row("EU", 1, "eu1", 5L, null, Row(5L)) + ) + ) + } + + test("findAffectedRowsFromTargetTable returns an empty result when the target table is empty") { + val processor = processorWithKeys(Seq("id")) + + val target = targetTableOf(singleKeyUserSchema)() + val minSeq = minSeqOf(singleKeyKeySchema)(Row(1, 10L)) + + val result = processor.findAffectedRowsFromTargetTable( + targetTableDf = target, + perKeyMinimumSequenceInMicrobatch = minSeq + ) + + assert(result.collect().isEmpty) + } + + test("findAffectedRowsFromTargetTable returns no rows for a microbatch key that has " + + "no rows in the target table") { + val processor = processorWithKeys(Seq("id")) + + // Target only has rows for key=1. Microbatch only sees key=2. + val target = targetTableOf(singleKeyUserSchema)(Row(1, "v", 1L, null, Row(1L))) + val minSeq = minSeqOf(singleKeyKeySchema)(Row(2, 10L)) + + val result = processor.findAffectedRowsFromTargetTable( + targetTableDf = target, + perKeyMinimumSequenceInMicrobatch = minSeq + ) + + assert(result.collect().isEmpty) + } +} diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala index 5ebdb4b4c86d2..8538ef92a588b 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.{Column, Row} import org.apache.spark.sql.connector.catalog.SharedTablesInMemoryRowLevelOperationTableCatalog import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.pipelines.autocdc.{ + AutoCdcReservedNames, ChangeArgs, ColumnSelection, Scd1BatchProcessor, @@ -145,7 +146,7 @@ trait AutoCdcGraphExecutionTestMixin extends BeforeAndAfterEach { * Assumes sequence type is BIGINT (Long). */ protected val cdcMetadataDdl: String = { - val col = Scd1BatchProcessor.cdcMetadataColName + val col = AutoCdcReservedNames.cdcMetadataColName val del = Scd1BatchProcessor.cdcDeleteSequenceFieldName val ups = Scd1BatchProcessor.cdcUpsertSequenceFieldName s"$col STRUCT<$del:BIGINT,$ups:BIGINT> NOT NULL" diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala index 5a9f6cb6710be..78dbb70027b45 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala @@ -21,8 +21,8 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.execution.streaming.runtime.MemoryStream import org.apache.spark.sql.functions import org.apache.spark.sql.pipelines.autocdc.{ + AutoCdcReservedNames, ColumnSelection, - Scd1BatchProcessor, UnqualifiedColumnName } import org.apache.spark.sql.pipelines.utils.{ExecutionTest, TestGraphRegistrationContext} @@ -157,7 +157,7 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite // The auxiliary table only contains keys and the metadata column, hence "name" should not be // included. - assert(auxSchema.fieldNames.toSeq == Seq("id", Scd1BatchProcessor.cdcMetadataColName)) + assert(auxSchema.fieldNames.toSeq == Seq("id", AutoCdcReservedNames.cdcMetadataColName)) assert(getAuxTableKeyColumnNames(target = "target") == Seq("id")) } @@ -195,7 +195,7 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite val auxSchema = spark.table(auxTableNameFor("target")).schema assert(auxSchema.fieldNames.toSeq == - Seq("region", "id", Scd1BatchProcessor.cdcMetadataColName)) + Seq("region", "id", AutoCdcReservedNames.cdcMetadataColName)) assert(getAuxTableKeyColumnNames(target = "target") == Seq("region", "id")) } diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala index 46f8ee47db02f..a5f3a13a012a6 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.pipelines.graph import org.apache.spark.sql.Row import org.apache.spark.sql.execution.streaming.runtime.MemoryStream import org.apache.spark.sql.functions -import org.apache.spark.sql.pipelines.autocdc.Scd1BatchProcessor +import org.apache.spark.sql.pipelines.autocdc.AutoCdcReservedNames import org.apache.spark.sql.pipelines.utils.{ExecutionTest, TestGraphRegistrationContext} import org.apache.spark.sql.test.SharedSparkSession @@ -147,8 +147,8 @@ class AutoCdcScd1TargetTableDurabilitySuite val schema = spark.table(s"$catalog.$namespace.target").schema assert( - schema.fieldNames.contains(Scd1BatchProcessor.cdcMetadataColName), - s"Target must have ${Scd1BatchProcessor.cdcMetadataColName} after first AutoCDC run; " + + schema.fieldNames.contains(AutoCdcReservedNames.cdcMetadataColName), + s"Target must have ${AutoCdcReservedNames.cdcMetadataColName} after first AutoCDC run; " + s"got ${schema.fieldNames.toSeq}" ) checkAnswer(