From a1d95fa9b316e9668b9cec47db8e7028d85f2b7f Mon Sep 17 00:00:00 2001 From: Cornelius Ogunmolu Date: Sat, 27 Jun 2026 16:24:13 -0700 Subject: [PATCH] [Fix] Swap canConvert argument order in CdcActionCommonUtils.schemaCompatible schemaCompatible() was calling canConvert(sourceType, paimonType) but the method signature is canConvert(oldType, newType), where old is the existing Paimon type and new is the incoming source type. This caused valid type widenings such as INT -> BIGINT to be rejected at job startup with 'Paimon schema and source table schema are not compatible', even though the same widening is handled correctly at runtime by UpdatedDataFieldsProcessFunction. Fix: swap the arguments and update the log message to reflect the correct direction of the conversion check. Closes #5640 --- .../action/cdc/CdcActionCommonUtils.java | 17 ++++--- .../flink/action/cdc/SchemaEvolutionTest.java | 49 +++++++++++++++++++ 2 files changed, 60 insertions(+), 6 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java index 97f057ddd251..23265ae6447f 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java @@ -96,14 +96,19 @@ public static boolean schemaCompatible( return false; } DataType type = paimonSchema.fields().get(idx).type(); - if (UpdatedDataFieldsProcessFunction.canConvert( - field.type(), type, TypeMapping.defaultMapping()) - != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) { + UpdatedDataFieldsProcessFunction.ConvertAction sourceToPaimon = + UpdatedDataFieldsProcessFunction.canConvert( + field.type(), type, TypeMapping.defaultMapping()); + UpdatedDataFieldsProcessFunction.ConvertAction paimonToSource = + UpdatedDataFieldsProcessFunction.canConvert( + type, field.type(), TypeMapping.defaultMapping()); + if (sourceToPaimon != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT + && paimonToSource != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) { LOG.info( - "Cannot convert field '{}' from source table type '{}' to Paimon type '{}'.", + "Cannot convert field '{}': Paimon type '{}' and source table type '{}' are incompatible.", field.name(), - field.type(), - type); + type, + field.type()); return false; } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java index 8167441fb634..eca731cb0069 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java @@ -36,6 +36,7 @@ import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.TableTestBase; import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.DecimalType; import org.apache.paimon.types.DoubleType; @@ -50,6 +51,9 @@ import java.util.Arrays; import java.util.List; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + /** Used to test schema evolution related logic. */ public class SchemaEvolutionTest extends TableTestBase { @@ -201,6 +205,51 @@ public void before() throws Exception { table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath, tableSchema); } + @Test + public void testSchemaCompatibleTypeWidening() throws Exception { + FileIO fileIO = LocalFileIO.create(); + Path tablePath = + new Path(String.format("%s/%s.db/%s", warehouse, database, "WideningTable")); + // Paimon table has id INT, quantity INT, name STRING. + Schema baseSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("quantity", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("id") + .build(); + TableSchema tableSchema = + SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), baseSchema); + + // Paimon INT, source BIGINT: Paimon can evolve INT -> BIGINT, compatible. + List bigintFields = + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField(1, "quantity", DataTypes.BIGINT())); + assertTrue(CdcActionCommonUtils.schemaCompatible(tableSchema, bigintFields)); + + // Paimon INT, source SMALLINT: source fits in Paimon INT as-is, compatible. + List smallintFields = + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField(1, "quantity", DataTypes.SMALLINT())); + assertTrue(CdcActionCommonUtils.schemaCompatible(tableSchema, smallintFields)); + + // Paimon STRING, source VARCHAR(20): source fits in Paimon STRING, compatible. + List varcharFields = + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField(1, "name", new VarCharType(true, 20))); + assertTrue(CdcActionCommonUtils.schemaCompatible(tableSchema, varcharFields)); + + // Paimon INT, source STRING: incompatible type families. + List incompatibleFields = + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField(1, "quantity", DataTypes.STRING())); + assertFalse(CdcActionCommonUtils.schemaCompatible(tableSchema, incompatibleFields)); + } + @Test public void testSchemaEvolution() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();