From a323e035df1094b734f631933716584611123f10 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Mon, 29 Jun 2026 11:22:18 +0800 Subject: [PATCH 1/2] feat: write snapshot v3 row lineage fields at top level --- src/iceberg/json_serde.cc | 60 +++++++++++++++++++- src/iceberg/snapshot.cc | 19 ++++--- src/iceberg/snapshot.h | 4 ++ src/iceberg/test/json_serde_test.cc | 85 ++++++++++++++++++++++++++--- 4 files changed, 151 insertions(+), 17 deletions(-) diff --git a/src/iceberg/json_serde.cc b/src/iceberg/json_serde.cc index c9b320ffe..6f8cda989 100644 --- a/src/iceberg/json_serde.cc +++ b/src/iceberg/json_serde.cc @@ -102,6 +102,8 @@ constexpr std::string_view kSequenceNumber = "sequence-number"; constexpr std::string_view kTimestampMs = "timestamp-ms"; constexpr std::string_view kManifestList = "manifest-list"; constexpr std::string_view kSummary = "summary"; +constexpr std::string_view kFirstRowId = "first-row-id"; +constexpr std::string_view kAddedRows = "added-rows"; constexpr std::string_view kMinSnapshotsToKeep = "min-snapshots-to-keep"; constexpr std::string_view kMaxSnapshotAgeMs = "max-snapshot-age-ms"; constexpr std::string_view kMaxRefAgeMs = "max-ref-age-ms"; @@ -459,9 +461,21 @@ nlohmann::json ToJson(const Snapshot& snapshot) { json[kManifestList] = snapshot.manifest_list; // If there is an operation, write the summary map if (snapshot.Operation().has_value()) { - json[kSummary] = snapshot.summary; + nlohmann::json summary_json; + for (const auto& [key, value] : snapshot.summary) { + if (key == SnapshotSummaryFields::kFirstRowId || + key == SnapshotSummaryFields::kAddedRows) { + continue; + } + summary_json[key] = value; + } + json[kSummary] = std::move(summary_json); } SetOptionalField(json, kSchemaId, snapshot.schema_id); + SetOptionalField(json, kFirstRowId, snapshot.first_row_id); + if (snapshot.first_row_id.has_value()) { + SetOptionalField(json, kAddedRows, snapshot.added_rows); + } return json; } @@ -808,12 +822,54 @@ Result> SnapshotFromJson(const nlohmann::json& json) { } } + auto parse_summary_int64 = + [&summary](const std::string& key) -> Result> { + auto it = summary.find(key); + if (it == summary.end()) { + return std::nullopt; + } + ICEBERG_ASSIGN_OR_RAISE(auto value, StringUtils::ParseNumber(it->second)); + return value; + }; + + ICEBERG_ASSIGN_OR_RAISE(auto first_row_id, + GetJsonValueOptional(json, kFirstRowId)); + if (!first_row_id.has_value()) { + ICEBERG_ASSIGN_OR_RAISE(first_row_id, + parse_summary_int64(SnapshotSummaryFields::kFirstRowId)); + } + + ICEBERG_ASSIGN_OR_RAISE(auto added_rows, + GetJsonValueOptional(json, kAddedRows)); + if (!added_rows.has_value()) { + ICEBERG_ASSIGN_OR_RAISE(added_rows, + parse_summary_int64(SnapshotSummaryFields::kAddedRows)); + } + + summary.erase(SnapshotSummaryFields::kFirstRowId); + summary.erase(SnapshotSummaryFields::kAddedRows); + + if (first_row_id.has_value() && first_row_id.value() < 0) { + return JsonParseError("Invalid first-row-id (cannot be negative): {}", + first_row_id.value()); + } + if (added_rows.has_value() && added_rows.value() < 0) { + return JsonParseError("Invalid added-rows (cannot be negative): {}", + added_rows.value()); + } + if (first_row_id.has_value() && !added_rows.has_value()) { + return JsonParseError("Invalid added-rows (required when first-row-id is set): null"); + } + if (!first_row_id.has_value()) { + added_rows = std::nullopt; + } + ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional(json, kSchemaId)); return std::make_unique( snapshot_id, parent_snapshot_id, sequence_number.value_or(TableMetadata::kInitialSequenceNumber), timestamp_ms, - manifest_list, std::move(summary), schema_id); + manifest_list, std::move(summary), schema_id, first_row_id, added_rows); } nlohmann::json ToJson(const BlobMetadata& blob_metadata) { diff --git a/src/iceberg/snapshot.cc b/src/iceberg/snapshot.cc index 1b3182fd9..663af5191 100644 --- a/src/iceberg/snapshot.cc +++ b/src/iceberg/snapshot.cc @@ -162,6 +162,10 @@ std::optional Snapshot::Operation() const { } Result> Snapshot::FirstRowId() const { + if (first_row_id.has_value()) { + return first_row_id; + } + auto it = summary.find(SnapshotSummaryFields::kFirstRowId); if (it == summary.end()) { return std::nullopt; @@ -171,6 +175,10 @@ Result> Snapshot::FirstRowId() const { } Result> Snapshot::AddedRows() const { + if (added_rows.has_value()) { + return added_rows; + } + auto it = summary.find(SnapshotSummaryFields::kAddedRows); if (it == summary.end()) { return std::nullopt; @@ -186,7 +194,8 @@ bool Snapshot::Equals(const Snapshot& other) const { return snapshot_id == other.snapshot_id && parent_snapshot_id == other.parent_snapshot_id && sequence_number == other.sequence_number && timestamp_ms == other.timestamp_ms && - schema_id == other.schema_id; + schema_id == other.schema_id && first_row_id == other.first_row_id && + added_rows == other.added_rows; } Result> Snapshot::Make( @@ -203,12 +212,6 @@ Result> Snapshot::Make( ICEBERG_PRECHECK(!first_row_id.has_value() || added_rows.has_value(), "Missing added-rows when first-row-id is set"); summary[SnapshotSummaryFields::kOperation] = operation; - if (first_row_id.has_value()) { - summary[SnapshotSummaryFields::kFirstRowId] = std::to_string(first_row_id.value()); - } - if (added_rows.has_value()) { - summary[SnapshotSummaryFields::kAddedRows] = std::to_string(added_rows.value()); - } return std::make_unique(Snapshot{ .snapshot_id = snapshot_id, .parent_snapshot_id = parent_snapshot_id, @@ -217,6 +220,8 @@ Result> Snapshot::Make( .manifest_list = std::move(manifest_list), .summary = std::move(summary), .schema_id = schema_id, + .first_row_id = first_row_id, + .added_rows = first_row_id.has_value() ? added_rows : std::nullopt, }); } diff --git a/src/iceberg/snapshot.h b/src/iceberg/snapshot.h index f3e7ffb85..c2fd9b8a2 100644 --- a/src/iceberg/snapshot.h +++ b/src/iceberg/snapshot.h @@ -403,6 +403,10 @@ struct ICEBERG_EXPORT Snapshot { std::unordered_map summary; /// ID of the table's current schema when the snapshot was created. std::optional schema_id; + /// The row-id of the first newly added row in this snapshot. + std::optional first_row_id; + /// The upper bound of rows with assigned row IDs in this snapshot. + std::optional added_rows; /// \brief Create a new Snapshot instance with validation on the inputs. static Result> Make( diff --git a/src/iceberg/test/json_serde_test.cc b/src/iceberg/test/json_serde_test.cc index 562471608..02940b1d3 100644 --- a/src/iceberg/test/json_serde_test.cc +++ b/src/iceberg/test/json_serde_test.cc @@ -256,6 +256,73 @@ TEST(JsonInternalTest, Snapshot) { TestJsonConversion(snapshot, expected_json); } +TEST(JsonInternalTest, SnapshotRowLineageSerializesTopLevelFields) { + ICEBERG_UNWRAP_OR_FAIL( + auto snapshot, + Snapshot::Make(/*sequence_number=*/99, /*snapshot_id=*/1234567890, + /*parent_snapshot_id=*/9876543210, + TimePointMsFromUnixMs(1234567890123), DataOperation::kAppend, + {{SnapshotSummaryFields::kAddedDataFiles, "50"}}, + /*schema_id=*/42, "/path/to/manifest_list", + /*first_row_id=*/100, /*added_rows=*/25)); + + auto json = ToJson(*snapshot); + EXPECT_EQ(json["first-row-id"], 100); + EXPECT_EQ(json["added-rows"], 25); + EXPECT_FALSE(json["summary"].contains("first-row-id")); + EXPECT_FALSE(json["summary"].contains("added-rows")); +} + +TEST(JsonInternalTest, SnapshotFromJsonReadsTopLevelRowLineageFields) { + nlohmann::json snapshot_json = + R"({"snapshot-id":1234567890, + "parent-snapshot-id":9876543210, + "sequence-number":99, + "timestamp-ms":1234567890123, + "manifest-list":"/path/to/manifest_list", + "summary":{ + "operation":"append", + "added-data-files":"50" + }, + "schema-id":42, + "first-row-id":100, + "added-rows":25})"_json; + + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, SnapshotFromJson(snapshot_json)); + ICEBERG_UNWRAP_OR_FAIL(auto first_row_id, snapshot->FirstRowId()); + ICEBERG_UNWRAP_OR_FAIL(auto added_rows, snapshot->AddedRows()); + EXPECT_EQ(first_row_id, 100); + EXPECT_EQ(added_rows, 25); + + auto json = ToJson(*snapshot); + EXPECT_EQ(json["first-row-id"], 100); + EXPECT_EQ(json["added-rows"], 25); + EXPECT_FALSE(json["summary"].contains("first-row-id")); + EXPECT_FALSE(json["summary"].contains("added-rows")); +} + +TEST(JsonInternalTest, SnapshotFromJsonReadsLegacySummaryOnlyRowLineageFields) { + nlohmann::json snapshot_json = + R"({"snapshot-id":1234567890, + "parent-snapshot-id":9876543210, + "sequence-number":99, + "timestamp-ms":1234567890123, + "manifest-list":"/path/to/manifest_list", + "summary":{ + "operation":"append", + "added-data-files":"50", + "first-row-id":"100", + "added-rows":"25" + }, + "schema-id":42})"_json; + + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, SnapshotFromJson(snapshot_json)); + ICEBERG_UNWRAP_OR_FAIL(auto first_row_id, snapshot->FirstRowId()); + ICEBERG_UNWRAP_OR_FAIL(auto added_rows, snapshot->AddedRows()); + EXPECT_EQ(first_row_id, 100); + EXPECT_EQ(added_rows, 25); +} + // FIXME: disable it for now since Iceberg Spark plugin generates // custom summary keys. TEST(JsonInternalTest, DISABLED_SnapshotFromJsonWithInvalidSummary) { @@ -480,19 +547,21 @@ TEST(JsonInternalTest, TableUpdateSetDefaultSortOrder) { } TEST(JsonInternalTest, TableUpdateAddSnapshot) { - auto snapshot = std::make_shared( - Snapshot{.snapshot_id = 123456789, - .parent_snapshot_id = 987654321, - .sequence_number = 5, - .timestamp_ms = TimePointMsFromUnixMs(1234567890000), - .manifest_list = "/path/to/manifest-list.avro", - .summary = {{SnapshotSummaryFields::kOperation, DataOperation::kAppend}}, - .schema_id = 1}); + ICEBERG_UNWRAP_OR_FAIL( + auto snapshot_unique, + Snapshot::Make(/*sequence_number=*/5, /*snapshot_id=*/123456789, + /*parent_snapshot_id=*/987654321, + TimePointMsFromUnixMs(1234567890000), DataOperation::kAppend, + /*summary=*/{}, /*schema_id=*/1, "/path/to/manifest-list.avro", + /*first_row_id=*/100, /*added_rows=*/25)); + std::shared_ptr snapshot(std::move(snapshot_unique)); table::AddSnapshot update(snapshot); ICEBERG_UNWRAP_OR_FAIL(auto json, ToJson(update)); EXPECT_EQ(json["action"], "add-snapshot"); EXPECT_TRUE(json.contains("snapshot")); + EXPECT_EQ(json["snapshot"]["first-row-id"], 100); + EXPECT_EQ(json["snapshot"]["added-rows"], 25); auto parsed = TableUpdateFromJson(json); ASSERT_THAT(parsed, IsOk()); From 4e30b8a2f82221f731d143209a7def3fcb79f638 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Mon, 29 Jun 2026 23:07:19 +0800 Subject: [PATCH 2/2] address comment --- src/iceberg/json_serde.cc | 32 +---------------------------- src/iceberg/snapshot.cc | 26 ++--------------------- src/iceberg/test/json_serde_test.cc | 30 +++++---------------------- 3 files changed, 8 insertions(+), 80 deletions(-) diff --git a/src/iceberg/json_serde.cc b/src/iceberg/json_serde.cc index 6f8cda989..fcf450040 100644 --- a/src/iceberg/json_serde.cc +++ b/src/iceberg/json_serde.cc @@ -461,15 +461,7 @@ nlohmann::json ToJson(const Snapshot& snapshot) { json[kManifestList] = snapshot.manifest_list; // If there is an operation, write the summary map if (snapshot.Operation().has_value()) { - nlohmann::json summary_json; - for (const auto& [key, value] : snapshot.summary) { - if (key == SnapshotSummaryFields::kFirstRowId || - key == SnapshotSummaryFields::kAddedRows) { - continue; - } - summary_json[key] = value; - } - json[kSummary] = std::move(summary_json); + json[kSummary] = snapshot.summary; } SetOptionalField(json, kSchemaId, snapshot.schema_id); SetOptionalField(json, kFirstRowId, snapshot.first_row_id); @@ -822,32 +814,10 @@ Result> SnapshotFromJson(const nlohmann::json& json) { } } - auto parse_summary_int64 = - [&summary](const std::string& key) -> Result> { - auto it = summary.find(key); - if (it == summary.end()) { - return std::nullopt; - } - ICEBERG_ASSIGN_OR_RAISE(auto value, StringUtils::ParseNumber(it->second)); - return value; - }; - ICEBERG_ASSIGN_OR_RAISE(auto first_row_id, GetJsonValueOptional(json, kFirstRowId)); - if (!first_row_id.has_value()) { - ICEBERG_ASSIGN_OR_RAISE(first_row_id, - parse_summary_int64(SnapshotSummaryFields::kFirstRowId)); - } - ICEBERG_ASSIGN_OR_RAISE(auto added_rows, GetJsonValueOptional(json, kAddedRows)); - if (!added_rows.has_value()) { - ICEBERG_ASSIGN_OR_RAISE(added_rows, - parse_summary_int64(SnapshotSummaryFields::kAddedRows)); - } - - summary.erase(SnapshotSummaryFields::kFirstRowId); - summary.erase(SnapshotSummaryFields::kAddedRows); if (first_row_id.has_value() && first_row_id.value() < 0) { return JsonParseError("Invalid first-row-id (cannot be negative): {}", diff --git a/src/iceberg/snapshot.cc b/src/iceberg/snapshot.cc index 663af5191..421d7439b 100644 --- a/src/iceberg/snapshot.cc +++ b/src/iceberg/snapshot.cc @@ -161,31 +161,9 @@ std::optional Snapshot::Operation() const { return std::nullopt; } -Result> Snapshot::FirstRowId() const { - if (first_row_id.has_value()) { - return first_row_id; - } - - auto it = summary.find(SnapshotSummaryFields::kFirstRowId); - if (it == summary.end()) { - return std::nullopt; - } - - return StringUtils::ParseNumber(it->second); -} +Result> Snapshot::FirstRowId() const { return first_row_id; } -Result> Snapshot::AddedRows() const { - if (added_rows.has_value()) { - return added_rows; - } - - auto it = summary.find(SnapshotSummaryFields::kAddedRows); - if (it == summary.end()) { - return std::nullopt; - } - - return StringUtils::ParseNumber(it->second); -} +Result> Snapshot::AddedRows() const { return added_rows; } bool Snapshot::Equals(const Snapshot& other) const { if (this == &other) { diff --git a/src/iceberg/test/json_serde_test.cc b/src/iceberg/test/json_serde_test.cc index 02940b1d3..0802d2399 100644 --- a/src/iceberg/test/json_serde_test.cc +++ b/src/iceberg/test/json_serde_test.cc @@ -282,7 +282,9 @@ TEST(JsonInternalTest, SnapshotFromJsonReadsTopLevelRowLineageFields) { "manifest-list":"/path/to/manifest_list", "summary":{ "operation":"append", - "added-data-files":"50" + "added-data-files":"50", + "first-row-id":"101", + "added-rows":"26" }, "schema-id":42, "first-row-id":100, @@ -297,30 +299,8 @@ TEST(JsonInternalTest, SnapshotFromJsonReadsTopLevelRowLineageFields) { auto json = ToJson(*snapshot); EXPECT_EQ(json["first-row-id"], 100); EXPECT_EQ(json["added-rows"], 25); - EXPECT_FALSE(json["summary"].contains("first-row-id")); - EXPECT_FALSE(json["summary"].contains("added-rows")); -} - -TEST(JsonInternalTest, SnapshotFromJsonReadsLegacySummaryOnlyRowLineageFields) { - nlohmann::json snapshot_json = - R"({"snapshot-id":1234567890, - "parent-snapshot-id":9876543210, - "sequence-number":99, - "timestamp-ms":1234567890123, - "manifest-list":"/path/to/manifest_list", - "summary":{ - "operation":"append", - "added-data-files":"50", - "first-row-id":"100", - "added-rows":"25" - }, - "schema-id":42})"_json; - - ICEBERG_UNWRAP_OR_FAIL(auto snapshot, SnapshotFromJson(snapshot_json)); - ICEBERG_UNWRAP_OR_FAIL(auto first_row_id, snapshot->FirstRowId()); - ICEBERG_UNWRAP_OR_FAIL(auto added_rows, snapshot->AddedRows()); - EXPECT_EQ(first_row_id, 100); - EXPECT_EQ(added_rows, 25); + EXPECT_EQ(json["summary"]["first-row-id"], "101"); + EXPECT_EQ(json["summary"]["added-rows"], "26"); } // FIXME: disable it for now since Iceberg Spark plugin generates