From e5170ff5bd035daeb95e49c228513e4f3f302703 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 30 Jun 2026 14:33:24 +0800 Subject: [PATCH] feat: retry stale v3 snapshot row-lineage validation Add a retryable validation error kind and use it for add-snapshot stale sequence-number and stale first-row-id checks, matching Java Iceberg's RetryableValidationException behavior. Include the new retryable validation kind in commit retry policy, while preserving normal validation failures for mixed/non-retryable builder errors. Add focused v3 row-lineage tests for multi-file assignment, branch commits, retry reassignment, stale snapshot validation, and delete-manifest null first_row_id handling. --- src/iceberg/result.h | 2 + src/iceberg/table_metadata.cc | 22 +-- .../test/manifest_list_versions_test.cc | 9 ++ .../test/merging_snapshot_update_test.cc | 136 +++++++++++++++++- src/iceberg/test/retry_util_test.cc | 21 +++ src/iceberg/test/table_update_test.cc | 70 +++++++++ src/iceberg/util/error_collector.h | 10 +- src/iceberg/util/retry_util.h | 3 +- 8 files changed, 256 insertions(+), 17 deletions(-) diff --git a/src/iceberg/result.h b/src/iceberg/result.h index 8765f852a..4b4d0e7fb 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -61,6 +61,7 @@ enum class ErrorKind { kNotImplemented, kNotSupported, kRestError, + kRetryableValidationFailed, kServiceUnavailable, kTokenExpired, kUnknownError, @@ -127,6 +128,7 @@ DEFINE_ERROR_FUNCTION(NotFound) DEFINE_ERROR_FUNCTION(NotImplemented) DEFINE_ERROR_FUNCTION(NotSupported) DEFINE_ERROR_FUNCTION(RestError) +DEFINE_ERROR_FUNCTION(RetryableValidationFailed) DEFINE_ERROR_FUNCTION(ServiceUnavailable) DEFINE_ERROR_FUNCTION(TokenExpired) DEFINE_ERROR_FUNCTION(UnknownError) diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index ef2f1bf38..83ae47351 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -1071,12 +1071,13 @@ Status TableMetadataBuilder::Impl::AddSnapshot(std::shared_ptr snapsho "Attempting to add a snapshot before a sort order is added"); ICEBERG_CHECK(!snapshots_by_id_.contains(snapshot->snapshot_id), "Snapshot already exists for id: {}", snapshot->snapshot_id); - ICEBERG_CHECK( - metadata_.format_version == 1 || - snapshot->sequence_number > metadata_.last_sequence_number || - !snapshot->parent_snapshot_id.has_value(), - "Cannot add snapshot with sequence number {} older than last sequence number {}", - snapshot->sequence_number, metadata_.last_sequence_number); + if (metadata_.format_version != 1 && + snapshot->sequence_number <= metadata_.last_sequence_number && + snapshot->parent_snapshot_id.has_value()) { + return RetryableValidationFailed( + "Cannot add snapshot with sequence number {} older than last sequence number {}", + snapshot->sequence_number, metadata_.last_sequence_number); + } metadata_.last_sequence_number = snapshot->sequence_number; metadata_.snapshots.push_back(snapshot); @@ -1087,10 +1088,11 @@ Status TableMetadataBuilder::Impl::AddSnapshot(std::shared_ptr snapsho ICEBERG_ASSIGN_OR_RAISE(auto first_row_id, snapshot->FirstRowId()); ICEBERG_CHECK(first_row_id.has_value(), "Cannot add a snapshot: first-row-id is null"); - ICEBERG_CHECK( - first_row_id.value() >= metadata_.next_row_id, - "Cannot add a snapshot, first-row-id is behind table next-row-id: {} < {}", - first_row_id.value(), metadata_.next_row_id); + if (first_row_id.value() < metadata_.next_row_id) { + return RetryableValidationFailed( + "Cannot add a snapshot, first-row-id is behind table next-row-id: {} < {}", + first_row_id.value(), metadata_.next_row_id); + } ICEBERG_ASSIGN_OR_RAISE(auto add_rows, snapshot->AddedRows()); ICEBERG_CHECK(add_rows.has_value(), "Cannot add a snapshot: added-rows is null"); diff --git a/src/iceberg/test/manifest_list_versions_test.cc b/src/iceberg/test/manifest_list_versions_test.cc index b173d56e7..ce0eb9da1 100644 --- a/src/iceberg/test/manifest_list_versions_test.cc +++ b/src/iceberg/test/manifest_list_versions_test.cc @@ -328,6 +328,15 @@ TEST_F(TestManifestListVersions, TestV3WriteMixedRowIdAssignment) { std::make_optional(kSnapshotFirstRowId + kAddedRows + kExistingRows)); } +TEST_F(TestManifestListVersions, TestV3DeleteRowIdNull) { + const auto manifest_list_path = + WriteManifestList(/*format_version=*/3, kSnapshotFirstRowId, {kDeleteManifest}); + + auto manifest = ReadManifestList(manifest_list_path); + EXPECT_EQ(manifest.content, ManifestContent::kDeletes); + EXPECT_FALSE(manifest.first_row_id.has_value()); +} + TEST_F(TestManifestListVersions, TestV1ForwardCompatibility) { std::string manifest_list_path = WriteManifestList(/*format_version=*/1, kSnapshotFirstRowId, {kTestManifest}); diff --git a/src/iceberg/test/merging_snapshot_update_test.cc b/src/iceberg/test/merging_snapshot_update_test.cc index 9841d74a9..e562f43d0 100644 --- a/src/iceberg/test/merging_snapshot_update_test.cc +++ b/src/iceberg/test/merging_snapshot_update_test.cc @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -45,9 +46,12 @@ #include "iceberg/table_properties.h" #include "iceberg/test/executor.h" #include "iceberg/test/matchers.h" +#include "iceberg/test/retry.h" #include "iceberg/test/update_test_base.h" #include "iceberg/transaction.h" #include "iceberg/update/fast_append.h" +#include "iceberg/update/merge_append.h" +#include "iceberg/update/snapshot_manager.h" #include "iceberg/update/update_properties.h" #include "iceberg/util/macros.h" @@ -276,6 +280,13 @@ class MergingSnapshotUpdateTest : public MinimalUpdateTestBase { return TestOverwriteUpdate::Make(TableName(), table_); } + void UpgradeTableToV3() { + ICEBERG_UNWRAP_OR_FAIL(auto props, table_->NewUpdateProperties()); + props->Set(TableProperties::kFormatVersion.key(), "3"); + EXPECT_THAT(props->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + void SetTableFormatVersion(int8_t format_version) { table_->metadata()->format_version = format_version; } @@ -311,6 +322,22 @@ class MergingSnapshotUpdateTest : public MinimalUpdateTestBase { return result; } + Result>> DataFileFirstRowIds( + const std::shared_ptr& snapshot, const TableMetadata& metadata) { + SnapshotCache snapshot_cache(snapshot.get()); + ICEBERG_ASSIGN_OR_RAISE(auto manifest_range, snapshot_cache.DataManifests(file_io_)); + std::vector manifests(manifest_range.begin(), manifest_range.end()); + ICEBERG_ASSIGN_OR_RAISE(auto entries, ReadAllEntries(manifests, metadata)); + + std::unordered_map> first_row_ids; + for (const auto& entry : entries) { + if (entry.data_file != nullptr) { + first_row_ids.emplace(entry.data_file->file_path, entry.data_file->first_row_id); + } + } + return first_row_ids; + } + // Write a manifest file containing the given data files. // Returns a ManifestFile with added_snapshot_id = kInvalidSnapshotId so it // is eligible for snapshot ID inheritance. @@ -448,10 +475,7 @@ TEST_F(MergingSnapshotUpdateTest, CommitNewDataFile) { } TEST_F(MergingSnapshotUpdateTest, CommitV3NewDataFileAssignsRowLineage) { - ICEBERG_UNWRAP_OR_FAIL(auto props, table_->NewUpdateProperties()); - props->Set(TableProperties::kFormatVersion.key(), "3"); - EXPECT_THAT(props->Commit(), IsOk()); - EXPECT_THAT(table_->Refresh(), IsOk()); + UpgradeTableToV3(); ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend()); EXPECT_THAT(op->AddFile(file_a_), IsOk()); @@ -466,6 +490,110 @@ TEST_F(MergingSnapshotUpdateTest, CommitV3NewDataFileAssignsRowLineage) { EXPECT_EQ(table_->metadata()->next_row_id, 100); } +TEST_F(MergingSnapshotUpdateTest, V3MultiFileRowIds) { + UpgradeTableToV3(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend()); + EXPECT_THAT(op->AddFile(file_a_), IsOk()); + EXPECT_THAT(op->AddFile(file_b_), IsOk()); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + ICEBERG_UNWRAP_OR_FAIL(auto first_row_id, snapshot->FirstRowId()); + ICEBERG_UNWRAP_OR_FAIL(auto added_rows, snapshot->AddedRows()); + EXPECT_EQ(first_row_id, std::make_optional(0)); + EXPECT_EQ(added_rows, std::make_optional(200)); + EXPECT_EQ(table_->metadata()->next_row_id, 200); + + ICEBERG_UNWRAP_OR_FAIL(auto first_row_ids, + DataFileFirstRowIds(snapshot, *table_->metadata())); + EXPECT_EQ(first_row_ids.at(file_a_->file_path), std::make_optional(0)); + EXPECT_EQ(first_row_ids.at(file_b_->file_path), std::make_optional(100)); +} + +TEST_F(MergingSnapshotUpdateTest, V3BranchRowIds) { + UpgradeTableToV3(); + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto starting_snapshot, table_->current_snapshot()); + const auto starting_snapshot_id = starting_snapshot->snapshot_id; + const auto starting_next_row_id = table_->metadata()->next_row_id; + + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("audit", starting_snapshot_id); + EXPECT_THAT(manager->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend()); + op->SetTargetBranch("audit"); + EXPECT_THAT(op->AddFile(file_b_), IsOk()); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + EXPECT_EQ(table_->metadata()->next_row_id, + starting_next_row_id + file_b_->record_count); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, table_->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, starting_snapshot_id); + + auto ref_it = table_->metadata()->refs.find("audit"); + ASSERT_NE(ref_it, table_->metadata()->refs.end()); + ICEBERG_UNWRAP_OR_FAIL(auto branch_snapshot, + table_->metadata()->SnapshotById(ref_it->second->snapshot_id)); + ICEBERG_UNWRAP_OR_FAIL(auto first_row_id, branch_snapshot->FirstRowId()); + ICEBERG_UNWRAP_OR_FAIL(auto added_rows, branch_snapshot->AddedRows()); + EXPECT_EQ(first_row_id, std::make_optional(starting_next_row_id)); + EXPECT_EQ(added_rows, std::make_optional(file_b_->record_count)); + + ICEBERG_UNWRAP_OR_FAIL(auto first_row_ids, + DataFileFirstRowIds(branch_snapshot, *table_->metadata())); + EXPECT_EQ(first_row_ids.at(file_a_->file_path), std::make_optional(0)); + EXPECT_EQ(first_row_ids.at(file_b_->file_path), + std::make_optional(starting_next_row_id)); +} + +// A staged v3 append must reassign row IDs when a concurrent commit advances next_row_id. +TEST_F(MergingSnapshotUpdateTest, V3RetryRowIds) { + UpgradeTableToV3(); + test::FakeRetryEnvironment fake_retry; + ScopedRetryTestHooks retry_hooks(fake_retry.hooks()); + + // Stage file_a_ with first_row_id = 0, but do not commit the transaction yet. + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto append, txn->NewMergeAppend()); + append->AppendFile(file_a_); + EXPECT_THAT(append->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto staged_snapshot, txn->current().Snapshot()); + ICEBERG_UNWRAP_OR_FAIL(auto staged_first_row_id, staged_snapshot->FirstRowId()); + EXPECT_EQ(staged_first_row_id, std::make_optional(0)); + + // Commit file_b_ first, advancing table next_row_id and making file_a_'s row IDs stale. + ICEBERG_UNWRAP_OR_FAIL(auto concurrent, table_->NewMergeAppend()); + concurrent->AppendFile(file_b_); + EXPECT_THAT(concurrent->Commit(), IsOk()); + auto after_concurrent = ReloadMetadata(); + EXPECT_EQ(after_concurrent->next_row_id, file_b_->record_count); + + // The original transaction must retry and reassign file_a_ after file_b_'s row IDs. + ICEBERG_UNWRAP_OR_FAIL(auto committed_table, txn->Commit()); + EXPECT_FALSE(fake_retry.sleep_durations().empty()); + EXPECT_EQ(committed_table->metadata()->next_row_id, + file_b_->record_count + file_a_->record_count); + + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, committed_table->current_snapshot()); + ICEBERG_UNWRAP_OR_FAIL(auto first_row_id, snapshot->FirstRowId()); + ICEBERG_UNWRAP_OR_FAIL(auto added_rows, snapshot->AddedRows()); + EXPECT_EQ(first_row_id, std::make_optional(file_b_->record_count)); + EXPECT_EQ(added_rows, std::make_optional(file_a_->record_count)); + + ICEBERG_UNWRAP_OR_FAIL(auto first_row_ids, + DataFileFirstRowIds(snapshot, *committed_table->metadata())); + EXPECT_EQ(first_row_ids.at(file_b_->file_path), std::make_optional(0)); + EXPECT_EQ(first_row_ids.at(file_a_->file_path), + std::make_optional(file_b_->record_count)); +} + TEST_F(MergingSnapshotUpdateTest, CommitMultipleDataFiles) { ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend()); EXPECT_THAT(op->AddFile(file_a_), IsOk()); diff --git a/src/iceberg/test/retry_util_test.cc b/src/iceberg/test/retry_util_test.cc index 1b2f35578..e9e237ef0 100644 --- a/src/iceberg/test/retry_util_test.cc +++ b/src/iceberg/test/retry_util_test.cc @@ -485,6 +485,27 @@ TEST(RetryRunnerTest, MakeCommitRetryRunnerRetriesCommitFailed) { EXPECT_EQ(attempts, 3); } +TEST(RetryRunnerTest, RetriesRetryableValidation) { + int call_count = 0; + int32_t attempts = 0; + + auto result = MakeCommitRetryRunner(3, 1, 10, 5000) + .Run( + [&]() -> Result { + ++call_count; + if (call_count <= 2) { + return RetryableValidationFailed("stale"); + } + return 99; + }, + &attempts); + + EXPECT_THAT(result, IsOk()); + EXPECT_EQ(*result, 99); + EXPECT_EQ(call_count, 3); + EXPECT_EQ(attempts, 3); +} + TEST(RetryRunnerTest, OnlyRetryOnMultipleErrorKinds) { int call_count = 0; int32_t attempts = 0; diff --git a/src/iceberg/test/table_update_test.cc b/src/iceberg/test/table_update_test.cc index be464f154..fd2de582c 100644 --- a/src/iceberg/test/table_update_test.cc +++ b/src/iceberg/test/table_update_test.cc @@ -460,4 +460,74 @@ TEST(TableUpdateTest, SetSnapshotRefRejectsTagForMainBranch) { EXPECT_THAT(result, HasErrorMessage("Cannot set main to a tag, it must be a branch")); } +TEST(TableUpdateTest, V3SnapshotRows) { + auto base = CreateBaseMetadata(); + base->format_version = 3; + base->next_row_id = 5; + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + ICEBERG_UNWRAP_OR_FAIL( + auto snapshot, + Snapshot::Make(/*sequence_number=*/1, /*snapshot_id=*/123, + /*parent_snapshot_id=*/std::nullopt, TimePointMsFromUnixMs(2000000), + DataOperation::kAppend, + /*summary=*/{}, base->current_schema_id, + "s3://bucket/manifest-list.avro", + /*first_row_id=*/5, + /*added_rows=*/7)); + table::AddSnapshot update(std::shared_ptr(std::move(snapshot))); + update.ApplyTo(*builder); + + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + EXPECT_EQ(metadata->next_row_id, 12); +} + +TEST(TableUpdateTest, V3StaleRowId) { + auto base = CreateBaseMetadata(); + base->format_version = 3; + base->next_row_id = 5; + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + ICEBERG_UNWRAP_OR_FAIL( + auto snapshot, + Snapshot::Make(/*sequence_number=*/1, /*snapshot_id=*/123, + /*parent_snapshot_id=*/std::nullopt, TimePointMsFromUnixMs(2000000), + DataOperation::kAppend, + /*summary=*/{}, base->current_schema_id, + "s3://bucket/manifest-list.avro", + /*first_row_id=*/4, + /*added_rows=*/1)); + table::AddSnapshot update(std::shared_ptr(std::move(snapshot))); + update.ApplyTo(*builder); + + auto result = builder->Build(); + ASSERT_THAT(result, IsError(ErrorKind::kRetryableValidationFailed)); + EXPECT_THAT( + result, + HasErrorMessage("Cannot add a snapshot, first-row-id is behind table next-row-id")); +} + +TEST(TableUpdateTest, V3StaleSequence) { + auto base = CreateBaseMetadata(); + base->format_version = 3; + base->last_sequence_number = 5; + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + + ICEBERG_UNWRAP_OR_FAIL( + auto snapshot, + Snapshot::Make(/*sequence_number=*/5, /*snapshot_id=*/123, + /*parent_snapshot_id=*/1, TimePointMsFromUnixMs(2000000), + DataOperation::kAppend, + /*summary=*/{}, base->current_schema_id, + "s3://bucket/manifest-list.avro", + /*first_row_id=*/0, + /*added_rows=*/1)); + table::AddSnapshot update(std::shared_ptr(std::move(snapshot))); + update.ApplyTo(*builder); + + auto result = builder->Build(); + ASSERT_THAT(result, IsError(ErrorKind::kRetryableValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Cannot add snapshot with sequence number")); +} + } // namespace iceberg diff --git a/src/iceberg/util/error_collector.h b/src/iceberg/util/error_collector.h index f07e127a9..7e8ab218b 100644 --- a/src/iceberg/util/error_collector.h +++ b/src/iceberg/util/error_collector.h @@ -159,14 +159,20 @@ class ICEBERG_EXPORT ErrorCollector { /// in Build(), Apply(), or Commit() methods) to validate that no errors /// were accumulated during the builder method calls. /// - /// \return Status indicating success if no errors, or a ValidationFailed - /// error with all accumulated error messages + /// \return Status indicating success if no errors, a RetryableValidationFailed if + /// all accumulated errors are retryable validations, or a ValidationFailed + /// error with all accumulated error messages otherwise [[nodiscard]] Status CheckErrors() const { if (!errors_.empty()) { std::string error_msg = "Validation failed due to the following errors:\n"; + bool all_retryable = true; for (const auto& [kind, message] : errors_) { + all_retryable &= kind == ErrorKind::kRetryableValidationFailed; error_msg += " - " + message + "\n"; } + if (all_retryable) { + return RetryableValidationFailed("{}", error_msg); + } return ValidationFailed("{}", error_msg); } return {}; diff --git a/src/iceberg/util/retry_util.h b/src/iceberg/util/retry_util.h index 656213976..08e3fb81d 100644 --- a/src/iceberg/util/retry_util.h +++ b/src/iceberg/util/retry_util.h @@ -198,7 +198,8 @@ class RetryRunner : private detail::RetryRunnerBase { ICEBERG_EXPORT inline auto MakeCommitRetryRunner(int32_t num_retries, int32_t min_wait_ms, int32_t max_wait_ms, int32_t total_timeout_ms) { - return RetryRunner>( + return RetryRunner>( RetryConfig{.num_retries = num_retries, .min_wait_ms = min_wait_ms, .max_wait_ms = max_wait_ms,