Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions docs/docs/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -871,15 +871,18 @@ All available procedures are listed below.
<td>compact_manifest</td>
<td>
CALL [catalog.]sys.compact_manifest(`table` => 'identifier')<br/>
CALL [catalog.]sys.compact_manifest(`table` => 'identifier', 'options' => 'key1=value1,key2=value2')
CALL [catalog.]sys.compact_manifest(`table` => 'identifier', 'options' => 'key1=value1,key2=value2')<br/>
CALL [catalog.]sys.compact_manifest(`table` => 'identifier', `dry_run` => true)
</td>
<td>
To compact_manifest the manifests. Arguments:
<li>table: the target table identifier. Cannot be empty.</li>
<li>options: the additional dynamic options of the table. It prioritizes higher than original `tableProp` and lower than `procedureArg`.</li>
<li>dry_run (Boolean, optional): when true, returns manifest metadata statistics without actually compacting.</li>
</td>
<td>
CALL sys.compact_manifest(`table` => 'default.T')
CALL sys.compact_manifest(`table` => 'default.T')<br/>
CALL sys.compact_manifest(`table` => 'default.T', `dry_run` => true)
</td>
</tr>
<tr>
Expand Down
4 changes: 3 additions & 1 deletion docs/docs/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,11 @@ This section introduce all available spark procedures about paimon.
To compact_manifest the manifests. Arguments:
<li>table: the target table identifier. Cannot be empty.</li>
<li>options: the additional dynamic options of the table. It prioritizes higher than original `tableProp` and lower than `procedureArg`.</li>
<li>dry_run (Boolean, optional): when true, logs manifest metadata statistics without actually compacting. The result is printed to the application log; the SQL return value is still `true`.</li>
</td>
<td>
CALL sys.compact_manifest(`table` => 'default.T')
CALL sys.compact_manifest(`table` => 'default.T')<br/>
CALL sys.compact_manifest(`table` => 'default.T', dry_run => true)
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.operation;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.table.FileStoreTable;

import java.util.List;

/** Dry run for manifest compaction. Reads only existing metadata, never writes files. */
public class ManifestCompactDryRun {

public static String execute(FileStoreTable table) {
Snapshot latestSnapshot = table.store().snapshotManager().latestSnapshot();
if (latestSnapshot == null) {
return "Dry run: no snapshot exists.";
}

ManifestList manifestList = table.store().manifestListFactory().create();
List<ManifestFileMeta> manifests = manifestList.readDataManifests(latestSnapshot);

if (manifests.isEmpty()) {
return "Dry run: 0 manifest files.";
}

CoreOptions options = new CoreOptions(table.options());
long suggestedMetaSize = options.manifestTargetSize().getBytes();

long totalFiles = manifests.size();
long totalSize = 0;
long totalDeletedEntries = 0;
long filesWithDeletedEntries = 0;
long smallFiles = 0;

for (ManifestFileMeta file : manifests) {
totalSize += file.fileSize();
totalDeletedEntries += file.numDeletedFiles();
if (file.numDeletedFiles() > 0) {
filesWithDeletedEntries++;
}
if (file.fileSize() < suggestedMetaSize) {
smallFiles++;
}
}

return String.format(
"Dry run: %d manifest files (%s), "
+ "%d deleted entries in %d files, "
+ "%d undersized files (< %s).",
totalFiles,
MemorySize.ofBytes(totalSize),
totalDeletedEntries,
filesWithDeletedEntries,
smallFiles,
MemorySize.ofBytes(suggestedMetaSize));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.procedure;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.operation.ManifestCompactDryRun;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.utils.ProcedureUtils;
Expand All @@ -28,6 +29,8 @@
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;

import javax.annotation.Nullable;

import java.util.HashMap;

/** Compact manifest file to reduce deleted manifest entries. */
Expand All @@ -43,9 +46,14 @@ public String identifier() {
@ProcedureHint(
argument = {
@ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true)
@ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true),
@ArgumentHint(name = "dry_run", type = @DataTypeHint("BOOLEAN"), isOptional = true)
})
public String[] call(ProcedureContext procedureContext, String tableId, String options)
public String[] call(
ProcedureContext procedureContext,
String tableId,
@Nullable String options,
@Nullable Boolean dryRun)
throws Exception {

FileStoreTable table = (FileStoreTable) table(tableId);
Expand All @@ -56,6 +64,10 @@ public String[] call(ProcedureContext procedureContext, String tableId, String o

table = table.copy(dynamicOptions);

if (dryRun != null && dryRun) {
return new String[] {ManifestCompactDryRun.execute(table)};
}

try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) {
commit.compactManifests();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,47 @@ public void testManifestCompactProcedureWithBranch() {
.isEqualTo(
"[+I[1, 101, 15, 20221208], +I[4, 1001, 16, 20221208], +I[5, 10001, 15, 20221209]]");
}

@Test
public void testManifestCompactDryRun() {
sql(
"CREATE TABLE T ("
+ " k INT,"
+ " v STRING,"
+ " hh INT,"
+ " dt STRING"
+ ") PARTITIONED BY (dt, hh) WITH ("
+ " 'write-only' = 'true',"
+ " 'manifest.full-compaction-threshold-size' = '10000 T',"
+ " 'bucket' = '-1'"
+ ")");

sql(
"INSERT INTO T VALUES (1, '10', 15, '20221208'), (4, '100', 16, '20221208'), (5, '1000', 15, '20221209')");

sql(
"INSERT OVERWRITE T VALUES (1, '10', 15, '20221208'), (4, '100', 16, '20221208'), (5, '1000', 15, '20221209')");

sql(
"INSERT OVERWRITE T VALUES (1, '10', 15, '20221208'), (4, '100', 16, '20221208'), (5, '1000', 15, '20221209')");

Assertions.assertThat(
sql("SELECT sum(num_deleted_files) FROM T$manifests").get(0).getField(0))
.isEqualTo(6L);

String dryRunResult =
Objects.requireNonNull(
sql("CALL sys.compact_manifest(`table` => 'default.T', `dry_run` => true)")
.get(0)
.getField(0))
.toString();

Assertions.assertThat(dryRunResult).startsWith("Dry run:");
Assertions.assertThat(dryRunResult).contains("deleted entries in");

// verify dry run did not actually compact
Assertions.assertThat(
sql("SELECT sum(num_deleted_files) FROM T$manifests").get(0).getField(0))
.isEqualTo(6L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.spark.procedure;

import org.apache.paimon.operation.ManifestCompactDryRun;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.utils.ProcedureUtils;
Expand All @@ -29,24 +31,31 @@
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;

import static org.apache.spark.sql.types.DataTypes.BooleanType;
import static org.apache.spark.sql.types.DataTypes.StringType;

/**
* Compact manifest procedure. Usage:
*
* <pre><code>
* CALL sys.compact_manifest(table => 'tableId')
* CALL sys.compact_manifest(table => 'tableId', dry_run => true)
* </code></pre>
*/
public class CompactManifestProcedure extends BaseProcedure {

private static final Logger LOG = LoggerFactory.getLogger(CompactManifestProcedure.class);

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", StringType),
ProcedureParameter.optional("options", StringType)
ProcedureParameter.optional("options", StringType),
ProcedureParameter.optional("dry_run", BooleanType)
};

private static final StructType OUTPUT_TYPE =
Expand Down Expand Up @@ -74,12 +83,19 @@ public InternalRow[] call(InternalRow args) {

Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String options = args.isNullAt(1) ? null : args.getString(1);
boolean dryRun = !args.isNullAt(2) && args.getBoolean(2);

Table table = loadSparkTable(tableIdent).getTable();
HashMap<String, String> dynamicOptions = new HashMap<>();
ProcedureUtils.putAllOptions(dynamicOptions, options);
table = table.copy(dynamicOptions);

if (dryRun) {
String message = ManifestCompactDryRun.execute((FileStoreTable) table);
LOG.info(message);
return new InternalRow[] {newInternalRow(true)};
}

try (BatchTableCommit commit = table.newBatchWriteBuilder().newCommit()) {
commit.compactManifests();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,29 @@ class CompactManifestProcedureTest extends PaimonSparkTestBase with StreamTest {
rows = spark.sql("SELECT sum(num_deleted_files) FROM `T$manifests`").collectAsList()
Assertions.assertThat(rows.get(0).getLong(0)).isEqualTo(0L)
}

test("Paimon Procedure: compact manifest dry run") {
spark.sql(s"""
|CREATE TABLE T2 (id INT, value STRING, dt STRING, hh INT)
|TBLPROPERTIES ('bucket'='-1', 'write-only'='true', 'compaction.min.file-num'='2')
|PARTITIONED BY (dt, hh)
|""".stripMargin)

spark.sql(s"INSERT INTO T2 VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)")
spark.sql(s"INSERT OVERWRITE T2 VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)")
spark.sql(s"INSERT OVERWRITE T2 VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)")

var rows = spark.sql("SELECT sum(num_deleted_files) FROM `T2$manifests`").collectAsList()
val deletedBefore = rows.get(0).getLong(0)
Assertions.assertThat(deletedBefore).isGreaterThan(0L)

val dryRunRows = spark
.sql("CALL sys.compact_manifest(table => 'T2', dry_run => true)")
.collectAsList()
Assertions.assertThat(dryRunRows.get(0).getBoolean(0)).isTrue

// verify dry run did not actually compact
rows = spark.sql("SELECT sum(num_deleted_files) FROM `T2$manifests`").collectAsList()
Assertions.assertThat(rows.get(0).getLong(0)).isEqualTo(deletedBefore)
}
}
Loading