diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala index 0d1c33be21727..03e27e1430709 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala @@ -20,10 +20,14 @@ package org.apache.spark.sql.pipelines.graph import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal +import org.json4s.JsonAST.{JArray, JString} +import org.json4s.jackson.JsonMethods.{compact, parse} + import org.apache.spark.SparkException import org.apache.spark.internal.{Logging, LogKeys} import org.apache.spark.sql.{AnalysisException, Dataset, Row} @@ -346,8 +350,6 @@ object AutoCdcAuxiliaryTable { * upstream. */ def serializeKeyColumnNames(names: Seq[String]): String = { - import org.json4s.JsonAST.{JArray, JString} - import org.json4s.jackson.JsonMethods.compact compact(JArray(names.map(JString(_)).toList)) } @@ -357,8 +359,6 @@ object AutoCdcAuxiliaryTable { * upstream. */ def parseKeyColumnNames(raw: String): Option[Seq[String]] = { - import org.json4s.JsonAST.{JArray, JString} - import org.json4s.jackson.JsonMethods.parse val parsed = try Some(parse(raw)) catch { case NonFatal(_) => None } parsed.flatMap { case JArray(elems) => @@ -406,7 +406,7 @@ trait AutoCdcMergeWriteBase { val (catalog, v2Identifier) = PipelinesCatalogUtils.resolveTableCatalog(spark, auxIdent) if (!catalog.tableExists(v2Identifier)) { - val properties = scala.collection.mutable.Map.empty[String, String] + val properties = mutable.Map.empty[String, String] // Inherit the target's format so MERGE semantics line up. When unspecified, omit the // provider so the catalog falls back to its default. @@ -440,18 +440,21 @@ trait AutoCdcMergeWriteBase { } /** - * Returns the resolved AutoCDC key column names as they appear in the auxiliary schema, in - * `changeArgs.keys` declaration order. + * Resolves each AutoCDC key in `changeArgs.keys` to its [[StructField]] in + * [[auxiliaryTableSchema]], preserving `changeArgs.keys` declaration order. This is the + * expected (flow-declared) side of drift validation, distinct from the keys recorded on an + * existing auxiliary table. + * + * [[AutoCdcMergeFlow]] should have validated that all `changeArgs.keys` exist in the deduced + * aux/target schemas by now, so a missing key is an internal error rather than a user-facing + * condition. */ - private def auxiliaryKeyColumnNames: Seq[String] = { + private def expectedAuxiliaryKeyFields: Seq[StructField] = { val resolver = spark.sessionState.conf.resolver changeArgs.keys.map { key => auxiliaryTableSchema.fields .find(field => resolver(field.name, key.name)) - .map(_.name) .getOrElse( - // This should never happen at this point, as [[AutoCdcMergeFlow]] should have validated - // all changeArgs.keys exist in the deduced aux/target table schemas by now. throw SparkException.internalError( s"AutoCDC key column '${key.name}' is missing from the auxiliary table schema " + s"for flow ${identifier.unquotedString} writing to target " + @@ -461,6 +464,12 @@ trait AutoCdcMergeWriteBase { } } + /** + * Returns the resolved AutoCDC key column names as they appear in the auxiliary schema, in + * `changeArgs.keys` declaration order. + */ + private def auxiliaryKeyColumnNames: Seq[String] = expectedAuxiliaryKeyFields.map(_.name) + /** * Validate that the target table's underlying connector implements * [[SupportsRowLevelOperations]], which is the V2 connector contract for MERGE/UPDATE/DELETE @@ -512,21 +521,10 @@ trait AutoCdcMergeWriteBase { val resolver = spark.sessionState.conf.resolver val existingAuxSchema = CatalogV2Util.v2ColumnsToStructType(existingAuxTable.columns()) - // The expected key fields are looked up in [[auxiliaryTableSchema]], which by construction - // contains every key column with its source-derived dataType. We deliberately do not look - // them up in [[existingAuxSchema]] - that's the recorded side, and conflating the two - // sides would mask drift. - val expectedKeyFields: Seq[StructField] = changeArgs.keys.map { key => - auxiliaryTableSchema.fields - .find(field => resolver(field.name, key.name)) - .getOrElse( - // Construction of [[auxiliaryTableSchema]] already enforces all of the user-specified - // keys are present, so if we don't find a key it is truly an internal error. - throw SparkException.internalError( - s"Key column '${key.name}' was not found in the AutoCDC auxiliary table schema." - ) - ) - } + // Resolve the flow-declared (expected) keys from [[auxiliaryTableSchema]]. We deliberately + // do not look them up in [[existingAuxSchema]] - that's the recorded side, and conflating + // the two sides would mask drift. See [[expectedAuxiliaryKeyFields]]. + val expectedKeyFields: Seq[StructField] = expectedAuxiliaryKeyFields val recordedKeyNames = parseRecordedKeyColumnNames(existingAuxTable, auxIdent) val recordedKeyFields: Seq[StructField] = recordedKeyNames.map { name => existingAuxSchema.fields diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala index 5ebdb4b4c86d2..52a0b9a5e6e75 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala @@ -21,7 +21,9 @@ import org.scalatest.{BeforeAndAfterEach, Suite} import org.apache.spark.SparkThrowable import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.classic.DataFrame import org.apache.spark.sql.connector.catalog.SharedTablesInMemoryRowLevelOperationTableCatalog +import org.apache.spark.sql.functions import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.pipelines.autocdc.{ ChangeArgs, @@ -207,6 +209,35 @@ trait AutoCdcGraphExecutionTestMixin extends BeforeAndAfterEach { ) ) + /** + * Build a single-flow AutoCDC pipeline: a [[TestGraphRegistrationContext]] that registers + * `target` under [[catalog]].[[namespace]] and one [[autoCdcFlow]] writing into it from + * `sourceDf`. Covers the common single-table/single-flow shape used across the AutoCDC E2E + * suites; tests that need multiple flows or non-AutoCDC datasets build the context inline. + */ + protected def singleAutoCdcFlowPipeline( + flowName: String, + target: String, + sourceDf: DataFrame, + keys: Seq[String], + sequencing: Column = functions.col("version"), + columnSelection: Option[ColumnSelection] = None, + deleteCondition: Option[Column] = None, + scdType: ScdType = ScdType.Type1): TestGraphRegistrationContext = + new TestGraphRegistrationContext(spark) { + registerTable(target, catalog = Some(catalog), database = Some(namespace)) + registerFlow(autoCdcFlow( + name = flowName, + target = target, + query = dfFlowFunc(sourceDf), + keys = keys, + sequencing = sequencing, + columnSelection = columnSelection, + deleteCondition = deleteCondition, + scdType = scdType + )) + } + /** Build a target row's `_cdc_metadata` struct value. */ protected def cdcMeta(deleteSeq: Option[Long], upsertSeq: Option[Long]): Row = Row(deleteSeq.orNull, upsertSeq.orNull) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala index 5a9f6cb6710be..6da4d4c898188 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala @@ -52,18 +52,11 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite // resume cleanly. val changeDataFeedStream = MemoryStream[(Int, String, Long)] def buildGraphRegistrationContext(): TestGraphRegistrationContext = - new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc( - changeDataFeedStream.toDF().toDF("id", "name", "version") - ), - keys = Seq("id"), - sequencing = functions.col("version") - )) - } + singleAutoCdcFlowPipeline( + "auto_cdc_flow", + "target", + changeDataFeedStream.toDF().toDF("id", "name", "version"), + Seq("id")) // Run #1: insert id=1 at seq=1. changeDataFeedStream.addData((1, "alice", 1L)) @@ -98,20 +91,17 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite // Single MemoryStream reused across both runs so the streaming checkpoint can resume. val stream = MemoryStream[(Int, String, Long, Boolean)] - def buildCtx(): TestGraphRegistrationContext = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc(stream.toDF().toDF("id", "name", "version", "is_delete")), - keys = Seq("id"), - sequencing = functions.col("version"), + def buildCtx(): TestGraphRegistrationContext = + singleAutoCdcFlowPipeline( + "auto_cdc_flow", + "target", + stream.toDF().toDF("id", "name", "version", "is_delete"), + Seq("id"), deleteCondition = Some(functions.col("is_delete") === true), columnSelection = Some(ColumnSelection.ExcludeColumns( Seq(UnqualifiedColumnName("is_delete")) )) - )) - } + ) // Run #1: delete id=1 at seq=10. Auxiliary table records seq=10 as the watermark. stream.addData((1, "alice", 10L, true)) @@ -141,17 +131,8 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite val stream = MemoryStream[(String, Int, Long)] stream.addData(("alice", 1, 1L)) - val ctx = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc(stream.toDF().toDF("name", "id", "version")), - keys = Seq("id"), - sequencing = functions.col("version") - )) - } - runPipeline(ctx) + runPipeline(singleAutoCdcFlowPipeline( + "auto_cdc_flow", "target", stream.toDF().toDF("name", "id", "version"), Seq("id"))) val auxSchema = spark.table(auxTableNameFor("target")).schema @@ -181,17 +162,9 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite val stream = MemoryStream[(String, Int, String, Long)] stream.addData(("v", 1, "us", 1L)) - val ctx = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc(stream.toDF().toDF("value", "id", "region", "version")), - keys = Seq("region", "id"), - sequencing = functions.col("version") - )) - } - runPipeline(ctx) + runPipeline(singleAutoCdcFlowPipeline( + "auto_cdc_flow", "target", stream.toDF().toDF("value", "id", "region", "version"), + Seq("region", "id"))) val auxSchema = spark.table(auxTableNameFor("target")).schema assert(auxSchema.fieldNames.toSeq == @@ -211,16 +184,9 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite // Single MemoryStream reused across both runs so the streaming checkpoint can resume. val stream = MemoryStream[(Int, Long)] - def buildCtx(): TestGraphRegistrationContext = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc(stream.toDF().toDF("id", "version")), - keys = Seq("id"), - sequencing = functions.col("version") - )) - } + def buildCtx(): TestGraphRegistrationContext = + singleAutoCdcFlowPipeline( + "auto_cdc_flow", "target", stream.toDF().toDF("id", "version"), Seq("id")) stream.addData((1, 1L)) runPipeline(buildCtx()) @@ -276,18 +242,12 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite // Single MemoryStream reused across both runs so the streaming checkpoint can resume. val stream = MemoryStream[(String, String, String, String, Long)] - def buildCtx(): TestGraphRegistrationContext = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc( - stream.toDF().toDF((keyNames :+ "version"): _*) - ), - keys = backtickQuotedKeys, - sequencing = functions.col("version") - )) - } + def buildCtx(): TestGraphRegistrationContext = + singleAutoCdcFlowPipeline( + "auto_cdc_flow", + "target", + stream.toDF().toDF((keyNames :+ "version"): _*), + backtickQuotedKeys) // Run #1: a single insert with arbitrary non-empty key values. stream.addData(("v1", "v2", "v3", "v4", 1L)) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala index 066d8afd53423..828c09b01db51 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala @@ -17,11 +17,12 @@ package org.apache.spark.sql.pipelines.graph +import org.apache.spark.sql.classic.DataFrame import org.apache.spark.sql.execution.streaming.runtime.MemoryStream -import org.apache.spark.sql.functions import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.pipelines.utils.{ExecutionTest, TestGraphRegistrationContext} import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.MetadataBuilder /** * End-to-end tests covering AutoCDC SCD1 key-drift validation: the AutoCDC flow's declared @@ -39,11 +40,10 @@ class AutoCdcScd1KeyDriftSuite with SharedSparkSession with AutoCdcGraphExecutionTestMixin { + import testImplicits._ + test("a pipeline execution that adds a key column to an existing AutoCDC flow triggers " + "KEY_SCHEMA_DRIFT") { - val session = spark - import session.implicits._ - // Target table carries both candidate key columns up-front so only the AutoCDC `keys` // declaration differs between the two pipelines. spark.sql( @@ -54,31 +54,13 @@ class AutoCdcScd1KeyDriftSuite // Pipeline #1 declares one key (`id`). Aux table is created with schema (id, _cdc_metadata). val stream1 = MemoryStream[(Int, String, Long)] stream1.addData((1, "us", 1L)) - val ctx1 = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "flow_v1", - target = "target", - query = dfFlowFunc(stream1.toDF().toDF("id", "region", "version")), - keys = Seq("id"), - sequencing = functions.col("version") - )) - } - runPipeline(ctx1) + runPipeline(buildPipeline("flow_v1", stream1.toDF().toDF("id", "region", "version"), Seq("id"))) // Pipeline #2 declares two keys (`region` + `id`) - arity drift. val stream2 = MemoryStream[(Int, String, Long)] stream2.addData((1, "us", 2L)) - val ctx2 = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "flow_v2", - target = "target", - query = dfFlowFunc(stream2.toDF().toDF("id", "region", "version")), - keys = Seq("region", "id"), - sequencing = functions.col("version") - )) - } + val ctx2 = buildPipeline( + "flow_v2", stream2.toDF().toDF("id", "region", "version"), Seq("region", "id")) val ex = intercept[RuntimeException] { runPipeline(ctx2) } checkErrorInPipelineFailure( @@ -100,9 +82,6 @@ class AutoCdcScd1KeyDriftSuite test("a pipeline execution that drops a key column from an existing AutoCDC flow triggers " + "KEY_SCHEMA_DRIFT") { - val session = spark - import session.implicits._ - spark.sql( s"CREATE TABLE $catalog.$namespace.target " + s"(region STRING NOT NULL, id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)" @@ -112,31 +91,13 @@ class AutoCdcScd1KeyDriftSuite // would slip through with `id` silently matching at position 0 of the recorded schema. val stream1 = MemoryStream[(String, Int, Long)] stream1.addData(("us", 1, 1L)) - val ctx1 = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "flow_v1", - target = "target", - query = dfFlowFunc(stream1.toDF().toDF("region", "id", "version")), - keys = Seq("region", "id"), - sequencing = functions.col("version") - )) - } - runPipeline(ctx1) + runPipeline(buildPipeline( + "flow_v1", stream1.toDF().toDF("region", "id", "version"), Seq("region", "id"))) // Pipeline #2 declares only [id] - arity drift. val stream2 = MemoryStream[(String, Int, Long)] stream2.addData(("us", 1, 2L)) - val ctx2 = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "flow_v2", - target = "target", - query = dfFlowFunc(stream2.toDF().toDF("region", "id", "version")), - keys = Seq("id"), - sequencing = functions.col("version") - )) - } + val ctx2 = buildPipeline("flow_v2", stream2.toDF().toDF("region", "id", "version"), Seq("id")) val ex = intercept[RuntimeException] { runPipeline(ctx2) } checkErrorInPipelineFailure( @@ -157,9 +118,6 @@ class AutoCdcScd1KeyDriftSuite test("a pipeline execution that swaps a key in an existing AutoCDC flow for a different name " + "(same arity) triggers KEY_SCHEMA_DRIFT") { - val session = spark - import session.implicits._ - spark.sql( s"CREATE TABLE $catalog.$namespace.target " + s"(id INT NOT NULL, region STRING NOT NULL, country STRING NOT NULL, " + @@ -169,33 +127,16 @@ class AutoCdcScd1KeyDriftSuite // Pipeline #1 declares [id, region]. val stream1 = MemoryStream[(Int, String, String, Long)] stream1.addData((1, "us", "USA", 1L)) - val ctx1 = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "flow_v1", - target = "target", - query = dfFlowFunc(stream1.toDF().toDF("id", "region", "country", "version")), - keys = Seq("id", "region"), - sequencing = functions.col("version") - )) - } - runPipeline(ctx1) + runPipeline(buildPipeline( + "flow_v1", stream1.toDF().toDF("id", "region", "country", "version"), Seq("id", "region"))) // Pipeline #2 declares [id, country] - same arity, different key set. An arity-only check // would silently match `id` at position 0 and the swapped `region`/`country` would slip // through; the by-name set comparison must catch it. val stream2 = MemoryStream[(Int, String, String, Long)] stream2.addData((1, "us", "USA", 2L)) - val ctx2 = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "flow_v2", - target = "target", - query = dfFlowFunc(stream2.toDF().toDF("id", "region", "country", "version")), - keys = Seq("id", "country"), - sequencing = functions.col("version") - )) - } + val ctx2 = buildPipeline( + "flow_v2", stream2.toDF().toDF("id", "region", "country", "version"), Seq("id", "country")) val ex = intercept[RuntimeException] { runPipeline(ctx2) } checkErrorInPipelineFailure( @@ -225,8 +166,6 @@ class AutoCdcScd1KeyDriftSuite s"""TBLPROPERTIES ('${AutoCdcAuxiliaryTable.keyColumnNamesProperty}' = '["id"]')""" ) - val session = spark - import session.implicits._ val stream = MemoryStream[(Int, Long)] stream.addData((1, 1L)) val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"), Seq("id")) @@ -247,9 +186,6 @@ class AutoCdcScd1KeyDriftSuite } test("a composite key reorder ([a,b] -> [b,a]) does NOT trigger drift validation") { - val session = spark - import session.implicits._ - spark.sql( s"CREATE TABLE $catalog.$namespace.target " + s"(a INT NOT NULL, b STRING NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)" @@ -261,39 +197,16 @@ class AutoCdcScd1KeyDriftSuite // columns and their dataTypes. val stream1 = MemoryStream[(Int, String, Long)] stream1.addData((1, "x", 1L)) - val ctx1 = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "flow_v1", - target = "target", - query = dfFlowFunc(stream1.toDF().toDF("a", "b", "version")), - keys = Seq("a", "b"), - sequencing = functions.col("version") - )) - } - runPipeline(ctx1) + runPipeline(buildPipeline("flow_v1", stream1.toDF().toDF("a", "b", "version"), Seq("a", "b"))) // Pipeline #2 declares the same key set in the reversed order [b, a]. Must NOT throw. val stream2 = MemoryStream[(Int, String, Long)] stream2.addData((2, "y", 1L)) - val ctx2 = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "flow_v2", - target = "target", - query = dfFlowFunc(stream2.toDF().toDF("a", "b", "version")), - keys = Seq("b", "a"), - sequencing = functions.col("version") - )) - } - runPipeline(ctx2) + runPipeline(buildPipeline("flow_v2", stream2.toDF().toDF("a", "b", "version"), Seq("b", "a"))) } test("a pipeline execution that changes a key column's nullability or metadata in an " + "existing AutoCDC flow does NOT trigger drift") { - val session = spark - import session.implicits._ - // Drift validation compares (name, dataType) pairs as a set. Nullability and column // metadata are part of [[StructField]] but not part of [[DataType]], so they do not gate // semantic equivalence: only the wire-format data type matters for merge correctness. @@ -314,7 +227,7 @@ class AutoCdcScd1KeyDriftSuite val stream2 = MemoryStream[(Option[Int], Long)] stream2.addData((Some(2), 2L)) val baseDf = stream2.toDF().toDF("id", "version") - val md = new org.apache.spark.sql.types.MetadataBuilder() + val md = new MetadataBuilder() .putString("description", "primary key") .build() val sourceDfWithMetadata = baseDf.select(baseDf("id").as("id", md), baseDf("version")) @@ -323,9 +236,6 @@ class AutoCdcScd1KeyDriftSuite test("a pipeline execution that wraps an existing AutoCDC flow's key in backticks does NOT " + "trigger drift") { - val session = spark - import session.implicits._ - // Backticks are a SQL-parse syntactic device, not part of the identifier itself. A user // adding or removing backticks around the same logical column must NOT be detected as drift. spark.sql( @@ -344,9 +254,6 @@ class AutoCdcScd1KeyDriftSuite test("a pipeline execution that drops backticks around an existing AutoCDC flow's " + "previously-backtick-quoted key does NOT trigger drift") { - val session = spark - import session.implicits._ - // The reverse direction of the previous test: drift validation must be backtick-invariant // on both the WRITE side (recorded property strips backticks when serializing the key // names in pipeline #1) and the READ side (resolver-aware lookup strips backticks when @@ -367,9 +274,6 @@ class AutoCdcScd1KeyDriftSuite test("under spark.sql.caseSensitive = true, an AutoCDC flow whose key differs only in case " + "from the recorded key triggers KEY_SCHEMA_DRIFT") { - val session = spark - import session.implicits._ - // validateNoAutoCdcKeyDrift uses spark.sessionState.conf.resolver, so its behavior on // `Id` vs `id` flips with the session conf. Pin the case-sensitive direction: pipeline #1 // seeds the aux table under the default resolver with recorded key `["id"]`, then @@ -410,9 +314,6 @@ class AutoCdcScd1KeyDriftSuite test("under the default (case-insensitive) resolver, an AutoCDC flow whose key differs only " + "in case from the recorded key does NOT trigger drift") { - val session = spark - import session.implicits._ - // Pairs with the case-sensitive test above: same recorded key, but under the default // resolver the two identifiers are equivalent so drift validation must accept pipeline // #2. This pins the negative direction so a regression that accidentally hard-codes a @@ -451,8 +352,6 @@ class AutoCdcScd1KeyDriftSuite s"CREATE TABLE ${auxTableNameFor("target")} (id INT NOT NULL, $cdcMetadataDdl)" ) - val session = spark - import session.implicits._ val stream = MemoryStream[(Int, Long)] stream.addData((1, 1L)) val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"), Seq("id")) @@ -486,8 +385,6 @@ class AutoCdcScd1KeyDriftSuite s"TBLPROPERTIES ('${AutoCdcAuxiliaryTable.keyColumnNamesProperty}' = '$malformedKeysArray')" ) - val session = spark - import session.implicits._ val stream = MemoryStream[(Int, Long)] stream.addData((1, 1L)) val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"), Seq("id")) @@ -523,8 +420,6 @@ class AutoCdcScd1KeyDriftSuite s"""TBLPROPERTIES ('${AutoCdcAuxiliaryTable.keyColumnNamesProperty}' = '["region"]')""" ) - val session = spark - import session.implicits._ val stream = MemoryStream[(Int, Long)] stream.addData((1, 1L)) val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"), Seq("id")) @@ -546,21 +441,12 @@ class AutoCdcScd1KeyDriftSuite /** * Build a single-flow pipeline targeting `cat.ns1.target` with the given source DF and key - * column list. + * column list. Thin wrapper over [[singleAutoCdcFlowPipeline]] since every drift test targets + * the same `target` table. */ private def buildPipeline( flowName: String, - sourceDf: org.apache.spark.sql.classic.DataFrame, - keys: Seq[String]): TestGraphRegistrationContext = { - new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = flowName, - target = "target", - query = dfFlowFunc(sourceDf), - keys = keys, - sequencing = functions.col("version") - )) - } - } + sourceDf: DataFrame, + keys: Seq[String]): TestGraphRegistrationContext = + singleAutoCdcFlowPipeline(flowName, "target", sourceDf, keys) } diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala index 0d3f6e954df34..1819905316911 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.pipelines.graph import org.apache.spark.sql.Row import org.apache.spark.sql.execution.streaming.runtime.MemoryStream -import org.apache.spark.sql.functions import org.apache.spark.sql.pipelines.utils.{ExecutionTest, TestGraphRegistrationContext} import org.apache.spark.sql.test.SharedSparkSession @@ -58,34 +57,16 @@ class AutoCdcScd1MultiPipelineSuite // cat.ns1.__spark_autocdc_aux_state_t_a must not affect pipeline #2's `t_b`. val streamA = MemoryStream[(Int, String, Long)] streamA.addData((1, "alice", 100L)) - val ctxA = new TestGraphRegistrationContext(spark) { - registerTable("t_a", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "flow_a", - target = "t_a", - query = dfFlowFunc(streamA.toDF().toDF("id", "name", "version")), - keys = Seq("id"), - sequencing = functions.col("version") - )) - } - runPipeline(ctxA) + runPipeline(singleAutoCdcFlowPipeline( + "flow_a", "t_a", streamA.toDF().toDF("id", "name", "version"), Seq("id"))) // Pipeline #2 only knows about `t_b`. Uses a deliberately *lower* sequence to verify // the watermark from pipeline #1's auxiliary table (seq=100) does not leak into // pipeline #2. val streamB = MemoryStream[(Int, String, Long)] streamB.addData((9, "bob", 1L)) - val ctxB = new TestGraphRegistrationContext(spark) { - registerTable("t_b", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "flow_b", - target = "t_b", - query = dfFlowFunc(streamB.toDF().toDF("id", "name", "version")), - keys = Seq("id"), - sequencing = functions.col("version") - )) - } - runPipeline(ctxB) + runPipeline(singleAutoCdcFlowPipeline( + "flow_b", "t_b", streamB.toDF().toDF("id", "name", "version"), Seq("id"))) checkAnswer( spark.table(s"$catalog.$namespace.t_a"), @@ -113,17 +94,8 @@ class AutoCdcScd1MultiPipelineSuite ) val stream = MemoryStream[(Int, String, Long)] stream.addData((1, "alice", 1L), (2, "bob", 1L)) - val ctxWriter = new TestGraphRegistrationContext(spark) { - registerTable("src", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "writer", - target = "src", - query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")), - keys = Seq("id"), - sequencing = functions.col("version") - )) - } - runPipeline(ctxWriter) + runPipeline(singleAutoCdcFlowPipeline( + "writer", "src", stream.toDF().toDF("id", "name", "version"), Seq("id"))) // Pipeline #2 is a regular materialized view that selects the user-data columns from // `src` (a different graph entirely). It must observe the merged AutoCDC rows and be @@ -161,17 +133,8 @@ class AutoCdcScd1MultiPipelineSuite // Pipeline #1: inserts rows with id=1 and id=2 at version=1. val stream1 = MemoryStream[(Int, String, Long)] stream1.addData((1, "alice", 1L), (2, "bob", 1L)) - val ctx1 = new TestGraphRegistrationContext(spark) { - registerTable("shared_target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "flow_v1", - target = "shared_target", - query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version")), - keys = Seq("id"), - sequencing = functions.col("version") - )) - } - runPipeline(ctx1) + runPipeline(singleAutoCdcFlowPipeline( + "flow_v1", "shared_target", stream1.toDF().toDF("id", "name", "version"), Seq("id"))) // Sanity-check pipeline #1's effect before pipeline #2 runs. checkAnswer( @@ -186,17 +149,8 @@ class AutoCdcScd1MultiPipelineSuite // (new key). id=1 is untouched and must survive into the final target unchanged. val stream2 = MemoryStream[(Int, String, Long)] stream2.addData((2, "bob-v2", 2L), (3, "carol", 1L)) - val ctx2 = new TestGraphRegistrationContext(spark) { - registerTable("shared_target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "flow_v2", - target = "shared_target", - query = dfFlowFunc(stream2.toDF().toDF("id", "name", "version")), - keys = Seq("id"), - sequencing = functions.col("version") - )) - } - runPipeline(ctx2) + runPipeline(singleAutoCdcFlowPipeline( + "flow_v2", "shared_target", stream2.toDF().toDF("id", "name", "version"), Seq("id"))) // Final target: id=1 untouched (pipeline #1's state), id=2 updated by pipeline #2, // id=3 freshly inserted by pipeline #2. @@ -229,16 +183,8 @@ class AutoCdcScd1MultiPipelineSuite // Pipeline #1: source DF schema is (id, name, version); inserts id=1 and id=2. val stream1 = MemoryStream[(Int, String, Long)] stream1.addData((1, "alice", 1L), (2, "bob", 1L)) - val ctx1 = new TestGraphRegistrationContext(spark) { - registerTable("shared_target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "flow_v1", - target = "shared_target", - query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version")), - keys = Seq("id"), - sequencing = functions.col("version") - )) - } + val ctx1 = singleAutoCdcFlowPipeline( + "flow_v1", "shared_target", stream1.toDF().toDF("id", "name", "version"), Seq("id")) runPipeline(ctx1) // Sanity-check pipeline #1's state before schema evolution kicks in. @@ -255,17 +201,8 @@ class AutoCdcScd1MultiPipelineSuite // is backfilled to NULL. val stream2 = MemoryStream[(Int, String, Option[Int], Long)] stream2.addData((2, "bob-v2", Some(25), 2L), (3, "carol", Some(30), 1L)) - val ctx2 = new TestGraphRegistrationContext(spark) { - registerTable("shared_target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "flow_v2", - target = "shared_target", - query = dfFlowFunc(stream2.toDF().toDF("id", "name", "age", "version")), - keys = Seq("id"), - sequencing = functions.col("version") - )) - } - runPipeline(ctx2) + runPipeline(singleAutoCdcFlowPipeline( + "flow_v2", "shared_target", stream2.toDF().toDF("id", "name", "age", "version"), Seq("id"))) checkAnswer( spark.table(s"$catalog.$namespace.shared_target"), @@ -309,32 +246,15 @@ class AutoCdcScd1MultiPipelineSuite // (id, _cdc_metadata). val stream1 = MemoryStream[(Int, String, Long)] stream1.addData((1, "alice", 1L)) - val ctx1 = new TestGraphRegistrationContext(spark) { - registerTable("shared_target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "flow_v1", - target = "shared_target", - query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version")), - keys = Seq("id"), - sequencing = functions.col("version") - )) - } - runPipeline(ctx1) + runPipeline(singleAutoCdcFlowPipeline( + "flow_v1", "shared_target", stream1.toDF().toDF("id", "name", "version"), Seq("id"))) // Pipeline #2: completely separate graph, but targets the same physical `shared_target` // table with `keys = Seq("name")`. val stream2 = MemoryStream[(Int, String, Long)] stream2.addData((2, "alice", 1L)) - val ctx2 = new TestGraphRegistrationContext(spark) { - registerTable("shared_target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "flow_v2", - target = "shared_target", - query = dfFlowFunc(stream2.toDF().toDF("id", "name", "version")), - keys = Seq("name"), - sequencing = functions.col("version") - )) - } + val ctx2 = singleAutoCdcFlowPipeline( + "flow_v2", "shared_target", stream2.toDF().toDF("id", "name", "version"), Seq("name")) val ex = intercept[RuntimeException] { runPipeline(ctx2) } checkErrorInPipelineFailure( diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala index 2424dbdc4e052..99032a2b75560 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala @@ -58,16 +58,9 @@ class AutoCdcScd1SchemaEvolutionSuite ) val stream = MemoryStream[(Int, String, Option[String], Long)] - def buildCtx(): TestGraphRegistrationContext = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc(stream.toDF().toDF("id", "name", "email", "version")), - keys = Seq("id"), - sequencing = functions.col("version") - )) - } + def buildCtx(): TestGraphRegistrationContext = + singleAutoCdcFlowPipeline( + "auto_cdc_flow", "target", stream.toDF().toDF("id", "name", "email", "version"), Seq("id")) // Run #1: insert with NULL email. stream.addData((1, "alice", None, 1L)) @@ -101,31 +94,14 @@ class AutoCdcScd1SchemaEvolutionSuite val stream1 = MemoryStream[(Int, Int, Long)] stream1.addData((1, 30, 1L)) - val ctx1 = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc(stream1.toDF().toDF("id", "age", "version")), - keys = Seq("id"), - sequencing = functions.col("version") - )) - } - runPipeline(ctx1) + runPipeline(singleAutoCdcFlowPipeline( + "auto_cdc_flow", "target", stream1.toDF().toDF("id", "age", "version"), Seq("id"))) // Run #2: widen `age` from Int to Long. val stream2 = MemoryStream[(Int, Long, Long)] stream2.addData((1, 31L, 2L)) - val ctx2 = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc(stream2.toDF().toDF("id", "age", "version")), - keys = Seq("id"), - sequencing = functions.col("version") - )) - } + val ctx2 = singleAutoCdcFlowPipeline( + "auto_cdc_flow", "target", stream2.toDF().toDF("id", "age", "version"), Seq("id")) val ex = intercept[RuntimeException] { runPipeline(ctx2) } checkErrorInPipelineFailure( failure = ex, @@ -154,31 +130,14 @@ class AutoCdcScd1SchemaEvolutionSuite val stream1 = MemoryStream[(Int, Long, Long)] stream1.addData((1, 100L, 1L)) - val ctx1 = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc(stream1.toDF().toDF("id", "payload", "version")), - keys = Seq("id"), - sequencing = functions.col("version") - )) - } - runPipeline(ctx1) + runPipeline(singleAutoCdcFlowPipeline( + "auto_cdc_flow", "target", stream1.toDF().toDF("id", "payload", "version"), Seq("id"))) // Run #2: narrow `payload` from Long (BIGINT) to Int (INT). val stream2 = MemoryStream[(Int, Int, Long)] stream2.addData((1, 5, 2L)) - val ctx2 = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc(stream2.toDF().toDF("id", "payload", "version")), - keys = Seq("id"), - sequencing = functions.col("version") - )) - } + val ctx2 = singleAutoCdcFlowPipeline( + "auto_cdc_flow", "target", stream2.toDF().toDF("id", "payload", "version"), Seq("id")) val ex = intercept[RuntimeException] { runPipeline(ctx2) } checkErrorInPipelineFailure( @@ -209,19 +168,11 @@ class AutoCdcScd1SchemaEvolutionSuite // unchanged (only the downstream projection differs), so the source identity that the // OffsetSeqLog records is stable across runs. val stream = MemoryStream[(Int, String, Option[String], Long)] - def buildCtx(includeEmail: Boolean): TestGraphRegistrationContext = - new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - val sourceDf = stream.toDF().toDF("id", "name", "email", "version") - val projectedDf = if (includeEmail) sourceDf else sourceDf.drop("email") - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc(projectedDf), - keys = Seq("id"), - sequencing = functions.col("version") - )) - } + def buildCtx(includeEmail: Boolean): TestGraphRegistrationContext = { + val sourceDf = stream.toDF().toDF("id", "name", "email", "version") + val projectedDf = if (includeEmail) sourceDf else sourceDf.drop("email") + singleAutoCdcFlowPipeline("auto_cdc_flow", "target", projectedDf, Seq("id")) + } // Run #1: source projects (id, name, version). Target schema is unchanged. stream.addData((1, "alice", None, 1L)) @@ -262,17 +213,9 @@ class AutoCdcScd1SchemaEvolutionSuite val stream = MemoryStream[(Int, String, String, Long)] def buildCtx(selection: Option[ColumnSelection]): TestGraphRegistrationContext = - new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc(stream.toDF().toDF("id", "name", "email", "version")), - keys = Seq("id"), - sequencing = functions.col("version"), - columnSelection = selection - )) - } + singleAutoCdcFlowPipeline( + "auto_cdc_flow", "target", stream.toDF().toDF("id", "name", "email", "version"), Seq("id"), + columnSelection = selection) // Run #1: only (id, name, version) selected; `email` is dropped before the MERGE. stream.addData((1, "alice", "ignored", 1L)) @@ -312,17 +255,9 @@ class AutoCdcScd1SchemaEvolutionSuite val stream = MemoryStream[(Int, String, String, Long)] def buildCtx(selection: Option[ColumnSelection]): TestGraphRegistrationContext = - new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc(stream.toDF().toDF("id", "name", "email", "version")), - keys = Seq("id"), - sequencing = functions.col("version"), - columnSelection = selection - )) - } + singleAutoCdcFlowPipeline( + "auto_cdc_flow", "target", stream.toDF().toDF("id", "name", "email", "version"), Seq("id"), + columnSelection = selection) // Run #1: include all columns; populate `email` for key=1. stream.addData((1, "alice", "a@x.com", 1L)) @@ -365,19 +300,11 @@ class AutoCdcScd1SchemaEvolutionSuite // Same `MemoryStream[(Int, String, Option[String], Long)]` shape across runs; runs // differ in whether `email` is kept in the projected source DF. val stream = MemoryStream[(Int, String, Option[String], Long)] - def buildCtx(includeEmail: Boolean): TestGraphRegistrationContext = - new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - val sourceDf = stream.toDF().toDF("id", "name", "email", "version") - val projectedDf = if (includeEmail) sourceDf else sourceDf.drop("email") - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc(projectedDf), - keys = Seq("id"), - sequencing = functions.col("version") - )) - } + def buildCtx(includeEmail: Boolean): TestGraphRegistrationContext = { + val sourceDf = stream.toDF().toDF("id", "name", "email", "version") + val projectedDf = if (includeEmail) sourceDf else sourceDf.drop("email") + singleAutoCdcFlowPipeline("auto_cdc_flow", "target", projectedDf, Seq("id")) + } // Run #1: wide source DF (id, name, email, version). mergeSchemas appends `email` to // the target. @@ -421,28 +348,20 @@ class AutoCdcScd1SchemaEvolutionSuite // shapes; the underlying tuple shape is unchanged so the streaming source's identity // is stable across runs. val stream = MemoryStream[(Int, Long, Int, Int, Int)] - def buildCtx(includeC: Boolean): TestGraphRegistrationContext = - new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d") - val inner = if (includeC) { - functions.struct(functions.col("b_c").as("c"), functions.col("b_d").as("d")) - } else { - functions.struct(functions.col("b_d").as("d")) - } - val projected = src.select( - functions.col("key"), - functions.col("version"), - functions.struct(functions.col("a"), inner.as("b")).as("value") - ) - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc(projected), - keys = Seq("key"), - sequencing = functions.col("version") - )) + def buildCtx(includeC: Boolean): TestGraphRegistrationContext = { + val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d") + val inner = if (includeC) { + functions.struct(functions.col("b_c").as("c"), functions.col("b_d").as("d")) + } else { + functions.struct(functions.col("b_d").as("d")) } + val projected = src.select( + functions.col("key"), + functions.col("version"), + functions.struct(functions.col("a"), inner.as("b")).as("value") + ) + singleAutoCdcFlowPipeline("auto_cdc_flow", "target", projected, Seq("key")) + } stream.addData((1, 1L, 1, 1, 1), (2, 1L, 2, 2, 2)) runPipeline(buildCtx(includeC = true)) @@ -476,30 +395,22 @@ class AutoCdcScd1SchemaEvolutionSuite ) val stream = MemoryStream[(Int, Long, Int, Int, Int)] - def buildCtx(includeD: Boolean): TestGraphRegistrationContext = - new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d") - val inner = if (includeD) { - functions.struct(functions.col("b_c").as("c"), functions.col("b_d").as("d")) - } else { - functions.struct(functions.col("b_c").as("c")) - } - val projected = src.select( - functions.col("key"), - functions.col("version"), - functions.array( - functions.struct(functions.col("a"), inner.as("b")) - ).as("vals") - ) - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc(projected), - keys = Seq("key"), - sequencing = functions.col("version") - )) + def buildCtx(includeD: Boolean): TestGraphRegistrationContext = { + val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d") + val inner = if (includeD) { + functions.struct(functions.col("b_c").as("c"), functions.col("b_d").as("d")) + } else { + functions.struct(functions.col("b_c").as("c")) } + val projected = src.select( + functions.col("key"), + functions.col("version"), + functions.array( + functions.struct(functions.col("a"), inner.as("b")) + ).as("vals") + ) + singleAutoCdcFlowPipeline("auto_cdc_flow", "target", projected, Seq("key")) + } stream.addData((1, 1L, 1, 1, 99)) runPipeline(buildCtx(includeD = false)) @@ -537,30 +448,22 @@ class AutoCdcScd1SchemaEvolutionSuite ) val stream = MemoryStream[(Int, Long, Int, Int, Int)] - def buildCtx(includeD: Boolean): TestGraphRegistrationContext = - new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d") - val inner = if (includeD) { - functions.struct(functions.col("b_c").as("c"), functions.col("b_d").as("d")) - } else { - functions.struct(functions.col("b_c").as("c")) - } - val projected = src.select( - functions.col("key"), - functions.col("version"), - functions.array( - functions.struct(functions.col("a"), inner.as("b")) - ).as("vals") - ) - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc(projected), - keys = Seq("key"), - sequencing = functions.col("version") - )) + def buildCtx(includeD: Boolean): TestGraphRegistrationContext = { + val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d") + val inner = if (includeD) { + functions.struct(functions.col("b_c").as("c"), functions.col("b_d").as("d")) + } else { + functions.struct(functions.col("b_c").as("c")) } + val projected = src.select( + functions.col("key"), + functions.col("version"), + functions.array( + functions.struct(functions.col("a"), inner.as("b")) + ).as("vals") + ) + singleAutoCdcFlowPipeline("auto_cdc_flow", "target", projected, Seq("key")) + } stream.addData((1, 1L, 1, 1, 1), (2, 1L, 2, 2, 2)) runPipeline(buildCtx(includeD = true)) @@ -598,19 +501,10 @@ class AutoCdcScd1SchemaEvolutionSuite val stream = MemoryStream[(Int, Long, String)] stream.addData((1, 1L, "alice")) - val ctx = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - // Source DF emits `Value` (capital), differing only in case from the target's - // `value` column. - val df = stream.toDF().toDF("key", "version", "Value") - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc(df), - keys = Seq("key"), - sequencing = functions.col("version") - )) - } + // Source DF emits `Value` (capital), differing only in case from the target's + // `value` column. + val df = stream.toDF().toDF("key", "version", "Value") + val ctx = singleAutoCdcFlowPipeline("auto_cdc_flow", "target", df, Seq("key")) val ex = intercept[RuntimeException] { runPipeline(ctx) } // The exact `name` and `referenceNames` parameters depend on internal merge-plan @@ -655,17 +549,8 @@ class AutoCdcScd1SchemaEvolutionSuite val stream = MemoryStream[(Int, String, Long)] stream.addData((1, "alice", 1L), (2, "bob", 1L)) - val ctx = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")), - keys = Seq("id"), - sequencing = functions.col("version") - )) - } - runPipeline(ctx) + runPipeline(singleAutoCdcFlowPipeline( + "auto_cdc_flow", "target", stream.toDF().toDF("id", "name", "version"), Seq("id"))) checkAnswer( spark.table(s"$catalog.$namespace.target").select("id", "name", "version", "extra"), @@ -691,31 +576,14 @@ class AutoCdcScd1SchemaEvolutionSuite val stream1 = MemoryStream[(Int, Long, Timestamp)] stream1.addData((1, 1L, Timestamp.valueOf("2024-01-01 10:00:00"))) - val ctx1 = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc(stream1.toDF().toDF("key", "version", "value")), - keys = Seq("key"), - sequencing = functions.col("version") - )) - } - runPipeline(ctx1) + runPipeline(singleAutoCdcFlowPipeline( + "auto_cdc_flow", "target", stream1.toDF().toDF("key", "version", "value"), Seq("key"))) // Run #2 emits `value` as STRING. mergeSchemas rejects the type change. val stream2 = MemoryStream[(Int, Long, String)] stream2.addData((1, 2L, "2024-01-02 11:00:00")) - val ctx2 = new TestGraphRegistrationContext(spark) { - registerTable("target", catalog = Some(catalog), database = Some(namespace)) - registerFlow(autoCdcFlow( - name = "auto_cdc_flow", - target = "target", - query = dfFlowFunc(stream2.toDF().toDF("key", "version", "value")), - keys = Seq("key"), - sequencing = functions.col("version") - )) - } + val ctx2 = singleAutoCdcFlowPipeline( + "auto_cdc_flow", "target", stream2.toDF().toDF("key", "version", "value"), Seq("key")) val ex = intercept[RuntimeException] { runPipeline(ctx2) } checkErrorInPipelineFailure(