diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java index 97f057ddd251..23265ae6447f 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java @@ -96,14 +96,19 @@ public static boolean schemaCompatible( return false; } DataType type = paimonSchema.fields().get(idx).type(); - if (UpdatedDataFieldsProcessFunction.canConvert( - field.type(), type, TypeMapping.defaultMapping()) - != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) { + UpdatedDataFieldsProcessFunction.ConvertAction sourceToPaimon = + UpdatedDataFieldsProcessFunction.canConvert( + field.type(), type, TypeMapping.defaultMapping()); + UpdatedDataFieldsProcessFunction.ConvertAction paimonToSource = + UpdatedDataFieldsProcessFunction.canConvert( + type, field.type(), TypeMapping.defaultMapping()); + if (sourceToPaimon != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT + && paimonToSource != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) { LOG.info( - "Cannot convert field '{}' from source table type '{}' to Paimon type '{}'.", + "Cannot convert field '{}': Paimon type '{}' and source table type '{}' are incompatible.", field.name(), - field.type(), - type); + type, + field.type()); return false; } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java index 8167441fb634..eca731cb0069 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java @@ -36,6 +36,7 @@ import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.TableTestBase; import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.DecimalType; import org.apache.paimon.types.DoubleType; @@ -50,6 +51,9 @@ import java.util.Arrays; import java.util.List; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + /** Used to test schema evolution related logic. */ public class SchemaEvolutionTest extends TableTestBase { @@ -201,6 +205,51 @@ public void before() throws Exception { table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath, tableSchema); } + @Test + public void testSchemaCompatibleTypeWidening() throws Exception { + FileIO fileIO = LocalFileIO.create(); + Path tablePath = + new Path(String.format("%s/%s.db/%s", warehouse, database, "WideningTable")); + // Paimon table has id INT, quantity INT, name STRING. + Schema baseSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("quantity", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("id") + .build(); + TableSchema tableSchema = + SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), baseSchema); + + // Paimon INT, source BIGINT: Paimon can evolve INT -> BIGINT, compatible. + List bigintFields = + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField(1, "quantity", DataTypes.BIGINT())); + assertTrue(CdcActionCommonUtils.schemaCompatible(tableSchema, bigintFields)); + + // Paimon INT, source SMALLINT: source fits in Paimon INT as-is, compatible. + List smallintFields = + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField(1, "quantity", DataTypes.SMALLINT())); + assertTrue(CdcActionCommonUtils.schemaCompatible(tableSchema, smallintFields)); + + // Paimon STRING, source VARCHAR(20): source fits in Paimon STRING, compatible. + List varcharFields = + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField(1, "name", new VarCharType(true, 20))); + assertTrue(CdcActionCommonUtils.schemaCompatible(tableSchema, varcharFields)); + + // Paimon INT, source STRING: incompatible type families. + List incompatibleFields = + Arrays.asList( + new DataField(0, "id", DataTypes.INT()), + new DataField(1, "quantity", DataTypes.STRING())); + assertFalse(CdcActionCommonUtils.schemaCompatible(tableSchema, incompatibleFields)); + } + @Test public void testSchemaEvolution() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();