From 3a37c2480b344dd4ac3e6e1f5d9d5e4a6c4b5851 Mon Sep 17 00:00:00 2001 From: Manu Zhang Date: Tue, 9 Jun 2026 23:03:18 +0800 Subject: [PATCH 1/4] feat: add row delta update Implements the RowDelta update builder, table and transaction factory methods, and focused tests for row-level add/delete flows. Co-authored-by: Codex --- src/iceberg/CMakeLists.txt | 1 + src/iceberg/meson.build | 1 + src/iceberg/table.cc | 15 + src/iceberg/table.h | 7 + src/iceberg/test/CMakeLists.txt | 1 + src/iceberg/test/row_delta_test.cc | 388 +++++++++++++++++++ src/iceberg/test/table_test.cc | 2 + src/iceberg/transaction.cc | 8 + src/iceberg/transaction.h | 3 + src/iceberg/type_fwd.h | 1 + src/iceberg/update/merging_snapshot_update.h | 14 +- src/iceberg/update/meson.build | 1 + src/iceberg/update/row_delta.cc | 191 +++++++++ src/iceberg/update/row_delta.h | 104 +++++ 14 files changed, 731 insertions(+), 6 deletions(-) create mode 100644 src/iceberg/test/row_delta_test.cc create mode 100644 src/iceberg/update/row_delta.cc create mode 100644 src/iceberg/update/row_delta.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index ea76c641a..9a0dc68b7 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -106,6 +106,7 @@ set(ICEBERG_SOURCES update/merge_append.cc update/merging_snapshot_update.cc update/pending_update.cc + update/row_delta.cc update/set_snapshot.cc update/snapshot_manager.cc update/snapshot_update.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 7bd2e052c..ab514be87 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -131,6 +131,7 @@ iceberg_sources = files( 'update/merge_append.cc', 'update/merging_snapshot_update.cc', 'update/pending_update.cc', + 'update/row_delta.cc', 'update/set_snapshot.cc', 'update/snapshot_manager.cc', 'update/snapshot_update.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 817e5917c..afa626964 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -35,6 +35,7 @@ #include "iceberg/update/expire_snapshots.h" #include "iceberg/update/fast_append.h" #include "iceberg/update/merge_append.h" +#include "iceberg/update/row_delta.h" #include "iceberg/update/set_snapshot.h" #include "iceberg/update/snapshot_manager.h" #include "iceberg/update/update_location.h" @@ -231,6 +232,12 @@ Result> Table::NewDeleteFiles() { return DeleteFiles::Make(name().name, std::move(ctx)); } +Result> Table::NewRowDelta() { + ICEBERG_ASSIGN_OR_RAISE( + auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); + return RowDelta::Make(name().name, std::move(ctx)); +} + Result> Table::NewUpdateStatistics() { ICEBERG_ASSIGN_OR_RAISE( auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); @@ -334,6 +341,14 @@ Result> StaticTable::NewMergeAppend() { return NotSupported("Cannot create a merge append for a static table"); } +Result> StaticTable::NewDeleteFiles() { + return NotSupported("Cannot create delete files for a static table"); +} + +Result> StaticTable::NewRowDelta() { + return NotSupported("Cannot create a row delta for a static table"); +} + Result> StaticTable::NewSnapshotManager() { return NotSupported("Cannot create a snapshot manager for a static table"); } diff --git a/src/iceberg/table.h b/src/iceberg/table.h index b71a1ddbc..c8f6ded08 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -182,6 +182,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// \brief Create a new DeleteFiles to delete data files and commit the changes. virtual Result> NewDeleteFiles(); + /// \brief Create a new RowDelta to add rows and row-level deletes. + virtual Result> NewRowDelta(); + /// \brief Create a new SnapshotManager to manage snapshots and snapshot references. virtual Result> NewSnapshotManager(); @@ -251,6 +254,10 @@ class ICEBERG_EXPORT StaticTable : public Table { Result> NewMergeAppend() override; + Result> NewDeleteFiles() override; + + Result> NewRowDelta() override; + Result> NewSnapshotManager() override; private: diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index c681de8c9..e528b1333 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -230,6 +230,7 @@ if(ICEBERG_BUILD_BUNDLE) merge_append_test.cc merging_snapshot_update_test.cc name_mapping_update_test.cc + row_delta_test.cc snapshot_manager_test.cc transaction_test.cc update_location_test.cc diff --git a/src/iceberg/test/row_delta_test.cc b/src/iceberg/test/row_delta_test.cc new file mode 100644 index 000000000..40453d345 --- /dev/null +++ b/src/iceberg/test/row_delta_test.cc @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/row_delta.h" + +#include +#include +#include +#include + +#include +#include + +#include "iceberg/avro/avro_register.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/partition_spec.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" +#include "iceberg/update/delete_files.h" +#include "iceberg/update/fast_append.h" + +namespace iceberg { + +class RowDeltaTest : public MinimalUpdateTestBase { + protected: + static void SetUpTestSuite() { avro::RegisterAll(); } + + void SetUp() override { + MinimalUpdateTestBase::SetUp(); + + ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec()); + ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema()); + + file_a_ = MakeDataFile("/data/file_a.parquet", /*partition_x=*/1L); + file_b_ = MakeDataFile("/data/file_b.parquet", /*partition_x=*/2L); + } + + std::shared_ptr MakeDataFile(const std::string& path, int64_t partition_x) { + auto file = std::make_shared(); + file->content = DataFile::Content::kData; + file->file_path = table_location_ + path; + file->file_format = FileFormatType::kParquet; + file->partition = PartitionValues(std::vector{Literal::Long(partition_x)}); + file->file_size_in_bytes = 1024; + file->record_count = 100; + file->partition_spec_id = spec_->spec_id(); + return file; + } + + std::shared_ptr MakeDeleteFile(const std::string& path, int64_t partition_x) { + auto file = MakeDataFile(path, partition_x); + file->content = DataFile::Content::kPositionDeletes; + file->file_size_in_bytes = 256; + file->record_count = 7; + return file; + } + + std::shared_ptr MakeDeletionVector(const std::string& path, + const std::string& referenced_data_file, + int64_t partition_x, + int64_t content_offset = 0) { + auto file = MakeDeleteFile(path, partition_x); + file->file_format = FileFormatType::kPuffin; + file->referenced_data_file = referenced_data_file; + file->content_offset = content_offset; + file->content_size_in_bytes = 10; + return file; + } + + void CommitFileA() { + ICEBERG_UNWRAP_OR_FAIL(auto fast_append, table_->NewFastAppend()); + fast_append->AppendFile(file_a_); + EXPECT_THAT(fast_append->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + void SetTableFormatVersion(int8_t format_version) { + table_->metadata()->format_version = format_version; + } + + std::shared_ptr spec_; + std::shared_ptr schema_; + std::shared_ptr file_a_; + std::shared_ptr file_b_; +}; + +TEST_F(RowDeltaTest, AddRowsCommitsAppendOperation) { + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->AddRows(file_a_); + + EXPECT_THAT(row_delta->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->Operation(), std::make_optional(DataOperation::kAppend)); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedRecords), "100"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedFileSize), "1024"); +} + +TEST_F(RowDeltaTest, AddDeletesCommitsDeleteOperation) { + auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet", + /*partition_x=*/1L); + + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->AddDeletes(delete_file); + + EXPECT_THAT(row_delta->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->Operation(), std::make_optional(DataOperation::kDelete)); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDeleteFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedPosDeleteFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedPosDeletes), "7"); +} + +TEST_F(RowDeltaTest, RemoveRowsCommitsOverwriteOperation) { + CommitFileA(); + + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->RemoveRows(file_a_); + + EXPECT_THAT(row_delta->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->Operation(), std::make_optional(DataOperation::kOverwrite)); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedRecords), "100"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kRemovedFileSize), "1024"); +} + +TEST_F(RowDeltaTest, RemoveRowsAndAddDeletesCommitsDeleteOperation) { + CommitFileA(); + + auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet", + /*partition_x=*/1L); + + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->RemoveRows(file_a_); + row_delta->AddDeletes(delete_file); + + EXPECT_THAT(row_delta->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->Operation(), std::make_optional(DataOperation::kDelete)); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDeleteFiles), "1"); +} + +TEST_F(RowDeltaTest, AddRowsAndRemoveDeletesCommitsAppendOperation) { + auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet", + /*partition_x=*/1L); + { + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->AddDeletes(delete_file); + EXPECT_THAT(row_delta->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->AddRows(file_a_); + row_delta->RemoveDeletes(delete_file); + + EXPECT_THAT(row_delta->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->Operation(), std::make_optional(DataOperation::kAppend)); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kRemovedDeleteFiles), "1"); +} + +TEST_F(RowDeltaTest, AddDeletesAndRemoveDeletesCommitsDeleteOperation) { + auto old_delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet", + /*partition_x=*/1L); + { + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->AddDeletes(old_delete_file); + EXPECT_THAT(row_delta->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + auto new_delete_file = MakeDeleteFile("/delete/file_b_pos_deletes.parquet", + /*partition_x=*/2L); + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->AddDeletes(new_delete_file); + row_delta->RemoveDeletes(old_delete_file); + + EXPECT_THAT(row_delta->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->Operation(), std::make_optional(DataOperation::kDelete)); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDeleteFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kRemovedDeleteFiles), "1"); +} + +TEST_F(RowDeltaTest, ValidateNoConflictingDataFilesFailsForConcurrentAppend) { + CommitFileA(); + ICEBERG_UNWRAP_OR_FAIL(auto starting_snapshot, table_->current_snapshot()); + + ICEBERG_UNWRAP_OR_FAIL(auto concurrent_append, table_->NewFastAppend()); + concurrent_append->AppendFile(file_b_); + EXPECT_THAT(concurrent_append->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + auto file_c = MakeDataFile("/data/file_c.parquet", /*partition_x=*/3L); + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->ValidateFromSnapshot(starting_snapshot->snapshot_id); + row_delta->ValidateNoConflictingDataFiles(); + row_delta->AddRows(file_c); + + auto result = row_delta->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Found conflicting files")); + EXPECT_THAT(result, HasErrorMessage(file_b_->file_path)); +} + +TEST_F(RowDeltaTest, ValidateNoConflictingDeleteFilesFailsForConcurrentDelete) { + CommitFileA(); + ICEBERG_UNWRAP_OR_FAIL(auto starting_snapshot, table_->current_snapshot()); + + auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet", + /*partition_x=*/1L); + std::shared_ptr concurrent_delta; + ICEBERG_UNWRAP_OR_FAIL(concurrent_delta, table_->NewRowDelta()); + concurrent_delta->AddDeletes(delete_file); + EXPECT_THAT(concurrent_delta->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + auto file_c = MakeDataFile("/data/file_c.parquet", /*partition_x=*/3L); + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->ValidateFromSnapshot(starting_snapshot->snapshot_id); + row_delta->ValidateNoConflictingDeleteFiles(); + row_delta->AddRows(file_c); + + auto result = row_delta->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Found new conflicting delete files")); + EXPECT_THAT(result, HasErrorMessage(delete_file->file_path)); +} + +TEST_F(RowDeltaTest, ValidateDataFilesExistFailsForConcurrentDelete) { + CommitFileA(); + ICEBERG_UNWRAP_OR_FAIL(auto starting_snapshot, table_->current_snapshot()); + + ICEBERG_UNWRAP_OR_FAIL(auto delete_files, table_->NewDeleteFiles()); + delete_files->DeleteFile(file_a_); + EXPECT_THAT(delete_files->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet", + /*partition_x=*/1L); + delete_file->referenced_data_file = file_a_->file_path; + + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->ValidateFromSnapshot(starting_snapshot->snapshot_id); + std::vector referenced_files{file_a_->file_path}; + row_delta->ValidateDataFilesExist(referenced_files); + row_delta->AddDeletes(delete_file); + + auto result = row_delta->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Cannot commit, missing data files")); + EXPECT_THAT(result, HasErrorMessage(file_a_->file_path)); +} + +TEST_F(RowDeltaTest, CannotRemoveReferencedDataFile) { + CommitFileA(); + + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + std::vector referenced_files{file_a_->file_path}; + row_delta->ValidateDataFilesExist(referenced_files); + row_delta->RemoveRows(file_a_); + + auto result = row_delta->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Cannot delete data files")); + EXPECT_THAT(result, HasErrorMessage(file_a_->file_path)); +} + +TEST_F(RowDeltaTest, AddDeleteFileForRemovedDataFileCommitsDeleteOperation) { + CommitFileA(); + + auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet", + /*partition_x=*/1L); + delete_file->referenced_data_file = file_a_->file_path; + + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->RemoveRows(file_a_); + row_delta->AddDeletes(delete_file); + + EXPECT_THAT(row_delta->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->Operation(), std::make_optional(DataOperation::kDelete)); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDeleteFiles), "1"); +} + +TEST_F(RowDeltaTest, ValidateDeletedFilesAllowsMissingRowsOnEmptyTable) { + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->ValidateDeletedFiles(); + row_delta->RemoveRows(file_a_); + + EXPECT_THAT(row_delta->Commit(), IsOk()); +} + +TEST_F(RowDeltaTest, ValidateDeletedFilesAllowsMissingDeletesOnEmptyTable) { + auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet", + /*partition_x=*/1L); + + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->ValidateDeletedFiles(); + row_delta->RemoveDeletes(delete_file); + + EXPECT_THAT(row_delta->Commit(), IsOk()); +} + +TEST_F(RowDeltaTest, AddDeletionVectorValidatesConcurrentDVs) { + CommitFileA(); + ICEBERG_UNWRAP_OR_FAIL(auto starting_snapshot, table_->current_snapshot()); + SetTableFormatVersion(3); + + auto concurrent_dv = + MakeDeletionVector("/delete/concurrent-dv-a.puffin", file_a_->file_path, + /*partition_x=*/1L, /*content_offset=*/0); + std::shared_ptr concurrent_delta; + ICEBERG_UNWRAP_OR_FAIL(concurrent_delta, table_->NewRowDelta()); + concurrent_delta->AddDeletes(concurrent_dv); + EXPECT_THAT(concurrent_delta->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + SetTableFormatVersion(3); + + auto dv = MakeDeletionVector("/delete/dv-a.puffin", file_a_->file_path, + /*partition_x=*/1L, /*content_offset=*/10); + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->ValidateFromSnapshot(starting_snapshot->snapshot_id); + row_delta->AddDeletes(dv); + + auto result = row_delta->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Found concurrently added DV")); + EXPECT_THAT(result, HasErrorMessage(file_a_->file_path)); +} + +} // namespace iceberg diff --git a/src/iceberg/test/table_test.cc b/src/iceberg/test/table_test.cc index 881c4fdd0..d7ebe4a0a 100644 --- a/src/iceberg/test/table_test.cc +++ b/src/iceberg/test/table_test.cc @@ -161,6 +161,8 @@ TEST(StaticTableTest, NewMutatingOperationsAreNotSupported) { EXPECT_THAT(table->NewUpdatePartitionStatistics(), IsError(ErrorKind::kNotSupported)); EXPECT_THAT(table->NewFastAppend(), IsError(ErrorKind::kNotSupported)); EXPECT_THAT(table->NewMergeAppend(), IsError(ErrorKind::kNotSupported)); + EXPECT_THAT(table->NewDeleteFiles(), IsError(ErrorKind::kNotSupported)); + EXPECT_THAT(table->NewRowDelta(), IsError(ErrorKind::kNotSupported)); EXPECT_THAT(table->NewSnapshotManager(), IsError(ErrorKind::kNotSupported)); } diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index ac1f08241..e911a61dc 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -37,6 +37,7 @@ #include "iceberg/update/fast_append.h" #include "iceberg/update/merge_append.h" #include "iceberg/update/pending_update.h" +#include "iceberg/update/row_delta.h" #include "iceberg/update/set_snapshot.h" #include "iceberg/update/snapshot_manager.h" #include "iceberg/update/snapshot_update.h" @@ -505,6 +506,13 @@ Result> Transaction::NewDeleteFiles() { return delete_files; } +Result> Transaction::NewRowDelta() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr row_delta, + RowDelta::Make(ctx_->table->name().name, ctx_)); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(row_delta)); + return row_delta; +} + Result> Transaction::NewUpdateStatistics() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_statistics, UpdateStatistics::Make(ctx_)); diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 52a0605c6..34ca78bd7 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -112,6 +112,9 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewDeleteFiles(); + /// \brief Create a new RowDelta to add rows and row-level deletes. + Result> NewRowDelta(); + /// \brief Create a new SnapshotManager to manage snapshots. Result> NewSnapshotManager(); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 0320f24ea..f29bc4a1a 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -243,6 +243,7 @@ class ExpireSnapshots; class FastAppend; class MergeAppend; class PendingUpdate; +class RowDelta; class SetSnapshot; class SnapshotManager; class SnapshotUpdate; diff --git a/src/iceberg/update/merging_snapshot_update.h b/src/iceberg/update/merging_snapshot_update.h index 879403222..fc3987ee1 100644 --- a/src/iceberg/update/merging_snapshot_update.h +++ b/src/iceberg/update/merging_snapshot_update.h @@ -288,6 +288,14 @@ class ICEBERG_EXPORT MergingSnapshotUpdate : public SnapshotUpdate { const std::shared_ptr& parent, std::shared_ptr io, bool case_sensitive = true); + /// \brief Return an error if a staged deletion vector conflicts with a deletion + /// vector added since starting_snapshot_id. + Status ValidateAddedDVs(const TableMetadata& metadata, + std::optional starting_snapshot_id, + std::shared_ptr conflict_filter, + const std::shared_ptr& parent, + std::shared_ptr io) const; + private: struct PendingDeleteFile { std::shared_ptr file; @@ -324,12 +332,6 @@ class ICEBERG_EXPORT MergingSnapshotUpdate : public SnapshotUpdate { Status AddDeleteFile(std::shared_ptr file, std::optional data_sequence_number); - Status ValidateAddedDVs(const TableMetadata& metadata, - std::optional starting_snapshot_id, - std::shared_ptr conflict_filter, - const std::shared_ptr& parent, - std::shared_ptr io) const; - Status ManagersReady() const; void SetSummaryProperty(const std::string& property, const std::string& value) override; diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 9f950e8d0..4f594a06e 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -23,6 +23,7 @@ install_headers( 'merge_append.h', 'merging_snapshot_update.h', 'pending_update.h', + 'row_delta.h', 'set_snapshot.h', 'snapshot_manager.h', 'snapshot_update.h', diff --git a/src/iceberg/update/row_delta.cc b/src/iceberg/update/row_delta.cc new file mode 100644 index 000000000..0d3fdf5ec --- /dev/null +++ b/src/iceberg/update/row_delta.cc @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/row_delta.h" + +#include +#include +#include +#include +#include + +#include "iceberg/expression/expressions.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/formatter_internal.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" + +namespace iceberg { + +Result> RowDelta::Make( + std::string table_name, std::shared_ptr ctx) { + ICEBERG_PRECHECK(!table_name.empty(), "Table name cannot be empty"); + ICEBERG_PRECHECK(ctx != nullptr, "Cannot create RowDelta without a context"); + return std::unique_ptr(new RowDelta(std::move(table_name), std::move(ctx))); +} + +RowDelta::RowDelta(std::string table_name, std::shared_ptr ctx) + : MergingSnapshotUpdate(std::move(table_name), std::move(ctx)), + conflict_detection_filter_(Expressions::AlwaysTrue()) {} + +RowDelta& RowDelta::AddRows(const std::shared_ptr& inserts) { + ICEBERG_BUILDER_RETURN_IF_ERROR(AddDataFile(inserts)); + return *this; +} + +RowDelta& RowDelta::AddDeletes(const std::shared_ptr& deletes) { + ICEBERG_BUILDER_RETURN_IF_ERROR(AddDeleteFile(deletes)); + return *this; +} + +RowDelta& RowDelta::RemoveRows(const std::shared_ptr& file) { + ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteDataFile(file)); + removed_data_files_.insert(file); + return *this; +} + +RowDelta& RowDelta::RemoveDeletes(const std::shared_ptr& deletes) { + ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteDeleteFile(deletes)); + return *this; +} + +RowDelta& RowDelta::ValidateFromSnapshot(int64_t snapshot_id) { + starting_snapshot_id_ = snapshot_id; + return *this; +} + +RowDelta& RowDelta::CaseSensitive(bool case_sensitive) { + MergingSnapshotUpdate::CaseSensitive(case_sensitive); + return *this; +} + +RowDelta& RowDelta::ValidateDataFilesExist( + std::span referenced_files) { + for (const auto& file : referenced_files) { + referenced_data_files_.insert(file); + } + return *this; +} + +RowDelta& RowDelta::ValidateDeletedFiles() { + validate_deletes_ = true; + return *this; +} + +RowDelta& RowDelta::ConflictDetectionFilter(std::shared_ptr filter) { + ICEBERG_BUILDER_CHECK(filter != nullptr, "Conflict detection filter cannot be null"); + conflict_detection_filter_ = std::move(filter); + return *this; +} + +RowDelta& RowDelta::ValidateNoConflictingDataFiles() { + validate_new_data_files_ = true; + return *this; +} + +RowDelta& RowDelta::ValidateNoConflictingDeleteFiles() { + validate_new_delete_files_ = true; + return *this; +} + +std::string RowDelta::operation() { + if (AddsDataFiles() && !AddsDeleteFiles() && !DeletesDataFiles()) { + return DataOperation::kAppend; + } + + if (AddsDeleteFiles() && !AddsDataFiles()) { + return DataOperation::kDelete; + } + + return DataOperation::kOverwrite; +} + +Status RowDelta::Validate(const TableMetadata& current_metadata, + const std::shared_ptr& snapshot) { + if (snapshot == nullptr) { + return {}; + } + + if (validate_deletes_) { + FailMissingDeletePaths(); + } + + if (starting_snapshot_id_.has_value()) { + ICEBERG_ASSIGN_OR_RAISE(bool is_ancestor, SnapshotUtil::IsAncestorOf( + current_metadata, snapshot->snapshot_id, + starting_snapshot_id_.value())); + ICEBERG_CHECK(is_ancestor, "Snapshot {} is not an ancestor of {}", + starting_snapshot_id_.value(), snapshot->snapshot_id); + } + + auto io = ctx_->table->io(); + if (!referenced_data_files_.empty()) { + ICEBERG_RETURN_UNEXPECTED(MergingSnapshotUpdate::ValidateDataFilesExist( + current_metadata, starting_snapshot_id_, referenced_data_files_, + /*skip_deletes=*/false, conflict_detection_filter_, snapshot, io, + IsCaseSensitive())); + } + + if (validate_new_data_files_) { + ICEBERG_RETURN_UNEXPECTED(MergingSnapshotUpdate::ValidateAddedDataFiles( + current_metadata, starting_snapshot_id_, conflict_detection_filter_, snapshot, io, + IsCaseSensitive())); + } + + if (validate_new_delete_files_) { + if (!removed_data_files_.empty()) { + ICEBERG_RETURN_UNEXPECTED(MergingSnapshotUpdate::ValidateNoNewDeletesForDataFiles( + current_metadata, starting_snapshot_id_, conflict_detection_filter_, + removed_data_files_, snapshot, io, IsCaseSensitive())); + } + + ICEBERG_RETURN_UNEXPECTED(MergingSnapshotUpdate::ValidateNoNewDeleteFiles( + current_metadata, starting_snapshot_id_, conflict_detection_filter_, snapshot, io, + IsCaseSensitive())); + } + + ICEBERG_RETURN_UNEXPECTED(ValidateNoConflictingFileAndPositionDeletes()); + + return MergingSnapshotUpdate::ValidateAddedDVs( + current_metadata, starting_snapshot_id_, conflict_detection_filter_, snapshot, io); +} + +Status RowDelta::ValidateNoConflictingFileAndPositionDeletes() const { + std::vector conflicting_files; + for (const auto& file : removed_data_files_) { + if (file != nullptr && referenced_data_files_.contains(file->file_path)) { + conflicting_files.push_back(file->file_path); + } + } + + if (!conflicting_files.empty()) { + return ValidationFailed( + "Cannot delete data files {} that are referenced by new delete files", + FormatRange(conflicting_files, ", ", "[", "]")); + } + + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/update/row_delta.h b/src/iceberg/update/row_delta.h new file mode 100644 index 000000000..5d859edd5 --- /dev/null +++ b/src/iceberg/update/row_delta.h @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/update/row_delta.h + +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/merging_snapshot_update.h" +#include "iceberg/util/data_file_set.h" + +namespace iceberg { + +/// \brief Row-level delta operation for adding rows and delete files. +/// +/// RowDelta is the C++ counterpart of Java BaseRowDelta. It can add data files, +/// add delete files, remove data/delete files, and validate conflicts against +/// snapshots committed after a configured starting snapshot. +class ICEBERG_EXPORT RowDelta : public MergingSnapshotUpdate { + public: + /// \brief Create a new RowDelta instance. + static Result> Make(std::string table_name, + std::shared_ptr ctx); + + /// \brief Add a data file containing inserted rows. + RowDelta& AddRows(const std::shared_ptr& inserts); + + /// \brief Add a delete file. + RowDelta& AddDeletes(const std::shared_ptr& deletes); + + /// \brief Remove a data file from the table. + RowDelta& RemoveRows(const std::shared_ptr& file); + + /// \brief Remove a delete file from the table. + RowDelta& RemoveDeletes(const std::shared_ptr& deletes); + + /// \brief Validate against snapshots committed after snapshot_id. + RowDelta& ValidateFromSnapshot(int64_t snapshot_id); + + /// \brief Set case sensitivity for conflict detection. + RowDelta& CaseSensitive(bool case_sensitive); + + /// \brief Validate that referenced data files still exist. + RowDelta& ValidateDataFilesExist(std::span referenced_files); + + /// \brief Fail if any requested data/delete-file removal is missing from + /// manifests when the table has a current snapshot. + RowDelta& ValidateDeletedFiles(); + + /// \brief Set the conflict detection filter used by validation methods. + RowDelta& ConflictDetectionFilter(std::shared_ptr filter); + + /// \brief Validate that no matching data files were concurrently added. + RowDelta& ValidateNoConflictingDataFiles(); + + /// \brief Validate that no matching delete files were concurrently added. + RowDelta& ValidateNoConflictingDeleteFiles(); + + std::string operation() override; + + protected: + Status Validate(const TableMetadata& current_metadata, + const std::shared_ptr& snapshot) override; + + private: + explicit RowDelta(std::string table_name, std::shared_ptr ctx); + + Status ValidateNoConflictingFileAndPositionDeletes() const; + + std::optional starting_snapshot_id_; + std::unordered_set referenced_data_files_; + DataFileSet removed_data_files_; + bool validate_deletes_ = false; + std::shared_ptr conflict_detection_filter_; + bool validate_new_data_files_ = false; + bool validate_new_delete_files_ = false; +}; + +} // namespace iceberg From 842d725f72d8a6cbc9ff23c9e3168c45182be586 Mon Sep 17 00:00:00 2001 From: manuzhang Date: Tue, 23 Jun 2026 17:53:45 +0800 Subject: [PATCH 2/4] fix row delta delete validation Co-authored-by: Codex --- src/iceberg/test/row_delta_test.cc | 26 +++++++++++++++++++++++++- src/iceberg/update/row_delta.cc | 2 +- src/iceberg/update/row_delta.h | 10 ++++++++-- 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/src/iceberg/test/row_delta_test.cc b/src/iceberg/test/row_delta_test.cc index 40453d345..5d64f079c 100644 --- a/src/iceberg/test/row_delta_test.cc +++ b/src/iceberg/test/row_delta_test.cc @@ -274,7 +274,7 @@ TEST_F(RowDeltaTest, ValidateNoConflictingDeleteFilesFailsForConcurrentDelete) { EXPECT_THAT(result, HasErrorMessage(delete_file->file_path)); } -TEST_F(RowDeltaTest, ValidateDataFilesExistFailsForConcurrentDelete) { +TEST_F(RowDeltaTest, ValidateDataFilesExistSkipsConcurrentDeleteByDefault) { CommitFileA(); ICEBERG_UNWRAP_OR_FAIL(auto starting_snapshot, table_->current_snapshot()); @@ -294,6 +294,30 @@ TEST_F(RowDeltaTest, ValidateDataFilesExistFailsForConcurrentDelete) { row_delta->ValidateDataFilesExist(referenced_files); row_delta->AddDeletes(delete_file); + EXPECT_THAT(row_delta->Commit(), IsOk()); +} + +TEST_F(RowDeltaTest, ValidateDataFilesExistFailsForConcurrentDeleteWithValidateDeletedFiles) { + CommitFileA(); + ICEBERG_UNWRAP_OR_FAIL(auto starting_snapshot, table_->current_snapshot()); + + ICEBERG_UNWRAP_OR_FAIL(auto delete_files, table_->NewDeleteFiles()); + delete_files->DeleteFile(file_a_); + EXPECT_THAT(delete_files->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet", + /*partition_x=*/1L); + delete_file->referenced_data_file = file_a_->file_path; + + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->ValidateFromSnapshot(starting_snapshot->snapshot_id); + std::vector referenced_files{file_a_->file_path}; + row_delta->ValidateDataFilesExist(referenced_files); + row_delta->ValidateDeletedFiles(); + row_delta->AddDeletes(delete_file); + auto result = row_delta->Commit(); EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); EXPECT_THAT(result, HasErrorMessage("Cannot commit, missing data files")); diff --git a/src/iceberg/update/row_delta.cc b/src/iceberg/update/row_delta.cc index 0d3fdf5ec..d2b331572 100644 --- a/src/iceberg/update/row_delta.cc +++ b/src/iceberg/update/row_delta.cc @@ -143,7 +143,7 @@ Status RowDelta::Validate(const TableMetadata& current_metadata, if (!referenced_data_files_.empty()) { ICEBERG_RETURN_UNEXPECTED(MergingSnapshotUpdate::ValidateDataFilesExist( current_metadata, starting_snapshot_id_, referenced_data_files_, - /*skip_deletes=*/false, conflict_detection_filter_, snapshot, io, + /*skip_deletes=*/!validate_deletes_, conflict_detection_filter_, snapshot, io, IsCaseSensitive())); } diff --git a/src/iceberg/update/row_delta.h b/src/iceberg/update/row_delta.h index 5d859edd5..6d50deb7d 100644 --- a/src/iceberg/update/row_delta.h +++ b/src/iceberg/update/row_delta.h @@ -66,10 +66,16 @@ class ICEBERG_EXPORT RowDelta : public MergingSnapshotUpdate { RowDelta& CaseSensitive(bool case_sensitive); /// \brief Validate that referenced data files still exist. + /// + /// By default, this validation checks overwrite and replace commits. To apply + /// validation to delete commits, call ValidateDeletedFiles(). RowDelta& ValidateDataFilesExist(std::span referenced_files); - /// \brief Fail if any requested data/delete-file removal is missing from - /// manifests when the table has a current snapshot. + /// \brief Enable validation for missing delete paths and delete-operation conflicts. + /// + /// This fails if any requested data/delete-file removal is missing from + /// manifests when the table has a current snapshot. It also makes + /// ValidateDataFilesExist() check delete-operation snapshots. RowDelta& ValidateDeletedFiles(); /// \brief Set the conflict detection filter used by validation methods. From ec5d51ef99f8a55542fb94033823e3782b0e538c Mon Sep 17 00:00:00 2001 From: manuzhang Date: Tue, 23 Jun 2026 18:23:09 +0800 Subject: [PATCH 3/4] fix row delta test formatting Co-authored-by: Codex --- src/iceberg/test/row_delta_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/iceberg/test/row_delta_test.cc b/src/iceberg/test/row_delta_test.cc index 5d64f079c..ce5721255 100644 --- a/src/iceberg/test/row_delta_test.cc +++ b/src/iceberg/test/row_delta_test.cc @@ -297,7 +297,8 @@ TEST_F(RowDeltaTest, ValidateDataFilesExistSkipsConcurrentDeleteByDefault) { EXPECT_THAT(row_delta->Commit(), IsOk()); } -TEST_F(RowDeltaTest, ValidateDataFilesExistFailsForConcurrentDeleteWithValidateDeletedFiles) { +TEST_F(RowDeltaTest, + ValidateDataFilesExistFailsForConcurrentDeleteWithValidateDeletedFiles) { CommitFileA(); ICEBERG_UNWRAP_OR_FAIL(auto starting_snapshot, table_->current_snapshot()); From d184b955617fe4a7385825fc75523049e01e10b4 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 23 Jun 2026 22:28:52 +0800 Subject: [PATCH 4/4] add more comments --- src/iceberg/test/row_delta_test.cc | 20 +++--- src/iceberg/update/row_delta.cc | 2 + src/iceberg/update/row_delta.h | 104 +++++++++++++++++++++++------ 3 files changed, 97 insertions(+), 29 deletions(-) diff --git a/src/iceberg/test/row_delta_test.cc b/src/iceberg/test/row_delta_test.cc index ce5721255..5906e7e39 100644 --- a/src/iceberg/test/row_delta_test.cc +++ b/src/iceberg/test/row_delta_test.cc @@ -88,7 +88,7 @@ class RowDeltaTest : public MinimalUpdateTestBase { return file; } - void CommitFileA() { + void AppendFileAToTable() { ICEBERG_UNWRAP_OR_FAIL(auto fast_append, table_->NewFastAppend()); fast_append->AppendFile(file_a_); EXPECT_THAT(fast_append->Commit(), IsOk()); @@ -139,7 +139,7 @@ TEST_F(RowDeltaTest, AddDeletesCommitsDeleteOperation) { } TEST_F(RowDeltaTest, RemoveRowsCommitsOverwriteOperation) { - CommitFileA(); + AppendFileAToTable(); std::shared_ptr row_delta; ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); @@ -156,7 +156,7 @@ TEST_F(RowDeltaTest, RemoveRowsCommitsOverwriteOperation) { } TEST_F(RowDeltaTest, RemoveRowsAndAddDeletesCommitsDeleteOperation) { - CommitFileA(); + AppendFileAToTable(); auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet", /*partition_x=*/1L); @@ -228,7 +228,7 @@ TEST_F(RowDeltaTest, AddDeletesAndRemoveDeletesCommitsDeleteOperation) { } TEST_F(RowDeltaTest, ValidateNoConflictingDataFilesFailsForConcurrentAppend) { - CommitFileA(); + AppendFileAToTable(); ICEBERG_UNWRAP_OR_FAIL(auto starting_snapshot, table_->current_snapshot()); ICEBERG_UNWRAP_OR_FAIL(auto concurrent_append, table_->NewFastAppend()); @@ -250,7 +250,7 @@ TEST_F(RowDeltaTest, ValidateNoConflictingDataFilesFailsForConcurrentAppend) { } TEST_F(RowDeltaTest, ValidateNoConflictingDeleteFilesFailsForConcurrentDelete) { - CommitFileA(); + AppendFileAToTable(); ICEBERG_UNWRAP_OR_FAIL(auto starting_snapshot, table_->current_snapshot()); auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet", @@ -275,7 +275,7 @@ TEST_F(RowDeltaTest, ValidateNoConflictingDeleteFilesFailsForConcurrentDelete) { } TEST_F(RowDeltaTest, ValidateDataFilesExistSkipsConcurrentDeleteByDefault) { - CommitFileA(); + AppendFileAToTable(); ICEBERG_UNWRAP_OR_FAIL(auto starting_snapshot, table_->current_snapshot()); ICEBERG_UNWRAP_OR_FAIL(auto delete_files, table_->NewDeleteFiles()); @@ -299,7 +299,7 @@ TEST_F(RowDeltaTest, ValidateDataFilesExistSkipsConcurrentDeleteByDefault) { TEST_F(RowDeltaTest, ValidateDataFilesExistFailsForConcurrentDeleteWithValidateDeletedFiles) { - CommitFileA(); + AppendFileAToTable(); ICEBERG_UNWRAP_OR_FAIL(auto starting_snapshot, table_->current_snapshot()); ICEBERG_UNWRAP_OR_FAIL(auto delete_files, table_->NewDeleteFiles()); @@ -326,7 +326,7 @@ TEST_F(RowDeltaTest, } TEST_F(RowDeltaTest, CannotRemoveReferencedDataFile) { - CommitFileA(); + AppendFileAToTable(); std::shared_ptr row_delta; ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); @@ -341,7 +341,7 @@ TEST_F(RowDeltaTest, CannotRemoveReferencedDataFile) { } TEST_F(RowDeltaTest, AddDeleteFileForRemovedDataFileCommitsDeleteOperation) { - CommitFileA(); + AppendFileAToTable(); auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet", /*partition_x=*/1L); @@ -383,7 +383,7 @@ TEST_F(RowDeltaTest, ValidateDeletedFilesAllowsMissingDeletesOnEmptyTable) { } TEST_F(RowDeltaTest, AddDeletionVectorValidatesConcurrentDVs) { - CommitFileA(); + AppendFileAToTable(); ICEBERG_UNWRAP_OR_FAIL(auto starting_snapshot, table_->current_snapshot()); SetTableFormatVersion(3); diff --git a/src/iceberg/update/row_delta.cc b/src/iceberg/update/row_delta.cc index d2b331572..dd3f50c58 100644 --- a/src/iceberg/update/row_delta.cc +++ b/src/iceberg/update/row_delta.cc @@ -154,12 +154,14 @@ Status RowDelta::Validate(const TableMetadata& current_metadata, } if (validate_new_delete_files_) { + // validate that explicitly deleted files have not had added deletes if (!removed_data_files_.empty()) { ICEBERG_RETURN_UNEXPECTED(MergingSnapshotUpdate::ValidateNoNewDeletesForDataFiles( current_metadata, starting_snapshot_id_, conflict_detection_filter_, removed_data_files_, snapshot, io, IsCaseSensitive())); } + // validate that previous deletes do not conflict with added deletes ICEBERG_RETURN_UNEXPECTED(MergingSnapshotUpdate::ValidateNoNewDeleteFiles( current_metadata, starting_snapshot_id_, conflict_detection_filter_, snapshot, io, IsCaseSensitive())); diff --git a/src/iceberg/update/row_delta.h b/src/iceberg/update/row_delta.h index 6d50deb7d..ddb54d836 100644 --- a/src/iceberg/update/row_delta.h +++ b/src/iceberg/update/row_delta.h @@ -36,55 +36,121 @@ namespace iceberg { -/// \brief Row-level delta operation for adding rows and delete files. +/// \brief API for encoding row-level changes to a table. /// -/// RowDelta is the C++ counterpart of Java BaseRowDelta. It can add data files, -/// add delete files, remove data/delete files, and validate conflicts against -/// snapshots committed after a configured starting snapshot. +/// This API accumulates data and delete file changes, produces a new Snapshot +/// of the table, and commits that snapshot as current. +/// +/// When committing, these changes are applied to the latest table snapshot. +/// Commit conflicts are resolved by applying the changes to the new latest +/// snapshot and reattempting the commit. class ICEBERG_EXPORT RowDelta : public MergingSnapshotUpdate { public: /// \brief Create a new RowDelta instance. static Result> Make(std::string table_name, std::shared_ptr ctx); - /// \brief Add a data file containing inserted rows. + /// \brief Add a data file to the table. + /// + /// \param inserts A data file of rows to insert. + /// \return This RowDelta for method chaining. RowDelta& AddRows(const std::shared_ptr& inserts); - /// \brief Add a delete file. + /// \brief Add a delete file to the table. + /// + /// \param deletes A delete file of rows to delete. + /// \return This RowDelta for method chaining. RowDelta& AddDeletes(const std::shared_ptr& deletes); /// \brief Remove a data file from the table. + /// + /// \param file A data file. + /// \return This RowDelta for method chaining. RowDelta& RemoveRows(const std::shared_ptr& file); - /// \brief Remove a delete file from the table. + /// \brief Remove a rewritten delete file from the table. + /// + /// \param deletes A delete file that can be removed from the table. + /// \return This RowDelta for method chaining. RowDelta& RemoveDeletes(const std::shared_ptr& deletes); - /// \brief Validate against snapshots committed after snapshot_id. + /// \brief Set the snapshot ID used in any reads for this operation. + /// + /// Validations check changes after this snapshot ID. If the from snapshot is + /// not set, all ancestor snapshots through the table's initial snapshot are + /// validated. + /// + /// \param snapshot_id A snapshot ID. + /// \return This RowDelta for method chaining. RowDelta& ValidateFromSnapshot(int64_t snapshot_id); - /// \brief Set case sensitivity for conflict detection. + /// \brief Enable or disable case-sensitive expression binding for validations. + /// + /// \param case_sensitive Whether expression binding should be case sensitive. + /// \return This RowDelta for method chaining. RowDelta& CaseSensitive(bool case_sensitive); - /// \brief Validate that referenced data files still exist. + /// \brief Add data file paths that must not be removed by conflicting commits. + /// + /// If any path has been removed by a conflicting commit in the table since + /// the snapshot passed to ValidateFromSnapshot(), the operation fails. + /// + /// By default, this validation checks only rewrite and overwrite commits. To + /// apply validation to delete commits, call ValidateDeletedFiles(). /// - /// By default, this validation checks overwrite and replace commits. To apply - /// validation to delete commits, call ValidateDeletedFiles(). + /// \param referenced_files File paths that are referenced by a position + /// delete file. + /// \return This RowDelta for method chaining. RowDelta& ValidateDataFilesExist(std::span referenced_files); - /// \brief Enable validation for missing delete paths and delete-operation conflicts. + /// \brief Enable validation that referenced data files were not deleted. /// - /// This fails if any requested data/delete-file removal is missing from - /// manifests when the table has a current snapshot. It also makes - /// ValidateDataFilesExist() check delete-operation snapshots. + /// If a data file has a row deleted using a position delete file, rewriting + /// or overwriting the data file concurrently would un-delete the row. Deleting + /// the data file is normally allowed, but a delete may be part of a + /// transaction that reads and re-appends a row. This method is used to + /// validate deletes for the transaction case. + /// + /// \return This RowDelta for method chaining. RowDelta& ValidateDeletedFiles(); - /// \brief Set the conflict detection filter used by validation methods. + /// \brief Set a conflict detection filter used to validate added files. + /// + /// If not called, a true literal is used as the conflict detection filter. + /// + /// \param filter An expression on rows in the table. + /// \return This RowDelta for method chaining. RowDelta& ConflictDetectionFilter(std::shared_ptr filter); - /// \brief Validate that no matching data files were concurrently added. + /// \brief Enable validation that concurrent data files do not conflict. + /// + /// This method should be called when the table is queried to determine which + /// files to delete or append. If a concurrent operation commits a new file + /// after the data was read and that file might contain rows matching the + /// conflict detection filter, this operation detects that during retries and + /// fails. + /// + /// Calling this method is required to maintain serializable isolation for + /// update/delete operations. Otherwise, the isolation level is snapshot + /// isolation. + /// + /// Validation uses the filter passed to ConflictDetectionFilter() and applies + /// to operations after the snapshot passed to ValidateFromSnapshot(). + /// + /// \return This RowDelta for method chaining. RowDelta& ValidateNoConflictingDataFiles(); - /// \brief Validate that no matching delete files were concurrently added. + /// \brief Enable validation that concurrent delete files do not conflict. + /// + /// This method must be called when the table is queried to produce a row + /// delta for UPDATE and MERGE operations independently of the isolation level. + /// Calling this method is not required for DELETE operations because it is OK + /// to delete a record that is also deleted concurrently. + /// + /// Validation uses the filter passed to ConflictDetectionFilter() and applies + /// to operations after the snapshot passed to ValidateFromSnapshot(). + /// + /// \return This RowDelta for method chaining. RowDelta& ValidateNoConflictingDeleteFiles(); std::string operation() override;