diff --git a/src/iceberg/manifest/manifest_group.cc b/src/iceberg/manifest/manifest_group.cc index 61bb57da2..932f3bf3d 100644 --- a/src/iceberg/manifest/manifest_group.cc +++ b/src/iceberg/manifest/manifest_group.cc @@ -21,6 +21,8 @@ #include #include +#include +#include #include #include #include @@ -41,6 +43,7 @@ #include "iceberg/type.h" #include "iceberg/util/checked_cast.h" #include "iceberg/util/content_file_util.h" +#include "iceberg/util/executor_util_internal.h" #include "iceberg/util/macros.h" namespace iceberg { @@ -189,6 +192,11 @@ ManifestGroup& ManifestGroup::ColumnsToKeepStats(std::unordered_set col return *this; } +ManifestGroup& ManifestGroup::PlanWith(OptionalExecutor executor) { + executor_ = executor; + return *this; +} + Result>> ManifestGroup::PlanFiles() { auto create_file_scan_tasks = [this](std::vector&& entries, @@ -343,10 +351,23 @@ Result> ManifestGroup::MakeReader( Result>> ManifestGroup::ReadEntries() { + // TODO(zehua): Replace with a thread-safe LRU cache. + std::shared_mutex eval_cache_mutex; std::unordered_map> eval_cache; + auto get_manifest_evaluator = [&](int32_t spec_id) -> Result { - if (eval_cache.contains(spec_id)) { - return eval_cache[spec_id].get(); + { + std::shared_lock lock(eval_cache_mutex); + auto iter = eval_cache.find(spec_id); + if (iter != eval_cache.end()) { + return iter->second.get(); + } + } + + std::lock_guard lock(eval_cache_mutex); + auto iter = eval_cache.find(spec_id); + if (iter != eval_cache.end()) { + return iter->second.get(); } auto spec_iter = specs_by_id_.find(spec_id); @@ -376,61 +397,63 @@ ManifestGroup::ReadEntries() { Evaluator::Make(*DataFileFilterSchema(), file_filter_, case_sensitive_)); } - std::unordered_map> result; + return ParallelCollect( + executor_, data_manifests_, + [&](const ManifestFile& manifest) + -> Result>> { + const int32_t spec_id = manifest.partition_spec_id; - // TODO(gangwu): Parallelize reading manifests - for (const auto& manifest : data_manifests_) { - const int32_t spec_id = manifest.partition_spec_id; - - ICEBERG_ASSIGN_OR_RAISE(auto manifest_evaluator, get_manifest_evaluator(spec_id)); - ICEBERG_ASSIGN_OR_RAISE(bool should_match, manifest_evaluator->Evaluate(manifest)); - if (!should_match) { - // Skip this manifest because it doesn't match partition filter - continue; - } + ICEBERG_ASSIGN_OR_RAISE(auto manifest_evaluator, get_manifest_evaluator(spec_id)); + ICEBERG_ASSIGN_OR_RAISE(bool should_match, + manifest_evaluator->Evaluate(manifest)); + if (!should_match) { + // Skip this manifest because it doesn't match partition filter + return {}; + } - if (ignore_deleted_) { - // only scan manifests that have entries other than deletes - if (!manifest.has_added_files() && !manifest.has_existing_files()) { - continue; - } - } + if (ignore_deleted_) { + // only scan manifests that have entries other than deletes + if (!manifest.has_added_files() && !manifest.has_existing_files()) { + return {}; + } + } - if (ignore_existing_) { - // only scan manifests that have entries other than existing - if (!manifest.has_added_files() && !manifest.has_deleted_files()) { - continue; - } - } + if (ignore_existing_) { + // only scan manifests that have entries other than existing + if (!manifest.has_added_files() && !manifest.has_deleted_files()) { + return {}; + } + } - // Read manifest entries - ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest)); - ICEBERG_ASSIGN_OR_RAISE(auto entries, - ignore_deleted_ ? reader->LiveEntries() : reader->Entries()); + // Read manifest entries + ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest)); + ICEBERG_ASSIGN_OR_RAISE( + auto entries, ignore_deleted_ ? reader->LiveEntries() : reader->Entries()); - for (auto& entry : entries) { - if (ignore_existing_ && entry.status == ManifestStatus::kExisting) { - continue; - } + std::unordered_map> manifest_result; - if (data_file_evaluator != nullptr) { - DataFileStructLike data_file(*entry.data_file); - ICEBERG_ASSIGN_OR_RAISE(bool should_match, - data_file_evaluator->Evaluate(data_file)); - if (!should_match) { - continue; - } - } + for (auto& entry : entries) { + if (ignore_existing_ && entry.status == ManifestStatus::kExisting) { + continue; + } - if (!manifest_entry_predicate_(entry)) { - continue; - } + if (data_file_evaluator != nullptr) { + DataFileStructLike data_file(*entry.data_file); + ICEBERG_ASSIGN_OR_RAISE(bool should_match, + data_file_evaluator->Evaluate(data_file)); + if (!should_match) { + continue; + } + } - result[spec_id].push_back(std::move(entry)); - } - } + if (!manifest_entry_predicate_(entry)) { + continue; + } - return result; + manifest_result[spec_id].push_back(std::move(entry)); + } + return manifest_result; + }); } } // namespace iceberg diff --git a/src/iceberg/manifest/manifest_group.h b/src/iceberg/manifest/manifest_group.h index 10b552786..09ae4a503 100644 --- a/src/iceberg/manifest/manifest_group.h +++ b/src/iceberg/manifest/manifest_group.h @@ -36,6 +36,7 @@ #include "iceberg/result.h" #include "iceberg/type_fwd.h" #include "iceberg/util/error_collector.h" +#include "iceberg/util/executor.h" namespace iceberg { @@ -94,6 +95,9 @@ class ICEBERG_EXPORT ManifestGroup : public ErrorCollector { /// \brief Set a custom manifest entry filter predicate. /// + /// When an executor is configured with PlanWith(), this predicate may be called + /// concurrently. Callers must synchronize any captured mutable state. + /// /// \param predicate A function that returns true if the entry should be included. ManifestGroup& FilterManifestEntries( std::function predicate); @@ -120,6 +124,12 @@ class ICEBERG_EXPORT ManifestGroup : public ErrorCollector { /// \param column_ids Field IDs of columns whose statistics should be preserved. ManifestGroup& ColumnsToKeepStats(std::unordered_set column_ids); + /// \brief Configure an optional executor for manifest planning. + /// + /// \param executor Executor to use, or std::nullopt to plan manifests serially. + /// \return Reference to this for method chaining. + ManifestGroup& PlanWith(OptionalExecutor executor); + /// \brief Plan scan tasks for all matching data files. Result>> PlanFiles(); @@ -158,6 +168,7 @@ class ICEBERG_EXPORT ManifestGroup : public ErrorCollector { std::function manifest_entry_predicate_; std::vector columns_; std::unordered_set columns_to_keep_stats_; + OptionalExecutor executor_; bool case_sensitive_ = true; bool ignore_deleted_ = false; bool ignore_existing_ = false; diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 1d8ea472b..c8c815797 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -139,6 +139,7 @@ add_iceberg_test(util_test lazy_test.cc location_util_test.cc math_util_internal_test.cc + executor_util_test.cc roaring_position_bitmap_test.cc position_delete_index_test.cc position_delete_range_consumer_test.cc diff --git a/src/iceberg/test/executor_util_test.cc b/src/iceberg/test/executor_util_test.cc new file mode 100644 index 000000000..3bb9ad9fc --- /dev/null +++ b/src/iceberg/test/executor_util_test.cc @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "iceberg/result.h" +#include "iceberg/test/executor.h" +#include "iceberg/test/matchers.h" +#include "iceberg/util/executor_util_internal.h" + +namespace iceberg { + +using ::testing::ElementsAre; +using ::testing::Pair; +using ::testing::UnorderedElementsAre; + +namespace { + +struct IntTask { + Result> operator()(int) { + return Result>{std::vector{}}; + } +}; + +struct BoolTask { + Result> operator()(bool) { + return Result>{std::vector{}}; + } +}; + +static_assert(internal::ParallelCollectible&, IntTask>); +static_assert( + std::same_as&>(), + IntTask{})), + Result>>); +static_assert(!internal::ParallelCollectible); +static_assert(!internal::ParallelCollectible&, BoolTask>); + +} // namespace + +TEST(ParallelReduceTest, MergesSets) { + std::vector> values = {{1, 2}, {2, 3}, {}}; + + auto result = ParallelReduce>::Reduce(values); + + EXPECT_THAT(result, UnorderedElementsAre(1, 2, 3)); +} + +TEST(ParallelReduceTest, JoinsVectors) { + std::vector> values = {{1, 2}, {}, {3}}; + + auto result = ParallelReduce>::Reduce(values); + + EXPECT_THAT(result, ElementsAre(1, 2, 3)); +} + +TEST(ParallelReduceTest, MergesMapsAndAppendsDuplicateVectors) { + std::vector>> values = { + {{1, {"a"}}, {2, {"b"}}}, {{1, {"c"}}, {3, {"d"}}}}; + + auto result = + ParallelReduce>>::Reduce(values); + + EXPECT_THAT(result, + UnorderedElementsAre(Pair(1, ElementsAre("a", "c")), + Pair(2, ElementsAre("b")), Pair(3, ElementsAre("d")))); +} + +TEST(ParallelReduceTest, ReducesPairElements) { + using Value = std::pair, std::vector>; + + std::vector values = {{{1}, {"a"}}, {{2}, {"b"}}}; + + auto result = ParallelReduce::Reduce(values); + + EXPECT_THAT(result.first, UnorderedElementsAre(1, 2)); + EXPECT_THAT(result.second, ElementsAre("a", "b")); +} + +TEST(ParallelReduceTest, ReducesTupleElements) { + using Value = std::tuple, std::vector>; + + std::vector values = {{{1}, {"a"}}, {{2}, {"b"}}}; + + auto result = ParallelReduce::Reduce(values); + + EXPECT_THAT(std::get<0>(result), UnorderedElementsAre(1, 2)); + EXPECT_THAT(std::get<1>(result), ElementsAre("a", "b")); +} + +TEST(ParallelReduceTest, ReducesViewElements) { + using Value = std::tuple, std::vector>; + + std::vector values = {{{0}, {"skip"}}, {{1}, {"a"}}, {{2}, {"b"}}}; + + auto result = ParallelReduce::Reduce(values | std::views::drop(1)); + + EXPECT_THAT(std::get<0>(result), UnorderedElementsAre(1, 2)); + EXPECT_THAT(std::get<1>(result), ElementsAre("a", "b")); +} + +TEST(ParallelCollectTest, CollectsSingleRange) { + std::vector input = {1, 2, 3}; + + auto result = ParallelCollect(std::nullopt, input, [](int value) { + return Result>{{value * 2}}; + }); + + EXPECT_THAT(result, IsOk()); + EXPECT_THAT(*result, UnorderedElementsAre(2, 4, 6)); +} + +TEST(ParallelCollectTest, KeepsTupleResultFromSingleRange) { + std::vector input = {1, 2}; + + auto result = ParallelCollect( + std::nullopt, input, + [](int value) + -> Result, std::vector>> { + return {{{value}, {std::to_string(value)}}}; + }); + + EXPECT_THAT(result, IsOk()); + EXPECT_THAT(std::get<0>(*result), UnorderedElementsAre(1, 2)); + EXPECT_THAT(std::get<1>(*result), ElementsAre("1", "2")); +} + +TEST(ParallelCollectTest, CollectsMultipleRanges) { + test::ThreadExecutor executor; + std::vector left = {1, 2}; + std::vector right = {"a", "b"}; + + auto result = ParallelCollect( + std::ref(executor), left, + [](int value) { return Result>{{value}}; }, right, + [](const std::string& value) { return Result>{{value}}; }); + + EXPECT_THAT(result, IsOk()); + EXPECT_THAT(std::get<0>(*result), UnorderedElementsAre(1, 2)); + EXPECT_THAT(std::get<1>(*result), ElementsAre("a", "b")); + EXPECT_EQ(executor.submit_count(), 4); +} + +TEST(ParallelCollectTest, PropagatesTaskErrors) { + std::vector input = {1, 2, 3}; + std::atomic calls = 0; + + auto result = ParallelCollect(std::nullopt, input, [&calls](int value) { + calls.fetch_add(1, std::memory_order_relaxed); + if (value == 2) { + return Result>{ValidationFailed("bad value")}; + } + return Result>{{value}}; + }); + + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_EQ(calls.load(std::memory_order_relaxed), 3); +} + +} // namespace iceberg diff --git a/src/iceberg/test/manifest_group_test.cc b/src/iceberg/test/manifest_group_test.cc index 70e2cea99..aa2d6810d 100644 --- a/src/iceberg/test/manifest_group_test.cc +++ b/src/iceberg/test/manifest_group_test.cc @@ -39,6 +39,7 @@ #include "iceberg/partition_spec.h" #include "iceberg/schema.h" #include "iceberg/table_scan.h" +#include "iceberg/test/executor.h" #include "iceberg/test/matchers.h" #include "iceberg/transform.h" #include "iceberg/type.h" @@ -625,11 +626,15 @@ TEST_P(ManifestGroupTest, MultipleDataManifests) { auto group, ManifestGroup::Make(file_io_, schema_, GetSpecsById(), std::move(manifests))); + test::ThreadExecutor executor; + group->PlanWith(std::ref(executor)); + // Plan files - should return files from both manifests ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles()); ASSERT_EQ(tasks.size(), 2); EXPECT_THAT(GetPaths(tasks), testing::UnorderedElementsAre("/path/to/data1.parquet", "/path/to/data2.parquet")); + EXPECT_EQ(executor.submit_count(), 2); } TEST_P(ManifestGroupTest, PartitionFilter) { diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index a76a15553..a17d9841a 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -93,6 +93,7 @@ iceberg_tests = { 'data_file_set_test.cc', 'decimal_test.cc', 'endian_test.cc', + 'executor_util_test.cc', 'file_io_test.cc', 'formatter_test.cc', 'lazy_test.cc', diff --git a/src/iceberg/util/executor_util_internal.h b/src/iceberg/util/executor_util_internal.h new file mode 100644 index 000000000..5eed3a8bd --- /dev/null +++ b/src/iceberg/util/executor_util_internal.h @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/result.h" +#include "iceberg/util/executor.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/task_group.h" + +namespace iceberg { + +template +struct ParallelReduce; + +namespace internal { + +template +concept ParallelReducible = requires(std::vector& values) { + typename ParallelReduce::result_type; + { + ParallelReduce::Reduce(values) + } -> std::same_as::result_type>; +}; + +template +using ParallelCollectValueT = + ResultValueT&, + std::add_lvalue_reference_t>>>>; + +template +struct ParallelCollectTraits { + using args_tuple_type = std::tuple; + using input_type = std::tuple_element_t; + using task_type = std::tuple_element_t; + using value_type = ParallelCollectValueT; +}; + +template +concept ParallelCollectible = + std::ranges::forward_range && std::ranges::sized_range && + std::is_lvalue_reference_v> && + requires(std::remove_reference_t& task, + std::ranges::range_reference_t item) { + { std::invoke(task, item) } -> AsResult; + requires(!std::same_as>); + requires std::default_initializable>; + requires ParallelReducible, Options...>; + }; + +} // namespace internal + +template +struct ParallelReduce> { + using result_type = std::unordered_set; + + template + static result_type Reduce(Values&& values) { + result_type result; + for (auto&& value : values) { + result.merge(value); + } + return result; + } +}; + +template +struct ParallelReduce> { + using result_type = std::vector; + + template + static result_type Reduce(Values&& values) { + return std::forward(values) | std::views::join | std::views::as_rvalue | + std::ranges::to(); + } +}; + +template +struct ParallelReduce, MapArgs...>> { + using result_type = std::unordered_map, MapArgs...>; + + template + static result_type Reduce(Values&& values) { + result_type result; + for (auto&& value : values) { + result.merge(value); + for (auto& [key, entries] : value) { + auto& out = result[key]; + out.insert(out.end(), std::make_move_iterator(entries.begin()), + std::make_move_iterator(entries.end())); + } + } + return result; + } +}; + +template +struct ParallelReduce> { + using result_type = std::pair::result_type, + typename ParallelReduce::result_type>; + + template + static result_type Reduce(Values&& values) { + return {ParallelReduce::Reduce(values | std::views::elements<0>), + ParallelReduce::Reduce(values | std::views::elements<1>)}; + } +}; + +template +struct ParallelReduce> { + using result_type = std::tuple::result_type...>; + + template + static result_type Reduce(Values&& values) { + return Reduce(values, std::index_sequence_for{}); + } + + private: + template + static result_type Reduce(Values&& values, std::index_sequence) { + return result_type{ParallelReduce>>::Reduce( + values | std::views::elements)...}; + } +}; + +template + requires(sizeof...(Args) >= 2 && sizeof...(Args) % 2 == 0 && + [](std::index_sequence) consteval { + return (internal::ParallelCollectible< + typename internal::ParallelCollectTraits::input_type, + typename internal::ParallelCollectTraits::task_type, + Options...> && + ...); + }(std::make_index_sequence{})) +auto ParallelCollect(OptionalExecutor executor, Args&&... args) { + constexpr std::size_t pair_count = sizeof...(Args) / 2; + using indices = std::make_index_sequence; + + auto args_tuple = std::forward_as_tuple(std::forward(args)...); + + auto values_tuple = [&](std::index_sequence) { + return std::tuple{[&] { + using traits = internal::ParallelCollectTraits; + + return std::vector( + std::ranges::size(std::get(args_tuple))); + }()...}; + }(indices{}); + + auto reduce_all = [&](std::index_sequence) { + auto reduce_one = [&] { + using traits = internal::ParallelCollectTraits; + using value_type = typename traits::value_type; + return ParallelReduce::Reduce( + std::get(values_tuple)); + }; + + if constexpr (pair_count == 1) { + return reduce_one.template operator()<0>(); + } else { + return std::tuple{reduce_one.template operator()()...}; + } + }; + + using result_type = decltype(reduce_all(indices{})); + + TaskGroup group; + group.SetExecutor(executor); + + [&](std::index_sequence) { + ( + [&] { + for (auto&& [item, value] : + std::views::zip(std::get(args_tuple), std::get(values_tuple))) { + group.Submit([&]() -> Status { + ICEBERG_ASSIGN_OR_RAISE(value, + std::invoke(std::get(args_tuple), item)); + return {}; + }); + } + }(), + ...); + }(indices{}); + + auto status = std::move(group).Run(); + if (!status.has_value()) { + return Result(std::unexpected(status.error())); + } + + return Result(reduce_all(indices{})); +} + +} // namespace iceberg