diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 9a0dc68b7..aa051cd14 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -105,6 +105,7 @@ set(ICEBERG_SOURCES update/fast_append.cc update/merge_append.cc update/merging_snapshot_update.cc + update/overwrite_files.cc update/pending_update.cc update/row_delta.cc update/set_snapshot.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index ab514be87..58d79a402 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -130,6 +130,7 @@ iceberg_sources = files( 'update/fast_append.cc', 'update/merge_append.cc', 'update/merging_snapshot_update.cc', + 'update/overwrite_files.cc', 'update/pending_update.cc', 'update/row_delta.cc', 'update/set_snapshot.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index afa626964..9dbc5acf7 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -35,6 +35,7 @@ #include "iceberg/update/expire_snapshots.h" #include "iceberg/update/fast_append.h" #include "iceberg/update/merge_append.h" +#include "iceberg/update/overwrite_files.h" #include "iceberg/update/row_delta.h" #include "iceberg/update/set_snapshot.h" #include "iceberg/update/snapshot_manager.h" @@ -238,6 +239,12 @@ Result> Table::NewRowDelta() { return RowDelta::Make(name().name, std::move(ctx)); } +Result> Table::NewOverwrite() { + ICEBERG_ASSIGN_OR_RAISE( + auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); + return OverwriteFiles::Make(name().name, std::move(ctx)); +} + Result> Table::NewUpdateStatistics() { ICEBERG_ASSIGN_OR_RAISE( auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); @@ -349,6 +356,10 @@ Result> StaticTable::NewRowDelta() { return NotSupported("Cannot create a row delta for a static table"); } +Result> StaticTable::NewOverwrite() { + return NotSupported("Cannot create an overwrite for a static table"); +} + Result> StaticTable::NewSnapshotManager() { return NotSupported("Cannot create a snapshot manager for a static table"); } diff --git a/src/iceberg/table.h b/src/iceberg/table.h index c8f6ded08..64ed21ef8 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -185,6 +185,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// \brief Create a new RowDelta to add rows and row-level deletes. virtual Result> NewRowDelta(); + /// \brief Create a new OverwriteFiles to overwrite data files and commit the changes. + virtual Result> NewOverwrite(); + /// \brief Create a new SnapshotManager to manage snapshots and snapshot references. virtual Result> NewSnapshotManager(); @@ -258,6 +261,8 @@ class ICEBERG_EXPORT StaticTable : public Table { Result> NewRowDelta() override; + Result> NewOverwrite() override; + Result> NewSnapshotManager() override; private: diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index e528b1333..0756c1eef 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -239,7 +239,8 @@ if(ICEBERG_BUILD_BUNDLE) update_properties_test.cc update_schema_test.cc update_sort_order_test.cc - update_statistics_test.cc) + update_statistics_test.cc + overwrite_files_test.cc) add_iceberg_test(data_test USE_BUNDLE diff --git a/src/iceberg/test/overwrite_files_test.cc b/src/iceberg/test/overwrite_files_test.cc new file mode 100644 index 000000000..5d4d851f1 --- /dev/null +++ b/src/iceberg/test/overwrite_files_test.cc @@ -0,0 +1,692 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/overwrite_files.h" + +#include +#include +#include + +#include +#include + +#include "iceberg/avro/avro_register.h" +#include "iceberg/constants.h" +#include "iceberg/expression/expressions.h" +#include "iceberg/partition_field.h" +#include "iceberg/partition_spec.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" +#include "iceberg/transaction.h" +#include "iceberg/transform.h" +#include "iceberg/update/fast_append.h" +#include "iceberg/update/row_delta.h" +#include "iceberg/util/data_file_set.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +// The base table (TableMetadataV2ValidMinimal.json) has schema {x: long (id 1), +// y: long (id 2), z: long (id 3)} and partitions by identity(x). +class OverwriteFilesTest : public UpdateTestBase { + protected: + static void SetUpTestSuite() { avro::RegisterAll(); } + + std::string MetadataResource() const override { + return "TableMetadataV2ValidMinimal.json"; + } + + void SetUp() override { + UpdateTestBase::SetUp(); + + ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec()); + ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema()); + + file_a_ = MakeDataFile("/data/file_a.parquet", /*partition_x=*/1L); + file_b_ = MakeDataFile("/data/file_b.parquet", /*partition_x=*/2L); + } + + std::shared_ptr MakeDataFile(const std::string& path, int64_t partition_x, + int64_t record_count = 100) { + auto f = std::make_shared(); + f->content = DataFile::Content::kData; + f->file_path = table_location_ + path; + f->file_format = FileFormatType::kParquet; + f->partition = PartitionValues(std::vector{Literal::Long(partition_x)}); + f->file_size_in_bytes = 1024; + f->record_count = record_count; + f->partition_spec_id = spec_->spec_id(); + return f; + } + + // Add y metrics so StrictMetricsEvaluator can prove row-filter containment. + std::shared_ptr MakeDataFileWithYBounds(const std::string& path, + int64_t partition_x, int64_t y_lower, + int64_t y_upper) { + auto f = MakeDataFile(path, partition_x); + f->lower_bounds = {{2, Literal::Long(y_lower).Serialize().value()}}; + f->upper_bounds = {{2, Literal::Long(y_upper).Serialize().value()}}; + f->value_counts = {{2, f->record_count}}; + f->null_value_counts = {{2, 0}}; + return f; + } + + std::shared_ptr MakeDeleteFile(const std::string& path, int64_t partition_x) { + auto f = MakeDataFile(path, partition_x); + f->content = DataFile::Content::kPositionDeletes; + return f; + } + + std::shared_ptr MakeEqualityDeleteFile(const std::string& path, + int64_t partition_x) { + auto f = MakeDeleteFile(path, partition_x); + f->content = DataFile::Content::kEqualityDeletes; + f->equality_ids = {1}; + return f; + } + + void CommitEqualityDelete(const std::string& delete_path, int64_t partition_x) { + auto del_file = MakeEqualityDeleteFile(delete_path, partition_x); + ICEBERG_UNWRAP_OR_FAIL(auto row_delta, table_->NewRowDelta()); + row_delta->AddDeletes(del_file); + EXPECT_THAT(row_delta->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + Result> NewOverwrite() { + return table_->NewOverwrite(); + } + + int64_t CommitFileA() { + auto fa = table_->NewFastAppend(); + EXPECT_TRUE(fa.has_value()); + fa.value()->AppendFile(file_a_); + EXPECT_THAT(fa.value()->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + auto snap = table_->current_snapshot(); + EXPECT_TRUE(snap.has_value()); + return snap.value()->snapshot_id; + } + + std::shared_ptr CommitFastAppend(const std::shared_ptr& file) { + auto fa = table_->NewFastAppend(); + EXPECT_TRUE(fa.has_value()); + fa.value()->AppendFile(file); + EXPECT_THAT(fa.value()->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + auto snap = table_->current_snapshot(); + EXPECT_TRUE(snap.has_value()); + return snap.value(); + } + + std::shared_ptr spec_; + std::shared_ptr schema_; + std::shared_ptr file_a_; + std::shared_ptr file_b_; +}; + +TEST_F(OverwriteFilesTest, TxnNewOverwrite) { + ICEBERG_UNWRAP_OR_FAIL(auto txn, Transaction::Make(table_, TransactionKind::kUpdate)); + ICEBERG_UNWRAP_OR_FAIL(auto op, txn->NewOverwrite()); + ASSERT_NE(op, nullptr); + + (*op).OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))).AddFile(file_a_); + + EXPECT_THAT(op->Commit(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto committed, txn->Commit()); + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), + DataOperation::kOverwrite); +} + +TEST_F(OverwriteFilesTest, DeleteAndAddCommit) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFile(file_a_); + op->AddFile(file_b_); + const std::string expected_operation = op->operation(); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), expected_operation); + EXPECT_EQ(expected_operation, DataOperation::kOverwrite); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "1"); +} + +TEST_F(OverwriteFilesTest, RowFilterAndAddCommit) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/new_x1.parquet", 1L)); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), + DataOperation::kOverwrite); +} + +TEST_F(OverwriteFilesTest, EmptyCommit) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + EXPECT_EQ(op->operation(), DataOperation::kOverwrite); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), + DataOperation::kOverwrite); +} + +TEST_F(OverwriteFilesTest, DeduplicatesFiles) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + auto add = MakeDataFile("/data/dup_add.parquet", 1L); + op->DeleteFile(file_a_); + op->DeleteFile(file_a_); // duplicate delete + op->AddFile(add); + op->AddFile(add); // duplicate add + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "1"); +} + +TEST_F(OverwriteFilesTest, StageOnly) { + const int64_t base_snapshot_id = CommitFileA(); + const size_t base_snapshot_count = table_->metadata()->snapshots.size(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->StageOnly(); + op->AddFile(file_b_); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + // The staged snapshot is recorded but the main branch still points at file_a's + // snapshot. + ICEBERG_UNWRAP_OR_FAIL(auto current, table_->current_snapshot()); + EXPECT_EQ(current->snapshot_id, base_snapshot_id); + EXPECT_GT(table_->metadata()->snapshots.size(), base_snapshot_count); +} + +TEST_F(OverwriteFilesTest, TargetBranch) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->SetTargetBranch("audit"); + op->AddFile(file_b_); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + EXPECT_TRUE(table_->metadata()->refs.contains("audit")); +} + +TEST_F(OverwriteFilesTest, CustomSummary) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->Set("custom-prop", "custom-value"); + op->AddFile(file_b_); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at("custom-prop"), "custom-value"); +} + +// With no matching committed delete file, deleting `del_file` is a harmless no-op. +TEST_F(OverwriteFilesTest, BulkDeleteCommit) { + { + ICEBERG_UNWRAP_OR_FAIL(auto seed, NewOverwrite()); + seed->AddFile(file_a_); + EXPECT_THAT(seed->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + auto del_file = MakeDeleteFile("/delete/del_a.parquet", 1L); + + DataFileSet data_files; + data_files.insert(file_a_); + DeleteFileSet delete_files; + delete_files.insert(del_file); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFiles(data_files, delete_files); + op->AddFile(file_b_); + EXPECT_EQ(op->operation(), DataOperation::kOverwrite); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "1"); +} + +TEST_F(OverwriteFilesTest, BulkDeleteData) { + { + ICEBERG_UNWRAP_OR_FAIL(auto seed, NewOverwrite()); + seed->AddFile(file_a_); + seed->AddFile(file_b_); + EXPECT_THAT(seed->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + DataFileSet data_files; + data_files.insert(file_a_); + data_files.insert(file_b_); + op->DeleteFiles(data_files, DeleteFileSet{}); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "2"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kTotalDataFiles), "0"); +} + +// OverwriteFiles validates content because the C++ API stores data and delete files in +// DataFile pointers, while Java uses separate DataFile/DeleteFile types. +TEST_F(OverwriteFilesTest, AddRejectsDeleteContent) { + auto del_file = MakeDeleteFile("/delete/del_as_data.parquet", 1L); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->AddFile(del_file); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Invalid data file to add")); + EXPECT_THAT(result, HasErrorMessage("has delete-file content")); +} + +TEST_F(OverwriteFilesTest, DeleteRejectsDeleteContent) { + auto del_file = MakeDeleteFile("/delete/del_as_delete.parquet", 1L); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFile(del_file); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Invalid data file to delete")); + EXPECT_THAT(result, HasErrorMessage("has delete-file content")); +} + +TEST_F(OverwriteFilesTest, BulkRejectsDeleteAsData) { + auto del_file = + MakeDeleteFile("/delete/del_a.parquet", 1L); // content = positionDeletes + DataFileSet data_files; + data_files.insert(del_file); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFiles(data_files, DeleteFileSet{}); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("has delete-file content")); +} + +TEST_F(OverwriteFilesTest, BulkRejectsDataAsDelete) { + DeleteFileSet delete_files; + delete_files.insert(file_a_); // content = kData + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFiles(DataFileSet{}, delete_files); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("has data-file content")); +} + +TEST_F(OverwriteFilesTest, BulkAcceptsEqualityDelete) { + auto eq_delete = MakeEqualityDeleteFile("/delete/eq_a.parquet", 1L); + DeleteFileSet delete_files; + delete_files.insert(eq_delete); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFiles(DataFileSet{}, delete_files); + op->AddFile(file_b_); + EXPECT_THAT(op->Commit(), IsOk()); +} + +TEST_F(OverwriteFilesTest, NoConflictingDeletesFails) { + const int64_t first_id = CommitFileA(); + + { + ICEBERG_UNWRAP_OR_FAIL(auto competing, NewOverwrite()); + competing->DeleteFile(file_a_); + EXPECT_THAT(competing->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/replacement_after_delete.parquet", 1L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingDeletes(); + EXPECT_THAT(op->Commit(), IsError(ErrorKind::kValidationFailed)); +} + +TEST_F(OverwriteFilesTest, NoConflictingDeletesPasses) { + const int64_t first_id = CommitFileA(); + CommitFastAppend(file_b_); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/replacement_no_conflict.parquet", 1L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingDeletes(); + EXPECT_THAT(op->Commit(), IsOk()); +} + +// Explicit replaced-file validation checks concurrent deletes covering replaced files. +TEST_F(OverwriteFilesTest, ExplicitDeleteConflict) { + CommitFileA(); + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); + CommitEqualityDelete("/delete/concurrent_x1.parquet", /*partition_x=*/1L); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFile(file_a_); + op->AddFile(MakeDataFile("/data/rewrite_x1.parquet", 1L)); + op->ValidateFromSnapshot(first_snapshot->snapshot_id); + op->ValidateNoConflictingDeletes(); + EXPECT_THAT(op->Commit(), IsError(ErrorKind::kValidationFailed)); +} + +// A narrower conflict filter can exclude the concurrent delete. +TEST_F(OverwriteFilesTest, ExplicitDeleteFilterScope) { + CommitFileA(); + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); + CommitEqualityDelete("/delete/concurrent_x1.parquet", /*partition_x=*/1L); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFile(file_a_); + op->AddFile(MakeDataFile("/data/rewrite_x1.parquet", 1L)); + op->ValidateFromSnapshot(first_snapshot->snapshot_id); + op->ConflictDetectionFilter(Expressions::Equal("x", Literal::Long(2L))); + op->ValidateNoConflictingDeletes(); + EXPECT_THAT(op->Commit(), IsOk()); +} + +TEST_F(OverwriteFilesTest, StrictRangeByProjection) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/in_partition.parquet", 1L)); + op->ValidateAddedFilesMatchOverwriteFilter(); + EXPECT_THAT(op->Commit(), IsOk()); +} + +TEST_F(OverwriteFilesTest, StrictRangeByMetrics) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("y", Literal::Long(5L))); + // y bounds [5, 5] prove every row has y == 5. + op->AddFile(MakeDataFileWithYBounds("/data/y_eq_5.parquet", 1L, 5L, 5L)); + op->ValidateAddedFilesMatchOverwriteFilter(); + EXPECT_THAT(op->Commit(), IsOk()); +} + +TEST_F(OverwriteFilesTest, StrictRangeRejectsPartialMetrics) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("y", Literal::Long(5L))); + // y bounds [1, 10] do not prove every row has y == 5. + op->AddFile(MakeDataFileWithYBounds("/data/y_range.parquet", 1L, 1L, 10L)); + op->ValidateAddedFilesMatchOverwriteFilter(); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, + HasErrorMessage("Cannot append file with rows that do not match filter")); +} + +TEST_F(OverwriteFilesTest, StrictRangeRejectsOutsidePartition) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/wrong_partition.parquet", /*partition_x=*/2L)); + op->ValidateAddedFilesMatchOverwriteFilter(); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, + HasErrorMessage("Cannot append file with rows that do not match filter")); +} + +TEST_F(OverwriteFilesTest, StrictRangeRequiresFilter) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->AddFile(MakeDataFile("/data/no_filter.parquet", 1L)); + op->ValidateAddedFilesMatchOverwriteFilter(); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, + HasErrorMessage("Cannot append file with rows that do not match filter")); +} + +TEST_F(OverwriteFilesTest, StrictRangeRejectsMultipleSpecs) { + // Add the second spec before creating the builder so staged files can resolve it. + ICEBERG_UNWRAP_OR_FAIL( + auto spec1, PartitionSpec::Make(*schema_, /*spec_id=*/1, + {PartitionField(/*source_id=*/1, /*field_id=*/1001, + "x_v1", Transform::Identity())}, + /*allow_missing_fields=*/false)); + table_->metadata()->partition_specs.push_back( + std::shared_ptr(std::move(spec1))); + ASSERT_THAT(table_->metadata()->PartitionSpecById(0), IsOk()); + ASSERT_THAT(table_->metadata()->PartitionSpecById(1), IsOk()); + + auto file_spec0 = MakeDataFile("/data/spec0_x1.parquet", 1L); // partition_spec_id 0 + auto file_spec1 = MakeDataFile("/data/spec1_x1.parquet", 1L); + file_spec1->partition_spec_id = 1; + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(file_spec0); + op->AddFile(file_spec1); + op->ValidateAddedFilesMatchOverwriteFilter(); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, HasErrorMessage("Cannot return a single partition spec")); +} + +TEST_F(OverwriteFilesTest, StrictRangeRejectsNoAdds) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->ValidateAddedFilesMatchOverwriteFilter(); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, HasErrorMessage("Cannot determine partition specs")); +} + +// Strict-range validation binds the row filter with the configured case sensitivity. +TEST_F(OverwriteFilesTest, StrictRangeCaseSensitivity) { + { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("X", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/cs.parquet", 1L)); + op->ValidateAddedFilesMatchOverwriteFilter(); + auto result = op->Commit(); + EXPECT_FALSE(result.has_value()); + } + { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->CaseSensitive(false); + op->OverwriteByRowFilter(Expressions::Equal("X", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/ci.parquet", 1L)); + op->ValidateAddedFilesMatchOverwriteFilter(); + EXPECT_THAT(op->Commit(), IsOk()); + } +} + +TEST_F(OverwriteFilesTest, NullAddFileRejected) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->AddFile(nullptr); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Invalid data file: null")); +} + +TEST_F(OverwriteFilesTest, NullDeleteFileRejected) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFile(nullptr); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Invalid data file: null")); +} + +TEST_F(OverwriteFilesTest, NullRowFilterRejected) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(nullptr); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Invalid row filter expression: null")); +} + +TEST_F(OverwriteFilesTest, NullConflictFilterRejected) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->ConflictDetectionFilter(nullptr); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Invalid conflict detection filter: null")); +} + +TEST_F(OverwriteFilesTest, RejectsNegativeSnapshotId) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->AddFile(file_a_).ValidateFromSnapshot(-1); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Invalid snapshot id")); +} + +TEST_F(OverwriteFilesTest, AcceptsZeroSnapshotId) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->AddFile(file_a_).ValidateFromSnapshot(0); + EXPECT_THAT(op->Commit(), IsOk()); +} + +TEST_F(OverwriteFilesTest, OperationMatrix) { + struct Case { + bool add; + bool delete_file; + bool row_filter; + std::string expected; + }; + const std::vector cases = { + {.add = true, + .delete_file = false, + .row_filter = false, + .expected = DataOperation::kAppend}, + {.add = false, + .delete_file = true, + .row_filter = false, + .expected = DataOperation::kDelete}, + {.add = false, + .delete_file = false, + .row_filter = true, + .expected = DataOperation::kDelete}, // row filter counts as a delete + {.add = true, + .delete_file = true, + .row_filter = false, + .expected = DataOperation::kOverwrite}, + {.add = true, + .delete_file = false, + .row_filter = true, + .expected = DataOperation::kOverwrite}, + {.add = false, + .delete_file = true, + .row_filter = true, + .expected = DataOperation::kDelete}, // deletes only + {.add = false, + .delete_file = false, + .row_filter = false, + .expected = DataOperation::kOverwrite}, // neither + }; + + int index = 0; + for (const auto& c : cases) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + if (c.add) { + op->AddFile(MakeDataFile("/data/tt_add" + std::to_string(index) + ".parquet", 1L)); + } + if (c.delete_file) { + op->DeleteFile( + MakeDataFile("/data/tt_del" + std::to_string(index) + ".parquet", 1L)); + } + if (c.row_filter) { + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + } + EXPECT_EQ(op->operation(), c.expected) << "case index " << index; + ++index; + } +} + +TEST_F(OverwriteFilesTest, DefaultConflictFilter) { + const int64_t first_id = CommitFileA(); + CommitFastAppend(file_b_); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/r2_ok.parquet", 1L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingData(); + EXPECT_THAT(op->Commit(), IsOk()); +} + +TEST_F(OverwriteFilesTest, ConflictFilterMatchesAdd) { + const int64_t first_id = CommitFileA(); + CommitFastAppend(file_b_); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(2L))); + op->AddFile(MakeDataFile("/data/r2_conflict.parquet", 2L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingData(); + EXPECT_THAT(op->Commit(), IsError(ErrorKind::kValidationFailed)); +} + +TEST_F(OverwriteFilesTest, ConflictFilterUsesExplicitFilter) { + const int64_t first_id = CommitFileA(); + CommitFastAppend(file_b_); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->ConflictDetectionFilter(Expressions::Equal("x", Literal::Long(2L))); + op->AddFile(MakeDataFile("/data/r1.parquet", 1L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingData(); + EXPECT_THAT(op->Commit(), IsError(ErrorKind::kValidationFailed)); +} + +TEST_F(OverwriteFilesTest, ExplicitReplaceConflicts) { + const int64_t first_id = CommitFileA(); + CommitFastAppend(file_b_); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->DeleteFile(file_a_); + op->AddFile(MakeDataFile("/data/r3.parquet", 1L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingData(); + EXPECT_THAT(op->Commit(), IsError(ErrorKind::kValidationFailed)); +} + +} // namespace iceberg diff --git a/src/iceberg/test/table_test.cc b/src/iceberg/test/table_test.cc index d7ebe4a0a..ef21f12d6 100644 --- a/src/iceberg/test/table_test.cc +++ b/src/iceberg/test/table_test.cc @@ -163,6 +163,7 @@ TEST(StaticTableTest, NewMutatingOperationsAreNotSupported) { EXPECT_THAT(table->NewMergeAppend(), IsError(ErrorKind::kNotSupported)); EXPECT_THAT(table->NewDeleteFiles(), IsError(ErrorKind::kNotSupported)); EXPECT_THAT(table->NewRowDelta(), IsError(ErrorKind::kNotSupported)); + EXPECT_THAT(table->NewOverwrite(), IsError(ErrorKind::kNotSupported)); EXPECT_THAT(table->NewSnapshotManager(), IsError(ErrorKind::kNotSupported)); } diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index e911a61dc..169e7ec90 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -36,6 +36,7 @@ #include "iceberg/update/expire_snapshots.h" #include "iceberg/update/fast_append.h" #include "iceberg/update/merge_append.h" +#include "iceberg/update/overwrite_files.h" #include "iceberg/update/pending_update.h" #include "iceberg/update/row_delta.h" #include "iceberg/update/set_snapshot.h" @@ -513,6 +514,13 @@ Result> Transaction::NewRowDelta() { return row_delta; } +Result> Transaction::NewOverwrite() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr overwrite, + OverwriteFiles::Make(ctx_->table->name().name, ctx_)); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(overwrite)); + return overwrite; +} + Result> Transaction::NewUpdateStatistics() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_statistics, UpdateStatistics::Make(ctx_)); diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 34ca78bd7..49b607d60 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -115,6 +115,9 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewRowDelta(); + /// \brief Create a new OverwriteFiles to overwrite data files and commit the changes. + Result> NewOverwrite(); + /// \brief Create a new SnapshotManager to manage snapshots. Result> NewSnapshotManager(); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index f29bc4a1a..784b3e03b 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -242,6 +242,7 @@ class DeleteFiles; class ExpireSnapshots; class FastAppend; class MergeAppend; +class OverwriteFiles; class PendingUpdate; class RowDelta; class SetSnapshot; diff --git a/src/iceberg/update/delete_files.h b/src/iceberg/update/delete_files.h index 1be08e35b..7e567830e 100644 --- a/src/iceberg/update/delete_files.h +++ b/src/iceberg/update/delete_files.h @@ -34,27 +34,58 @@ namespace iceberg { /// \brief API for deleting data files from a table. /// -/// This accumulates data-file deletions, produces a new snapshot, and commits that -/// snapshot as current. File paths are matched exactly against table metadata values; -/// equivalent but differently-normalized URIs are not considered matches. +/// This API accumulates file deletions, produces a new Snapshot of the table, +/// and commits that snapshot as current. When committing, these changes are +/// applied to the latest table snapshot. Commit conflicts are resolved by +/// applying the changes to the new latest snapshot and reattempting the commit. +/// +/// File paths are matched exactly against table metadata values; equivalent but +/// differently-normalized URIs are not considered matches. class ICEBERG_EXPORT DeleteFiles : public MergingSnapshotUpdate { public: static Result> Make( std::string table_name, std::shared_ptr ctx); - /// \brief Delete a data-file path from the table. + /// \brief Delete a file by path from the underlying table. + /// + /// \param path A path to remove from the table. + /// \return This DeleteFiles for method chaining. DeleteFiles& DeleteFile(std::string_view path); - /// \brief Delete a data file tracked by object identity and path. + /// \brief Delete a file tracked by a DataFile from the underlying table. + /// + /// \param file A DataFile to remove from the table. + /// \return This DeleteFiles for method chaining. DeleteFiles& DeleteFile(const std::shared_ptr& file); - /// \brief Delete files whose rows all match the given expression. + /// \brief Delete files that match an expression on data rows from the table. + /// + /// A file is selected to be deleted by the expression if it could contain any + /// rows that match the expression. Candidate files are selected using an + /// inclusive partition projection. These candidate files are deleted if all of + /// the rows in the file must match the expression, determined by the + /// expression's strict partition projection. This guarantees that files are + /// deleted if and only if all rows in the file must match the expression. + /// + /// Files that may contain some rows that match the expression and some rows + /// that do not will result in a validation error. + /// + /// \param expr An expression on rows in the table. + /// \return This DeleteFiles for method chaining. DeleteFiles& DeleteFromRowFilter(std::shared_ptr expr); - /// \brief Set case sensitivity for expression binding. + /// \brief Enable or disable case-sensitive expression binding for validations. + /// + /// \param case_sensitive Whether expression binding should be case sensitive. + /// \return This DeleteFiles for method chaining. DeleteFiles& CaseSensitive(bool case_sensitive); - /// \brief Validate that explicitly requested deleted files still exist. + /// \brief Enable validation that deleted files still exist. + /// + /// If this method is called, any files that are part of the deletion must + /// still exist when committing the operation. + /// + /// \return This DeleteFiles for method chaining. DeleteFiles& ValidateFilesExist(); std::string operation() override; diff --git a/src/iceberg/update/fast_append.h b/src/iceberg/update/fast_append.h index 07018989c..f5e1ceaba 100644 --- a/src/iceberg/update/fast_append.h +++ b/src/iceberg/update/fast_append.h @@ -34,36 +34,43 @@ namespace iceberg { -/// \brief Appending new files in a table. +/// \brief API for appending new files in a table. /// -/// FastAppend is optimized for appending new data files to a table, it creates new -/// manifest files for the added data without compacting or rewriting existing manifests, -/// making it faster for write-heavy workloads. +/// This API accumulates file additions, produces a new Snapshot of the table, +/// and commits that snapshot as current. When committing, these changes are +/// applied to the latest table snapshot. Commit conflicts are resolved by +/// applying the changes to the new latest snapshot and reattempting the commit. +/// +/// FastAppend is optimized for appending new data files to a table. It creates +/// new manifest files for the added data without compacting or rewriting +/// existing manifests. class ICEBERG_EXPORT FastAppend : public SnapshotUpdate { public: /// \brief Create a new FastAppend instance. /// /// \param table_name The name of the table - /// \param transaction The transaction to use for this update + /// \param ctx The transaction context to use for this update /// \return A Result containing the FastAppend instance or an error static Result> Make( std::string table_name, std::shared_ptr ctx); - /// \brief Append a data file to this update. + /// \brief Append a DataFile to the table. /// - /// \param file The data file to append - /// \return Reference to this for method chaining + /// \param file A data file. + /// \return This FastAppend for method chaining. FastAppend& AppendFile(const std::shared_ptr& file); - /// \brief Append a manifest file to this update. + /// \brief Append a ManifestFile to the table. + /// + /// The manifest must contain only appended files. All files in the manifest + /// are appended to the table in the snapshot created by this update. /// - /// The manifest must only contain added files (no existing or deleted files). - /// If the manifest doesn't have a snapshot ID assigned and snapshot ID inheritance - /// is enabled, it will be used directly. Otherwise, it will be copied with the - /// new snapshot ID. + /// If the manifest doesn't have a snapshot ID assigned and snapshot ID + /// inheritance is enabled, it will be used directly. Otherwise, it will be + /// copied with the new snapshot ID. /// - /// \param manifest The manifest file to append - /// \return Reference to this for method chaining + /// \param manifest A manifest file of files to append. + /// \return This FastAppend for method chaining. FastAppend& AppendManifest(const ManifestFile& manifest); std::string operation() override; diff --git a/src/iceberg/update/merge_append.h b/src/iceberg/update/merge_append.h index 8122a262c..cd4f4acbb 100644 --- a/src/iceberg/update/merge_append.h +++ b/src/iceberg/update/merge_append.h @@ -31,10 +31,15 @@ namespace iceberg { -/// \brief Append files while merging manifests when table properties allow it. +/// \brief API for appending new files in a table while merging manifests. /// -/// MergeAppend uses the shared MergingSnapshotUpdate pipeline, so it can compact -/// newly written and existing manifests into fewer manifests during commit. +/// This API accumulates file additions, produces a new Snapshot of the table, +/// and commits that snapshot as current. When committing, these changes are +/// applied to the latest table snapshot. Commit conflicts are resolved by +/// applying the changes to the new latest snapshot and reattempting the commit. +/// +/// MergeAppend uses the shared MergingSnapshotUpdate pipeline, so it can merge +/// newly written and existing manifests according to table properties. class ICEBERG_EXPORT MergeAppend : public MergingSnapshotUpdate { public: /// \brief Create a new MergeAppend instance. @@ -45,19 +50,20 @@ class ICEBERG_EXPORT MergeAppend : public MergingSnapshotUpdate { static Result> Make( std::string table_name, std::shared_ptr ctx); - /// \brief Append a data file to this update. + /// \brief Append a DataFile to the table. /// - /// \param file The data file to append - /// \return Reference to this for method chaining + /// \param file A data file. + /// \return This MergeAppend for method chaining. MergeAppend& AppendFile(const std::shared_ptr& file); - /// \brief Append a data manifest to this update. + /// \brief Append a ManifestFile to the table. /// - /// The manifest must contain only added files. Snapshot ID and sequence number - /// assignment happen during commit. + /// The manifest must contain only appended files. All files in the manifest + /// are appended to the table in the snapshot created by this update. + /// Snapshot ID and sequence number assignment happen during commit. /// - /// \param manifest The manifest file to append - /// \return Reference to this for method chaining + /// \param manifest A manifest file of files to append. + /// \return This MergeAppend for method chaining. MergeAppend& AppendManifest(const ManifestFile& manifest); std::string operation() override; diff --git a/src/iceberg/update/merging_snapshot_update.cc b/src/iceberg/update/merging_snapshot_update.cc index c4d108dcb..8f88633ba 100644 --- a/src/iceberg/update/merging_snapshot_update.cc +++ b/src/iceberg/update/merging_snapshot_update.cc @@ -617,12 +617,13 @@ void MergingSnapshotUpdate::SetSummaryProperty(const std::string& property, Result> MergingSnapshotUpdate::DataSpec() const { if (new_data_files_by_spec_.empty()) { - return InvalidArgument("DataSpec() called before any data file was added"); + return InvalidArgument( + "Cannot determine partition specs: no data files have been added"); } if (new_data_files_by_spec_.size() > 1) { return InvalidArgument( - "DataSpec() requires exactly one partition spec; got {} different specs", - new_data_files_by_spec_.size()); + "Cannot return a single partition spec: data files with different partition " + "specs have been added"); } return base().PartitionSpecById(new_data_files_by_spec_.begin()->first); } diff --git a/src/iceberg/update/merging_snapshot_update.h b/src/iceberg/update/merging_snapshot_update.h index fc3987ee1..d11a9b1f9 100644 --- a/src/iceberg/update/merging_snapshot_update.h +++ b/src/iceberg/update/merging_snapshot_update.h @@ -40,12 +40,17 @@ namespace iceberg { -/// \brief Abstract base class for all merge-based snapshot write operations. +/// \brief Abstract base class for merge-based snapshot write operations. /// -/// Provides the complete filter → write → merge pipeline that all merge-based -/// operations (MergeAppend, OverwriteFiles, RowDelta, ReplacePartitions, -/// RewriteFiles) share. Subclasses only need to implement `operation()` and -/// call the protected primitive API to describe what changes to make. +/// This class is the C++ counterpart of Java's MergingSnapshotProducer. It +/// provides the shared filter, write, and merge pipeline used by operations +/// that accumulate file additions and deletions, produce a new Snapshot, and +/// commit that snapshot as current. Commit conflicts are resolved by applying +/// the same staged changes to the new latest snapshot and reattempting the +/// commit. +/// +/// Subclasses describe their changes by calling the protected primitive API and +/// implement `operation()` to set the snapshot operation summary. /// /// The Apply() pipeline: /// 1. Filter data manifests (via data_filter_manager_) @@ -98,9 +103,9 @@ class ICEBERG_EXPORT MergingSnapshotUpdate : public SnapshotUpdate { /// \brief Add all files in a pre-existing data manifest to the new snapshot. /// - /// The manifest must contain DATA content. If snapshot ID inheritance is - /// enabled and the manifest has no snapshot ID assigned, it is used directly; - /// otherwise it is copied with the current snapshot ID. + /// The manifest must contain only appended data files. If snapshot ID + /// inheritance is enabled and the manifest has no snapshot ID assigned, it is + /// used directly; otherwise it is copied with the current snapshot ID. Status AddManifest(ManifestFile manifest); /// \brief Register a data file (by object) to be deleted from the table. @@ -116,14 +121,15 @@ class ICEBERG_EXPORT MergingSnapshotUpdate : public SnapshotUpdate { /// \brief Register an expression to delete matching rows. /// - /// Both data and delete filter managers receive the expression: delete files that - /// match the row filter can also be removed because those rows will be deleted. + /// Both data and delete filter managers receive the expression: delete files + /// that match the row filter can also be removed because those rows will be + /// deleted. Status DeleteByRowFilter(std::shared_ptr expr); /// \brief Register a partition to be dropped. /// - /// Both data and delete filter managers receive the partition drop, since dropping - /// data in a partition also drops all delete files in that partition. + /// Both data and delete filter managers receive the partition drop, since + /// dropping data in a partition also drops all delete files in that partition. Status DropPartition(int32_t spec_id, PartitionValues partition); /// \brief Fail if any registered delete path is not found in any manifest. diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 4f594a06e..4ba4168d4 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -22,6 +22,7 @@ install_headers( 'fast_append.h', 'merge_append.h', 'merging_snapshot_update.h', + 'overwrite_files.h', 'pending_update.h', 'row_delta.h', 'set_snapshot.h', diff --git a/src/iceberg/update/overwrite_files.cc b/src/iceberg/update/overwrite_files.cc new file mode 100644 index 000000000..cba6e028c --- /dev/null +++ b/src/iceberg/update/overwrite_files.cc @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/overwrite_files.h" + +#include + +#include "iceberg/expression/evaluator.h" +#include "iceberg/expression/expressions.h" +#include "iceberg/expression/projections.h" +#include "iceberg/expression/strict_metrics_evaluator.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/partition_spec.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/type.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> OverwriteFiles::Make( + std::string table_name, std::shared_ptr ctx) { + ICEBERG_PRECHECK(!table_name.empty(), "Table name cannot be empty"); + ICEBERG_PRECHECK(ctx != nullptr, "Cannot create OverwriteFiles without a context"); + return std::shared_ptr( + new OverwriteFiles(std::move(table_name), std::move(ctx))); +} + +OverwriteFiles::OverwriteFiles(std::string table_name, + std::shared_ptr ctx) + : MergingSnapshotUpdate(std::move(table_name), std::move(ctx)) {} + +OverwriteFiles::~OverwriteFiles() = default; + +OverwriteFiles& OverwriteFiles::AddFile(const std::shared_ptr& file) { + ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null"); + ICEBERG_BUILDER_CHECK(file->content == DataFile::Content::kData, + "Invalid data file to add: {} has delete-file content", + file->file_path); + ICEBERG_BUILDER_RETURN_IF_ERROR(AddDataFile(file)); + return *this; +} + +OverwriteFiles& OverwriteFiles::DeleteFile(const std::shared_ptr& file) { + ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null"); + ICEBERG_BUILDER_CHECK(file->content == DataFile::Content::kData, + "Invalid data file to delete: {} has delete-file content", + file->file_path); + deleted_data_files_.insert(file); + ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteDataFile(file)); + return *this; +} + +OverwriteFiles& OverwriteFiles::DeleteFiles(const DataFileSet& data_files_to_delete, + const DeleteFileSet& delete_files_to_delete) { + // Both sets use DataFile pointers, so validate content before forwarding to the + // data-file and delete-file removal paths. + for (const auto& file : data_files_to_delete) { + ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null"); + ICEBERG_BUILDER_CHECK(file->content == DataFile::Content::kData, + "Invalid data file to delete: {} has delete-file content", + file->file_path); + deleted_data_files_.insert(file); + ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteDataFile(file)); + } + for (const auto& file : delete_files_to_delete) { + ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid delete file: null"); + ICEBERG_BUILDER_CHECK(file->content != DataFile::Content::kData, + "Invalid delete file to delete: {} has data-file content", + file->file_path); + ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteDeleteFile(file)); + } + return *this; +} + +OverwriteFiles& OverwriteFiles::OverwriteByRowFilter(std::shared_ptr expr) { + ICEBERG_BUILDER_CHECK(expr != nullptr, "Invalid row filter expression: null"); + ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteByRowFilter(std::move(expr))); + return *this; +} + +OverwriteFiles& OverwriteFiles::ValidateFromSnapshot(int64_t snapshot_id) { + ICEBERG_BUILDER_CHECK(snapshot_id >= 0, "Invalid snapshot id: {}", snapshot_id); + starting_snapshot_id_ = snapshot_id; + return *this; +} + +OverwriteFiles& OverwriteFiles::ConflictDetectionFilter( + std::shared_ptr expr) { + ICEBERG_BUILDER_CHECK(expr != nullptr, "Invalid conflict detection filter: null"); + conflict_detection_filter_ = std::move(expr); + return *this; +} + +OverwriteFiles& OverwriteFiles::CaseSensitive(bool case_sensitive) { + MergingSnapshotUpdate::CaseSensitive(case_sensitive); + return *this; +} + +OverwriteFiles& OverwriteFiles::ValidateNoConflictingData() { + validate_new_data_files_ = true; + FailMissingDeletePaths(); + return *this; +} + +OverwriteFiles& OverwriteFiles::ValidateNoConflictingDeletes() { + validate_new_deletes_ = true; + FailMissingDeletePaths(); + return *this; +} + +OverwriteFiles& OverwriteFiles::ValidateAddedFilesMatchOverwriteFilter() { + validate_added_files_match_overwrite_filter_ = true; + return *this; +} + +std::string OverwriteFiles::operation() { + if (DeletesDataFiles() && !AddsDataFiles()) { + return DataOperation::kDelete; + } + if (AddsDataFiles() && !DeletesDataFiles()) { + return DataOperation::kAppend; + } + return DataOperation::kOverwrite; +} + +std::shared_ptr OverwriteFiles::DataConflictDetectionFilter() const { + if (conflict_detection_filter_ != nullptr) { + return conflict_detection_filter_; + } + if (auto filter = RowFilter(); filter != nullptr && + filter != Expressions::AlwaysFalse() && + deleted_data_files_.empty()) { + return filter; + } + return Expressions::AlwaysTrue(); +} + +Status OverwriteFiles::Validate(const TableMetadata& current_metadata, + const std::shared_ptr& snapshot) { + auto row_filter = RowFilter(); + + if (validate_added_files_match_overwrite_filter_) { + ICEBERG_ASSIGN_OR_RAISE(auto spec, DataSpec()); + ICEBERG_ASSIGN_OR_RAISE(auto schema, current_metadata.Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec->PartitionType(*schema)); + auto partition_schema = partition_type->ToSchema(); + + ICEBERG_ASSIGN_OR_RAISE( + auto inclusive_expr, + Projections::Inclusive(*spec, *schema, IsCaseSensitive())->Project(row_filter)); + ICEBERG_ASSIGN_OR_RAISE( + auto inclusive_evaluator, + Evaluator::Make(*partition_schema, inclusive_expr, IsCaseSensitive())); + + ICEBERG_ASSIGN_OR_RAISE( + auto strict_expr, + Projections::Strict(*spec, *schema, IsCaseSensitive())->Project(row_filter)); + ICEBERG_ASSIGN_OR_RAISE( + auto strict_evaluator, + Evaluator::Make(*partition_schema, strict_expr, IsCaseSensitive())); + + ICEBERG_ASSIGN_OR_RAISE( + auto metrics_evaluator, + StrictMetricsEvaluator::Make(row_filter, schema, IsCaseSensitive())); + + // the real test is that the strict or metrics test matches the file, indicating that + // all records in the file match the filter. inclusive is used to avoid testing the + // metrics, which is more complicated + const auto file_test = [&](const DataFile& file) -> Result { + ICEBERG_ASSIGN_OR_RAISE(bool inclusive_match, + inclusive_evaluator->Evaluate(file.partition)); + if (!inclusive_match) { + return false; + } + ICEBERG_ASSIGN_OR_RAISE(bool strict_match, + strict_evaluator->Evaluate(file.partition)); + if (strict_match) { + return true; + } + return metrics_evaluator->Evaluate(file); + }; + + for (const auto& file : AddedDataFiles()) { + ICEBERG_ASSIGN_OR_RAISE(bool matches_filter, file_test(*file)); + if (!matches_filter) { + return ValidationFailed( + "Cannot append file with rows that do not match filter: {}: {}", + row_filter->ToString(), file->file_path); + } + } + } + + if (validate_new_data_files_) { + ICEBERG_RETURN_UNEXPECTED(ValidateAddedDataFiles( + current_metadata, starting_snapshot_id_, DataConflictDetectionFilter(), snapshot, + ctx_->table->io(), IsCaseSensitive())); + } + + if (validate_new_deletes_) { + if (row_filter != nullptr && row_filter != Expressions::AlwaysFalse()) { + auto filter = + conflict_detection_filter_ != nullptr ? conflict_detection_filter_ : row_filter; + ICEBERG_RETURN_UNEXPECTED( + ValidateNoNewDeleteFiles(current_metadata, starting_snapshot_id_, filter, + snapshot, ctx_->table->io(), IsCaseSensitive())); + ICEBERG_RETURN_UNEXPECTED( + ValidateDeletedDataFiles(current_metadata, starting_snapshot_id_, filter, + snapshot, ctx_->table->io(), IsCaseSensitive())); + } + + if (!deleted_data_files_.empty()) { + ICEBERG_RETURN_UNEXPECTED(ValidateNoNewDeletesForDataFiles( + current_metadata, starting_snapshot_id_, conflict_detection_filter_, + deleted_data_files_, snapshot, ctx_->table->io(), IsCaseSensitive())); + } + } + + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/update/overwrite_files.h b/src/iceberg/update/overwrite_files.h new file mode 100644 index 000000000..f76263cad --- /dev/null +++ b/src/iceberg/update/overwrite_files.h @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/update/overwrite_files.h + +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/merging_snapshot_update.h" +#include "iceberg/util/data_file_set.h" + +namespace iceberg { + +/// \brief API for overwriting files in a table. +/// +/// This API accumulates file additions and produces a new Snapshot of the table by +/// replacing all deleted files with the set of additions. This operation is used +/// to implement idempotent writes that always replace a section of a table with +/// new data or update/delete operations that eagerly overwrite files. +/// +/// Overwrites can be validated. The default validation mode is idempotent, +/// meaning the overwrite is correct and should be committed regardless of other +/// concurrent changes to the table. For example, this can be used for replacing +/// all data for day D with query results. Alternatively, this API can be +/// configured for overwriting certain files with their filtered versions while +/// ensuring no new data that would need to be filtered has been added. +/// +/// When committing, these changes are applied to the latest table snapshot. +/// Commit conflicts are resolved by applying the changes to the new latest +/// snapshot and reattempting the commit. +class ICEBERG_EXPORT OverwriteFiles : public MergingSnapshotUpdate { + public: + /// \brief Create a new OverwriteFiles instance. + /// + /// \param table_name The name of the table + /// \param ctx The transaction context to use for this update + /// \return A Result containing the OverwriteFiles instance or an error + static Result> Make( + std::string table_name, std::shared_ptr ctx); + + ~OverwriteFiles() override; + + /// \brief Add a DataFile to the table. + /// + /// \param file A data file. + /// \return This OverwriteFiles for method chaining. + OverwriteFiles& AddFile(const std::shared_ptr& file); + + /// \brief Delete a DataFile from the table. + /// + /// \param file A data file. + /// \return This OverwriteFiles for method chaining. + OverwriteFiles& DeleteFile(const std::shared_ptr& file); + + /// \brief Delete a set of data files from the table with their respective + /// delete files. + /// + /// \param data_files_to_delete The data files to be deleted from the table. + /// \param delete_files_to_delete The delete files corresponding to the data + /// files to be deleted from the table. + /// \return This OverwriteFiles for method chaining. + OverwriteFiles& DeleteFiles(const DataFileSet& data_files_to_delete, + const DeleteFileSet& delete_files_to_delete); + + /// \brief Delete files that match an expression on data rows from the table. + /// + /// A file is selected to be deleted by the expression if it could contain any + /// rows that match the expression. Candidate files are selected using an + /// inclusive partition projection. These candidate files are deleted if all of + /// the rows in the file must match the expression, determined by the + /// expression's strict partition projection. This guarantees that files are + /// deleted if and only if all rows in the file must match the expression. + /// + /// Files that may contain some rows that match the expression and some rows + /// that do not will result in a validation error. + /// + /// \param expr An expression on rows in the table. + /// \return This OverwriteFiles for method chaining. + OverwriteFiles& OverwriteByRowFilter(std::shared_ptr expr); + + /// \brief Set the snapshot ID used in any reads for this operation. + /// + /// Validations check changes after this snapshot ID. If the from snapshot is + /// not set, all ancestor snapshots through the table's initial snapshot are + /// validated. + /// + /// \param snapshot_id A snapshot ID. + /// \return This OverwriteFiles for method chaining. + OverwriteFiles& ValidateFromSnapshot(int64_t snapshot_id); + + /// \brief Set a conflict detection filter used to validate concurrently added + /// data and delete files. + /// + /// \param expr An expression on rows in the table. + /// \return This OverwriteFiles for method chaining. + OverwriteFiles& ConflictDetectionFilter(std::shared_ptr expr); + + /// \brief Enable validation that data added concurrently does not conflict + /// with this commit's operation. + /// + /// This method should be called while committing non-idempotent overwrite + /// operations. If a concurrent operation commits a new file after the data was + /// read and that file might contain rows matching the specified conflict + /// detection filter, the overwrite operation detects this and fails. + /// + /// Calling this method with a correct conflict detection filter is required to + /// maintain isolation for non-idempotent overwrite operations. + /// + /// Validation uses the conflict detection filter passed to + /// ConflictDetectionFilter() and applies to operations that happened after the + /// snapshot passed to ValidateFromSnapshot(). If the conflict detection filter + /// is not set, any new data added concurrently will fail this overwrite + /// operation. + /// + /// \return This OverwriteFiles for method chaining. + OverwriteFiles& ValidateNoConflictingData(); + + /// \brief Enable validation that deletes that happened concurrently do not + /// conflict with this commit's operation. + /// + /// Validating concurrent deletes is required during non-idempotent overwrite + /// operations. If a concurrent operation deletes data in one of the files being + /// overwritten, the overwrite operation must be aborted as it may undelete rows + /// that were removed concurrently. + /// + /// Calling this method with a correct conflict detection filter is required to + /// maintain isolation for non-idempotent overwrite operations. + /// + /// Validation uses the conflict detection filter passed to + /// ConflictDetectionFilter() and applies to operations that happened after the + /// snapshot passed to ValidateFromSnapshot(). If the conflict detection filter + /// is not set, this operation will use the row filter provided in + /// OverwriteByRowFilter() to check for new delete files and will ensure there + /// are no conflicting deletes for data files removed via DeleteFile(). + /// + /// \return This OverwriteFiles for method chaining. + OverwriteFiles& ValidateNoConflictingDeletes(); + + /// \brief Signal that each file added to the table must match the overwrite + /// expression. + /// + /// If this method is called, each added file is validated on commit to ensure + /// that it matches the overwrite row filter. This is used to ensure that + /// writes are idempotent: files cannot be added during a commit that would not + /// be removed if the operation were run a second time. + /// + /// \return This OverwriteFiles for method chaining. + OverwriteFiles& ValidateAddedFilesMatchOverwriteFilter(); + + /// \brief Enable or disable case-sensitive expression binding for validations + /// that accept expressions. + /// + /// \param case_sensitive Whether expression binding should be case sensitive. + /// \return This OverwriteFiles for method chaining. + OverwriteFiles& CaseSensitive(bool case_sensitive); + + std::string operation() override; + + protected: + Status Validate(const TableMetadata& current_metadata, + const std::shared_ptr& snapshot) override; + + private: + explicit OverwriteFiles(std::string table_name, + std::shared_ptr ctx); + + /// \brief Select the conflict-detection filter from the configured state. + std::shared_ptr DataConflictDetectionFilter() const; + + DataFileSet deleted_data_files_; + bool validate_added_files_match_overwrite_filter_ = false; + std::optional starting_snapshot_id_; + std::shared_ptr conflict_detection_filter_; + bool validate_new_data_files_ = false; + bool validate_new_deletes_ = false; +}; + +} // namespace iceberg diff --git a/src/iceberg/update/row_delta.h b/src/iceberg/update/row_delta.h index ddb54d836..bf58a048c 100644 --- a/src/iceberg/update/row_delta.h +++ b/src/iceberg/update/row_delta.h @@ -39,7 +39,7 @@ namespace iceberg { /// \brief API for encoding row-level changes to a table. /// /// This API accumulates data and delete file changes, produces a new Snapshot -/// of the table, and commits that snapshot as current. +/// of the table, and commits that snapshot as the current. /// /// When committing, these changes are applied to the latest table snapshot. /// Commit conflicts are resolved by applying the changes to the new latest @@ -50,25 +50,25 @@ class ICEBERG_EXPORT RowDelta : public MergingSnapshotUpdate { static Result> Make(std::string table_name, std::shared_ptr ctx); - /// \brief Add a data file to the table. + /// \brief Add a DataFile to the table. /// /// \param inserts A data file of rows to insert. /// \return This RowDelta for method chaining. RowDelta& AddRows(const std::shared_ptr& inserts); - /// \brief Add a delete file to the table. + /// \brief Add a DeleteFile to the table. /// /// \param deletes A delete file of rows to delete. /// \return This RowDelta for method chaining. RowDelta& AddDeletes(const std::shared_ptr& deletes); - /// \brief Remove a data file from the table. + /// \brief Remove a DataFile from the table. /// /// \param file A data file. /// \return This RowDelta for method chaining. RowDelta& RemoveRows(const std::shared_ptr& file); - /// \brief Remove a rewritten delete file from the table. + /// \brief Remove a rewritten DeleteFile from the table. /// /// \param deletes A delete file that can be removed from the table. /// \return This RowDelta for method chaining. @@ -114,7 +114,8 @@ class ICEBERG_EXPORT RowDelta : public MergingSnapshotUpdate { /// \return This RowDelta for method chaining. RowDelta& ValidateDeletedFiles(); - /// \brief Set a conflict detection filter used to validate added files. + /// \brief Set a conflict detection filter used to validate concurrently added + /// data and delete files. /// /// If not called, a true literal is used as the conflict detection filter. /// diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index 7397034b3..b2c92bfb1 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -36,10 +36,11 @@ namespace iceberg { -/// \brief Base class for operations that produce snapshots. +/// \brief API for table changes that produce snapshots. /// -/// This class provides common functionality for creating new snapshots, -/// including manifest list writing and cleanup. +/// This class contains common methods for all updates that create a new table +/// Snapshot. It also provides the shared implementation for snapshot ID +/// assignment, manifest list writing, retry-safe apply, and cleanup. class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { public: /// \brief Result of applying a snapshot update @@ -56,9 +57,9 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { /// \brief Set a callback to delete files instead of the table's default. /// - /// \param delete_func A function used to delete file locations - /// \return Reference to this for method chaining - /// \note Cannot be called more than once + /// \param delete_func A function used to delete file locations. + /// \return This update for method chaining. + /// \note Cannot be called more than once. auto& DeleteWith(this auto& self, std::function delete_func) { if (self.delete_func_) { @@ -69,18 +70,21 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { return self; } - /// \brief Stage a snapshot in table metadata, but not update the current snapshot id. + /// \brief Stage a snapshot in table metadata, but do not make it current. /// - /// \return Reference to this for method chaining + /// The snapshot is assigned an ID and added to table metadata. The table's + /// current snapshot ID is not updated. + /// + /// \return This update for method chaining. auto& StageOnly(this auto& self) { self.stage_only_ = true; return self; } - /// \brief Perform operations on a particular branch + /// \brief Perform operations on a particular branch. /// - /// \param branch Which is name of SnapshotRef of type branch - /// \return Reference to this for method chaining + /// \param branch The name of a SnapshotRef of type branch. + /// \return This update for method chaining. auto& SetTargetBranch(this auto& self, const std::string& branch) { if (branch.empty()) [[unlikely]] { return self.AddError(ErrorKind::kInvalidArgument, "Branch name cannot be empty"); @@ -99,11 +103,11 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { return self; } - /// \brief Set a summary property. + /// \brief Set a summary property in the snapshot produced by this update. /// - /// \param property The property name - /// \param value The property value - /// \return Reference to this for method chaining + /// \param property A String property name. + /// \param value A String property value. + /// \return This update for method chaining. auto& Set(this auto& self, const std::string& property, const std::string& value) { static_cast(self).SetSummaryProperty(property, value); return self; @@ -111,11 +115,12 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { /// \brief Apply the update's changes to create a new snapshot. /// - /// This method validates the changes, applies them to the metadata, - /// and creates a new snapshot without committing it. The snapshot - /// is stored internally and can be accessed after Apply() succeeds. + /// This method validates the changes, applies them to the current base + /// metadata, and creates a new snapshot without committing it. Commit retries + /// call Apply() again with refreshed metadata so the same changes can be + /// applied to the new latest snapshot. /// - /// \return A result containing the new snapshot, or an error + /// \return A result containing the new snapshot, or an error. Result Apply(); /// \brief Finalize the snapshot update, cleaning up any uncommitted files.