From e5da646938e5f01ded45104712bb3efa7509cf32 Mon Sep 17 00:00:00 2001 From: Zehua Zou Date: Mon, 22 Jun 2026 11:20:13 +0800 Subject: [PATCH 1/2] feat: optimize some parallel comments except manifest --- src/iceberg/delete_file_index.cc | 215 +++++---- src/iceberg/delete_file_index.h | 8 + src/iceberg/manifest/manifest_group.cc | 4 +- src/iceberg/table_scan.cc | 15 +- src/iceberg/table_scan.h | 8 + src/iceberg/test/arrow_test.cc | 2 +- src/iceberg/test/expire_snapshots_test.cc | 72 ++- src/iceberg/test/fast_append_test.cc | 58 ++- src/iceberg/test/table_scan_test.cc | 6 + src/iceberg/test/task_group_test.cc | 36 +- src/iceberg/update/expire_snapshots.cc | 513 ++++++++++++---------- src/iceberg/update/expire_snapshots.h | 13 +- src/iceberg/update/snapshot_update.cc | 21 +- src/iceberg/update/snapshot_update.h | 22 +- 14 files changed, 649 insertions(+), 344 deletions(-) diff --git a/src/iceberg/delete_file_index.cc b/src/iceberg/delete_file_index.cc index 8c58e861b..9a8be06b8 100644 --- a/src/iceberg/delete_file_index.cc +++ b/src/iceberg/delete_file_index.cc @@ -22,7 +22,9 @@ #include #include #include +#include #include +#include #include #include "iceberg/expression/expression.h" @@ -37,6 +39,7 @@ #include "iceberg/schema.h" #include "iceberg/util/checked_cast.h" #include "iceberg/util/content_file_util.h" +#include "iceberg/util/executor_util_internal.h" #include "iceberg/util/macros.h" namespace iceberg { @@ -528,107 +531,153 @@ DeleteFileIndex::Builder& DeleteFileIndex::Builder::IgnoreResiduals() { return *this; } +DeleteFileIndex::Builder& DeleteFileIndex::Builder::PlanWith(OptionalExecutor executor) { + executor_ = executor; + return *this; +} + Result> DeleteFileIndex::Builder::LoadDeleteFiles() { - // Build expression caches per spec ID - std::unordered_map> part_expr_cache; + // TODO(zehua): Replace with a thread-safe LRU cache. + std::shared_mutex projected_expr_cache_mutex; + std::unordered_map> projected_expr_cache; + std::shared_mutex eval_cache_mutex; std::unordered_map> eval_cache; auto data_filter = ignore_residuals_ ? True::Instance() : data_filter_; - // Filter and read manifests into manifest entries - std::vector files; - for (const auto& manifest : delete_manifests_) { - if (manifest.content != ManifestContent::kDeletes) { - continue; + auto and_filters = + [](std::shared_ptr left, + std::shared_ptr right) -> Result> { + if (left && right) { + return And::MakeFolded(std::move(left), std::move(right)); } - if (!manifest.has_added_files() && !manifest.has_existing_files()) { - continue; + return right ? std::move(right) : std::move(left); + }; + + auto get_projected_expr = [&](int32_t spec_id, + const std::shared_ptr& spec) + -> Result> { + if (!data_filter_) { + return std::shared_ptr(); } - const int32_t spec_id = manifest.partition_spec_id; - auto spec_iter = specs_by_id_.find(spec_id); - ICEBERG_CHECK(spec_iter != specs_by_id_.cend(), - "Partition spec ID {} not found when loading delete files", spec_id); - - const auto& spec = spec_iter->second; - - // Get or compute projected partition expression - if (!part_expr_cache.contains(spec_id) && data_filter_) { - auto projector = Projections::Inclusive(*spec, *schema_, case_sensitive_); - ICEBERG_ASSIGN_OR_RAISE(auto projected, projector->Project(data_filter_)); - part_expr_cache[spec_id] = std::move(projected); + { + std::shared_lock lock(projected_expr_cache_mutex); + auto iter = projected_expr_cache.find(spec_id); + if (iter != projected_expr_cache.end()) { + return iter->second; + } } - // Get or create manifest evaluator - if (!eval_cache.contains(spec_id)) { - auto filter = partition_filter_; - if (auto it = part_expr_cache.find(spec_id); it != part_expr_cache.cend()) { - if (filter) { - ICEBERG_ASSIGN_OR_RAISE(filter, And::Make(filter, it->second)); - } else { - filter = it->second; - } - } - if (filter) { - ICEBERG_ASSIGN_OR_RAISE(auto evaluator, - ManifestEvaluator::MakePartitionFilter( - std::move(filter), spec, *schema_, case_sensitive_)); - eval_cache[spec_id] = std::move(evaluator); - } + std::lock_guard lock(projected_expr_cache_mutex); + auto iter = projected_expr_cache.find(spec_id); + if (iter != projected_expr_cache.end()) { + return iter->second; } - // Evaluate manifest against filter - if (auto it = eval_cache.find(spec_id); it != eval_cache.end()) { - ICEBERG_ASSIGN_OR_RAISE(auto should_match, it->second->Evaluate(manifest)); - if (!should_match) { - continue; // Manifest doesn't match filter - } + auto projector = Projections::Inclusive(*spec, *schema_, case_sensitive_); + ICEBERG_ASSIGN_OR_RAISE(auto projected, projector->Project(data_filter_)); + auto [inserted_iter, _] = projected_expr_cache.emplace(spec_id, std::move(projected)); + return inserted_iter->second; + }; + + auto get_manifest_evaluator = + [&](int32_t spec_id, const std::shared_ptr& spec, + const std::shared_ptr& filter) -> Result { + if (!filter) { + return nullptr; } - // Read manifest entries - ICEBERG_ASSIGN_OR_RAISE(auto reader, - ManifestReader::Make(manifest, io_, schema_, spec)); - - auto partition_filter = partition_filter_; - if (auto it = part_expr_cache.find(spec_id); it != part_expr_cache.cend()) { - if (partition_filter) { - ICEBERG_ASSIGN_OR_RAISE(partition_filter, - And::Make(partition_filter, it->second)); - } else { - partition_filter = it->second; + { + std::shared_lock lock(eval_cache_mutex); + auto iter = eval_cache.find(spec_id); + if (iter != eval_cache.end()) { + return iter->second.get(); } } - if (partition_filter) { - reader->FilterPartitions(std::move(partition_filter)); - } - if (partition_set_) { - reader->FilterPartitions(partition_set_); - } - reader->FilterRows(data_filter).CaseSensitive(case_sensitive_).TryDropStats(); - - ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries()); - files.reserve(files.size() + entries.size()); - - for (auto& entry : entries) { - ICEBERG_CHECK(entry.data_file != nullptr, "ManifestEntry must have a data file"); - ICEBERG_CHECK(entry.sequence_number.has_value(), - "Missing sequence number from delete file: {}", - entry.data_file->file_path); - if (entry.sequence_number.value() > min_sequence_number_) { - auto& file = *entry.data_file; - // keep minimum stats to avoid memory pressure - std::unordered_set columns = - file.content == DataFile::Content::kPositionDeletes - ? std::unordered_set{MetadataColumns::kDeleteFilePathColumnId} - : std::unordered_set(file.equality_ids.begin(), - file.equality_ids.end()); - ContentFileUtil::DropUnselectedStats(*entry.data_file, columns); - files.emplace_back(std::move(entry)); - } + + std::lock_guard lock(eval_cache_mutex); + auto iter = eval_cache.find(spec_id); + if (iter != eval_cache.end()) { + return iter->second.get(); } - } - return files; + ICEBERG_ASSIGN_OR_RAISE(auto evaluator, ManifestEvaluator::MakePartitionFilter( + filter, spec, *schema_, case_sensitive_)); + auto [inserted_iter, _] = eval_cache.emplace(spec_id, std::move(evaluator)); + return inserted_iter->second.get(); + }; + + return ParallelCollect( + executor_, delete_manifests_, + [&](const ManifestFile& manifest) -> Result> { + std::vector manifest_result; + if (manifest.content != ManifestContent::kDeletes) { + return manifest_result; + } + if (!manifest.has_added_files() && !manifest.has_existing_files()) { + return manifest_result; + } + + const int32_t spec_id = manifest.partition_spec_id; + auto spec_iter = specs_by_id_.find(spec_id); + ICEBERG_CHECK(spec_iter != specs_by_id_.cend(), + "Partition spec ID {} not found when loading delete files", + spec_id); + + const auto& spec = spec_iter->second; + + ICEBERG_ASSIGN_OR_RAISE(auto projected_data_filter, + get_projected_expr(spec_id, spec)); + ICEBERG_ASSIGN_OR_RAISE(auto delete_partition_filter, + and_filters(partition_filter_, projected_data_filter)); + ICEBERG_ASSIGN_OR_RAISE( + auto manifest_evaluator, + get_manifest_evaluator(spec_id, spec, delete_partition_filter)); + if (manifest_evaluator != nullptr) { + ICEBERG_ASSIGN_OR_RAISE(auto should_match, + manifest_evaluator->Evaluate(manifest)); + if (!should_match) { + return manifest_result; + } + } + + // Read manifest entries + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestReader::Make(manifest, io_, schema_, spec)); + + if (delete_partition_filter) { + reader->FilterPartitions(std::move(delete_partition_filter)); + } + if (partition_set_) { + reader->FilterPartitions(partition_set_); + } + reader->FilterRows(data_filter).CaseSensitive(case_sensitive_).TryDropStats(); + + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries()); + manifest_result.reserve(entries.size()); + + for (auto& entry : entries) { + ICEBERG_CHECK(entry.data_file != nullptr, + "ManifestEntry must have a data file"); + ICEBERG_CHECK(entry.sequence_number.has_value(), + "Missing sequence number from delete file: {}", + entry.data_file->file_path); + if (entry.sequence_number.value() > min_sequence_number_) { + auto& file = *entry.data_file; + // keep minimum stats to avoid memory pressure + std::unordered_set columns = + file.content == DataFile::Content::kPositionDeletes + ? std::unordered_set< + int32_t>{MetadataColumns::kDeleteFilePathColumnId} + : std::unordered_set(file.equality_ids.begin(), + file.equality_ids.end()); + ContentFileUtil::DropUnselectedStats(*entry.data_file, columns); + manifest_result.emplace_back(std::move(entry)); + } + } + return manifest_result; + }); } Status DeleteFileIndex::Builder::AddDV( diff --git a/src/iceberg/delete_file_index.h b/src/iceberg/delete_file_index.h index 5444281a0..555114a23 100644 --- a/src/iceberg/delete_file_index.h +++ b/src/iceberg/delete_file_index.h @@ -35,6 +35,7 @@ #include "iceberg/result.h" #include "iceberg/type_fwd.h" #include "iceberg/util/error_collector.h" +#include "iceberg/util/executor.h" #include "iceberg/util/partition_value_util.h" namespace iceberg { @@ -356,6 +357,12 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector { /// \brief Ignore residual expressions after partition filtering. Builder& IgnoreResiduals(); + /// \brief Configure an optional executor for reading delete manifests. + /// + /// \param executor Executor to use, or std::nullopt to read manifests serially. + /// \return Reference to this for method chaining. + Builder& PlanWith(OptionalExecutor executor); + /// \brief Build the DeleteFileIndex. Result> Build(); @@ -388,6 +395,7 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector { std::shared_ptr data_filter_; std::shared_ptr partition_filter_; std::shared_ptr partition_set_; + OptionalExecutor executor_; bool case_sensitive_ = true; bool ignore_residuals_ = false; }; diff --git a/src/iceberg/manifest/manifest_group.cc b/src/iceberg/manifest/manifest_group.cc index 932f3bf3d..ec5eb66bc 100644 --- a/src/iceberg/manifest/manifest_group.cc +++ b/src/iceberg/manifest/manifest_group.cc @@ -194,6 +194,7 @@ ManifestGroup& ManifestGroup::ColumnsToKeepStats(std::unordered_set col ManifestGroup& ManifestGroup::PlanWith(OptionalExecutor executor) { executor_ = executor; + delete_index_builder_.PlanWith(executor); return *this; } @@ -314,8 +315,7 @@ Result> ManifestGroup::MakeReader( auto columns = columns_; if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue && - !columns.empty() && - std::ranges::find(columns, Schema::kAllColumns) == columns.end()) { + !columns.empty() && !std::ranges::contains(columns, Schema::kAllColumns)) { auto data_file_schema = DataFileFilterSchema(); ICEBERG_ASSIGN_OR_RAISE( auto bound_file_filter, diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 6881d34fb..fb4fbf3c5 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -288,6 +288,12 @@ TableScanBuilder& TableScanBuilder::MinRowsRequested( return *this; } +template +TableScanBuilder& TableScanBuilder::PlanWith(Executor& executor) { + context_.plan_executor = std::ref(executor); + return *this; +} + template TableScanBuilder& TableScanBuilder::UseSnapshot(int64_t snapshot_id) { ICEBERG_BUILDER_CHECK(!context_.snapshot_id.has_value(), @@ -538,7 +544,8 @@ Result>> DataTableScan::PlanFiles() co .Select(ScanColumns()) .FilterData(filter()) .IgnoreDeleted() - .ColumnsToKeepStats(context_.columns_to_keep_stats); + .ColumnsToKeepStats(context_.columns_to_keep_stats) + .PlanWith(context_.plan_executor); if (context_.ignore_residuals) { manifest_group->IgnoreResiduals(); } @@ -641,7 +648,8 @@ Result>> IncrementalAppendScan::PlanFi entry.status == ManifestStatus::kAdded; }) .IgnoreDeleted() - .ColumnsToKeepStats(context_.columns_to_keep_stats); + .ColumnsToKeepStats(context_.columns_to_keep_stats) + .PlanWith(context_.plan_executor); if (context_.ignore_residuals) { manifest_group->IgnoreResiduals(); @@ -737,7 +745,8 @@ IncrementalChangelogScan::PlanFiles(std::optional from_snapshot_id_excl snapshot_ids.contains(entry.snapshot_id.value()); }) .IgnoreExisting() - .ColumnsToKeepStats(context_.columns_to_keep_stats); + .ColumnsToKeepStats(context_.columns_to_keep_stats) + .PlanWith(context_.plan_executor); if (context_.ignore_residuals) { manifest_group->IgnoreResiduals(); diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 64fb3ffd1..3e4f14d55 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -32,6 +32,7 @@ #include "iceberg/table_metadata.h" #include "iceberg/type_fwd.h" #include "iceberg/util/error_collector.h" +#include "iceberg/util/executor.h" namespace iceberg { @@ -228,6 +229,7 @@ struct TableScanContext { std::optional to_snapshot_id; std::string branch{}; std::optional min_rows_requested; + OptionalExecutor plan_executor; // Validate the context parameters to see if they have conflicts. [[nodiscard]] Status Validate() const; @@ -302,6 +304,12 @@ class ICEBERG_TEMPLATE_CLASS_EXPORT TableScanBuilder : public ErrorCollector { /// \param num_rows The minimum number of rows requested TableScanBuilder& MinRowsRequested(int64_t num_rows); + /// \brief Configure an executor for manifest planning. + /// + /// \param executor Executor to use while planning manifests. + /// \return Reference to this for method chaining. + TableScanBuilder& PlanWith(Executor& executor); + /// \brief Request this scan to use the given snapshot by ID. /// \param snapshot_id a snapshot ID /// \note InvalidArgument will be returned if the snapshot cannot be found diff --git a/src/iceberg/test/arrow_test.cc b/src/iceberg/test/arrow_test.cc index 2a7242e71..d18a6eaf9 100644 --- a/src/iceberg/test/arrow_test.cc +++ b/src/iceberg/test/arrow_test.cc @@ -638,7 +638,7 @@ TEST(ArrowExecutorAdapterTest, RunsTaskGroupOnThreadPool) { std::mutex mutex; std::vector thread_ids; - auto status = TaskGroup<>() + auto status = TaskGroup() .SetExecutor(std::ref(executor)) .Submit([&]() -> Status { std::lock_guard lock(mutex); diff --git a/src/iceberg/test/expire_snapshots_test.cc b/src/iceberg/test/expire_snapshots_test.cc index 91a49bbbd..1216952e7 100644 --- a/src/iceberg/test/expire_snapshots_test.cc +++ b/src/iceberg/test/expire_snapshots_test.cc @@ -474,7 +474,7 @@ TEST_F(ExpireSnapshotsCleanupTest, ExecutorDispatchesDeletesConcurrently) { ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); update->ExpireSnapshotId(kExpiredSnapshotId); - update->ExecuteDeleteWith(std::ref(executor)); + update->ExecuteDeleteWith(executor); update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) { std::lock_guard lock(deleted_files_mu); deleted_files.push_back(path); @@ -510,12 +510,80 @@ TEST_F(ExpireSnapshotsCleanupTest, ExecuteDeleteWithWithoutDeleteWithDoesNotUseE ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); update->ExpireSnapshotId(kExpiredSnapshotId); - update->ExecuteDeleteWith(std::ref(executor)); + update->ExecuteDeleteWith(executor); EXPECT_THAT(update->Commit(), IsOk()); EXPECT_EQ(executor.submit_count(), 0); } +TEST_F(ExpireSnapshotsCleanupTest, PlanWithUsesIncrementalCleanup) { + const auto deleted_data_file_path = + table_location_ + "/data/deleted-by-expired.parquet"; + const auto delete_manifest_path = + table_location_ + "/metadata/expired-delete-entry.avro"; + const auto expired_manifest_list_path = + table_location_ + "/metadata/expired-deleted-entry-ml.avro"; + const auto current_manifest_list_path = + table_location_ + "/metadata/current-deleted-entry-ml.avro"; + + auto delete_manifest = WriteDataManifest( + delete_manifest_path, kExpiredSnapshotId, + {MakeEntry(ManifestStatus::kDeleted, kExpiredSnapshotId, kExpiredSequenceNumber, + MakeDataFile(deleted_data_file_path))}); + delete_manifest = + AssignManifestSequenceNumber(std::move(delete_manifest), kExpiredSequenceNumber); + WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId, + /*parent_snapshot_id=*/0, kExpiredSequenceNumber, {delete_manifest}); + WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId, + kCurrentSequenceNumber, {delete_manifest}); + RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path); + + test::ThreadExecutor plan_executor; + test::ThreadExecutor delete_executor( + ServiceUnavailable("delete executor should be unused")); + + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->PlanWith(plan_executor); + update->ExecuteDeleteWith(delete_executor); + + EXPECT_THAT(update->Commit(), IsOk()); + EXPECT_GT(plan_executor.submit_count(), 0); + EXPECT_EQ(delete_executor.submit_count(), 0); +} + +TEST_F(ExpireSnapshotsCleanupTest, PlanWithUsesReachableCleanup) { + const auto expired_data_file_path = table_location_ + "/data/expired-data.parquet"; + const auto expired_data_manifest_path = table_location_ + "/metadata/expired-data.avro"; + const auto expired_manifest_list_path = + table_location_ + "/metadata/expired-manifest-list.avro"; + const auto current_manifest_list_path = + table_location_ + "/metadata/current-manifest-list.avro"; + + auto expired_data_manifest = WriteDataManifest( + expired_data_manifest_path, kExpiredSnapshotId, + {MakeEntry(ManifestStatus::kAdded, kExpiredSnapshotId, kExpiredSequenceNumber, + MakeDataFile(expired_data_file_path))}); + WriteManifestList(expired_manifest_list_path, kExpiredSnapshotId, + /*parent_snapshot_id=*/0, kExpiredSequenceNumber, + {expired_data_manifest}); + WriteManifestList(current_manifest_list_path, kCurrentSnapshotId, kExpiredSnapshotId, + kCurrentSequenceNumber, {}); + RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path); + + test::ThreadExecutor plan_executor; + test::ThreadExecutor delete_executor( + ServiceUnavailable("delete executor should be unused")); + + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots()); + update->ExpireSnapshotId(kExpiredSnapshotId); + update->PlanWith(plan_executor); + update->ExecuteDeleteWith(delete_executor); + + EXPECT_THAT(update->Commit(), IsOk()); + EXPECT_GT(plan_executor.submit_count(), 0); + EXPECT_EQ(delete_executor.submit_count(), 0); +} + TEST_F(ExpireSnapshotsCleanupTest, DeleteWithRetriesTransientFailures) { const auto expired_data_file_path = table_location_ + "/data/expired-data.parquet"; const auto expired_data_manifest_path = table_location_ + "/metadata/expired-data.avro"; diff --git a/src/iceberg/test/fast_append_test.cc b/src/iceberg/test/fast_append_test.cc index 32224d117..8853d419c 100644 --- a/src/iceberg/test/fast_append_test.cc +++ b/src/iceberg/test/fast_append_test.cc @@ -22,6 +22,8 @@ #include #include #include +#include +#include #include #include @@ -36,12 +38,31 @@ #include "iceberg/snapshot.h" #include "iceberg/table_metadata.h" #include "iceberg/test/matchers.h" -#include "iceberg/test/test_resource.h" #include "iceberg/test/update_test_base.h" -#include "iceberg/util/uuid.h" +#include "iceberg/transaction.h" namespace iceberg { +namespace { + +class TestSnapshotUpdate : public SnapshotUpdate { + public: + explicit TestSnapshotUpdate(std::shared_ptr ctx) + : SnapshotUpdate(std::move(ctx)) {} + + using SnapshotUpdate::ManifestPath; + + Status CleanUncommitted(const std::unordered_set&) override { return {}; } + std::string operation() override { return "test"; } + Result> Apply(const TableMetadata&, + const std::shared_ptr&) override { + return std::vector{}; + } + std::unordered_map Summary() override { return {}; } +}; + +} // namespace + class FastAppendTest : public UpdateTestBase { protected: static void SetUpTestSuite() { avro::RegisterAll(); } @@ -102,6 +123,8 @@ class FastAppendTest : public UpdateTestBase { std::shared_ptr file_b_; }; +class SnapshotUpdateTest : public UpdateTestBase {}; + TEST_F(FastAppendTest, AppendDataFile) { std::shared_ptr fast_append; ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); @@ -260,4 +283,35 @@ TEST_F(FastAppendTest, SetSnapshotProperty) { EXPECT_EQ(snapshot->summary.at("custom-property"), "custom-value"); } +TEST_F(SnapshotUpdateTest, ConcurrentManifestPaths) { + ICEBERG_UNWRAP_OR_FAIL(auto ctx, + TransactionContext::Make(table_, TransactionKind::kUpdate)); + TestSnapshotUpdate update(std::move(ctx)); + + constexpr int kThreadCount = 8; + constexpr int kPathsPerThread = 32; + std::vector paths(kThreadCount * kPathsPerThread); + std::vector threads; + threads.reserve(kThreadCount); + + for (int thread_index = 0; thread_index < kThreadCount; ++thread_index) { + threads.emplace_back([&, thread_index] { + for (int path_index = 0; path_index < kPathsPerThread; ++path_index) { + paths[thread_index * kPathsPerThread + path_index] = update.ManifestPath(); + } + }); + } + + for (auto& thread : threads) { + thread.join(); + } + + std::unordered_set unique_paths(paths.begin(), paths.end()); + ASSERT_EQ(unique_paths.size(), paths.size()); + for (const auto& path : paths) { + EXPECT_THAT(path, ::testing::HasSubstr("/metadata/")); + EXPECT_THAT(path, ::testing::HasSubstr("-m")); + } +} + } // namespace iceberg diff --git a/src/iceberg/test/table_scan_test.cc b/src/iceberg/test/table_scan_test.cc index 11905a870..34591a438 100644 --- a/src/iceberg/test/table_scan_test.cc +++ b/src/iceberg/test/table_scan_test.cc @@ -30,6 +30,7 @@ #include "iceberg/expression/expressions.h" #include "iceberg/snapshot.h" #include "iceberg/table_metadata.h" +#include "iceberg/test/executor.h" #include "iceberg/test/scan_test_base.h" namespace iceberg { @@ -394,11 +395,16 @@ TEST_P(TableScanTest, PlanFilesWithMultipleManifests) { ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata_with_manifests, file_io_)); + + test::ThreadExecutor executor; + builder->PlanWith(executor); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); ASSERT_EQ(tasks.size(), 2); EXPECT_THAT(GetPaths(tasks), testing::UnorderedElementsAre("/path/to/data1.parquet", "/path/to/data2.parquet")); + EXPECT_EQ(executor.submit_count(), 2); } TEST_P(TableScanTest, PlanFilesWithFilter) { diff --git a/src/iceberg/test/task_group_test.cc b/src/iceberg/test/task_group_test.cc index 2f0da3de2..0b04a333d 100644 --- a/src/iceberg/test/task_group_test.cc +++ b/src/iceberg/test/task_group_test.cc @@ -95,7 +95,7 @@ TEST(FnOnceTest, SupportsMoveOnlyCapture) { TEST(TaskGroupTest, UsesExecutor) { test::ThreadExecutor executor; - TaskGroup<> group; + TaskGroup group; bool ran = false; group.SetExecutor(std::ref(executor)); @@ -111,7 +111,7 @@ TEST(TaskGroupTest, UsesExecutor) { TEST(TaskGroupTest, ReturnsSubmitError) { test::ThreadExecutor executor(ServiceUnavailable("executor busy")); - TaskGroup<> group; + TaskGroup group; group.SetExecutor(std::ref(executor)); group.Submit([]() -> Status { return {}; }); @@ -121,7 +121,7 @@ TEST(TaskGroupTest, ReturnsSubmitError) { } TEST(TaskGroupTest, DirectMoveOnlyTask) { - TaskGroup<> group; + TaskGroup group; auto value = std::make_unique(7); int observed = 0; @@ -136,7 +136,7 @@ TEST(TaskGroupTest, DirectMoveOnlyTask) { TEST(TaskGroupTest, ClearsExecutor) { test::ThreadExecutor executor; - TaskGroup<> group; + TaskGroup group; int call_count = 0; group.SetExecutor(std::ref(executor)); @@ -152,25 +152,27 @@ TEST(TaskGroupTest, ClearsExecutor) { } TEST(TaskGroupTest, FluentSubmit) { - int call_count = 0; + test::ThreadExecutor executor; + std::atomic call_count = 0; - auto status = TaskGroup<>() + auto status = TaskGroup() + .SetExecutor(std::ref(executor)) .Submit([&]() -> Status { - ++call_count; + call_count.fetch_add(1, std::memory_order_relaxed); return {}; }) .Submit([&]() -> Status { - ++call_count; + call_count.fetch_add(1, std::memory_order_relaxed); return {}; }) .Run(); EXPECT_THAT(status, IsOk()); - EXPECT_EQ(call_count, 2); + EXPECT_EQ(call_count.load(std::memory_order_relaxed), 2); } TEST(TaskGroupTest, DirectAggregatesErrors) { - TaskGroup<> group; + TaskGroup group; int call_count = 0; group.Submit([&]() -> Status { @@ -192,7 +194,7 @@ TEST(TaskGroupTest, DirectAggregatesErrors) { TEST(TaskGroupTest, ParallelSubmitsAll) { test::ThreadExecutor executor; - TaskGroup<> group; + TaskGroup group; std::atomic call_count = 0; group.SetExecutor(std::ref(executor)); @@ -212,7 +214,7 @@ TEST(TaskGroupTest, ParallelSubmitsAll) { TEST(TaskGroupTest, ParallelAggregatesErrors) { test::ThreadExecutor executor; - TaskGroup<> group; + TaskGroup group; std::atomic call_count = 0; group.SetExecutor(std::ref(executor)); @@ -236,16 +238,16 @@ TEST(TaskGroupTest, ParallelAggregatesErrors) { TEST(TaskGroupTest, ParallelSubmitErrors) { test::ThreadExecutor executor(ServiceUnavailable("executor busy")); - TaskGroup<> group; - int call_count = 0; + TaskGroup group; + std::atomic call_count = 0; group.SetExecutor(std::ref(executor)); group.Submit([&]() -> Status { - ++call_count; + call_count.fetch_add(1, std::memory_order_relaxed); return {}; }); group.Submit([&]() -> Status { - ++call_count; + call_count.fetch_add(1, std::memory_order_relaxed); return {}; }); @@ -253,7 +255,7 @@ TEST(TaskGroupTest, ParallelSubmitErrors) { EXPECT_THAT(status, IsError(ErrorKind::kServiceUnavailable)); EXPECT_THAT(status, HasErrorMessage("Task group failed with 2 errors")); EXPECT_THAT(status, HasErrorMessage("executor busy")); - EXPECT_EQ(call_count, 0); + EXPECT_EQ(call_count.load(std::memory_order_relaxed), 0); EXPECT_EQ(executor.submit_count(), 2); } diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc index 4de2c50b5..d14a7331f 100644 --- a/src/iceberg/update/expire_snapshots.cc +++ b/src/iceberg/update/expire_snapshots.cc @@ -22,8 +22,8 @@ #include #include #include -#include #include +#include #include #include #include @@ -41,6 +41,7 @@ #include "iceberg/table_metadata.h" #include "iceberg/transaction.h" #include "iceberg/util/error_collector.h" +#include "iceberg/util/executor_util_internal.h" #include "iceberg/util/macros.h" #include "iceberg/util/retry_util.h" #include "iceberg/util/snapshot_util_internal.h" @@ -65,10 +66,11 @@ class FileCleanupStrategy { public: FileCleanupStrategy(std::shared_ptr file_io, std::function delete_func, - OptionalExecutor executor) + OptionalExecutor delete_executor, OptionalExecutor plan_executor) : file_io_(std::move(file_io)), delete_func_(std::move(delete_func)), - executor_(std::move(executor)) {} + delete_executor_(std::move(delete_executor)), + plan_executor_(std::move(plan_executor)) {} virtual ~FileCleanupStrategy() = default; @@ -85,24 +87,26 @@ class FileCleanupStrategy { /// \brief Snapshot IDs present in `before` but not in `after`. static std::unordered_set ExpiredSnapshotIds(const TableMetadata& before, const TableMetadata& after) { - std::unordered_set after_ids; - after_ids.reserve(after.snapshots.size()); - for (const auto& s : after.snapshots) { - if (s) after_ids.insert(s->snapshot_id); - } - std::unordered_set expired; - expired.reserve(before.snapshots.size()); - for (const auto& s : before.snapshots) { - if (s && !after_ids.contains(s->snapshot_id)) { - expired.insert(s->snapshot_id); - } - } + auto after_ids = + after.snapshots | + std::views::filter([](const auto& snapshot) { return snapshot != nullptr; }) | + std::views::transform(&iceberg::Snapshot::snapshot_id) | + std::ranges::to>(); + + auto expired = + before.snapshots | std::views::filter([&](const auto& snapshot) { + return snapshot != nullptr && !after_ids.contains(snapshot->snapshot_id); + }) | + std::views::transform(&iceberg::Snapshot::snapshot_id) | + std::ranges::to>(); return expired; } /// \brief Best-effort delete with bounded retry. void DeleteFiles(const std::unordered_set& paths) { - if (paths.empty()) return; + if (paths.empty()) { + return; + } if (!delete_func_) { std::vector path_list(paths.begin(), paths.end()); @@ -116,7 +120,7 @@ class FileCleanupStrategy { } TaskGroup> group(kDeleteRetryConfig); - group.SetExecutor(executor_); + group.SetExecutor(delete_executor_); for (const auto& path : paths) { group.Submit([this, path]() -> Status { try { @@ -171,7 +175,8 @@ class FileCleanupStrategy { std::shared_ptr file_io_; std::function delete_func_; - OptionalExecutor executor_; + OptionalExecutor delete_executor_; + OptionalExecutor plan_executor_; private: /// Retry budget for the FileIO bulk `DeleteFiles` path. Tight on purpose: file @@ -263,23 +268,17 @@ class ReachableFileCleanup : public FileCleanupStrategy { SnapshotCache snapshot_cache(snapshot.get()); ICEBERG_ASSIGN_OR_RAISE(auto snapshot_manifests, snapshot_cache.Manifests(file_io_)); - std::unordered_set manifests; - for (const auto& manifest : snapshot_manifests) { - manifests.insert(manifest); - } - return manifests; + return snapshot_manifests | std::views::as_rvalue | + std::ranges::to>(); } /// \brief Collect manifests for a set of snapshots. Result> ReadManifests( const TableMetadata& metadata, const std::unordered_set& snapshot_ids) { - std::unordered_set manifests; - for (int64_t snapshot_id : snapshot_ids) { - ICEBERG_ASSIGN_OR_RAISE(auto snapshot_manifests, - ReadManifestsForSnapshot(metadata, snapshot_id)); - manifests.insert(snapshot_manifests.begin(), snapshot_manifests.end()); - } - return manifests; + return ParallelCollect(plan_executor_, snapshot_ids, + [this, &metadata](int64_t snapshot_id) { + return ReadManifestsForSnapshot(metadata, snapshot_id); + }); } /// \brief Remove manifests still referenced by retained snapshots. @@ -288,21 +287,16 @@ class ReachableFileCleanup : public FileCleanupStrategy { const std::unordered_set& retained_snapshot_ids, std::unordered_set manifests_to_delete, std::unordered_set& current_manifests) { - for (int64_t snapshot_id : retained_snapshot_ids) { - ICEBERG_ASSIGN_OR_RAISE(auto snapshot_manifests, - ReadManifestsForSnapshot(metadata, snapshot_id)); - - for (const auto& manifest : snapshot_manifests) { - manifests_to_delete.erase(manifest); - - if (manifests_to_delete.empty()) { - return manifests_to_delete; - } - - current_manifests.insert(manifest); + ICEBERG_ASSIGN_OR_RAISE(auto retained_manifests, + ReadManifests(metadata, retained_snapshot_ids)); + for (auto it = retained_manifests.begin(); it != retained_manifests.end();) { + auto current = it++; + manifests_to_delete.erase(*current); + if (manifests_to_delete.empty()) { + return manifests_to_delete; } + current_manifests.insert(retained_manifests.extract(current)); } - return manifests_to_delete; } @@ -343,38 +337,38 @@ class ReachableFileCleanup : public FileCleanupStrategy { const TableMetadata& metadata, const std::unordered_set& manifests_to_delete, const std::unordered_set& current_manifests) { - std::unordered_set data_files_to_delete; - // Collect live file paths from manifests being deleted. - for (const auto& manifest : manifests_to_delete) { - auto live_data_files = ReadLiveDataFilePaths(metadata, manifest); - // Ignore expired-manifest read failures and keep scanning candidates. - if (!live_data_files.has_value()) { - continue; - } - - data_files_to_delete.insert(live_data_files->begin(), live_data_files->end()); + auto live_file_results = ParallelCollect( + plan_executor_, manifests_to_delete, + [this, &metadata]( + const ManifestFile& manifest) -> Result> { + auto result = ReadLiveDataFilePaths(metadata, manifest); + // Ignore expired-manifest read failures and keep scanning candidates. + if (!result.has_value()) { + return std::unordered_set{}; + } + return std::move(result).value(); + }); + if (!live_file_results.has_value()) { + return {}; } - + auto data_files_to_delete = std::move(live_file_results).value(); if (data_files_to_delete.empty()) { return data_files_to_delete; } // Remove files still referenced by current manifests. - for (const auto& manifest : current_manifests) { - if (data_files_to_delete.empty()) { - return data_files_to_delete; - } - - auto live_data_files = ReadLiveDataFilePaths(metadata, manifest); - // Fail closed if any retained manifest cannot be read safely. - if (!live_data_files.has_value()) { - return std::unordered_set{}; - } - - for (const auto& file_path : live_data_files.value()) { - data_files_to_delete.erase(file_path); - } + auto current_live_results = + ParallelCollect(plan_executor_, current_manifests, + [this, &metadata](const ManifestFile& manifest) { + return ReadLiveDataFilePaths(metadata, manifest); + }); + // Fail closed if any retained manifest cannot be read safely. + if (!current_live_results.has_value()) { + return std::unordered_set{}; + } + for (const auto& file_path : current_live_results.value()) { + data_files_to_delete.erase(file_path); } return data_files_to_delete; @@ -435,17 +429,23 @@ class IncrementalFileCleanup : public FileCleanupStrategy { std::unordered_set ancestor_ids; ancestor_ids.reserve(ancestors_result.value().size()); for (const auto& ancestor : ancestors_result.value()) { - if (ancestor) ancestor_ids.insert(ancestor->snapshot_id); + if (ancestor) { + ancestor_ids.insert(ancestor->snapshot_id); + } } // Protect snapshots whose changes were picked into the current ancestry. std::unordered_set picked_ancestor_snapshot_ids; picked_ancestor_snapshot_ids.reserve(ancestor_ids.size()); for (const auto& ancestor : ancestors_result.value()) { - if (!ancestor) continue; + if (!ancestor) { + continue; + } const auto& summary = ancestor->summary; auto it = summary.find(SnapshotSummaryFields::kSourceSnapshotId); - if (it == summary.end()) continue; + if (it == summary.end()) { + continue; + } ICEBERG_ASSIGN_OR_RAISE(auto source_id, StringUtils::ParseNumber(it->second)); picked_ancestor_snapshot_ids.insert(source_id); @@ -454,28 +454,41 @@ class IncrementalFileCleanup : public FileCleanupStrategy { // Find manifests still referenced by a valid snapshot but written by an // expired snapshot. Their deleted entries point at data files now safe to // remove and become candidates for manifests_to_scan below. - std::unordered_set valid_manifests; - std::unordered_set manifests_to_scan; - manifests_to_scan.reserve(expired_snapshot_ids.size()); - for (const auto& snapshot : metadata_after_expiration.snapshots) { - if (!snapshot) continue; - SnapshotCache snapshot_cache(snapshot.get()); - auto manifests_result = snapshot_cache.Manifests(file_io_); - if (!manifests_result.has_value()) continue; // best-effort - auto manifests = std::move(manifests_result).value(); - for (auto& manifest : manifests) { - valid_manifests.insert(manifest.manifest_path); - - int64_t writer_id = manifest.added_snapshot_id; - bool from_valid_snapshots = valid_ids.contains(writer_id); - bool is_from_ancestor = ancestor_ids.contains(writer_id); - bool is_picked = picked_ancestor_snapshot_ids.contains(writer_id); - if (!from_valid_snapshots && (is_from_ancestor || is_picked) && - manifest.has_deleted_files()) { - manifests_to_scan.insert(std::move(manifest)); - } - } - } + ICEBERG_ASSIGN_OR_RAISE( + auto valid_manifest_results, + ParallelCollect( + plan_executor_, metadata_after_expiration.snapshots, + [&](const std::shared_ptr& snapshot) + -> Result, + std::unordered_set>> { + if (!snapshot) { + return {}; + } + SnapshotCache snapshot_cache(snapshot.get()); + auto manifests_result = snapshot_cache.Manifests(file_io_); + if (!manifests_result.has_value()) { + // best-effort + return {}; + } + auto manifests = std::move(manifests_result).value(); + std::unordered_set valid_manifest_result; + std::unordered_set scan_manifest_result; + for (auto& manifest : manifests) { + valid_manifest_result.insert(manifest.manifest_path); + + int64_t writer_id = manifest.added_snapshot_id; + bool from_valid_snapshots = valid_ids.contains(writer_id); + bool is_from_ancestor = ancestor_ids.contains(writer_id); + bool is_picked = picked_ancestor_snapshot_ids.contains(writer_id); + if (!from_valid_snapshots && (is_from_ancestor || is_picked) && + manifest.has_deleted_files()) { + scan_manifest_result.insert(std::move(manifest)); + } + } + return std::pair(std::move(valid_manifest_result), + std::move(scan_manifest_result)); + })); + auto [valid_manifests, manifests_to_scan] = std::move(valid_manifest_results); // Find manifests that were only referenced by snapshots that have expired, // and split them by what kind of cleanup they need: @@ -484,66 +497,84 @@ class IncrementalFileCleanup : public FileCleanupStrategy { // entries (data files now safe to drop); // - manifests_to_revert: written by an expiring non-ancestor snapshot // and contains added entries -- those data files were never adopted. - std::unordered_set manifest_lists_to_delete; - manifest_lists_to_delete.reserve(expired_snapshot_ids.size()); - std::unordered_set manifests_to_delete; - manifests_to_delete.reserve(expired_snapshot_ids.size()); - std::unordered_set manifests_to_revert; - manifests_to_revert.reserve(expired_snapshot_ids.size()); - for (const auto& snapshot : metadata_before_expiration.snapshots) { - if (!snapshot) continue; - int64_t snapshot_id = snapshot->snapshot_id; - if (valid_ids.contains(snapshot_id)) continue; - - // Skip cherry-picked snapshots; the picked snapshot owns its cleanup. - if (picked_ancestor_snapshot_ids.contains(snapshot_id)) { - continue; - } - - int64_t source_snapshot_id = -1; - auto src_it = snapshot->summary.find(SnapshotSummaryFields::kSourceSnapshotId); - if (src_it != snapshot->summary.end()) { - auto source_snapshot_id_result = - StringUtils::ParseNumber(src_it->second); - if (!source_snapshot_id_result.has_value()) { - continue; - } - source_snapshot_id = source_snapshot_id_result.value(); - } - // If this commit was cherry-picked from a still-live snapshot, skip it. - if (ancestor_ids.contains(source_snapshot_id) || - picked_ancestor_snapshot_ids.contains(source_snapshot_id)) { - continue; - } - - SnapshotCache snapshot_cache(snapshot.get()); - auto manifests_result = snapshot_cache.Manifests(file_io_); - if (!manifests_result.has_value()) { - continue; - } - - auto manifests = std::move(manifests_result).value(); - for (auto& manifest : manifests) { - if (valid_manifests.contains(manifest.manifest_path)) continue; - manifests_to_delete.insert(manifest.manifest_path); - - int64_t writer_id = manifest.added_snapshot_id; - bool is_from_ancestor = ancestor_ids.contains(writer_id); - bool is_from_expiring_snapshot = expired_snapshot_ids.contains(writer_id); - - if (is_from_ancestor && manifest.has_deleted_files()) { - manifests_to_scan.insert(std::move(manifest)); - } else if (!is_from_ancestor && is_from_expiring_snapshot && - manifest.has_added_files()) { - // The writer must be known-expired so missing history cannot make - // an ancestor look like a reverted snapshot. - manifests_to_revert.insert(std::move(manifest)); - } - } - if (!snapshot->manifest_list.empty()) { - manifest_lists_to_delete.insert(snapshot->manifest_list); - } - } + ICEBERG_ASSIGN_OR_RAISE( + auto expired_manifest_results, + ParallelCollect( + plan_executor_, metadata_before_expiration.snapshots, + [&](const std::shared_ptr& snapshot) + -> Result, std::unordered_set, + std::unordered_set, std::unordered_set>> { + if (!snapshot) { + return {}; + } + int64_t snapshot_id = snapshot->snapshot_id; + if (valid_ids.contains(snapshot_id)) { + return {}; + } + + // Skip cherry-picked snapshots; the picked snapshot owns its cleanup. + if (picked_ancestor_snapshot_ids.contains(snapshot_id)) { + return {}; + } + + int64_t source_snapshot_id = -1; + auto src_it = + snapshot->summary.find(SnapshotSummaryFields::kSourceSnapshotId); + if (src_it != snapshot->summary.end()) { + auto source_snapshot_id_result = + StringUtils::ParseNumber(src_it->second); + if (!source_snapshot_id_result.has_value()) { + return {}; + } + source_snapshot_id = source_snapshot_id_result.value(); + } + // If this commit was cherry-picked from a still-live snapshot, skip it. + if (ancestor_ids.contains(source_snapshot_id) || + picked_ancestor_snapshot_ids.contains(source_snapshot_id)) { + return {}; + } + + SnapshotCache snapshot_cache(snapshot.get()); + auto manifests_result = snapshot_cache.Manifests(file_io_); + if (!manifests_result.has_value()) { + return {}; + } + + auto manifests = std::move(manifests_result).value(); + std::unordered_set manifest_lists_to_delete; + std::unordered_set manifests_to_delete; + std::unordered_set manifests_to_scan; + std::unordered_set manifests_to_revert; + for (auto& manifest : manifests) { + if (valid_manifests.contains(manifest.manifest_path)) { + continue; + } + manifests_to_delete.insert(manifest.manifest_path); + + int64_t writer_id = manifest.added_snapshot_id; + bool is_from_ancestor = ancestor_ids.contains(writer_id); + bool is_from_expiring_snapshot = expired_snapshot_ids.contains(writer_id); + + if (is_from_ancestor && manifest.has_deleted_files()) { + manifests_to_scan.insert(std::move(manifest)); + } else if (!is_from_ancestor && is_from_expiring_snapshot && + manifest.has_added_files()) { + // The writer must be known-expired so missing history cannot make + // an ancestor look like a reverted snapshot. + manifests_to_revert.insert(std::move(manifest)); + } + } + if (!snapshot->manifest_list.empty()) { + manifest_lists_to_delete.insert(snapshot->manifest_list); + } + return std::tuple( + std::move(manifest_lists_to_delete), std::move(manifests_to_delete), + std::move(manifests_to_scan), std::move(manifests_to_revert)); + })); + auto [manifest_lists_to_delete, manifests_to_delete, expired_manifests_to_scan, + manifests_to_revert] = std::move(expired_manifest_results); + manifests_to_scan.merge(expired_manifests_to_scan); // Deleting data files if (level == CleanupLevel::kAll) { @@ -580,34 +611,53 @@ class IncrementalFileCleanup : public FileCleanupStrategy { const std::unordered_set& manifests_to_scan, const std::unordered_set& manifests_to_revert, const std::unordered_set& valid_ids) { - std::unordered_set files_to_delete; - - for (const auto& manifest : manifests_to_scan) { - auto reader_result = MakeManifestReader(manifest, file_io_, metadata); - if (!reader_result.has_value()) continue; - auto entries_result = reader_result.value()->Entries(); - if (!entries_result.has_value()) continue; - for (const auto& entry : entries_result.value()) { - if (entry.status == ManifestStatus::kDeleted && entry.snapshot_id.has_value() && - !valid_ids.contains(entry.snapshot_id.value()) && entry.data_file) { - files_to_delete.insert(entry.data_file->file_path); - } - } - } - - for (const auto& manifest : manifests_to_revert) { - auto reader_result = MakeManifestReader(manifest, file_io_, metadata); - if (!reader_result.has_value()) continue; - auto entries_result = reader_result.value()->Entries(); - if (!entries_result.has_value()) continue; - for (const auto& entry : entries_result.value()) { - if (entry.status == ManifestStatus::kAdded && entry.data_file) { - files_to_delete.insert(entry.data_file->file_path); - } - } + auto files_to_delete = ParallelCollect( + plan_executor_, manifests_to_scan, + [this, &metadata, &valid_ids]( + const ManifestFile& manifest) -> Result> { + auto reader_result = MakeManifestReader(manifest, file_io_, metadata); + if (!reader_result.has_value()) { + return {}; + } + auto entries_result = reader_result.value()->Entries(); + if (!entries_result.has_value()) { + return {}; + } + std::unordered_set result; + for (const auto& entry : entries_result.value()) { + if (entry.status == ManifestStatus::kDeleted && + entry.snapshot_id.has_value() && + !valid_ids.contains(entry.snapshot_id.value()) && entry.data_file) { + result.insert(entry.data_file->file_path); + } + } + return result; + }, + manifests_to_revert, + [this, &metadata]( + const ManifestFile& manifest) -> Result> { + auto reader_result = MakeManifestReader(manifest, file_io_, metadata); + if (!reader_result.has_value()) { + return {}; + } + auto entries_result = reader_result.value()->Entries(); + if (!entries_result.has_value()) { + return {}; + } + std::unordered_set result; + for (const auto& entry : entries_result.value()) { + if (entry.status == ManifestStatus::kAdded && entry.data_file) { + result.insert(entry.data_file->file_path); + } + } + return result; + }); + if (!files_to_delete.has_value()) { + return {}; } - - return files_to_delete; + auto [scan_results, revert_results] = std::move(files_to_delete).value(); + scan_results.merge(revert_results); + return scan_results; } }; @@ -625,7 +675,9 @@ bool HasNonMainSnapshots(const TableMetadata& metadata) { } std::unordered_set main_ancestors; for (const auto& a : ancestors_result.value()) { - if (a) main_ancestors.insert(a->snapshot_id); + if (a) { + main_ancestors.insert(a->snapshot_id); + } } for (const auto& snapshot : metadata.snapshots) { if (snapshot && !main_ancestors.contains(snapshot->snapshot_id)) { @@ -652,16 +704,22 @@ bool HasRemovedNonMainAncestors(const TableMetadata& before, const TableMetadata return true; } for (const auto& a : ancestors_result.value()) { - if (a) main_ancestors.insert(a->snapshot_id); + if (a) { + main_ancestors.insert(a->snapshot_id); + } } } std::unordered_set after_ids; after_ids.reserve(after.snapshots.size()); for (const auto& s : after.snapshots) { - if (s) after_ids.insert(s->snapshot_id); + if (s) { + after_ids.insert(s->snapshot_id); + } } for (const auto& snapshot : before.snapshots) { - if (!snapshot) continue; + if (!snapshot) { + continue; + } bool removed = !after_ids.contains(snapshot->snapshot_id); bool in_main = main_ancestors.contains(snapshot->snapshot_id); if (removed && !in_main) { @@ -723,6 +781,11 @@ ExpireSnapshots& ExpireSnapshots::DeleteWith( return *this; } +ExpireSnapshots& ExpireSnapshots::PlanWith(Executor& executor) { + plan_executor_ = std::ref(executor); + return *this; +} + ExpireSnapshots& ExpireSnapshots::CleanupLevel(enum CleanupLevel level) { cleanup_level_ = level; return *this; @@ -733,8 +796,8 @@ ExpireSnapshots& ExpireSnapshots::CleanExpiredMetadata(bool clean) { return *this; } -ExpireSnapshots& ExpireSnapshots::ExecuteDeleteWith(OptionalExecutor executor) { - executor_ = std::move(executor); +ExpireSnapshots& ExpireSnapshots::ExecuteDeleteWith(Executor& executor) { + delete_executor_ = std::ref(executor); return *this; } @@ -781,8 +844,7 @@ Result> ExpireSnapshots::ComputeAllBranchSnapshotIds auto to_retain, ComputeBranchSnapshotsToRetain(ref->snapshot_id, expire_snapshot_older_than, min_snapshots_to_keep)); - snapshot_ids_to_retain.insert(std::make_move_iterator(to_retain.begin()), - std::make_move_iterator(to_retain.end())); + snapshot_ids_to_retain.merge(to_retain); } return snapshot_ids_to_retain; } @@ -879,9 +941,8 @@ Result ExpireSnapshots::Apply() { ComputeAllBranchSnapshotIdsToRetain(retained_refs)); ICEBERG_ASSIGN_OR_RAISE(auto unreferenced_snapshot_ids, UnreferencedSnapshotIdsToRetain(retained_refs)); - ids_to_retain.insert(all_branch_snapshot_ids.begin(), all_branch_snapshot_ids.end()); - ids_to_retain.insert(unreferenced_snapshot_ids.begin(), - unreferenced_snapshot_ids.end()); + ids_to_retain.merge(all_branch_snapshot_ids); + ids_to_retain.merge(unreferenced_snapshot_ids); ApplyResult result; result.metadata_before_expiration = std::make_shared(base); @@ -900,35 +961,45 @@ Result ExpireSnapshots::Apply() { }); if (clean_expired_metadata_) { + ICEBERG_ASSIGN_OR_RAISE( + auto reachable_ids, + ParallelCollect( + plan_executor_, ids_to_retain, + [this, &base](int64_t snapshot_id) + -> Result< + std::pair, std::unordered_set>> { + std::unordered_set spec_ids; + std::unordered_set schema_ids; + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, base.SnapshotById(snapshot_id)); + SnapshotCache snapshot_cache(snapshot.get()); + ICEBERG_ASSIGN_OR_RAISE(auto manifests, + snapshot_cache.Manifests(ctx_->table->io())); + for (const auto& manifest : manifests) { + spec_ids.insert(manifest.partition_spec_id); + } + if (snapshot->schema_id.has_value()) { + schema_ids.insert(snapshot->schema_id.value()); + } + return std::pair(std::move(spec_ids), std::move(schema_ids)); + })); + std::unordered_set reachable_specs = {base.default_spec_id}; std::unordered_set reachable_schemas = {base.current_schema_id}; - - // TODO(xiao.dong) parallel processing - for (int64_t snapshot_id : ids_to_retain) { - ICEBERG_ASSIGN_OR_RAISE(auto snapshot, base.SnapshotById(snapshot_id)); - SnapshotCache snapshot_cache(snapshot.get()); - ICEBERG_ASSIGN_OR_RAISE(auto manifests, - snapshot_cache.Manifests(ctx_->table->io())); - for (const auto& manifest : manifests) { - reachable_specs.insert(manifest.partition_spec_id); - } - if (snapshot->schema_id.has_value()) { - reachable_schemas.insert(snapshot->schema_id.value()); - } - } - - std::ranges::for_each( - base.partition_specs, [&reachable_specs, &result](const auto& spec) { - if (!reachable_specs.contains(spec->spec_id())) { - result.partition_spec_ids_to_remove.emplace_back(spec->spec_id()); - } - }); - std::ranges::for_each(base.schemas, - [&reachable_schemas, &result](const auto& schema) { - if (!reachable_schemas.contains(schema->schema_id())) { - result.schema_ids_to_remove.insert(schema->schema_id()); - } - }); + reachable_specs.merge(reachable_ids.first); + reachable_schemas.merge(reachable_ids.second); + + result.partition_spec_ids_to_remove = + base.partition_specs | std::views::filter([&reachable_specs](const auto& spec) { + return !reachable_specs.contains(spec->spec_id()); + }) | + std::views::transform(&PartitionSpec::spec_id) | + std::ranges::to>(); + result.schema_ids_to_remove = + base.schemas | std::views::filter([&reachable_schemas](const auto& schema) { + return !reachable_schemas.contains(schema->schema_id()); + }) | + std::views::transform(&Schema::schema_id) | + std::ranges::to>(); } // Cache the result for use during Finalize() @@ -969,11 +1040,13 @@ Status ExpireSnapshots::Finalize(Result commit_result) { !HasNonMainSnapshots(metadata_after_expiration); if (can_use_incremental) { - return IncrementalFileCleanup(ctx_->table->io(), delete_func_, executor_) + return IncrementalFileCleanup(ctx_->table->io(), delete_func_, delete_executor_, + plan_executor_) .CleanFiles(metadata_before_expiration, metadata_after_expiration, cleanup_level_); } - return ReachableFileCleanup(ctx_->table->io(), delete_func_, executor_) + return ReachableFileCleanup(ctx_->table->io(), delete_func_, delete_executor_, + plan_executor_) .CleanFiles(metadata_before_expiration, metadata_after_expiration, cleanup_level_); } diff --git a/src/iceberg/update/expire_snapshots.h b/src/iceberg/update/expire_snapshots.h index bb28b5a58..214692614 100644 --- a/src/iceberg/update/expire_snapshots.h +++ b/src/iceberg/update/expire_snapshots.h @@ -122,6 +122,12 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { /// \return Reference to this for method chaining. ExpireSnapshots& DeleteWith(std::function delete_func); + /// \brief Configure an executor for planning expired snapshot metadata. + /// + /// \param executor Executor to use while planning expired snapshot metadata. + /// \return Reference to this for method chaining. + ExpireSnapshots& PlanWith(Executor& executor); + /// \brief Configures the cleanup level for expired files. /// /// This method provides fine-grained control over which files are cleaned up during @@ -148,9 +154,9 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { /// Only used with DeleteWith(). The caller must keep the executor alive until /// Finalize() returns. /// - /// \param executor An executor reference, or std::nullopt for serial deletion. + /// \param executor An executor reference. /// \return Reference to this for method chaining. - ExpireSnapshots& ExecuteDeleteWith(OptionalExecutor executor); + ExpireSnapshots& ExecuteDeleteWith(Executor& executor); Kind kind() const final { return Kind::kExpireSnapshots; } bool IsRetryable() const override { return true; } @@ -194,9 +200,10 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { std::function delete_func_; std::vector snapshot_ids_to_expire_; enum CleanupLevel cleanup_level_ { CleanupLevel::kAll }; + OptionalExecutor plan_executor_; bool clean_expired_metadata_{false}; bool specified_snapshot_id_{false}; - OptionalExecutor executor_; + OptionalExecutor delete_executor_; /// Cached result from Apply(), consumed by Finalize() and cleared after use. std::optional apply_result_; diff --git a/src/iceberg/update/snapshot_update.cc b/src/iceberg/update/snapshot_update.cc index 03f68cc94..40669a35f 100644 --- a/src/iceberg/update/snapshot_update.cc +++ b/src/iceberg/update/snapshot_update.cc @@ -30,11 +30,12 @@ #include "iceberg/manifest/manifest_writer.h" #include "iceberg/manifest/rolling_manifest_writer.h" #include "iceberg/partition_summary_internal.h" -#include "iceberg/table.h" +#include "iceberg/table.h" // IWYU pragma: keep #include "iceberg/transaction.h" #include "iceberg/util/macros.h" #include "iceberg/util/snapshot_util_internal.h" #include "iceberg/util/string_util.h" +#include "iceberg/util/task_group.h" #include "iceberg/util/uuid.h" namespace iceberg { @@ -174,7 +175,7 @@ void SnapshotUpdate::SetSummaryProperty(const std::string& property, summary_.Set(property, value); } -// TODO(xxx): write manifests in parallel +// TODO(xxx): Split files into independent rolling-writer groups before parallelizing. Result> SnapshotUpdate::WriteDataManifests( std::span> files, const std::shared_ptr& spec, @@ -200,7 +201,7 @@ Result> SnapshotUpdate::WriteDataManifests( return rolling_writer.ToManifestFiles(); } -// TODO(xxx): write manifests in parallel +// TODO(xxx): Split files into independent rolling-writer groups before parallelizing. Result> SnapshotUpdate::WriteDeleteManifests( std::span files, const std::shared_ptr& spec) { @@ -257,13 +258,17 @@ Result SnapshotUpdate::Apply() { ICEBERG_RETURN_UNEXPECTED(Validate(base(), parent_snapshot)); ICEBERG_ASSIGN_OR_RAISE(auto manifests, Apply(base(), parent_snapshot)); + auto metadata_tasks = TaskGroup().SetExecutor(plan_executor_); for (auto& manifest : manifests) { if (manifest.added_snapshot_id != kInvalidSnapshotId) { continue; } - // TODO(xxx): read in parallel and cache enriched manifests for retries - ICEBERG_ASSIGN_OR_RAISE(manifest, AddMetadata(manifest, ctx_->table->io(), base())); + metadata_tasks.Submit([&manifest, this]() -> Status { + ICEBERG_ASSIGN_OR_RAISE(manifest, AddMetadata(manifest, ctx_->table->io(), base())); + return {}; + }); } + ICEBERG_RETURN_UNEXPECTED(std::move(metadata_tasks).Run()); std::string manifest_list_path = ManifestListPath(); manifest_lists_.push_back(manifest_list_path); @@ -419,8 +424,9 @@ std::string SnapshotUpdate::ManifestListPath() { // Generate manifest list path // Format: {metadata_location}/snap-{snapshot_id}-{attempt}-{uuid}.avro int64_t snapshot_id = SnapshotId(); + auto attempt = attempt_.fetch_add(1, std::memory_order_relaxed) + 1; std::string filename = - std::format("snap-{}-{}-{}.avro", snapshot_id, ++attempt_, commit_uuid_); + std::format("snap-{}-{}-{}.avro", snapshot_id, attempt, commit_uuid_); return ctx_->MetadataFileLocation(filename); } @@ -449,7 +455,8 @@ SnapshotSummaryBuilder SnapshotUpdate::BuildManifestCountSummary( std::string SnapshotUpdate::ManifestPath() { // Generate manifest path // Format: {metadata_location}/{uuid}-m{manifest_count}.avro - std::string filename = std::format("{}-m{}.avro", commit_uuid_, manifest_count_++); + auto manifest_count = manifest_count_.fetch_add(1, std::memory_order_relaxed); + std::string filename = std::format("{}-m{}.avro", commit_uuid_, manifest_count); return ctx_->MetadataFileLocation(filename); } diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index b2c92bfb1..fa8dcbf12 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -19,6 +19,7 @@ #pragma once +#include #include #include #include @@ -33,6 +34,7 @@ #include "iceberg/snapshot.h" #include "iceberg/type_fwd.h" #include "iceberg/update/pending_update.h" +#include "iceberg/util/executor.h" namespace iceberg { @@ -81,6 +83,15 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { return self; } + /// \brief Configure an executor for manifest planning work. + /// + /// \param executor Executor to use while planning manifests. + /// \return Reference to this for method chaining. + auto& ScanManifestsWith(this auto& self, Executor& executor) { + self.plan_executor_ = std::ref(executor); + return self; + } + /// \brief Perform operations on a particular branch. /// /// \param branch The name of a SnapshotRef of type branch. @@ -152,8 +163,10 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { const std::string& target_branch() const { return target_branch_; } bool can_inherit_snapshot_id() const { return can_inherit_snapshot_id_; } const std::string& commit_uuid() const { return commit_uuid_; } - int32_t manifest_count() const { return manifest_count_; } - int32_t attempt() const { return attempt_; } + int32_t manifest_count() const { + return manifest_count_.load(std::memory_order_relaxed); + } + int32_t attempt() const { return attempt_.load(std::memory_order_relaxed); } int64_t target_manifest_size_bytes() const { return target_manifest_size_bytes_; } /// \brief Clean up any uncommitted manifests that were created. @@ -238,11 +251,12 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { private: const bool can_inherit_snapshot_id_{true}; const std::string commit_uuid_; - int32_t manifest_count_{0}; - int32_t attempt_{0}; + std::atomic manifest_count_{0}; + std::atomic attempt_{0}; std::vector manifest_lists_; const int64_t target_manifest_size_bytes_; std::optional snapshot_id_; + OptionalExecutor plan_executor_; bool stage_only_{false}; std::function delete_func_; std::string target_branch_{SnapshotRef::kMainBranch}; From 643bdb8edf74c94f075981a7733e6774ce8231d6 Mon Sep 17 00:00:00 2001 From: Zehua Zou Date: Wed, 24 Jun 2026 16:41:08 +0800 Subject: [PATCH 2/2] address review --- src/iceberg/update/expire_snapshots.h | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/iceberg/update/expire_snapshots.h b/src/iceberg/update/expire_snapshots.h index 214692614..215ad85fd 100644 --- a/src/iceberg/update/expire_snapshots.h +++ b/src/iceberg/update/expire_snapshots.h @@ -151,9 +151,6 @@ class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { /// \brief Configure an executor for DeleteWith() callbacks. /// - /// Only used with DeleteWith(). The caller must keep the executor alive until - /// Finalize() returns. - /// /// \param executor An executor reference. /// \return Reference to this for method chaining. ExpireSnapshots& ExecuteDeleteWith(Executor& executor);