diff --git a/src/iceberg/json_serde.cc b/src/iceberg/json_serde.cc index c9b320ffe..fcf450040 100644 --- a/src/iceberg/json_serde.cc +++ b/src/iceberg/json_serde.cc @@ -102,6 +102,8 @@ constexpr std::string_view kSequenceNumber = "sequence-number"; constexpr std::string_view kTimestampMs = "timestamp-ms"; constexpr std::string_view kManifestList = "manifest-list"; constexpr std::string_view kSummary = "summary"; +constexpr std::string_view kFirstRowId = "first-row-id"; +constexpr std::string_view kAddedRows = "added-rows"; constexpr std::string_view kMinSnapshotsToKeep = "min-snapshots-to-keep"; constexpr std::string_view kMaxSnapshotAgeMs = "max-snapshot-age-ms"; constexpr std::string_view kMaxRefAgeMs = "max-ref-age-ms"; @@ -462,6 +464,10 @@ nlohmann::json ToJson(const Snapshot& snapshot) { json[kSummary] = snapshot.summary; } SetOptionalField(json, kSchemaId, snapshot.schema_id); + SetOptionalField(json, kFirstRowId, snapshot.first_row_id); + if (snapshot.first_row_id.has_value()) { + SetOptionalField(json, kAddedRows, snapshot.added_rows); + } return json; } @@ -808,12 +814,32 @@ Result> SnapshotFromJson(const nlohmann::json& json) { } } + ICEBERG_ASSIGN_OR_RAISE(auto first_row_id, + GetJsonValueOptional(json, kFirstRowId)); + ICEBERG_ASSIGN_OR_RAISE(auto added_rows, + GetJsonValueOptional(json, kAddedRows)); + + if (first_row_id.has_value() && first_row_id.value() < 0) { + return JsonParseError("Invalid first-row-id (cannot be negative): {}", + first_row_id.value()); + } + if (added_rows.has_value() && added_rows.value() < 0) { + return JsonParseError("Invalid added-rows (cannot be negative): {}", + added_rows.value()); + } + if (first_row_id.has_value() && !added_rows.has_value()) { + return JsonParseError("Invalid added-rows (required when first-row-id is set): null"); + } + if (!first_row_id.has_value()) { + added_rows = std::nullopt; + } + ICEBERG_ASSIGN_OR_RAISE(auto schema_id, GetJsonValueOptional(json, kSchemaId)); return std::make_unique( snapshot_id, parent_snapshot_id, sequence_number.value_or(TableMetadata::kInitialSequenceNumber), timestamp_ms, - manifest_list, std::move(summary), schema_id); + manifest_list, std::move(summary), schema_id, first_row_id, added_rows); } nlohmann::json ToJson(const BlobMetadata& blob_metadata) { diff --git a/src/iceberg/snapshot.cc b/src/iceberg/snapshot.cc index 1b3182fd9..421d7439b 100644 --- a/src/iceberg/snapshot.cc +++ b/src/iceberg/snapshot.cc @@ -161,23 +161,9 @@ std::optional Snapshot::Operation() const { return std::nullopt; } -Result> Snapshot::FirstRowId() const { - auto it = summary.find(SnapshotSummaryFields::kFirstRowId); - if (it == summary.end()) { - return std::nullopt; - } - - return StringUtils::ParseNumber(it->second); -} - -Result> Snapshot::AddedRows() const { - auto it = summary.find(SnapshotSummaryFields::kAddedRows); - if (it == summary.end()) { - return std::nullopt; - } +Result> Snapshot::FirstRowId() const { return first_row_id; } - return StringUtils::ParseNumber(it->second); -} +Result> Snapshot::AddedRows() const { return added_rows; } bool Snapshot::Equals(const Snapshot& other) const { if (this == &other) { @@ -186,7 +172,8 @@ bool Snapshot::Equals(const Snapshot& other) const { return snapshot_id == other.snapshot_id && parent_snapshot_id == other.parent_snapshot_id && sequence_number == other.sequence_number && timestamp_ms == other.timestamp_ms && - schema_id == other.schema_id; + schema_id == other.schema_id && first_row_id == other.first_row_id && + added_rows == other.added_rows; } Result> Snapshot::Make( @@ -203,12 +190,6 @@ Result> Snapshot::Make( ICEBERG_PRECHECK(!first_row_id.has_value() || added_rows.has_value(), "Missing added-rows when first-row-id is set"); summary[SnapshotSummaryFields::kOperation] = operation; - if (first_row_id.has_value()) { - summary[SnapshotSummaryFields::kFirstRowId] = std::to_string(first_row_id.value()); - } - if (added_rows.has_value()) { - summary[SnapshotSummaryFields::kAddedRows] = std::to_string(added_rows.value()); - } return std::make_unique(Snapshot{ .snapshot_id = snapshot_id, .parent_snapshot_id = parent_snapshot_id, @@ -217,6 +198,8 @@ Result> Snapshot::Make( .manifest_list = std::move(manifest_list), .summary = std::move(summary), .schema_id = schema_id, + .first_row_id = first_row_id, + .added_rows = first_row_id.has_value() ? added_rows : std::nullopt, }); } diff --git a/src/iceberg/snapshot.h b/src/iceberg/snapshot.h index f3e7ffb85..c2fd9b8a2 100644 --- a/src/iceberg/snapshot.h +++ b/src/iceberg/snapshot.h @@ -403,6 +403,10 @@ struct ICEBERG_EXPORT Snapshot { std::unordered_map summary; /// ID of the table's current schema when the snapshot was created. std::optional schema_id; + /// The row-id of the first newly added row in this snapshot. + std::optional first_row_id; + /// The upper bound of rows with assigned row IDs in this snapshot. + std::optional added_rows; /// \brief Create a new Snapshot instance with validation on the inputs. static Result> Make( diff --git a/src/iceberg/test/json_serde_test.cc b/src/iceberg/test/json_serde_test.cc index 562471608..0802d2399 100644 --- a/src/iceberg/test/json_serde_test.cc +++ b/src/iceberg/test/json_serde_test.cc @@ -256,6 +256,53 @@ TEST(JsonInternalTest, Snapshot) { TestJsonConversion(snapshot, expected_json); } +TEST(JsonInternalTest, SnapshotRowLineageSerializesTopLevelFields) { + ICEBERG_UNWRAP_OR_FAIL( + auto snapshot, + Snapshot::Make(/*sequence_number=*/99, /*snapshot_id=*/1234567890, + /*parent_snapshot_id=*/9876543210, + TimePointMsFromUnixMs(1234567890123), DataOperation::kAppend, + {{SnapshotSummaryFields::kAddedDataFiles, "50"}}, + /*schema_id=*/42, "/path/to/manifest_list", + /*first_row_id=*/100, /*added_rows=*/25)); + + auto json = ToJson(*snapshot); + EXPECT_EQ(json["first-row-id"], 100); + EXPECT_EQ(json["added-rows"], 25); + EXPECT_FALSE(json["summary"].contains("first-row-id")); + EXPECT_FALSE(json["summary"].contains("added-rows")); +} + +TEST(JsonInternalTest, SnapshotFromJsonReadsTopLevelRowLineageFields) { + nlohmann::json snapshot_json = + R"({"snapshot-id":1234567890, + "parent-snapshot-id":9876543210, + "sequence-number":99, + "timestamp-ms":1234567890123, + "manifest-list":"/path/to/manifest_list", + "summary":{ + "operation":"append", + "added-data-files":"50", + "first-row-id":"101", + "added-rows":"26" + }, + "schema-id":42, + "first-row-id":100, + "added-rows":25})"_json; + + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, SnapshotFromJson(snapshot_json)); + ICEBERG_UNWRAP_OR_FAIL(auto first_row_id, snapshot->FirstRowId()); + ICEBERG_UNWRAP_OR_FAIL(auto added_rows, snapshot->AddedRows()); + EXPECT_EQ(first_row_id, 100); + EXPECT_EQ(added_rows, 25); + + auto json = ToJson(*snapshot); + EXPECT_EQ(json["first-row-id"], 100); + EXPECT_EQ(json["added-rows"], 25); + EXPECT_EQ(json["summary"]["first-row-id"], "101"); + EXPECT_EQ(json["summary"]["added-rows"], "26"); +} + // FIXME: disable it for now since Iceberg Spark plugin generates // custom summary keys. TEST(JsonInternalTest, DISABLED_SnapshotFromJsonWithInvalidSummary) { @@ -480,19 +527,21 @@ TEST(JsonInternalTest, TableUpdateSetDefaultSortOrder) { } TEST(JsonInternalTest, TableUpdateAddSnapshot) { - auto snapshot = std::make_shared( - Snapshot{.snapshot_id = 123456789, - .parent_snapshot_id = 987654321, - .sequence_number = 5, - .timestamp_ms = TimePointMsFromUnixMs(1234567890000), - .manifest_list = "/path/to/manifest-list.avro", - .summary = {{SnapshotSummaryFields::kOperation, DataOperation::kAppend}}, - .schema_id = 1}); + ICEBERG_UNWRAP_OR_FAIL( + auto snapshot_unique, + Snapshot::Make(/*sequence_number=*/5, /*snapshot_id=*/123456789, + /*parent_snapshot_id=*/987654321, + TimePointMsFromUnixMs(1234567890000), DataOperation::kAppend, + /*summary=*/{}, /*schema_id=*/1, "/path/to/manifest-list.avro", + /*first_row_id=*/100, /*added_rows=*/25)); + std::shared_ptr snapshot(std::move(snapshot_unique)); table::AddSnapshot update(snapshot); ICEBERG_UNWRAP_OR_FAIL(auto json, ToJson(update)); EXPECT_EQ(json["action"], "add-snapshot"); EXPECT_TRUE(json.contains("snapshot")); + EXPECT_EQ(json["snapshot"]["first-row-id"], 100); + EXPECT_EQ(json["snapshot"]["added-rows"], 25); auto parsed = TableUpdateFromJson(json); ASSERT_THAT(parsed, IsOk());