diff --git a/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt b/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt index b1a1835164c6..39d4214cf14e 100644 --- a/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt +++ b/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt @@ -213,6 +213,7 @@ EDL 1.0 com.sun.activation:jakarta.activation jakarta.activation:jakarta.activation-api jakarta.xml.bind:jakarta.xml.bind-api + org.locationtech.jts:jts-core EPL 2.0 @@ -383,6 +384,7 @@ Apache License 2.0 org.apache.hadoop:hadoop-common org.apache.hadoop:hadoop-hdfs org.apache.hadoop:hadoop-hdfs-client + org.apache.hadoop:hadoop-mapreduce-client-core org.apache.hadoop:hadoop-shaded-guava org.apache.hadoop:hadoop-shaded-protobuf_3_25 org.apache.httpcomponents:httpcore @@ -390,6 +392,8 @@ Apache License 2.0 org.apache.iceberg:iceberg-bundled-guava org.apache.iceberg:iceberg-common org.apache.iceberg:iceberg-core + org.apache.iceberg:iceberg-orc + org.apache.iceberg:iceberg-parquet org.apache.kerby:kerb-admin org.apache.kerby:kerb-client org.apache.kerby:kerb-common @@ -407,6 +411,16 @@ Apache License 2.0 org.apache.kerby:token-provider org.apache.logging.log4j:log4j-api org.apache.logging.log4j:log4j-core + org.apache.orc:orc-core + org.apache.orc:orc-shims + org.apache.parquet:parquet-avro + org.apache.parquet:parquet-column + org.apache.parquet:parquet-common + org.apache.parquet:parquet-encoding + org.apache.parquet:parquet-format-structures + org.apache.parquet:parquet-hadoop + org.apache.parquet:parquet-jackson + org.apache.parquet:parquet-variant org.apache.ranger:ranger-audit-core org.apache.ranger:ranger-authz-api org.apache.ranger:ranger-intg diff --git a/hadoop-ozone/dist/src/main/license/jar-report.txt b/hadoop-ozone/dist/src/main/license/jar-report.txt index 2daa3316986c..d80a0c2ade24 100644 --- a/hadoop-ozone/dist/src/main/license/jar-report.txt +++ b/hadoop-ozone/dist/src/main/license/jar-report.txt @@ -64,6 +64,7 @@ share/ozone/lib/hadoop-auth.jar share/ozone/lib/hadoop-common.jar share/ozone/lib/hadoop-hdfs-client.jar share/ozone/lib/hadoop-hdfs.jar +share/ozone/lib/hadoop-mapreduce-client-core.jar share/ozone/lib/hadoop-shaded-guava.jar share/ozone/lib/hadoop-shaded-protobuf_3_25.jar share/ozone/lib/hdds-cli-common.jar @@ -90,6 +91,8 @@ share/ozone/lib/iceberg-api.jar share/ozone/lib/iceberg-bundled-guava.jar share/ozone/lib/iceberg-common.jar share/ozone/lib/iceberg-core.jar +share/ozone/lib/iceberg-orc.jar +share/ozone/lib/iceberg-parquet.jar share/ozone/lib/istack-commons-runtime.jar share/ozone/lib/j2objc-annotations.jar share/ozone/lib/jackson-annotations.jar @@ -165,6 +168,7 @@ share/ozone/lib/json-simple.jar share/ozone/lib/jsp-api.jar share/ozone/lib/jspecify.jar share/ozone/lib/jsr311-api.jar +share/ozone/lib/jts-core.jar share/ozone/lib/kerb-core.jar share/ozone/lib/kerb-crypto.jar share/ozone/lib/kerb-util.jar @@ -214,6 +218,8 @@ share/ozone/lib/opentelemetry-sdk-logs.jar share/ozone/lib/opentelemetry-sdk-metrics.jar share/ozone/lib/opentelemetry-sdk-trace.jar share/ozone/lib/opentelemetry-sdk.jar +share/ozone/lib/orc-core-nohive.jar +share/ozone/lib/orc-shims.jar share/ozone/lib/osgi-resource-locator.jar share/ozone/lib/ozone-client.jar share/ozone/lib/ozone-cli-admin.jar @@ -241,6 +247,14 @@ share/ozone/lib/ozone-s3-secret-store.jar share/ozone/lib/ozone-s3gateway.jar share/ozone/lib/ozone-tools.jar share/ozone/lib/ozone-vapor.jar +share/ozone/lib/parquet-avro.jar +share/ozone/lib/parquet-column.jar +share/ozone/lib/parquet-common.jar +share/ozone/lib/parquet-encoding.jar +share/ozone/lib/parquet-format-structures.jar +share/ozone/lib/parquet-hadoop.jar +share/ozone/lib/parquet-jackson.jar +share/ozone/lib/parquet-variant.jar share/ozone/lib/perfmark-api.jar share/ozone/lib/picocli.jar share/ozone/lib/picocli-shell-jline3.jar diff --git a/hadoop-ozone/iceberg/pom.xml b/hadoop-ozone/iceberg/pom.xml index d7b822c38608..988f7bc6f66d 100644 --- a/hadoop-ozone/iceberg/pom.xml +++ b/hadoop-ozone/iceberg/pom.xml @@ -32,6 +32,11 @@ + + org.apache.avro + avro + 1.12.0 + @@ -64,11 +69,111 @@ + + org.apache.iceberg + iceberg-orc + ${iceberg.version} + + + org.apache.iceberg + iceberg-parquet + ${iceberg.version} + + + commons-pool + commons-pool + + + + + org.apache.orc + orc-core + 1.9.7 + nohive + + + org.apache.hadoop + hadoop-client-api + + + org.threeten + threeten-extra + + + + + org.apache.parquet + parquet-column + 1.16.0 + org.slf4j slf4j-api + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + runtime + + + com.github.pjfanning + jersey-json + + + + + com.sun.jersey + jersey-guice + + + com.sun.jersey + jersey-servlet + + + + + javax.xml.bind + jaxb-api + + + org.apache.avro + avro + + + + org.apache.hadoop + hadoop-yarn-api + + + org.apache.hadoop + hadoop-yarn-client + + + org.apache.hadoop + hadoop-yarn-common + + + + + org.eclipse.jetty + jetty-client + + + org.eclipse.jetty.websocket + websocket-api + + + org.eclipse.jetty.websocket + websocket-client + + + org.eclipse.jetty.websocket + websocket-common + + + org.apache.hadoop hadoop-common diff --git a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java index 0327b7a5fd66..7b3aa3da771d 100644 --- a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java +++ b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java @@ -46,18 +46,33 @@ import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.RewriteTablePathUtil; import org.apache.iceberg.RewriteTablePathUtil.RewriteResult; +import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StaticTableOperations; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadata.MetadataLogEntry; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.actions.ImmutableRewriteTablePath; import org.apache.iceberg.actions.RewriteTablePath; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.orc.GenericOrcWriter; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -228,8 +243,6 @@ private boolean versionInFilePath(String path, String version) { } private String rebuildMetadata() { - // TODO: position delete file entries in rewriteManifestResult.copyPlan() reference staging paths - // that are never written, exclude them until position delete rewriting is implemented. TableMetadata startMetadata = startVersionName != null ? new StaticTableOperations(startVersionName, table.io()).current() : null; @@ -255,6 +268,13 @@ private String rebuildMetadata() { RewriteContentFileResult rewriteManifestResult = rewriteManifests(deltaSnapshotIds, endMetadata, rewriteManifestListResult.toRewrite()); + Set deleteFiles = + rewriteManifestResult.toRewrite().stream() + .filter(e -> e instanceof DeleteFile) + .map(e -> (DeleteFile) e) + .collect(Collectors.toSet()); + rewritePositionDeletes(deleteFiles); + Set> copyPlan = new HashSet<>(); copyPlan.addAll(rewriteVersionResult.copyPlan()); copyPlan.addAll(rewriteManifestListResult.copyPlan()); @@ -675,4 +695,188 @@ private static RewriteResult writeDeleteManifest( throw new RuntimeIOException(e); } } + + static class OzonePositionDeleteReaderWriter implements RewriteTablePathUtil.PositionDeleteReaderWriter { + @Override + public CloseableIterable reader( + InputFile inputFile, FileFormat format, PartitionSpec spec) { + return positionDeletesReader(inputFile, format, spec); + } + + @Override + public PositionDeleteWriter writer( + OutputFile outputFile, + FileFormat format, + PartitionSpec spec, + StructLike partition, + Schema rowSchema) + throws IOException { + return positionDeletesWriter(outputFile, format, spec, partition, rowSchema); + } + } + + private void rewritePositionDeletes(Set toRewrite) { + /* + * NOTE: Rewriting position delete files updates embedded data file paths, which changes the + * resulting file size. This causes a metadata mismatch in the manifests: + * + * 1. Dependency: Manifests MUST be rewritten first because they are the source of truth used to identify which + * position delete files exist and need processing. + * 2. Issue: Because manifests are written before the delete files are updated, the'file_size_in_bytes' field + * in the manifest reflects the original size, not the new size. + * 3. Impact: Some catalogs (e.g., REST catalogs like Polaris) will fail to read these files as the reader uses + * the stale size from the manifest. + * + * This is a known Iceberg limitation being addressed by the Iceberg community. Once that fix is available + * in the Iceberg core, this action should be updated accordingly. + */ + if (toRewrite.isEmpty()) { + return; + } + + RewriteTablePathUtil.PositionDeleteReaderWriter posDeleteReaderWriter = new OzonePositionDeleteReaderWriter(); + int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER; + Semaphore semaphore = new Semaphore(maxInFlight); + ExecutorCompletionService completionService = new ExecutorCompletionService<>(executorService); + int submittedTasks = 0; + int completedTasks = 0; + + try { + for (DeleteFile deleteFile : toRewrite) { + semaphore.acquire(); + boolean taskSubmitted = false; + try { + completionService.submit(() -> { + try { + rewritePositionDelete(deleteFile, table, sourcePrefix, targetPrefix, stagingDir, posDeleteReaderWriter); + return null; + } finally { + semaphore.release(); + } + }); + taskSubmitted = true; + submittedTasks++; + } finally { + if (!taskSubmitted) { + semaphore.release(); + } + } + + Future done; + while ((done = completionService.poll()) != null) { + done.get(); + completedTasks++; + } + } + + while (completedTasks < submittedTasks) { + completionService.take().get(); + completedTasks++; + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + executorService.shutdownNow(); + throw new RuntimeException("Interrupted while rewriting position delete files", e); + + } catch (ExecutionException e) { + executorService.shutdownNow(); + throw new RuntimeException("Failed to rewrite position delete file", e.getCause()); + } + } + + private static void rewritePositionDelete( + DeleteFile deleteFile, + Table table, + String sourcePrefixArg, + String targetPrefixArg, + String stagingLocationArg, + RewriteTablePathUtil.PositionDeleteReaderWriter posDeleteReaderWriter) { + try { + FileIO io = table.io(); + String newPath = + RewriteTablePathUtil.stagingPath( + deleteFile.location(), sourcePrefixArg, stagingLocationArg); + OutputFile outputFile = io.newOutputFile(newPath); + PartitionSpec spec = table.specs().get(deleteFile.specId()); + RewriteTablePathUtil.rewritePositionDeleteFile( + deleteFile, + outputFile, + io, + spec, + sourcePrefixArg, + targetPrefixArg, + posDeleteReaderWriter); + } catch (IOException e) { + LOG.error("Failed to rewrite position delete file: {}", + deleteFile.location(), e); + throw new RuntimeIOException(e); + } + } + + static CloseableIterable positionDeletesReader( + InputFile inputFile, FileFormat format, PartitionSpec spec) { + Schema deleteSchema = DeleteSchemaUtil.posDeleteReadSchema(spec.schema()); + switch (format) { + case AVRO: + return Avro.read(inputFile) + .project(deleteSchema) + .reuseContainers() + .createReaderFunc(DataReader::create) + .build(); + + case PARQUET: + return Parquet.read(inputFile) + .project(deleteSchema) + .reuseContainers() + .createReaderFunc( + fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema)) + .build(); + + case ORC: + return ORC.read(inputFile) + .project(deleteSchema) + .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema)) + .build(); + + default: + LOG.error("Unsupported file format: {} for input file: {}", format, inputFile.location()); + throw new UnsupportedOperationException("Unsupported file format: " + format); + } + } + + static PositionDeleteWriter positionDeletesWriter( + OutputFile outputFile, + FileFormat format, + PartitionSpec spec, + StructLike partition, + Schema rowSchema) + throws IOException { + switch (format) { + case AVRO: + return Avro.writeDeletes(outputFile) + .createWriterFunc(DataWriter::create) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + case PARQUET: + return Parquet.writeDeletes(outputFile) + .createWriterFunc(GenericParquetWriter::create) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + case ORC: + return ORC.writeDeletes(outputFile) + .createWriterFunc(GenericOrcWriter::buildWriter) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + default: + LOG.error("Unsupported file format: {} for output file: {}", format, outputFile.location()); + throw new UnsupportedOperationException("Unsupported file format: " + format); + } + } } diff --git a/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java b/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java index 87c9c665ff8a..7c0038f4da57 100644 --- a/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java +++ b/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.DataFile; @@ -48,7 +49,11 @@ import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.ManifestReader; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.RewriteTablePathUtil; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; @@ -57,14 +62,27 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadata.MetadataLogEntry; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.actions.RewriteTablePath; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.mockito.Mockito; /** @@ -92,7 +110,7 @@ class TestRewriteTablePathOzoneAction { private Path stagingDir; @BeforeEach - public void setupTableLocation() { + public void setupTableLocation() throws IOException { String tableLocation = tableDir.toUri().toString().replaceFirst("^file:///", "file:/") + TABLE_NAME; this.table = createTable(tableLocation + "/"); this.sourcePrefix = tableLocation; @@ -101,7 +119,7 @@ public void setupTableLocation() { @Test void fullTablePathRewrite() throws Exception { - RewriteTablePath.Result result = new RewriteTablePathOzoneAction(table) + RewriteTablePath.Result result = new RewriteTablePathOzoneAction(table, 2) .rewriteLocationPrefix(sourcePrefix, targetPrefix) .stagingLocation(stagingDir.toString() + "/") .execute(); @@ -305,9 +323,18 @@ void endVersionRejectsDeletedVersionFile() { assertThat(exception).hasMessageContaining("does not exist"); } + @Test + void usesCurrentMetadataIfEndVersionNotProvided() { + String currentMetadata = ((HasTableOperations) table).operations().current().metadataFileLocation(); + RewriteTablePathOzoneAction action = new RewriteTablePathOzoneAction(table); + action.rewriteLocationPrefix(sourcePrefix, targetPrefix).stagingLocation(stagingDir + "/"); + RewriteTablePath.Result result = action.execute(); + assertThat(result.latestVersion()).isEqualTo(RewriteTablePathUtil.fileName(currentMetadata)); + } + + @Test void defaultStagingDirIsUnderTableMetadataLocation() { String metadataLocation = RewriteTablePathOzoneUtils.getMetadataLocation(table); - RewriteTablePath.Result result = new RewriteTablePathOzoneAction(table) .rewriteLocationPrefix(sourcePrefix, targetPrefix) .execute(); @@ -366,6 +393,110 @@ void statsFileCopyPlanReturnsBeforeToAfterPathPairs() { Pair.of("before-1.stats", "after-1.stats"), Pair.of("before-2.stats", "after-2.stats")), copyPlan); } + + @Test + void rejectsTablesWithPartitionStatistics() { + TableMetadata baseMetadata = ((HasTableOperations) table).operations().current(); + long snapshotId = baseMetadata.currentSnapshot().snapshotId(); + PartitionStatisticsFile statsFile = Mockito.mock(PartitionStatisticsFile.class); + Mockito.when(statsFile.snapshotId()).thenReturn(snapshotId); + Mockito.when(statsFile.path()).thenReturn(sourcePrefix + "/metadata/dummy.stats"); + Mockito.when(statsFile.fileSizeInBytes()).thenReturn(100L); + TableMetadata metadataWithStats = TableMetadata.buildFrom(baseMetadata) + .setPartitionStatistics(statsFile) + .build(); + + TableOperations ops = ((HasTableOperations) table).operations(); + ops.commit(baseMetadata, metadataWithStats); + + RewriteTablePath action = new RewriteTablePathOzoneAction(table) + .rewriteLocationPrefix(sourcePrefix, targetPrefix) + .stagingLocation(stagingDir + "/"); + + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, action::execute); + assertThat(exception).hasMessageContaining("Partition statistics files are not supported yet."); + } + + @Test + public void positionDeletesReaderUnsupportedFormat() { + InputFile mockInput = Mockito.mock(InputFile.class); + Mockito.when(mockInput.location()).thenReturn("s3://bucket/test.txt"); + PartitionSpec spec = PartitionSpec.unpartitioned(); + FileFormat mockUnsupportedFormat = Mockito.mock(FileFormat.class); + Mockito.when(mockUnsupportedFormat.toString()).thenReturn("txt"); + + UnsupportedOperationException exception = assertThrows(UnsupportedOperationException.class, + () -> RewriteTablePathOzoneAction.positionDeletesReader(mockInput, mockUnsupportedFormat, spec)); + + assertThat(exception).hasMessageContaining("Unsupported file format: txt"); + } + + @Test + public void positionDeletesWriterUnsupportedFormat() { + OutputFile mockOutput = Mockito.mock(OutputFile.class); + Mockito.when(mockOutput.location()).thenReturn("s3://bucket/test.txt"); + PartitionSpec spec = PartitionSpec.unpartitioned(); + FileFormat mockUnsupportedFormat = Mockito.mock(FileFormat.class); + Mockito.when(mockUnsupportedFormat.toString()).thenReturn("txt"); + + UnsupportedOperationException exception = assertThrows(UnsupportedOperationException.class, + () -> RewriteTablePathOzoneAction.positionDeletesWriter( + mockOutput, mockUnsupportedFormat, spec, null, null)); + + assertThat(exception).hasMessageContaining("Unsupported file format: txt"); + } + + @ParameterizedTest + @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC"}) + void positionDeletesAvroAndOrcRoundTrip(FileFormat format, @TempDir Path temp) throws IOException { + String extension = format.name().toLowerCase(); + String path = temp.resolve("test." + extension).toUri().toString(); + OutputFile outputFile = table.io().newOutputFile(path); + PartitionSpec spec = table.spec(); + + try (PositionDeleteWriter writer = RewriteTablePathOzoneAction.positionDeletesWriter( + outputFile, format, spec, new PartitionData(spec.partitionType()), SCHEMA)) { + + GenericRecord row = GenericRecord.create(SCHEMA); + row.setField("c1", 42); + row.setField("c2", format.name() + "-test"); + + writer.write(PositionDelete.create().set("data.parquet", 100L, row)); + } + + try (CloseableIterable reader = RewriteTablePathOzoneAction.positionDeletesReader( + table.io().newInputFile(path), format, spec)) { + + List results = new ArrayList<>(); + reader.forEach(results::add); + + assertThat(results).hasSize(1); + Record record = results.get(0); + + assertThat(record.getField("file_path").toString()).isEqualTo("data.parquet"); + assertThat(record.getField("pos")).isEqualTo(100L); + + Record rowResult = (Record) record.getField("row"); + assertThat(rowResult.getField("c1")).isEqualTo(42); + assertThat(rowResult.getField("c2")).isEqualTo(format.name() + "-test"); + } + } + + @Test + void manifestsToRewriteRejectsMissingManifestList() { + Snapshot snapshot = table.currentSnapshot(); + String manifestListLocation = snapshot.manifestListLocation(); + table.io().deleteFile(manifestListLocation); + + RewriteTablePath action = new RewriteTablePathOzoneAction(table) + .rewriteLocationPrefix(sourcePrefix, targetPrefix) + .stagingLocation(stagingDir + "/"); + + RuntimeException exception = assertThrows(RuntimeException.class, action::execute); + assertThat(exception).hasMessageContaining("Failed to collect manifests to rewrite"); + assertThat(exception.getCause()).hasMessageContaining("Failed to read manifests for snapshot " + + snapshot.snapshotId()); + } /** * For every staged file in the CSV copy plan, asserts that internal paths are rewritten @@ -375,8 +506,8 @@ void statsFileCopyPlanReturnsBeforeToAfterPathPairs() { * manifest-list references all start with target. *
  • snap-*.avro (manifest-list): target path starts with target, and every * manifest entry path inside the staged file starts with target.
  • - *
  • *.avro (manifest): target path starts with target (content rewrite - * is not yet implemented).
  • + *
  • *.avro (manifest): target path starts with target and the content inside it.
  • + *
  • deletes.parquet(position delete file): target path starts with target and the content inside it.
  • * */ private void assertAllInternalPathsRewritten(Set> csvPairs, String target) throws Exception { @@ -392,6 +523,8 @@ private void assertAllInternalPathsRewritten(Set> csvPairs, } else if (RewriteTablePathUtil.fileName(stagingPath).endsWith(".avro")) { assertTrue(targetPath.startsWith(target), "Manifest file target path should start with target prefix: " + targetPath); + } else if (stagingPath.endsWith("deletes.parquet")) { + assertStagedDeleteFileInternalPathsRewritten(table, stagingPath, target); } } } @@ -538,6 +671,27 @@ private void assertDeleteManifestPathsRewritten(ManifestFile staged, ManifestFil "Rewritten delete manifest should reference the same delete files (by name) as the original"); } + private static void assertStagedDeleteFileInternalPathsRewritten( + Table tbl, String stagedPath, String targetPrefix) throws IOException { + Schema readSchema = DeleteSchemaUtil.pathPosSchema(); + String pathColumn = MetadataColumns.DELETE_FILE_PATH.name(); + int rowCount = 0; + try (CloseableIterable rows = + Parquet.read(tbl.io().newInputFile(stagedPath)) + .project(readSchema) + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(readSchema, fileSchema)) + .build()) { + for (Record row : rows) { + Object path = row.getField(pathColumn); + assertTrue( + path.toString().startsWith(targetPrefix), + stagedPath + " row " + rowCount + ": path '" + path + + "' must start with '" + targetPrefix + "'"); + rowCount++; + } + } + } + private static List metadataLogEntryPaths(Table tbl) { TableMetadata meta = ((HasTableOperations) tbl).operations().current(); List paths = new ArrayList<>(); @@ -569,13 +723,19 @@ private static Set> readCsvPairs(Table tbl, String fileList return pairs; } - private Table createTable(String location) { + private Table createTable(String location) throws IOException { HadoopTables tables = new HadoopTables(new Configuration()); Table tbl = tables.create(SCHEMA, PartitionSpec.unpartitioned(), new HashMap<>(), location); for (int i = 0; i < COMMITS; i++) { - String dataPath = location + "/data/batch-" + i + ".parquet"; + String dataPath = location + "data/batch-" + i + ".parquet"; tbl.newAppend().appendFile(dummyDataFile(dataPath)).commit(); } + + for (int i = 0; i < 2; i++) { + String dataPath = location + "data/batch-" + i + ".parquet"; + DeleteFile df = writePositionDeleteFile(tbl, dataPath); + tbl.newRowDelta().addDeletes(df).commit(); + } return tables.load(location); } @@ -588,6 +748,26 @@ private DataFile dummyDataFile(String dataPath) { .build(); } + private DeleteFile writePositionDeleteFile(Table tbl, String referencedDataPath) + throws IOException { + String deleteUri = RewriteTablePathUtil.combinePaths( + tbl.location(), "data/" + UUID.randomUUID() + "-deletes.parquet"); + PositionDeleteWriter writer = + Parquet.writeDeletes(tbl.io().newOutputFile(deleteUri)) + .createWriterFunc(GenericParquetWriter::create) + .withSpec(tbl.spec()) + .withPartition(new PartitionData(tbl.spec().partitionType())) + .metricsConfig(MetricsConfig.forPositionDelete(tbl)) + .overwrite() + .buildPositionWriter(); + try { + writer.write(PositionDelete.create().set(referencedDataPath, 0L)); + } finally { + writer.close(); + } + return writer.toDeleteFile(); + } + private static StatisticsFile statisticsFile(String path, long fileSizeInBytes) { return new GenericStatisticsFile(1L, path, fileSizeInBytes, 0L, List.of()); }