Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/iceberg/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ enum class ErrorKind {
kNotImplemented,
kNotSupported,
kRestError,
kRetryableValidationFailed,
kServiceUnavailable,
kTokenExpired,
kUnknownError,
Expand Down Expand Up @@ -127,6 +128,7 @@ DEFINE_ERROR_FUNCTION(NotFound)
DEFINE_ERROR_FUNCTION(NotImplemented)
DEFINE_ERROR_FUNCTION(NotSupported)
DEFINE_ERROR_FUNCTION(RestError)
DEFINE_ERROR_FUNCTION(RetryableValidationFailed)
DEFINE_ERROR_FUNCTION(ServiceUnavailable)
DEFINE_ERROR_FUNCTION(TokenExpired)
DEFINE_ERROR_FUNCTION(UnknownError)
Expand Down
22 changes: 12 additions & 10 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1071,12 +1071,13 @@ Status TableMetadataBuilder::Impl::AddSnapshot(std::shared_ptr<Snapshot> snapsho
"Attempting to add a snapshot before a sort order is added");
ICEBERG_CHECK(!snapshots_by_id_.contains(snapshot->snapshot_id),
"Snapshot already exists for id: {}", snapshot->snapshot_id);
ICEBERG_CHECK(
metadata_.format_version == 1 ||
snapshot->sequence_number > metadata_.last_sequence_number ||
!snapshot->parent_snapshot_id.has_value(),
"Cannot add snapshot with sequence number {} older than last sequence number {}",
snapshot->sequence_number, metadata_.last_sequence_number);
if (metadata_.format_version != 1 &&
snapshot->sequence_number <= metadata_.last_sequence_number &&
snapshot->parent_snapshot_id.has_value()) {
return RetryableValidationFailed(
"Cannot add snapshot with sequence number {} older than last sequence number {}",
snapshot->sequence_number, metadata_.last_sequence_number);
}

metadata_.last_sequence_number = snapshot->sequence_number;
metadata_.snapshots.push_back(snapshot);
Expand All @@ -1087,10 +1088,11 @@ Status TableMetadataBuilder::Impl::AddSnapshot(std::shared_ptr<Snapshot> snapsho
ICEBERG_ASSIGN_OR_RAISE(auto first_row_id, snapshot->FirstRowId());
ICEBERG_CHECK(first_row_id.has_value(),
"Cannot add a snapshot: first-row-id is null");
ICEBERG_CHECK(
first_row_id.value() >= metadata_.next_row_id,
"Cannot add a snapshot, first-row-id is behind table next-row-id: {} < {}",
first_row_id.value(), metadata_.next_row_id);
if (first_row_id.value() < metadata_.next_row_id) {
return RetryableValidationFailed(
"Cannot add a snapshot, first-row-id is behind table next-row-id: {} < {}",
first_row_id.value(), metadata_.next_row_id);
}

ICEBERG_ASSIGN_OR_RAISE(auto add_rows, snapshot->AddedRows());
ICEBERG_CHECK(add_rows.has_value(), "Cannot add a snapshot: added-rows is null");
Expand Down
9 changes: 9 additions & 0 deletions src/iceberg/test/manifest_list_versions_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,15 @@ TEST_F(TestManifestListVersions, TestV3WriteMixedRowIdAssignment) {
std::make_optional(kSnapshotFirstRowId + kAddedRows + kExistingRows));
}

TEST_F(TestManifestListVersions, TestV3DeleteRowIdNull) {
const auto manifest_list_path =
WriteManifestList(/*format_version=*/3, kSnapshotFirstRowId, {kDeleteManifest});

auto manifest = ReadManifestList(manifest_list_path);
EXPECT_EQ(manifest.content, ManifestContent::kDeletes);
EXPECT_FALSE(manifest.first_row_id.has_value());
}

TEST_F(TestManifestListVersions, TestV1ForwardCompatibility) {
std::string manifest_list_path =
WriteManifestList(/*format_version=*/1, kSnapshotFirstRowId, {kTestManifest});
Expand Down
136 changes: 132 additions & 4 deletions src/iceberg/test/merging_snapshot_update_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <optional>
#include <ranges>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

Expand All @@ -45,9 +46,12 @@
#include "iceberg/table_properties.h"
#include "iceberg/test/executor.h"
#include "iceberg/test/matchers.h"
#include "iceberg/test/retry.h"
#include "iceberg/test/update_test_base.h"
#include "iceberg/transaction.h"
#include "iceberg/update/fast_append.h"
#include "iceberg/update/merge_append.h"
#include "iceberg/update/snapshot_manager.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/util/macros.h"

Expand Down Expand Up @@ -276,6 +280,13 @@ class MergingSnapshotUpdateTest : public MinimalUpdateTestBase {
return TestOverwriteUpdate::Make(TableName(), table_);
}

void UpgradeTableToV3() {
ICEBERG_UNWRAP_OR_FAIL(auto props, table_->NewUpdateProperties());
props->Set(TableProperties::kFormatVersion.key(), "3");
EXPECT_THAT(props->Commit(), IsOk());
EXPECT_THAT(table_->Refresh(), IsOk());
}

void SetTableFormatVersion(int8_t format_version) {
table_->metadata()->format_version = format_version;
}
Expand Down Expand Up @@ -311,6 +322,22 @@ class MergingSnapshotUpdateTest : public MinimalUpdateTestBase {
return result;
}

Result<std::unordered_map<std::string, std::optional<int64_t>>> DataFileFirstRowIds(
const std::shared_ptr<Snapshot>& snapshot, const TableMetadata& metadata) {
SnapshotCache snapshot_cache(snapshot.get());
ICEBERG_ASSIGN_OR_RAISE(auto manifest_range, snapshot_cache.DataManifests(file_io_));
std::vector<ManifestFile> manifests(manifest_range.begin(), manifest_range.end());
ICEBERG_ASSIGN_OR_RAISE(auto entries, ReadAllEntries(manifests, metadata));

std::unordered_map<std::string, std::optional<int64_t>> first_row_ids;
for (const auto& entry : entries) {
if (entry.data_file != nullptr) {
first_row_ids.emplace(entry.data_file->file_path, entry.data_file->first_row_id);
}
}
return first_row_ids;
}

// Write a manifest file containing the given data files.
// Returns a ManifestFile with added_snapshot_id = kInvalidSnapshotId so it
// is eligible for snapshot ID inheritance.
Expand Down Expand Up @@ -448,10 +475,7 @@ TEST_F(MergingSnapshotUpdateTest, CommitNewDataFile) {
}

TEST_F(MergingSnapshotUpdateTest, CommitV3NewDataFileAssignsRowLineage) {
ICEBERG_UNWRAP_OR_FAIL(auto props, table_->NewUpdateProperties());
props->Set(TableProperties::kFormatVersion.key(), "3");
EXPECT_THAT(props->Commit(), IsOk());
EXPECT_THAT(table_->Refresh(), IsOk());
UpgradeTableToV3();

ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
EXPECT_THAT(op->AddFile(file_a_), IsOk());
Expand All @@ -466,6 +490,110 @@ TEST_F(MergingSnapshotUpdateTest, CommitV3NewDataFileAssignsRowLineage) {
EXPECT_EQ(table_->metadata()->next_row_id, 100);
}

TEST_F(MergingSnapshotUpdateTest, V3MultiFileRowIds) {
UpgradeTableToV3();

ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
EXPECT_THAT(op->AddFile(file_a_), IsOk());
EXPECT_THAT(op->AddFile(file_b_), IsOk());
EXPECT_THAT(op->Commit(), IsOk());

EXPECT_THAT(table_->Refresh(), IsOk());
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
ICEBERG_UNWRAP_OR_FAIL(auto first_row_id, snapshot->FirstRowId());
ICEBERG_UNWRAP_OR_FAIL(auto added_rows, snapshot->AddedRows());
EXPECT_EQ(first_row_id, std::make_optional<int64_t>(0));
EXPECT_EQ(added_rows, std::make_optional<int64_t>(200));
EXPECT_EQ(table_->metadata()->next_row_id, 200);

ICEBERG_UNWRAP_OR_FAIL(auto first_row_ids,
DataFileFirstRowIds(snapshot, *table_->metadata()));
EXPECT_EQ(first_row_ids.at(file_a_->file_path), std::make_optional<int64_t>(0));
EXPECT_EQ(first_row_ids.at(file_b_->file_path), std::make_optional<int64_t>(100));
}

TEST_F(MergingSnapshotUpdateTest, V3BranchRowIds) {
UpgradeTableToV3();
CommitFileA();

ICEBERG_UNWRAP_OR_FAIL(auto starting_snapshot, table_->current_snapshot());
const auto starting_snapshot_id = starting_snapshot->snapshot_id;
const auto starting_next_row_id = table_->metadata()->next_row_id;

ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
manager->CreateBranch("audit", starting_snapshot_id);
EXPECT_THAT(manager->Commit(), IsOk());
EXPECT_THAT(table_->Refresh(), IsOk());

ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
op->SetTargetBranch("audit");
EXPECT_THAT(op->AddFile(file_b_), IsOk());
EXPECT_THAT(op->Commit(), IsOk());

EXPECT_THAT(table_->Refresh(), IsOk());
EXPECT_EQ(table_->metadata()->next_row_id,
starting_next_row_id + file_b_->record_count);
ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, table_->current_snapshot());
EXPECT_EQ(current_snapshot->snapshot_id, starting_snapshot_id);

auto ref_it = table_->metadata()->refs.find("audit");
ASSERT_NE(ref_it, table_->metadata()->refs.end());
ICEBERG_UNWRAP_OR_FAIL(auto branch_snapshot,
table_->metadata()->SnapshotById(ref_it->second->snapshot_id));
ICEBERG_UNWRAP_OR_FAIL(auto first_row_id, branch_snapshot->FirstRowId());
ICEBERG_UNWRAP_OR_FAIL(auto added_rows, branch_snapshot->AddedRows());
EXPECT_EQ(first_row_id, std::make_optional(starting_next_row_id));
EXPECT_EQ(added_rows, std::make_optional(file_b_->record_count));

ICEBERG_UNWRAP_OR_FAIL(auto first_row_ids,
DataFileFirstRowIds(branch_snapshot, *table_->metadata()));
EXPECT_EQ(first_row_ids.at(file_a_->file_path), std::make_optional<int64_t>(0));
EXPECT_EQ(first_row_ids.at(file_b_->file_path),
std::make_optional(starting_next_row_id));
}

// A staged v3 append must reassign row IDs when a concurrent commit advances next_row_id.
TEST_F(MergingSnapshotUpdateTest, V3RetryRowIds) {
UpgradeTableToV3();
test::FakeRetryEnvironment fake_retry;
ScopedRetryTestHooks retry_hooks(fake_retry.hooks());

// Stage file_a_ with first_row_id = 0, but do not commit the transaction yet.
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
ICEBERG_UNWRAP_OR_FAIL(auto append, txn->NewMergeAppend());
append->AppendFile(file_a_);
EXPECT_THAT(append->Commit(), IsOk());

ICEBERG_UNWRAP_OR_FAIL(auto staged_snapshot, txn->current().Snapshot());
ICEBERG_UNWRAP_OR_FAIL(auto staged_first_row_id, staged_snapshot->FirstRowId());
EXPECT_EQ(staged_first_row_id, std::make_optional<int64_t>(0));

// Commit file_b_ first, advancing table next_row_id and making file_a_'s row IDs stale.
ICEBERG_UNWRAP_OR_FAIL(auto concurrent, table_->NewMergeAppend());
concurrent->AppendFile(file_b_);
EXPECT_THAT(concurrent->Commit(), IsOk());
auto after_concurrent = ReloadMetadata();
EXPECT_EQ(after_concurrent->next_row_id, file_b_->record_count);

// The original transaction must retry and reassign file_a_ after file_b_'s row IDs.
ICEBERG_UNWRAP_OR_FAIL(auto committed_table, txn->Commit());
EXPECT_FALSE(fake_retry.sleep_durations().empty());
EXPECT_EQ(committed_table->metadata()->next_row_id,
file_b_->record_count + file_a_->record_count);

ICEBERG_UNWRAP_OR_FAIL(auto snapshot, committed_table->current_snapshot());
ICEBERG_UNWRAP_OR_FAIL(auto first_row_id, snapshot->FirstRowId());
ICEBERG_UNWRAP_OR_FAIL(auto added_rows, snapshot->AddedRows());
EXPECT_EQ(first_row_id, std::make_optional(file_b_->record_count));
EXPECT_EQ(added_rows, std::make_optional(file_a_->record_count));

ICEBERG_UNWRAP_OR_FAIL(auto first_row_ids,
DataFileFirstRowIds(snapshot, *committed_table->metadata()));
EXPECT_EQ(first_row_ids.at(file_b_->file_path), std::make_optional<int64_t>(0));
EXPECT_EQ(first_row_ids.at(file_a_->file_path),
std::make_optional(file_b_->record_count));
}

TEST_F(MergingSnapshotUpdateTest, CommitMultipleDataFiles) {
ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
EXPECT_THAT(op->AddFile(file_a_), IsOk());
Expand Down
21 changes: 21 additions & 0 deletions src/iceberg/test/retry_util_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,27 @@ TEST(RetryRunnerTest, MakeCommitRetryRunnerRetriesCommitFailed) {
EXPECT_EQ(attempts, 3);
}

TEST(RetryRunnerTest, RetriesRetryableValidation) {
int call_count = 0;
int32_t attempts = 0;

auto result = MakeCommitRetryRunner(3, 1, 10, 5000)
.Run(
[&]() -> Result<int> {
++call_count;
if (call_count <= 2) {
return RetryableValidationFailed("stale");
}
return 99;
},
&attempts);

EXPECT_THAT(result, IsOk());
EXPECT_EQ(*result, 99);
EXPECT_EQ(call_count, 3);
EXPECT_EQ(attempts, 3);
}

TEST(RetryRunnerTest, OnlyRetryOnMultipleErrorKinds) {
int call_count = 0;
int32_t attempts = 0;
Expand Down
70 changes: 70 additions & 0 deletions src/iceberg/test/table_update_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,4 +460,74 @@ TEST(TableUpdateTest, SetSnapshotRefRejectsTagForMainBranch) {
EXPECT_THAT(result, HasErrorMessage("Cannot set main to a tag, it must be a branch"));
}

TEST(TableUpdateTest, V3SnapshotRows) {
auto base = CreateBaseMetadata();
base->format_version = 3;
base->next_row_id = 5;
auto builder = TableMetadataBuilder::BuildFrom(base.get());

ICEBERG_UNWRAP_OR_FAIL(
auto snapshot,
Snapshot::Make(/*sequence_number=*/1, /*snapshot_id=*/123,
/*parent_snapshot_id=*/std::nullopt, TimePointMsFromUnixMs(2000000),
DataOperation::kAppend,
/*summary=*/{}, base->current_schema_id,
"s3://bucket/manifest-list.avro",
/*first_row_id=*/5,
/*added_rows=*/7));
table::AddSnapshot update(std::shared_ptr<Snapshot>(std::move(snapshot)));
update.ApplyTo(*builder);

ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
EXPECT_EQ(metadata->next_row_id, 12);
}

TEST(TableUpdateTest, V3StaleRowId) {
auto base = CreateBaseMetadata();
base->format_version = 3;
base->next_row_id = 5;
auto builder = TableMetadataBuilder::BuildFrom(base.get());

ICEBERG_UNWRAP_OR_FAIL(
auto snapshot,
Snapshot::Make(/*sequence_number=*/1, /*snapshot_id=*/123,
/*parent_snapshot_id=*/std::nullopt, TimePointMsFromUnixMs(2000000),
DataOperation::kAppend,
/*summary=*/{}, base->current_schema_id,
"s3://bucket/manifest-list.avro",
/*first_row_id=*/4,
/*added_rows=*/1));
table::AddSnapshot update(std::shared_ptr<Snapshot>(std::move(snapshot)));
update.ApplyTo(*builder);

auto result = builder->Build();
ASSERT_THAT(result, IsError(ErrorKind::kRetryableValidationFailed));
EXPECT_THAT(
result,
HasErrorMessage("Cannot add a snapshot, first-row-id is behind table next-row-id"));
}

TEST(TableUpdateTest, V3StaleSequence) {
auto base = CreateBaseMetadata();
base->format_version = 3;
base->last_sequence_number = 5;
auto builder = TableMetadataBuilder::BuildFrom(base.get());

ICEBERG_UNWRAP_OR_FAIL(
auto snapshot,
Snapshot::Make(/*sequence_number=*/5, /*snapshot_id=*/123,
/*parent_snapshot_id=*/1, TimePointMsFromUnixMs(2000000),
DataOperation::kAppend,
/*summary=*/{}, base->current_schema_id,
"s3://bucket/manifest-list.avro",
/*first_row_id=*/0,
/*added_rows=*/1));
table::AddSnapshot update(std::shared_ptr<Snapshot>(std::move(snapshot)));
update.ApplyTo(*builder);

auto result = builder->Build();
ASSERT_THAT(result, IsError(ErrorKind::kRetryableValidationFailed));
EXPECT_THAT(result, HasErrorMessage("Cannot add snapshot with sequence number"));
}

} // namespace iceberg
10 changes: 8 additions & 2 deletions src/iceberg/util/error_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,20 @@ class ICEBERG_EXPORT ErrorCollector {
/// in Build(), Apply(), or Commit() methods) to validate that no errors
/// were accumulated during the builder method calls.
///
/// \return Status indicating success if no errors, or a ValidationFailed
/// error with all accumulated error messages
/// \return Status indicating success if no errors, a RetryableValidationFailed if
/// all accumulated errors are retryable validations, or a ValidationFailed
/// error with all accumulated error messages otherwise
[[nodiscard]] Status CheckErrors() const {
if (!errors_.empty()) {
std::string error_msg = "Validation failed due to the following errors:\n";
bool all_retryable = true;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering why we require all the errors to be retryable before treating the result as RetryableValidationFailed. Java impl throws a RetryableValidationException immediately when it encounters a retryable validation error, will this affect the behavior of lib users?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The all_retryable check is intentional because C++ builders accumulate errors, while Java is fail-fast. In Java, a RetryableValidationException stops evaluation immediately, so there is no mixed-error result to classify.

In C++, if we treated the result as retryable when only the first error is retryable, a chained builder call could retry even when another collected error is deterministic and cannot be fixed by refreshing metadata. Requiring all collected errors to be retryable keeps retry limited to cases where refreshing can plausibly fix the whole update.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I get it.

for (const auto& [kind, message] : errors_) {
all_retryable &= kind == ErrorKind::kRetryableValidationFailed;
error_msg += " - " + message + "\n";
}
if (all_retryable) {
return RetryableValidationFailed("{}", error_msg);
}
return ValidationFailed("{}", error_msg);
}
return {};
Expand Down
3 changes: 2 additions & 1 deletion src/iceberg/util/retry_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ class RetryRunner : private detail::RetryRunnerBase {
ICEBERG_EXPORT inline auto MakeCommitRetryRunner(int32_t num_retries, int32_t min_wait_ms,
int32_t max_wait_ms,
int32_t total_timeout_ms) {
return RetryRunner<retry::OnlyRetryOn<ErrorKind::kCommitFailed>>(
return RetryRunner<retry::OnlyRetryOn<ErrorKind::kCommitFailed,
ErrorKind::kRetryableValidationFailed>>(
RetryConfig{.num_retries = num_retries,
.min_wait_ms = min_wait_ms,
.max_wait_ms = max_wait_ms,
Expand Down
Loading