From 4b28991fa66cc1b1fb6b70bac4688105af2258ee Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Tue, 2 Jun 2026 20:03:50 +0000 Subject: [PATCH 1/2] [SPARK-56632][CONNECT][TESTS][4.2] Add E2E test for self-join reusing a DataFrame ### What changes were proposed in this pull request? Adds an end-to-end Spark Connect test that joins two independently constructed DataFrames over the same data and selects columns from both sides. ### Why are the changes needed? To verify column resolution from both sides of a self-join reusing a DataFrame works (post SPARK-56632). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New test in `ClientE2ETestSuite`. --- .../sql/connect/ClientE2ETestSuite.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala index 0e8cb9348c7f8..de82c557ff6cf 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala @@ -1013,6 +1013,25 @@ class ClientE2ETestSuite df1.join(df1_filter).join(df2, df1_filter("i") === 1).select(df1_filter("j"))) } + test("SPARK-56632: self-join reusing a DataFrame resolves columns from both sides") { + val session = spark + import session.implicits._ + + // Two independently constructed DataFrames over the same data, so they have + // distinct plan ids, mirroring `val df1 = spark.table(t); val df2 = spark.table(t)`. + val df1 = Seq((1, 100), (2, 200)).toDF("id", "salary") + val df2 = Seq((1, 100), (2, 200)).toDF("id", "salary") + + // The query under test. Succeeds on master (post SPARK-56632); on 4.1 (has + // SPARK-55070 but missing the fix) this throws AMBIGUOUS_COLUMN_REFERENCE. + checkSameResult( + Seq(Row(1, 100, 1, 100), Row(2, 200, 2, 200)), + df1 + .join(df2, df1("id") === df2("id")) + .select(df1("id"), df1("salary"), df2("id"), df2("salary")) + .orderBy(df1("id"))) + } + test("broadcast join") { withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "-1") { val left = spark.range(100).select(col("id"), rand(10).as("a")) From 2789c859226007afe742f0db45511f1aa29afb6c Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Tue, 2 Jun 2026 20:37:58 +0000 Subject: [PATCH 2/2] [SPARK-56632][CONNECT][TESTS][4.2] Use spark.table self-join with external write between resolutions Rewrite the test to resolve the same table twice (version X and version X+1 after an external write) and self-join the two DataFrames, matching the reported scenario. --- .../sql/connect/ClientE2ETestSuite.scala | 39 +++++++++++-------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala index de82c557ff6cf..cc3495ed4d518 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala @@ -1014,22 +1014,29 @@ class ClientE2ETestSuite } test("SPARK-56632: self-join reusing a DataFrame resolves columns from both sides") { - val session = spark - import session.implicits._ - - // Two independently constructed DataFrames over the same data, so they have - // distinct plan ids, mirroring `val df1 = spark.table(t); val df2 = spark.table(t)`. - val df1 = Seq((1, 100), (2, 200)).toDF("id", "salary") - val df2 = Seq((1, 100), (2, 200)).toDF("id", "salary") - - // The query under test. Succeeds on master (post SPARK-56632); on 4.1 (has - // SPARK-55070 but missing the fix) this throws AMBIGUOUS_COLUMN_REFERENCE. - checkSameResult( - Seq(Row(1, 100, 1, 100), Row(2, 200, 2, 200)), - df1 - .join(df2, df1("id") === df2("id")) - .select(df1("id"), df1("salary"), df2("id"), df2("salary")) - .orderBy(df1("id"))) + withTable("testcat.t") { + spark.sql("CREATE TABLE testcat.t (id INT, salary INT)") + + // Version X. + spark.sql("INSERT INTO testcat.t VALUES (1, 100)") + // Resolves to table version X (QueryExecution #1). + val df1 = spark.table("testcat.t") + + // External write produces version X + 1. + spark.sql("INSERT INTO testcat.t VALUES (2, 200)") + // Resolves to table version X + 1 (QueryExecution #2). + val df2 = spark.table("testcat.t") + + // Self-join reusing the same table on both sides (QueryExecution #3). + // Succeeds on master (post SPARK-56632); on 4.1 (has SPARK-55070 but + // missing the fix) this throws AMBIGUOUS_COLUMN_REFERENCE. + checkSameResult( + Seq(Row(1, 100, 1, 100), Row(2, 200, 2, 200)), + df1 + .join(df2, df1("id") === df2("id")) + .select(df1("id"), df1("salary"), df2("id"), df2("salary")) + .orderBy(df1("id"))) + } } test("broadcast join") {