Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,15 @@ private[pipelines] object AutoCdcReservedNames {

/** Common reserved-name prefix shared by AutoCDC internal columns and internal tables. */
val prefix: String = "__spark_autocdc_"

/**
* Reserved name of the operational metadata column AutoCDC that is projected on every AutoCDC
* microbatch, auxiliary table, and target table.
*
* Shared across all SCD strategies and across the flow resolution, batch-processor, and
* streaming-write layers.
*
* Note that the schema of the CDC metadata column however can and does differ on the SCD-type.
*/
val cdcMetadataColName: String = s"${prefix}metadata"
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ case class Scd1BatchProcessor(
F.when(rowDeleteSequence.isNull, changeArgs.sequencing).otherwise(F.lit(null))

validatedMicrobatch.withColumn(
Scd1BatchProcessor.cdcMetadataColName,
AutoCdcReservedNames.cdcMetadataColName,
Scd1BatchProcessor.constructCdcMetadataCol(
deleteSequence = rowDeleteSequence,
upsertSequence = rowUpsertSequence,
Expand Down Expand Up @@ -175,7 +175,7 @@ case class Scd1BatchProcessor(
schema = microbatchWithCdcMetadataDf.schema,
columnSelection = Some(
ColumnSelection.ExcludeColumns(
Seq(UnqualifiedColumnName(Scd1BatchProcessor.cdcMetadataColName))
Seq(UnqualifiedColumnName(AutoCdcReservedNames.cdcMetadataColName))
)
),
caseSensitive = caseSensitiveColumnComparison
Expand All @@ -197,7 +197,7 @@ case class Scd1BatchProcessor(
// select. Identifiers could have special characters such as '.'.
F.col(QuotingUtils.quoteIdentifier(colName))
}) :+ F.col(
Scd1BatchProcessor.cdcMetadataColName
AutoCdcReservedNames.cdcMetadataColName
)

microbatchWithCdcMetadataDf.select(
Expand All @@ -223,7 +223,7 @@ case class Scd1BatchProcessor(
val aliasedMicrobatchDf = microbatchDf.alias("microbatch")
val aliasedAuxiliaryTableDf = auxiliaryTableDf.alias("auxiliaryTable")

val cdcMetadata = Scd1BatchProcessor.cdcMetadataColName
val cdcMetadata = AutoCdcReservedNames.cdcMetadataColName

val microbatchCdcMetadata = F.col(s"microbatch.$cdcMetadata")
val effectiveSeq = F.greatest(
Expand Down Expand Up @@ -267,7 +267,7 @@ case class Scd1BatchProcessor(
auxiliaryTableIdentifier: TableIdentifier
): Unit = {
val auxIdentQuoted = auxiliaryTableIdentifier.quotedString
val meta = Scd1BatchProcessor.cdcMetadataColName
val meta = AutoCdcReservedNames.cdcMetadataColName

// Project the reconciled microbatch down to just keys + `_cdc_metadata`; data columns are
// irrelevant for the auxiliary table and should not be persisted.
Expand Down Expand Up @@ -330,7 +330,7 @@ case class Scd1BatchProcessor(
reconciledMicrobatchDf: DataFrame,
targetTableIdentifier: TableIdentifier
): Unit = {
val meta = Scd1BatchProcessor.cdcMetadataColName
val meta = AutoCdcReservedNames.cdcMetadataColName

val destinationTableStr = targetTableIdentifier.quotedString
// (Re-)alias the reconciled microbatch DF for easy reference for the remainder of the merge.
Expand Down Expand Up @@ -415,7 +415,6 @@ object Scd1BatchProcessor {
* enforced at [[org.apache.spark.sql.pipelines.graph.AutoCdcMergeFlow]] construction.
*/
private[autocdc] val winningRowColName: String = s"${AutoCdcReservedNames.prefix}winning_row"
private[pipelines] val cdcMetadataColName: String = s"${AutoCdcReservedNames.prefix}metadata"

private[pipelines] val cdcDeleteSequenceFieldName: String = "deleteSequence"
private[pipelines] val cdcUpsertSequenceFieldName: String = "upsertSequence"
Expand Down
Loading