diff --git a/docs/docs/flink/procedures.md b/docs/docs/flink/procedures.md index 9ae1b176b0b9..b8726ebbd08c 100644 --- a/docs/docs/flink/procedures.md +++ b/docs/docs/flink/procedures.md @@ -871,15 +871,18 @@ All available procedures are listed below. compact_manifest CALL [catalog.]sys.compact_manifest(`table` => 'identifier')
- CALL [catalog.]sys.compact_manifest(`table` => 'identifier', 'options' => 'key1=value1,key2=value2') + CALL [catalog.]sys.compact_manifest(`table` => 'identifier', 'options' => 'key1=value1,key2=value2')
+ CALL [catalog.]sys.compact_manifest(`table` => 'identifier', `dry_run` => true) To compact_manifest the manifests. Arguments:
  • table: the target table identifier. Cannot be empty.
  • options: the additional dynamic options of the table. It prioritizes higher than original `tableProp` and lower than `procedureArg`.
  • +
  • dry_run (Boolean, optional): when true, returns manifest metadata statistics without actually compacting.
  • - CALL sys.compact_manifest(`table` => 'default.T') + CALL sys.compact_manifest(`table` => 'default.T')
    + CALL sys.compact_manifest(`table` => 'default.T', `dry_run` => true) diff --git a/docs/docs/spark/procedures.md b/docs/docs/spark/procedures.md index ed55504d0738..8b7cd4d6cdb1 100644 --- a/docs/docs/spark/procedures.md +++ b/docs/docs/spark/procedures.md @@ -441,9 +441,11 @@ This section introduce all available spark procedures about paimon. To compact_manifest the manifests. Arguments:
  • table: the target table identifier. Cannot be empty.
  • options: the additional dynamic options of the table. It prioritizes higher than original `tableProp` and lower than `procedureArg`.
  • +
  • dry_run (Boolean, optional): when true, logs manifest metadata statistics without actually compacting. The result is printed to the application log; the SQL return value is still `true`.
  • - CALL sys.compact_manifest(`table` => 'default.T') + CALL sys.compact_manifest(`table` => 'default.T')
    + CALL sys.compact_manifest(`table` => 'default.T', dry_run => true) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestCompactDryRun.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestCompactDryRun.java new file mode 100644 index 000000000000..98c2f01eb727 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestCompactDryRun.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.table.FileStoreTable; + +import java.util.List; + +/** Dry run for manifest compaction. Reads only existing metadata, never writes files. */ +public class ManifestCompactDryRun { + + public static String execute(FileStoreTable table) { + Snapshot latestSnapshot = table.store().snapshotManager().latestSnapshot(); + if (latestSnapshot == null) { + return "Dry run: no snapshot exists."; + } + + ManifestList manifestList = table.store().manifestListFactory().create(); + List manifests = manifestList.readDataManifests(latestSnapshot); + + if (manifests.isEmpty()) { + return "Dry run: 0 manifest files."; + } + + CoreOptions options = new CoreOptions(table.options()); + long suggestedMetaSize = options.manifestTargetSize().getBytes(); + + long totalFiles = manifests.size(); + long totalSize = 0; + long totalDeletedEntries = 0; + long filesWithDeletedEntries = 0; + long smallFiles = 0; + + for (ManifestFileMeta file : manifests) { + totalSize += file.fileSize(); + totalDeletedEntries += file.numDeletedFiles(); + if (file.numDeletedFiles() > 0) { + filesWithDeletedEntries++; + } + if (file.fileSize() < suggestedMetaSize) { + smallFiles++; + } + } + + return String.format( + "Dry run: %d manifest files (%s), " + + "%d deleted entries in %d files, " + + "%d undersized files (< %s).", + totalFiles, + MemorySize.ofBytes(totalSize), + totalDeletedEntries, + filesWithDeletedEntries, + smallFiles, + MemorySize.ofBytes(suggestedMetaSize)); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java index 488aee544e26..f1c9156e319f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.procedure; import org.apache.paimon.CoreOptions; +import org.apache.paimon.operation.ManifestCompactDryRun; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.utils.ProcedureUtils; @@ -28,6 +29,8 @@ import org.apache.flink.table.annotation.ProcedureHint; import org.apache.flink.table.procedure.ProcedureContext; +import javax.annotation.Nullable; + import java.util.HashMap; /** Compact manifest file to reduce deleted manifest entries. */ @@ -43,9 +46,14 @@ public String identifier() { @ProcedureHint( argument = { @ArgumentHint(name = "table", type = @DataTypeHint("STRING")), - @ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true) + @ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true), + @ArgumentHint(name = "dry_run", type = @DataTypeHint("BOOLEAN"), isOptional = true) }) - public String[] call(ProcedureContext procedureContext, String tableId, String options) + public String[] call( + ProcedureContext procedureContext, + String tableId, + @Nullable String options, + @Nullable Boolean dryRun) throws Exception { FileStoreTable table = (FileStoreTable) table(tableId); @@ -56,6 +64,10 @@ public String[] call(ProcedureContext procedureContext, String tableId, String o table = table.copy(dynamicOptions); + if (dryRun != null && dryRun) { + return new String[] {ManifestCompactDryRun.execute(table)}; + } + try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) { commit.compactManifests(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java index 89cdc48d85a0..cd74fc682d6d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactManifestProcedureITCase.java @@ -130,4 +130,47 @@ public void testManifestCompactProcedureWithBranch() { .isEqualTo( "[+I[1, 101, 15, 20221208], +I[4, 1001, 16, 20221208], +I[5, 10001, 15, 20221209]]"); } + + @Test + public void testManifestCompactDryRun() { + sql( + "CREATE TABLE T (" + + " k INT," + + " v STRING," + + " hh INT," + + " dt STRING" + + ") PARTITIONED BY (dt, hh) WITH (" + + " 'write-only' = 'true'," + + " 'manifest.full-compaction-threshold-size' = '10000 T'," + + " 'bucket' = '-1'" + + ")"); + + sql( + "INSERT INTO T VALUES (1, '10', 15, '20221208'), (4, '100', 16, '20221208'), (5, '1000', 15, '20221209')"); + + sql( + "INSERT OVERWRITE T VALUES (1, '10', 15, '20221208'), (4, '100', 16, '20221208'), (5, '1000', 15, '20221209')"); + + sql( + "INSERT OVERWRITE T VALUES (1, '10', 15, '20221208'), (4, '100', 16, '20221208'), (5, '1000', 15, '20221209')"); + + Assertions.assertThat( + sql("SELECT sum(num_deleted_files) FROM T$manifests").get(0).getField(0)) + .isEqualTo(6L); + + String dryRunResult = + Objects.requireNonNull( + sql("CALL sys.compact_manifest(`table` => 'default.T', `dry_run` => true)") + .get(0) + .getField(0)) + .toString(); + + Assertions.assertThat(dryRunResult).startsWith("Dry run:"); + Assertions.assertThat(dryRunResult).contains("deleted entries in"); + + // verify dry run did not actually compact + Assertions.assertThat( + sql("SELECT sum(num_deleted_files) FROM T$manifests").get(0).getField(0)) + .isEqualTo(6L); + } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java index 5a6837f6c15f..cc1a42f9e52c 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java @@ -18,6 +18,8 @@ package org.apache.paimon.spark.procedure; +import org.apache.paimon.operation.ManifestCompactDryRun; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.utils.ProcedureUtils; @@ -29,9 +31,12 @@ import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; +import static org.apache.spark.sql.types.DataTypes.BooleanType; import static org.apache.spark.sql.types.DataTypes.StringType; /** @@ -39,14 +44,18 @@ * *
    
      *  CALL sys.compact_manifest(table => 'tableId')
    + *  CALL sys.compact_manifest(table => 'tableId', dry_run => true)
      * 
    */ public class CompactManifestProcedure extends BaseProcedure { + private static final Logger LOG = LoggerFactory.getLogger(CompactManifestProcedure.class); + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { ProcedureParameter.required("table", StringType), - ProcedureParameter.optional("options", StringType) + ProcedureParameter.optional("options", StringType), + ProcedureParameter.optional("dry_run", BooleanType) }; private static final StructType OUTPUT_TYPE = @@ -74,12 +83,19 @@ public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); String options = args.isNullAt(1) ? null : args.getString(1); + boolean dryRun = !args.isNullAt(2) && args.getBoolean(2); Table table = loadSparkTable(tableIdent).getTable(); HashMap dynamicOptions = new HashMap<>(); ProcedureUtils.putAllOptions(dynamicOptions, options); table = table.copy(dynamicOptions); + if (dryRun) { + String message = ManifestCompactDryRun.execute((FileStoreTable) table); + LOG.info(message); + return new InternalRow[] {newInternalRow(true)}; + } + try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) { commit.compactManifests(); } catch (Exception e) { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala index 4425b85912d4..7ef982b085ee 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala @@ -46,4 +46,29 @@ class CompactManifestProcedureTest extends PaimonSparkTestBase with StreamTest { rows = spark.sql("SELECT sum(num_deleted_files) FROM `T$manifests`").collectAsList() Assertions.assertThat(rows.get(0).getLong(0)).isEqualTo(0L) } + + test("Paimon Procedure: compact manifest dry run") { + spark.sql(s""" + |CREATE TABLE T2 (id INT, value STRING, dt STRING, hh INT) + |TBLPROPERTIES ('bucket'='-1', 'write-only'='true', 'compaction.min.file-num'='2') + |PARTITIONED BY (dt, hh) + |""".stripMargin) + + spark.sql(s"INSERT INTO T2 VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)") + spark.sql(s"INSERT OVERWRITE T2 VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)") + spark.sql(s"INSERT OVERWRITE T2 VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)") + + var rows = spark.sql("SELECT sum(num_deleted_files) FROM `T2$manifests`").collectAsList() + val deletedBefore = rows.get(0).getLong(0) + Assertions.assertThat(deletedBefore).isGreaterThan(0L) + + val dryRunRows = spark + .sql("CALL sys.compact_manifest(table => 'T2', dry_run => true)") + .collectAsList() + Assertions.assertThat(dryRunRows.get(0).getBoolean(0)).isTrue + + // verify dry run did not actually compact + rows = spark.sql("SELECT sum(num_deleted_files) FROM `T2$manifests`").collectAsList() + Assertions.assertThat(rows.get(0).getLong(0)).isEqualTo(deletedBefore) + } }