diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionBasicTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionBasicTests.scala index bb221a200b2d1..23bc5028981ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionBasicTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionBasicTests.scala @@ -1303,4 +1303,217 @@ trait MergeIntoSchemaEvolutionBasicTests extends MergeIntoSchemaEvolutionSuiteBa )) ) } + + for (subFieldName <- Seq("job.title", "job title")) { + testNestedStructsEvolution( + s"source struct has extra nested field with special-char name: $subFieldName")( + target = Seq( + """{ "pk": 1, "info": { "name": "Alice" }, "dep": "hr" }""", + """{ "pk": 2, "info": { "name": "Bob" }, "dep": "finance" }""" + ), + source = Seq( + s"""{ "pk": 1, "info": { "name": "Alice2", "$subFieldName": "engineer" }, "dep": "hr" }""", + s"""{ "pk": 3, "info": { "name": "Cathy", "$subFieldName": "manager" }, "dep": "sales" }""" + ), + targetSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("info", StructType(Seq( + StructField("name", StringType) + ))), + StructField("dep", StringType) + )), + sourceSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("info", StructType(Seq( + StructField("name", StringType), + StructField(subFieldName, StringType) + ))), + StructField("dep", StringType) + )), + clauses = Seq(updateAll(), insertAll()), + result = Seq( + s"""{ "pk": 1, "info": { "name": "Alice2", "$subFieldName": "engineer" }, "dep": "hr" }""", + s"""{ "pk": 2, "info": { "name": "Bob", "$subFieldName": null }, "dep": "finance" }""", + s"""{ "pk": 3, "info": { "name": "Cathy", "$subFieldName": "manager" }, "dep": "sales" }""" + ), + resultSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("info", StructType(Seq( + StructField("name", StringType), + StructField(subFieldName, StringType) + ))), + StructField("dep", StringType) + )), + expectErrorWithoutEvolutionContains = "Cannot write extra fields" + ) + } + + for (colName <- Seq("job.title", "job title")) { + testEvolution( + s"special-char column already in target gets updated with type widening: $colName")( + targetData = { + val schema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("salary", IntegerType), + StructField("dep", StringType), + StructField(colName, ShortType) + )) + spark.createDataFrame(spark.sparkContext.parallelize(Seq( + Row(1, 100, "hr", 1.toShort), + Row(2, 200, "software", 2.toShort), + Row(3, 300, "hr", 3.toShort) + )), schema) + }, + sourceData = { + val schema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("salary", IntegerType), + StructField("dep", StringType), + StructField(colName, IntegerType) + )) + spark.createDataFrame(spark.sparkContext.parallelize(Seq( + Row(2, 150, "finance", 50000), + Row(4, 400, "finance", 60000) + )), schema) + }, + clauses = Seq(updateAll(), insertAll()), + expected = Seq( + (1, 100, "hr", 1), + (2, 150, "finance", 50000), + (3, 300, "hr", 3), + (4, 400, "finance", 60000) + ).toDF("pk", "salary", "dep", colName), + expectedSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("salary", IntegerType), + StructField("dep", StringType), + StructField(colName, IntegerType) + )), + expectErrorWithoutEvolutionContains = + "Fail to assign a value of \"INT\" type to the \"SMALLINT\" type column or variable" + ) + } + + for (subFieldName <- Seq("job.title", "job title")) { + testNestedStructsEvolution( + s"nested special-char field already in target gets updated with type widening:" + + s" $subFieldName")( + target = Seq( + s"""{ "pk": 1, "info": { "name": "Alice", "$subFieldName": 1 }, "dep": "hr" }""", + s"""{ "pk": 2, "info": { "name": "Bob", "$subFieldName": 2 }, "dep": "software" }""", + s"""{ "pk": 3, "info": { "name": "Charlie", "$subFieldName": 3 }, "dep": "hr" }""" + ), + source = Seq( + s"""{ "pk": 2, "info": { "name": "Bob2", "$subFieldName": 50000 }, "dep": "finance" }""", + s"""{ "pk": 4, "info": { "name": "Diana", "$subFieldName": 60000 }, "dep": "finance" }""" + ), + targetSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("info", StructType(Seq( + StructField("name", StringType), + StructField(subFieldName, ShortType) + ))), + StructField("dep", StringType) + )), + sourceSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("info", StructType(Seq( + StructField("name", StringType), + StructField(subFieldName, IntegerType) + ))), + StructField("dep", StringType) + )), + clauses = Seq(updateAll(), insertAll()), + result = Seq( + s"""{ "pk": 1, "info": { "name": "Alice", "$subFieldName": 1 }, "dep": "hr" }""", + s"""{ "pk": 2, "info": { "name": "Bob2", "$subFieldName": 50000 }, "dep": "finance" }""", + s"""{ "pk": 3, "info": { "name": "Charlie", "$subFieldName": 3 }, "dep": "hr" }""", + s"""{ "pk": 4, "info": { "name": "Diana", "$subFieldName": 60000 }, "dep": "finance" }""" + ), + resultSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("info", StructType(Seq( + StructField("name", StringType), + StructField(subFieldName, IntegerType) + ))), + StructField("dep", StringType) + )), + expectErrorWithoutEvolutionContains = + "Fail to assign a value of \"INT\" type to the \"SMALLINT\" type column or variable" + ) + } + + for (colName <- Seq("job.title", "job title")) { + testEvolution( + s"target has special-char column missing from source: $colName")( + targetData = Seq( + (1, 100, "hr", "engineer"), + (2, 200, "finance", "manager"), + (3, 300, "hr", "analyst") + ).toDF("pk", "salary", "dep", colName), + sourceData = Seq( + (2, 150, "sales"), + (4, 400, "engineering") + ).toDF("pk", "salary", "dep"), + clauses = Seq(updateAll(), insertAll()), + expected = Seq[(Int, Int, String, String)]( + (1, 100, "hr", "engineer"), + (2, 150, "sales", "manager"), + (3, 300, "hr", "analyst"), + (4, 400, "engineering", null) + ).toDF("pk", "salary", "dep", colName), + expectedSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("salary", IntegerType, nullable = false), + StructField("dep", StringType), + StructField(colName, StringType) + )), + expectErrorWithoutEvolutionContains = "cannot be resolved" + ) + } + + for (subFieldName <- Seq("job.title", "job title")) { + testNestedStructsEvolution( + s"target struct has nested special-char field missing from source: $subFieldName")( + target = Seq( + s"""{ "pk": 1, "info": { "name": "Alice", "$subFieldName": "engineer" }, "dep": "hr" }""", + s"""{ "pk": 2, "info": { "name": "Bob", "$subFieldName": "manager" }, "dep": "finance" }""" + ), + source = Seq( + """{ "pk": 2, "info": { "name": "Bob2" }, "dep": "sales" }""", + """{ "pk": 3, "info": { "name": "Cathy" }, "dep": "engineering" }""" + ), + targetSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("info", StructType(Seq( + StructField("name", StringType), + StructField(subFieldName, StringType) + ))), + StructField("dep", StringType) + )), + sourceSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("info", StructType(Seq( + StructField("name", StringType) + ))), + StructField("dep", StringType) + )), + clauses = Seq(updateAll(), insertAll()), + result = Seq( + s"""{ "pk": 1, "info": { "name": "Alice", "$subFieldName": "engineer" }, "dep": "hr" }""", + s"""{ "pk": 2, "info": { "name": "Bob2", "$subFieldName": "manager" }, "dep": "sales" }""", + s"""{ "pk": 3, "info": { "name": "Cathy", "$subFieldName": null }, "dep": "engineering" }""" + ), + resultSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("info", StructType(Seq( + StructField("name", StringType), + StructField(subFieldName, StringType) + ))), + StructField("dep", StringType) + )), + expectErrorWithoutEvolutionContains = "Cannot find data for the output column", + requiresNestedTypeCoercion = true + ) + } }