From 0c832d3c5a31c1f8e13101a349cf41ba4319b8d1 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 18:36:41 +0800 Subject: [PATCH 01/13] [flink][spark] Support dry_run in compact_manifest procedure --- .../procedure/CompactManifestProcedure.java | 66 +++++++++++++++++- .../procedure/CompactManifestProcedure.java | 67 ++++++++++++++++++- 2 files changed, 128 insertions(+), 5 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java index 488aee544e26..10625ed005a7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java @@ -19,6 +19,12 @@ package org.apache.paimon.flink.procedure; import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.operation.ManifestFileMerger; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.utils.ProcedureUtils; @@ -28,7 +34,10 @@ import org.apache.flink.table.annotation.ProcedureHint; import org.apache.flink.table.procedure.ProcedureContext; +import javax.annotation.Nullable; + import java.util.HashMap; +import java.util.List; /** Compact manifest file to reduce deleted manifest entries. */ public class CompactManifestProcedure extends ProcedureBase { @@ -43,9 +52,14 @@ public String identifier() { @ProcedureHint( argument = { @ArgumentHint(name = "table", type = @DataTypeHint("STRING")), - @ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true) + @ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true), + @ArgumentHint(name = "dry_run", type = @DataTypeHint("BOOLEAN"), isOptional = true) }) - public String[] call(ProcedureContext procedureContext, String tableId, String options) + public String[] call( + ProcedureContext procedureContext, + String tableId, + @Nullable String options, + @Nullable Boolean dryRun) throws Exception { FileStoreTable table = (FileStoreTable) table(tableId); @@ -56,9 +70,57 @@ public String[] call(ProcedureContext procedureContext, String tableId, String o table = table.copy(dynamicOptions); + if (dryRun != null && dryRun) { + return dryRunCompactManifest(table); + } + try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) { commit.compactManifests(); } return new String[] {"success"}; } + + private String[] dryRunCompactManifest(FileStoreTable table) { + Snapshot latestSnapshot = table.store().snapshotManager().latestSnapshot(); + if (latestSnapshot == null) { + return new String[] {"Dry run: no snapshot exists, nothing to compact."}; + } + + ManifestList manifestList = table.store().manifestListFactory().create(); + List beforeManifests = manifestList.readDataManifests(latestSnapshot); + + Options compactOptions = Options.fromMap(table.options()); + compactOptions.set(CoreOptions.MANIFEST_MERGE_MIN_COUNT, 1); + compactOptions.set(CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE, MemorySize.ofBytes(1)); + + List afterManifests = + ManifestFileMerger.merge( + beforeManifests, + table.store().manifestFileFactory().create(), + table.schema().logicalPartitionType(), + new CoreOptions(compactOptions), + null); + + long beforeFileCount = beforeManifests.size(); + long afterFileCount = afterManifests.size(); + long beforeTotalSize = beforeManifests.stream().mapToLong(ManifestFileMeta::fileSize).sum(); + long afterTotalSize = afterManifests.stream().mapToLong(ManifestFileMeta::fileSize).sum(); + long beforeDeletedEntries = + beforeManifests.stream().mapToLong(ManifestFileMeta::numDeletedFiles).sum(); + long afterDeletedEntries = + afterManifests.stream().mapToLong(ManifestFileMeta::numDeletedFiles).sum(); + long eliminatedDeletedEntries = beforeDeletedEntries - afterDeletedEntries; + + String result = + String.format( + "Dry run: manifest compaction would reduce %d manifest files to %d, " + + "total file size from %s to %s, " + + "eliminating %d deleted entries.", + beforeFileCount, + afterFileCount, + MemorySize.ofBytes(beforeTotalSize), + MemorySize.ofBytes(afterTotalSize), + eliminatedDeletedEntries); + return new String[] {result}; + } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java index 5a6837f6c15f..42f6b56fcc63 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java @@ -18,6 +18,14 @@ package org.apache.paimon.spark.procedure; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.operation.ManifestFileMerger; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.utils.ProcedureUtils; @@ -29,9 +37,12 @@ import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; import java.util.HashMap; +import java.util.List; +import static org.apache.spark.sql.types.DataTypes.BooleanType; import static org.apache.spark.sql.types.DataTypes.StringType; /** @@ -39,6 +50,7 @@ * *

  *  CALL sys.compact_manifest(table => 'tableId')
+ *  CALL sys.compact_manifest(table => 'tableId', dry_run => true)
  * 
*/ public class CompactManifestProcedure extends BaseProcedure { @@ -46,13 +58,14 @@ public class CompactManifestProcedure extends BaseProcedure { private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { ProcedureParameter.required("table", StringType), - ProcedureParameter.optional("options", StringType) + ProcedureParameter.optional("options", StringType), + ProcedureParameter.optional("dry_run", BooleanType) }; private static final StructType OUTPUT_TYPE = new StructType( new StructField[] { - new StructField("result", DataTypes.BooleanType, true, Metadata.empty()) + new StructField("result", DataTypes.StringType, true, Metadata.empty()) }); protected CompactManifestProcedure(TableCatalog tableCatalog) { @@ -74,19 +87,67 @@ public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); String options = args.isNullAt(1) ? null : args.getString(1); + boolean dryRun = !args.isNullAt(2) && args.getBoolean(2); Table table = loadSparkTable(tableIdent).getTable(); HashMap dynamicOptions = new HashMap<>(); ProcedureUtils.putAllOptions(dynamicOptions, options); table = table.copy(dynamicOptions); + if (dryRun) { + String result = dryRunCompactManifest((FileStoreTable) table); + return new InternalRow[] {newInternalRow(UTF8String.fromString(result))}; + } + try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) { commit.compactManifests(); } catch (Exception e) { throw new RuntimeException(e); } - return new InternalRow[] {newInternalRow(true)}; + return new InternalRow[] {newInternalRow(UTF8String.fromString("success"))}; + } + + private String dryRunCompactManifest(FileStoreTable table) { + Snapshot latestSnapshot = table.store().snapshotManager().latestSnapshot(); + if (latestSnapshot == null) { + return "Dry run: no snapshot exists, nothing to compact."; + } + + ManifestList manifestList = table.store().manifestListFactory().create(); + List beforeManifests = manifestList.readDataManifests(latestSnapshot); + + Options compactOptions = Options.fromMap(table.options()); + compactOptions.set(CoreOptions.MANIFEST_MERGE_MIN_COUNT, 1); + compactOptions.set(CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE, MemorySize.ofBytes(1)); + + List afterManifests = + ManifestFileMerger.merge( + beforeManifests, + table.store().manifestFileFactory().create(), + table.schema().logicalPartitionType(), + new CoreOptions(compactOptions), + null); + + long beforeFileCount = beforeManifests.size(); + long afterFileCount = afterManifests.size(); + long beforeTotalSize = beforeManifests.stream().mapToLong(ManifestFileMeta::fileSize).sum(); + long afterTotalSize = afterManifests.stream().mapToLong(ManifestFileMeta::fileSize).sum(); + long beforeDeletedEntries = + beforeManifests.stream().mapToLong(ManifestFileMeta::numDeletedFiles).sum(); + long afterDeletedEntries = + afterManifests.stream().mapToLong(ManifestFileMeta::numDeletedFiles).sum(); + long eliminatedDeletedEntries = beforeDeletedEntries - afterDeletedEntries; + + return String.format( + "Dry run: manifest compaction would reduce %d manifest files to %d, " + + "total file size from %s to %s, " + + "eliminating %d deleted entries.", + beforeFileCount, + afterFileCount, + MemorySize.ofBytes(beforeTotalSize), + MemorySize.ofBytes(afterTotalSize), + eliminatedDeletedEntries); } @Override From fc61b7d6d62222fd48d71aa46f9fbea0fac8b216 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 18:46:38 +0800 Subject: [PATCH 02/13] [core][flink][spark] Extract dry_run logic, fix Spark output type compatibility, add test --- .../operation/ManifestCompactDryRun.java | 75 +++++++++++++++++++ .../procedure/CompactManifestProcedure.java | 54 +------------ .../CompactManifestProcedureITCase.java | 43 +++++++++++ .../procedure/CompactManifestProcedure.java | 60 ++------------- 4 files changed, 126 insertions(+), 106 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/operation/ManifestCompactDryRun.java diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestCompactDryRun.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestCompactDryRun.java new file mode 100644 index 000000000000..8091493b2ab4 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestCompactDryRun.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; + +import java.util.List; + +/** Dry run for manifest compaction, computing before/after statistics without committing. */ +public class ManifestCompactDryRun { + + public static String execute(FileStoreTable table) { + Snapshot latestSnapshot = table.store().snapshotManager().latestSnapshot(); + if (latestSnapshot == null) { + return "Dry run: no snapshot exists, nothing to compact."; + } + + ManifestList manifestList = table.store().manifestListFactory().create(); + List beforeManifests = manifestList.readDataManifests(latestSnapshot); + + Options compactOptions = Options.fromMap(table.options()); + compactOptions.set(CoreOptions.MANIFEST_MERGE_MIN_COUNT, 1); + compactOptions.set(CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE, MemorySize.ofBytes(1)); + + List afterManifests = + ManifestFileMerger.merge( + beforeManifests, + table.store().manifestFileFactory().create(), + table.schema().logicalPartitionType(), + new CoreOptions(compactOptions), + null); + + long beforeFileCount = beforeManifests.size(); + long afterFileCount = afterManifests.size(); + long beforeTotalSize = beforeManifests.stream().mapToLong(ManifestFileMeta::fileSize).sum(); + long afterTotalSize = afterManifests.stream().mapToLong(ManifestFileMeta::fileSize).sum(); + long beforeDeletedEntries = + beforeManifests.stream().mapToLong(ManifestFileMeta::numDeletedFiles).sum(); + long afterDeletedEntries = + afterManifests.stream().mapToLong(ManifestFileMeta::numDeletedFiles).sum(); + long eliminatedDeletedEntries = beforeDeletedEntries - afterDeletedEntries; + + return String.format( + "Dry run: manifest compaction would reduce %d manifest files to %d, " + + "total file size from %s to %s, " + + "eliminating %d deleted entries.", + beforeFileCount, + afterFileCount, + MemorySize.ofBytes(beforeTotalSize), + MemorySize.ofBytes(afterTotalSize), + eliminatedDeletedEntries); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java index 10625ed005a7..f1c9156e319f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java @@ -19,12 +19,7 @@ package org.apache.paimon.flink.procedure; import org.apache.paimon.CoreOptions; -import org.apache.paimon.Snapshot; -import org.apache.paimon.manifest.ManifestFileMeta; -import org.apache.paimon.manifest.ManifestList; -import org.apache.paimon.operation.ManifestFileMerger; -import org.apache.paimon.options.MemorySize; -import org.apache.paimon.options.Options; +import org.apache.paimon.operation.ManifestCompactDryRun; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.utils.ProcedureUtils; @@ -37,7 +32,6 @@ import javax.annotation.Nullable; import java.util.HashMap; -import java.util.List; /** Compact manifest file to reduce deleted manifest entries. */ public class CompactManifestProcedure extends ProcedureBase { @@ -71,7 +65,7 @@ public String[] call( table = table.copy(dynamicOptions); if (dryRun != null && dryRun) { - return dryRunCompactManifest(table); + return new String[] {ManifestCompactDryRun.execute(table)}; } try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) { @@ -79,48 +73,4 @@ public String[] call( } return new String[] {"success"}; } - - private String[] dryRunCompactManifest(FileStoreTable table) { - Snapshot latestSnapshot = table.store().snapshotManager().latestSnapshot(); - if (latestSnapshot == null) { - return new String[] {"Dry run: no snapshot exists, nothing to compact."}; - } - - ManifestList manifestList = table.store().manifestListFactory().create(); - List beforeManifests = manifestList.readDataManifests(latestSnapshot); - - Options compactOptions = Options.fromMap(table.options()); - compactOptions.set(CoreOptions.MANIFEST_MERGE_MIN_COUNT, 1); - compactOptions.set(CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE, MemorySize.ofBytes(1)); - - List afterManifests = - ManifestFileMerger.merge( - beforeManifests, - table.store().manifestFileFactory().create(), - table.schema().logicalPartitionType(), - new CoreOptions(compactOptions), - null); - - long beforeFileCount = beforeManifests.size(); - long afterFileCount = afterManifests.size(); - long beforeTotalSize = beforeManifests.stream().mapToLong(ManifestFileMeta::fileSize).sum(); - long afterTotalSize = afterManifests.stream().mapToLong(ManifestFileMeta::fileSize).sum(); - long beforeDeletedEntries = - beforeManifests.stream().mapToLong(ManifestFileMeta::numDeletedFiles).sum(); - long afterDeletedEntries = - afterManifests.stream().mapToLong(ManifestFileMeta::numDeletedFiles).sum(); - long eliminatedDeletedEntries = beforeDeletedEntries - afterDeletedEntries; - - String result = - String.format( - "Dry run: manifest compaction would reduce %d manifest files to %d, " - + "total file size from %s to %s, " - + "eliminating %d deleted entries.", - beforeFileCount, - afterFileCount, - MemorySize.ofBytes(beforeTotalSize), - MemorySize.ofBytes(afterTotalSize), - eliminatedDeletedEntries); - return new String[] {result}; - } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java index 89cdc48d85a0..fa2b76b7c134 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java @@ -130,4 +130,47 @@ public void testManifestCompactProcedureWithBranch() { .isEqualTo( "[+I[1, 101, 15, 20221208], +I[4, 1001, 16, 20221208], +I[5, 10001, 15, 20221209]]"); } + + @Test + public void testManifestCompactDryRun() { + sql( + "CREATE TABLE T (" + + " k INT," + + " v STRING," + + " hh INT," + + " dt STRING" + + ") PARTITIONED BY (dt, hh) WITH (" + + " 'write-only' = 'true'," + + " 'manifest.full-compaction-threshold-size' = '10000 T'," + + " 'bucket' = '-1'" + + ")"); + + sql( + "INSERT INTO T VALUES (1, '10', 15, '20221208'), (4, '100', 16, '20221208'), (5, '1000', 15, '20221209')"); + + sql( + "INSERT OVERWRITE T VALUES (1, '10', 15, '20221208'), (4, '100', 16, '20221208'), (5, '1000', 15, '20221209')"); + + sql( + "INSERT OVERWRITE T VALUES (1, '10', 15, '20221208'), (4, '100', 16, '20221208'), (5, '1000', 15, '20221209')"); + + Assertions.assertThat( + sql("SELECT sum(num_deleted_files) FROM T$manifests").get(0).getField(0)) + .isEqualTo(6L); + + String dryRunResult = + Objects.requireNonNull( + sql("CALL sys.compact_manifest(`table` => 'default.T', `dry_run` => true)") + .get(0) + .getField(0)) + .toString(); + + Assertions.assertThat(dryRunResult).startsWith("Dry run:"); + Assertions.assertThat(dryRunResult).contains("eliminating"); + + // verify dry run did not actually compact + Assertions.assertThat( + sql("SELECT sum(num_deleted_files) FROM T$manifests").get(0).getField(0)) + .isEqualTo(6L); + } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java index 42f6b56fcc63..4790d07d2ca9 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java @@ -18,13 +18,7 @@ package org.apache.paimon.spark.procedure; -import org.apache.paimon.CoreOptions; -import org.apache.paimon.Snapshot; -import org.apache.paimon.manifest.ManifestFileMeta; -import org.apache.paimon.manifest.ManifestList; -import org.apache.paimon.operation.ManifestFileMerger; -import org.apache.paimon.options.MemorySize; -import org.apache.paimon.options.Options; +import org.apache.paimon.operation.ManifestCompactDryRun; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchTableCommit; @@ -40,7 +34,6 @@ import org.apache.spark.unsafe.types.UTF8String; import java.util.HashMap; -import java.util.List; import static org.apache.spark.sql.types.DataTypes.BooleanType; import static org.apache.spark.sql.types.DataTypes.StringType; @@ -65,7 +58,8 @@ public class CompactManifestProcedure extends BaseProcedure { private static final StructType OUTPUT_TYPE = new StructType( new StructField[] { - new StructField("result", DataTypes.StringType, true, Metadata.empty()) + new StructField("result", DataTypes.BooleanType, true, Metadata.empty()), + new StructField("message", DataTypes.StringType, true, Metadata.empty()) }); protected CompactManifestProcedure(TableCatalog tableCatalog) { @@ -95,8 +89,8 @@ public InternalRow[] call(InternalRow args) { table = table.copy(dynamicOptions); if (dryRun) { - String result = dryRunCompactManifest((FileStoreTable) table); - return new InternalRow[] {newInternalRow(UTF8String.fromString(result))}; + String message = ManifestCompactDryRun.execute((FileStoreTable) table); + return new InternalRow[] {newInternalRow(true, UTF8String.fromString(message))}; } try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) { @@ -105,49 +99,7 @@ public InternalRow[] call(InternalRow args) { throw new RuntimeException(e); } - return new InternalRow[] {newInternalRow(UTF8String.fromString("success"))}; - } - - private String dryRunCompactManifest(FileStoreTable table) { - Snapshot latestSnapshot = table.store().snapshotManager().latestSnapshot(); - if (latestSnapshot == null) { - return "Dry run: no snapshot exists, nothing to compact."; - } - - ManifestList manifestList = table.store().manifestListFactory().create(); - List beforeManifests = manifestList.readDataManifests(latestSnapshot); - - Options compactOptions = Options.fromMap(table.options()); - compactOptions.set(CoreOptions.MANIFEST_MERGE_MIN_COUNT, 1); - compactOptions.set(CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE, MemorySize.ofBytes(1)); - - List afterManifests = - ManifestFileMerger.merge( - beforeManifests, - table.store().manifestFileFactory().create(), - table.schema().logicalPartitionType(), - new CoreOptions(compactOptions), - null); - - long beforeFileCount = beforeManifests.size(); - long afterFileCount = afterManifests.size(); - long beforeTotalSize = beforeManifests.stream().mapToLong(ManifestFileMeta::fileSize).sum(); - long afterTotalSize = afterManifests.stream().mapToLong(ManifestFileMeta::fileSize).sum(); - long beforeDeletedEntries = - beforeManifests.stream().mapToLong(ManifestFileMeta::numDeletedFiles).sum(); - long afterDeletedEntries = - afterManifests.stream().mapToLong(ManifestFileMeta::numDeletedFiles).sum(); - long eliminatedDeletedEntries = beforeDeletedEntries - afterDeletedEntries; - - return String.format( - "Dry run: manifest compaction would reduce %d manifest files to %d, " - + "total file size from %s to %s, " - + "eliminating %d deleted entries.", - beforeFileCount, - afterFileCount, - MemorySize.ofBytes(beforeTotalSize), - MemorySize.ofBytes(afterTotalSize), - eliminatedDeletedEntries); + return new InternalRow[] {newInternalRow(true, null)}; } @Override From 3c9ba4e5b77a64caf34477a3b5c3090a912f83d8 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 18:56:21 +0800 Subject: [PATCH 03/13] [core] Make dry_run zero-IO: compute stats from manifest metadata only --- .../operation/ManifestCompactDryRun.java | 55 +++++++------------ 1 file changed, 21 insertions(+), 34 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestCompactDryRun.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestCompactDryRun.java index 8091493b2ab4..bd770bf3de53 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestCompactDryRun.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestCompactDryRun.java @@ -18,17 +18,15 @@ package org.apache.paimon.operation; -import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.options.MemorySize; -import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import java.util.List; -/** Dry run for manifest compaction, computing before/after statistics without committing. */ +/** Dry run for manifest compaction. Reads only existing metadata, never writes files. */ public class ManifestCompactDryRun { public static String execute(FileStoreTable table) { @@ -38,38 +36,27 @@ public static String execute(FileStoreTable table) { } ManifestList manifestList = table.store().manifestListFactory().create(); - List beforeManifests = manifestList.readDataManifests(latestSnapshot); - - Options compactOptions = Options.fromMap(table.options()); - compactOptions.set(CoreOptions.MANIFEST_MERGE_MIN_COUNT, 1); - compactOptions.set(CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE, MemorySize.ofBytes(1)); - - List afterManifests = - ManifestFileMerger.merge( - beforeManifests, - table.store().manifestFileFactory().create(), - table.schema().logicalPartitionType(), - new CoreOptions(compactOptions), - null); - - long beforeFileCount = beforeManifests.size(); - long afterFileCount = afterManifests.size(); - long beforeTotalSize = beforeManifests.stream().mapToLong(ManifestFileMeta::fileSize).sum(); - long afterTotalSize = afterManifests.stream().mapToLong(ManifestFileMeta::fileSize).sum(); - long beforeDeletedEntries = - beforeManifests.stream().mapToLong(ManifestFileMeta::numDeletedFiles).sum(); - long afterDeletedEntries = - afterManifests.stream().mapToLong(ManifestFileMeta::numDeletedFiles).sum(); - long eliminatedDeletedEntries = beforeDeletedEntries - afterDeletedEntries; + List manifests = manifestList.readDataManifests(latestSnapshot); + + long manifestFileCount = manifests.size(); + long totalSize = manifests.stream().mapToLong(ManifestFileMeta::fileSize).sum(); + long totalAddedEntries = + manifests.stream().mapToLong(ManifestFileMeta::numAddedFiles).sum(); + long totalDeletedEntries = + manifests.stream().mapToLong(ManifestFileMeta::numDeletedFiles).sum(); + + if (totalDeletedEntries == 0) { + return String.format( + "Dry run: %d manifest files (%s), 0 deleted entries. Nothing to compact.", + manifestFileCount, MemorySize.ofBytes(totalSize)); + } return String.format( - "Dry run: manifest compaction would reduce %d manifest files to %d, " - + "total file size from %s to %s, " - + "eliminating %d deleted entries.", - beforeFileCount, - afterFileCount, - MemorySize.ofBytes(beforeTotalSize), - MemorySize.ofBytes(afterTotalSize), - eliminatedDeletedEntries); + "Dry run: %d manifest files (%s), " + + "%d added entries, %d deleted entries to eliminate.", + manifestFileCount, + MemorySize.ofBytes(totalSize), + totalAddedEntries, + totalDeletedEntries); } } From 593db53a7ffb6715b69904d7d9528ff055960324 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 18:58:42 +0800 Subject: [PATCH 04/13] [flink] Fix dry_run test assertion to match actual output format --- .../paimon/flink/procedure/CompactManifestProcedureITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java index fa2b76b7c134..47690a1f84c5 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java @@ -166,7 +166,7 @@ public void testManifestCompactDryRun() { .toString(); Assertions.assertThat(dryRunResult).startsWith("Dry run:"); - Assertions.assertThat(dryRunResult).contains("eliminating"); + Assertions.assertThat(dryRunResult).contains("deleted entries to eliminate"); // verify dry run did not actually compact Assertions.assertThat( From 06826622151b4c927d5bd959f7d22abd04775eb6 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 19:08:20 +0800 Subject: [PATCH 05/13] [core] Align dry_run with tryFullCompaction trigger logic --- .../operation/ManifestCompactDryRun.java | 70 ++++++++++++++----- .../CompactManifestProcedureITCase.java | 3 +- .../CompactManifestProcedureTest.scala | 27 +++++++ 3 files changed, 82 insertions(+), 18 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestCompactDryRun.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestCompactDryRun.java index bd770bf3de53..c6198a31db0c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestCompactDryRun.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestCompactDryRun.java @@ -18,6 +18,7 @@ package org.apache.paimon.operation; +import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.manifest.ManifestList; @@ -26,37 +27,72 @@ import java.util.List; -/** Dry run for manifest compaction. Reads only existing metadata, never writes files. */ +/** + * Dry run for manifest compaction. Reads only existing metadata, never writes files. Replicates the + * trigger logic from {@link ManifestFileMerger#tryFullCompaction} to predict whether compaction + * would actually fire. + */ public class ManifestCompactDryRun { public static String execute(FileStoreTable table) { Snapshot latestSnapshot = table.store().snapshotManager().latestSnapshot(); if (latestSnapshot == null) { - return "Dry run: no snapshot exists, nothing to compact."; + return "Dry run: no snapshot exists."; } ManifestList manifestList = table.store().manifestListFactory().create(); List manifests = manifestList.readDataManifests(latestSnapshot); - long manifestFileCount = manifests.size(); - long totalSize = manifests.stream().mapToLong(ManifestFileMeta::fileSize).sum(); - long totalAddedEntries = - manifests.stream().mapToLong(ManifestFileMeta::numAddedFiles).sum(); - long totalDeletedEntries = - manifests.stream().mapToLong(ManifestFileMeta::numDeletedFiles).sum(); - - if (totalDeletedEntries == 0) { - return String.format( - "Dry run: %d manifest files (%s), 0 deleted entries. Nothing to compact.", - manifestFileCount, MemorySize.ofBytes(totalSize)); + if (manifests.isEmpty()) { + return "Dry run: 0 manifest files."; + } + + CoreOptions options = new CoreOptions(table.options()); + long suggestedMetaSize = options.manifestTargetSize().getBytes(); + + long totalFiles = manifests.size(); + long totalSize = 0; + long totalDeletedEntries = 0; + long filesWithDeletedEntries = 0; + long smallFiles = 0; + long filesToCompact = 0; + long totalDeltaFileSize = 0; + + for (ManifestFileMeta file : manifests) { + totalSize += file.fileSize(); + totalDeletedEntries += file.numDeletedFiles(); + + boolean hasDeleted = file.numDeletedFiles() > 0; + boolean isSmall = file.fileSize() < suggestedMetaSize; + + if (hasDeleted) { + filesWithDeletedEntries++; + } + if (isSmall) { + smallFiles++; + } + if (hasDeleted || isSmall) { + filesToCompact++; + totalDeltaFileSize += file.fileSize(); + } } + // compactManifestOnce forces sizeTrigger = 1 byte + boolean wouldTrigger = totalDeltaFileSize >= 1; + return String.format( "Dry run: %d manifest files (%s), " - + "%d added entries, %d deleted entries to eliminate.", - manifestFileCount, + + "%d deleted entries in %d files, " + + "%d undersized files (< %s), " + + "%d files to compact, " + + "would trigger: %s.", + totalFiles, MemorySize.ofBytes(totalSize), - totalAddedEntries, - totalDeletedEntries); + totalDeletedEntries, + filesWithDeletedEntries, + smallFiles, + MemorySize.ofBytes(suggestedMetaSize), + filesToCompact, + wouldTrigger); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java index 47690a1f84c5..fb6a7ffeed8b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java @@ -166,7 +166,8 @@ public void testManifestCompactDryRun() { .toString(); Assertions.assertThat(dryRunResult).startsWith("Dry run:"); - Assertions.assertThat(dryRunResult).contains("deleted entries to eliminate"); + Assertions.assertThat(dryRunResult).contains("deleted entries in"); + Assertions.assertThat(dryRunResult).contains("would trigger: true"); // verify dry run did not actually compact Assertions.assertThat( diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala index 4425b85912d4..0ac5fd44550b 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala @@ -46,4 +46,31 @@ class CompactManifestProcedureTest extends PaimonSparkTestBase with StreamTest { rows = spark.sql("SELECT sum(num_deleted_files) FROM `T$manifests`").collectAsList() Assertions.assertThat(rows.get(0).getLong(0)).isEqualTo(0L) } + + test("Paimon Procedure: compact manifest dry run") { + spark.sql(s""" + |CREATE TABLE T2 (id INT, value STRING, dt STRING, hh INT) + |TBLPROPERTIES ('bucket'='-1', 'write-only'='true', 'compaction.min.file-num'='2') + |PARTITIONED BY (dt, hh) + |""".stripMargin) + + spark.sql(s"INSERT INTO T2 VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)") + spark.sql(s"INSERT OVERWRITE T2 VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)") + spark.sql(s"INSERT OVERWRITE T2 VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)") + + Thread.sleep(10000) + + var rows = spark.sql("SELECT sum(num_deleted_files) FROM `T2$manifests`").collectAsList() + val deletedBefore = rows.get(0).getLong(0) + Assertions.assertThat(deletedBefore).isGreaterThan(0L) + + val dryRunRows = spark.sql("CALL sys.compact_manifest(table => 'T2', dry_run => true)") + .collectAsList() + Assertions.assertThat(dryRunRows.get(0).getBoolean(0)).isTrue + Assertions.assertThat(dryRunRows.get(0).getString(1)).startsWith("Dry run:") + + // verify dry run did not actually compact + rows = spark.sql("SELECT sum(num_deleted_files) FROM `T2$manifests`").collectAsList() + Assertions.assertThat(rows.get(0).getLong(0)).isEqualTo(deletedBefore) + } } From b1694fbd4a2f9cabfbc7726dbdc3fd6d777bd8fa Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 19:10:14 +0800 Subject: [PATCH 06/13] [spark] Remove unnecessary Thread.sleep in dry_run test --- .../paimon/spark/procedure/CompactManifestProcedureTest.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala index 0ac5fd44550b..c3fc81f533a8 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala @@ -58,8 +58,6 @@ class CompactManifestProcedureTest extends PaimonSparkTestBase with StreamTest { spark.sql(s"INSERT OVERWRITE T2 VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)") spark.sql(s"INSERT OVERWRITE T2 VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)") - Thread.sleep(10000) - var rows = spark.sql("SELECT sum(num_deleted_files) FROM `T2$manifests`").collectAsList() val deletedBefore = rows.get(0).getLong(0) Assertions.assertThat(deletedBefore).isGreaterThan(0L) From 310330d19e8b7293e772fe772f04f9de34d1803f Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 19:12:26 +0800 Subject: [PATCH 07/13] [core] Remove wouldTrigger prediction, report only facts --- .../operation/ManifestCompactDryRun.java | 31 +++---------------- .../CompactManifestProcedureITCase.java | 1 - 2 files changed, 5 insertions(+), 27 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestCompactDryRun.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestCompactDryRun.java index c6198a31db0c..98c2f01eb727 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestCompactDryRun.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestCompactDryRun.java @@ -27,11 +27,7 @@ import java.util.List; -/** - * Dry run for manifest compaction. Reads only existing metadata, never writes files. Replicates the - * trigger logic from {@link ManifestFileMerger#tryFullCompaction} to predict whether compaction - * would actually fire. - */ +/** Dry run for manifest compaction. Reads only existing metadata, never writes files. */ public class ManifestCompactDryRun { public static String execute(FileStoreTable table) { @@ -55,44 +51,27 @@ public static String execute(FileStoreTable table) { long totalDeletedEntries = 0; long filesWithDeletedEntries = 0; long smallFiles = 0; - long filesToCompact = 0; - long totalDeltaFileSize = 0; for (ManifestFileMeta file : manifests) { totalSize += file.fileSize(); totalDeletedEntries += file.numDeletedFiles(); - - boolean hasDeleted = file.numDeletedFiles() > 0; - boolean isSmall = file.fileSize() < suggestedMetaSize; - - if (hasDeleted) { + if (file.numDeletedFiles() > 0) { filesWithDeletedEntries++; } - if (isSmall) { + if (file.fileSize() < suggestedMetaSize) { smallFiles++; } - if (hasDeleted || isSmall) { - filesToCompact++; - totalDeltaFileSize += file.fileSize(); - } } - // compactManifestOnce forces sizeTrigger = 1 byte - boolean wouldTrigger = totalDeltaFileSize >= 1; - return String.format( "Dry run: %d manifest files (%s), " + "%d deleted entries in %d files, " - + "%d undersized files (< %s), " - + "%d files to compact, " - + "would trigger: %s.", + + "%d undersized files (< %s).", totalFiles, MemorySize.ofBytes(totalSize), totalDeletedEntries, filesWithDeletedEntries, smallFiles, - MemorySize.ofBytes(suggestedMetaSize), - filesToCompact, - wouldTrigger); + MemorySize.ofBytes(suggestedMetaSize)); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java index fb6a7ffeed8b..cd74fc682d6d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java @@ -167,7 +167,6 @@ public void testManifestCompactDryRun() { Assertions.assertThat(dryRunResult).startsWith("Dry run:"); Assertions.assertThat(dryRunResult).contains("deleted entries in"); - Assertions.assertThat(dryRunResult).contains("would trigger: true"); // verify dry run did not actually compact Assertions.assertThat( From 40cd5d0a3c31113ec96703cb0e438b1f7d33a19d Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 19:22:17 +0800 Subject: [PATCH 08/13] [core][spark] Neutral dry_run wording, keep Spark OUTPUT_TYPE unchanged, add Spark test --- .../spark/procedure/CompactManifestProcedure.java | 13 ++++++++----- .../procedure/CompactManifestProcedureTest.scala | 4 ++-- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java index 4790d07d2ca9..cc1a42f9e52c 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java @@ -31,7 +31,8 @@ import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; -import org.apache.spark.unsafe.types.UTF8String; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; @@ -48,6 +49,8 @@ */ public class CompactManifestProcedure extends BaseProcedure { + private static final Logger LOG = LoggerFactory.getLogger(CompactManifestProcedure.class); + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { ProcedureParameter.required("table", StringType), @@ -58,8 +61,7 @@ public class CompactManifestProcedure extends BaseProcedure { private static final StructType OUTPUT_TYPE = new StructType( new StructField[] { - new StructField("result", DataTypes.BooleanType, true, Metadata.empty()), - new StructField("message", DataTypes.StringType, true, Metadata.empty()) + new StructField("result", DataTypes.BooleanType, true, Metadata.empty()) }); protected CompactManifestProcedure(TableCatalog tableCatalog) { @@ -90,7 +92,8 @@ public InternalRow[] call(InternalRow args) { if (dryRun) { String message = ManifestCompactDryRun.execute((FileStoreTable) table); - return new InternalRow[] {newInternalRow(true, UTF8String.fromString(message))}; + LOG.info(message); + return new InternalRow[] {newInternalRow(true)}; } try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) { @@ -99,7 +102,7 @@ public InternalRow[] call(InternalRow args) { throw new RuntimeException(e); } - return new InternalRow[] {newInternalRow(true, null)}; + return new InternalRow[] {newInternalRow(true)}; } @Override diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala index c3fc81f533a8..7ef982b085ee 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala @@ -62,10 +62,10 @@ class CompactManifestProcedureTest extends PaimonSparkTestBase with StreamTest { val deletedBefore = rows.get(0).getLong(0) Assertions.assertThat(deletedBefore).isGreaterThan(0L) - val dryRunRows = spark.sql("CALL sys.compact_manifest(table => 'T2', dry_run => true)") + val dryRunRows = spark + .sql("CALL sys.compact_manifest(table => 'T2', dry_run => true)") .collectAsList() Assertions.assertThat(dryRunRows.get(0).getBoolean(0)).isTrue - Assertions.assertThat(dryRunRows.get(0).getString(1)).startsWith("Dry run:") // verify dry run did not actually compact rows = spark.sql("SELECT sum(num_deleted_files) FROM `T2$manifests`").collectAsList() From b0e62af99b45d5256ad2d0b7db5b5470f49912d7 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 19:31:02 +0800 Subject: [PATCH 09/13] [docs] Document dry_run parameter for compact_manifest procedure --- docs/docs/flink/procedures.md | 7 +++++-- docs/docs/spark/procedures.md | 4 +++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/docs/docs/flink/procedures.md b/docs/docs/flink/procedures.md index 9ae1b176b0b9..b8726ebbd08c 100644 --- a/docs/docs/flink/procedures.md +++ b/docs/docs/flink/procedures.md @@ -871,15 +871,18 @@ All available procedures are listed below. compact_manifest CALL [catalog.]sys.compact_manifest(`table` => 'identifier')
- CALL [catalog.]sys.compact_manifest(`table` => 'identifier', 'options' => 'key1=value1,key2=value2') + CALL [catalog.]sys.compact_manifest(`table` => 'identifier', 'options' => 'key1=value1,key2=value2')
+ CALL [catalog.]sys.compact_manifest(`table` => 'identifier', `dry_run` => true) To compact_manifest the manifests. Arguments:
  • table: the target table identifier. Cannot be empty.
  • options: the additional dynamic options of the table. It prioritizes higher than original `tableProp` and lower than `procedureArg`.
  • +
  • dry_run (Boolean, optional): when true, returns manifest metadata statistics without actually compacting.
  • - CALL sys.compact_manifest(`table` => 'default.T') + CALL sys.compact_manifest(`table` => 'default.T')
    + CALL sys.compact_manifest(`table` => 'default.T', `dry_run` => true) diff --git a/docs/docs/spark/procedures.md b/docs/docs/spark/procedures.md index ed55504d0738..8b7cd4d6cdb1 100644 --- a/docs/docs/spark/procedures.md +++ b/docs/docs/spark/procedures.md @@ -441,9 +441,11 @@ This section introduce all available spark procedures about paimon. To compact_manifest the manifests. Arguments:
  • table: the target table identifier. Cannot be empty.
  • options: the additional dynamic options of the table. It prioritizes higher than original `tableProp` and lower than `procedureArg`.
  • +
  • dry_run (Boolean, optional): when true, logs manifest metadata statistics without actually compacting. The result is printed to the application log; the SQL return value is still `true`.
  • - CALL sys.compact_manifest(`table` => 'default.T') + CALL sys.compact_manifest(`table` => 'default.T')
    + CALL sys.compact_manifest(`table` => 'default.T', dry_run => true) From f00f826bb0add2154ec4e18f00625d5c7ef6a1e9 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 19:39:32 +0800 Subject: [PATCH 10/13] [core] Rewrite oversized manifest files during full compaction --- .../java/org/apache/paimon/operation/ManifestFileMerger.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java index 0313b2b12b8d..4ea14a546c45 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java @@ -195,7 +195,10 @@ public static Optional> tryFullCompaction( // 1. should trigger full compaction Filter mustChange = - file -> file.numDeletedFiles() > 0 || file.fileSize() < suggestedMetaSize; + file -> + file.numDeletedFiles() > 0 + || file.fileSize() < suggestedMetaSize + || file.fileSize() > suggestedMetaSize; long totalManifestSize = 0; long deltaDeleteFileNum = 0; long totalDeltaFileSize = 0; From 65e3c1c7fd94aa4950e945e45b85663728593798 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 19:45:06 +0800 Subject: [PATCH 11/13] [core] Use Math.max(2 * targetSize, 16MB) as oversized threshold --- .../java/org/apache/paimon/operation/ManifestFileMerger.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java index 4ea14a546c45..e55a1b717757 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java @@ -194,11 +194,12 @@ public static Optional> tryFullCompaction( // 1. should trigger full compaction + long oversizedThreshold = Math.max(2 * suggestedMetaSize, 16 * 1024 * 1024); Filter mustChange = file -> file.numDeletedFiles() > 0 || file.fileSize() < suggestedMetaSize - || file.fileSize() > suggestedMetaSize; + || file.fileSize() > oversizedThreshold; long totalManifestSize = 0; long deltaDeleteFileNum = 0; long totalDeltaFileSize = 0; From 877df3ee50010d27db908893734e8b8909a5495a Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 20:57:31 +0800 Subject: [PATCH 12/13] Revert "[core] Use Math.max(2 * targetSize, 16MB) as oversized threshold" This reverts commit 65e3c1c7fd94aa4950e945e45b85663728593798. --- .../java/org/apache/paimon/operation/ManifestFileMerger.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java index e55a1b717757..4ea14a546c45 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java @@ -194,12 +194,11 @@ public static Optional> tryFullCompaction( // 1. should trigger full compaction - long oversizedThreshold = Math.max(2 * suggestedMetaSize, 16 * 1024 * 1024); Filter mustChange = file -> file.numDeletedFiles() > 0 || file.fileSize() < suggestedMetaSize - || file.fileSize() > oversizedThreshold; + || file.fileSize() > suggestedMetaSize; long totalManifestSize = 0; long deltaDeleteFileNum = 0; long totalDeltaFileSize = 0; From 29e0275974cc418d17c7a630d7af369935456267 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 20:57:32 +0800 Subject: [PATCH 13/13] Revert "[core] Rewrite oversized manifest files during full compaction" This reverts commit f00f826bb0add2154ec4e18f00625d5c7ef6a1e9. --- .../java/org/apache/paimon/operation/ManifestFileMerger.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java index 4ea14a546c45..0313b2b12b8d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java @@ -195,10 +195,7 @@ public static Optional> tryFullCompaction( // 1. should trigger full compaction Filter mustChange = - file -> - file.numDeletedFiles() > 0 - || file.fileSize() < suggestedMetaSize - || file.fileSize() > suggestedMetaSize; + file -> file.numDeletedFiles() > 0 || file.fileSize() < suggestedMetaSize; long totalManifestSize = 0; long deltaDeleteFileNum = 0; long totalDeltaFileSize = 0;