From 5641661e4326d3ed737fdd883dfaa77fc68a4cb2 Mon Sep 17 00:00:00 2001 From: wzx140 Date: Thu, 25 Jun 2026 16:51:13 +0800 Subject: [PATCH] [lake/paimon] Support custom Paimon path --- .../lake/paimon/utils/PaimonConversions.java | 1 - .../paimon/LakeEnabledTableCreateITCase.java | 64 +++++++++++++++++++ .../server/coordinator/MetadataManager.java | 5 +- .../utils/TableDescriptorValidation.java | 14 ++++ 4 files changed, 82 insertions(+), 2 deletions(-) diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java index 94166e2d05..87ce7b0439 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java @@ -69,7 +69,6 @@ public class PaimonConversions { static { PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET.key()); PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.BUCKET_KEY.key()); - PAIMON_UNSETTABLE_OPTIONS.add(CoreOptions.PATH.key()); PAIMON_UNSETTABLE_OPTIONS.add(PARTITION_GENERATE_LEGACY_NAME_OPTION_KEY); } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java index 0efa057b89..834433ccab 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java @@ -23,6 +23,7 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.exception.InvalidAlterTableException; import org.apache.fluss.exception.InvalidConfigException; import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.exception.LakeTableAlreadyExistException; @@ -390,6 +391,8 @@ void testCreateLakeEnabledTableWithAllTypes() throws Exception { @Test void testCreateLakeEnableTableWithUnsettablePaimonOptions() { + assertThat(PAIMON_UNSETTABLE_OPTIONS).doesNotContain(CoreOptions.PATH.key()); + Map customProperties = new HashMap<>(); for (String key : PAIMON_UNSETTABLE_OPTIONS) { @@ -418,6 +421,67 @@ void testCreateLakeEnableTableWithUnsettablePaimonOptions() { } } + @Test + void testAlterPaimonPathOnlyWhenLakeDisabled() throws Exception { + TableDescriptor lakeDisabledTable = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .build()) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, false) + .distributedBy(BUCKET_NUM, "c1", "c2") + .build(); + TablePath lakeDisabledTablePath = TablePath.of(DATABASE, "alter_paimon_path_disabled"); + admin.createTable(lakeDisabledTablePath, lakeDisabledTable, false).get(); + + String customPaimonPath = + Files.createTempDirectory("alter-paimon-path-disabled").toUri().toString(); + admin.alterTable( + lakeDisabledTablePath, + Collections.singletonList(TableChange.set("paimon.path", customPaimonPath)), + false) + .get(); + assertThat( + admin.getTableInfo(lakeDisabledTablePath) + .get() + .toTableDescriptor() + .getCustomProperties()) + .containsEntry("paimon.path", customPaimonPath); + + TableDescriptor lakeEnabledTable = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .build()) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .distributedBy(BUCKET_NUM, "c1", "c2") + .build(); + TablePath lakeEnabledTablePath = TablePath.of(DATABASE, "alter_paimon_path_enabled"); + admin.createTable(lakeEnabledTablePath, lakeEnabledTable, false).get(); + + assertThatThrownBy( + () -> + admin.alterTable( + lakeEnabledTablePath, + Collections.singletonList( + TableChange.set( + "paimon.path", + Files.createTempDirectory( + "alter-paimon-path-enabled") + .toUri() + .toString())), + false) + .get()) + .cause() + .isInstanceOf(InvalidAlterTableException.class) + .hasMessageContaining( + "'paimon.path' can only be altered when datalake is disabled"); + } + @Test void testCreateLakeEnableTableWithExistLakeTable() throws Exception { Map customProperties = new HashMap<>(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 106d4ef66f..3719ada161 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -517,7 +517,10 @@ public void alterTableProperties( TableInfo tableInfo = tableReg.toTableInfo(tablePath, schemaInfo); // validate the changes - validateAlterTableProperties(tableInfo, tablePropertyChanges.tableKeysToChange()); + validateAlterTableProperties( + tableInfo, + tablePropertyChanges.tableKeysToChange(), + tablePropertyChanges.customKeysToChange()); TableDescriptor tableDescriptor = tableInfo.toTableDescriptor(); TableDescriptor newDescriptor = diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index 949afd8c67..db70322752 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -68,6 +68,8 @@ /** Validator of {@link TableDescriptor}. */ public class TableDescriptorValidation { + private static final String PAIMON_PATH_KEY = "paimon.path"; + private static final Set SYSTEM_COLUMNS = Collections.unmodifiableSet( new LinkedHashSet<>( @@ -155,6 +157,11 @@ private static void checkTableLakeFormatMatchesCluster( public static void validateAlterTableProperties( TableInfo currentTable, Set tableKeysToChange) { + validateAlterTableProperties(currentTable, tableKeysToChange, Collections.emptySet()); + } + + public static void validateAlterTableProperties( + TableInfo currentTable, Set tableKeysToChange, Set customKeysToChange) { TableConfig currentConfig = currentTable.getTableConfig(); List unsupportedKeys = @@ -179,6 +186,13 @@ public static void validateAlterTableProperties( ConfigOptions.TABLE_KV_STANDBY_REPLICA_ENABLED.key())); } + if (customKeysToChange.contains(PAIMON_PATH_KEY) && currentConfig.isDataLakeEnabled()) { + throw new InvalidAlterTableException( + String.format( + "'%s' can only be altered when datalake is disabled.", + PAIMON_PATH_KEY)); + } + if (!currentConfig.getDataLakeFormat().isPresent()) { List datalakeKeys = tableKeysToChange.stream()