Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 71 additions & 48 deletions src/iceberg/manifest/manifest_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

#include <algorithm>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <string>
#include <unordered_set>
#include <utility>
Expand All @@ -41,6 +43,7 @@
#include "iceberg/type.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/content_file_util.h"
#include "iceberg/util/executor_util_internal.h"
#include "iceberg/util/macros.h"

namespace iceberg {
Expand Down Expand Up @@ -189,6 +192,11 @@ ManifestGroup& ManifestGroup::ColumnsToKeepStats(std::unordered_set<int32_t> col
return *this;
}

ManifestGroup& ManifestGroup::PlanWith(OptionalExecutor executor) {
executor_ = executor;
return *this;
}

Result<std::vector<std::shared_ptr<FileScanTask>>> ManifestGroup::PlanFiles() {
auto create_file_scan_tasks =
[this](std::vector<ManifestEntry>&& entries,
Expand Down Expand Up @@ -343,10 +351,23 @@ Result<std::unique_ptr<ManifestReader>> ManifestGroup::MakeReader(

Result<std::unordered_map<int32_t, std::vector<ManifestEntry>>>
ManifestGroup::ReadEntries() {
// TODO(zehua): Replace with a thread-safe LRU cache.
std::shared_mutex eval_cache_mutex;
std::unordered_map<int32_t, std::unique_ptr<ManifestEvaluator>> eval_cache;

auto get_manifest_evaluator = [&](int32_t spec_id) -> Result<ManifestEvaluator*> {
if (eval_cache.contains(spec_id)) {
return eval_cache[spec_id].get();
{
std::shared_lock lock(eval_cache_mutex);
auto iter = eval_cache.find(spec_id);
if (iter != eval_cache.end()) {
return iter->second.get();
}
}

std::lock_guard lock(eval_cache_mutex);
auto iter = eval_cache.find(spec_id);
if (iter != eval_cache.end()) {
return iter->second.get();
}

auto spec_iter = specs_by_id_.find(spec_id);
Expand Down Expand Up @@ -376,61 +397,63 @@ ManifestGroup::ReadEntries() {
Evaluator::Make(*DataFileFilterSchema(), file_filter_, case_sensitive_));
}

std::unordered_map<int32_t, std::vector<ManifestEntry>> result;
return ParallelCollect(
executor_, data_manifests_,
[&](const ManifestFile& manifest)
-> Result<std::unordered_map<int32_t, std::vector<ManifestEntry>>> {
const int32_t spec_id = manifest.partition_spec_id;

// TODO(gangwu): Parallelize reading manifests
for (const auto& manifest : data_manifests_) {
const int32_t spec_id = manifest.partition_spec_id;

ICEBERG_ASSIGN_OR_RAISE(auto manifest_evaluator, get_manifest_evaluator(spec_id));
ICEBERG_ASSIGN_OR_RAISE(bool should_match, manifest_evaluator->Evaluate(manifest));
if (!should_match) {
// Skip this manifest because it doesn't match partition filter
continue;
}
ICEBERG_ASSIGN_OR_RAISE(auto manifest_evaluator, get_manifest_evaluator(spec_id));
ICEBERG_ASSIGN_OR_RAISE(bool should_match,
manifest_evaluator->Evaluate(manifest));
if (!should_match) {
// Skip this manifest because it doesn't match partition filter
return {};
}

if (ignore_deleted_) {
// only scan manifests that have entries other than deletes
if (!manifest.has_added_files() && !manifest.has_existing_files()) {
continue;
}
}
if (ignore_deleted_) {
// only scan manifests that have entries other than deletes
if (!manifest.has_added_files() && !manifest.has_existing_files()) {
return {};
}
}

if (ignore_existing_) {
// only scan manifests that have entries other than existing
if (!manifest.has_added_files() && !manifest.has_deleted_files()) {
continue;
}
}
if (ignore_existing_) {
// only scan manifests that have entries other than existing
if (!manifest.has_added_files() && !manifest.has_deleted_files()) {
return {};
}
}

// Read manifest entries
ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest));
ICEBERG_ASSIGN_OR_RAISE(auto entries,
ignore_deleted_ ? reader->LiveEntries() : reader->Entries());
// Read manifest entries
ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest));
ICEBERG_ASSIGN_OR_RAISE(
auto entries, ignore_deleted_ ? reader->LiveEntries() : reader->Entries());

for (auto& entry : entries) {
if (ignore_existing_ && entry.status == ManifestStatus::kExisting) {
continue;
}
std::unordered_map<int32_t, std::vector<ManifestEntry>> manifest_result;

if (data_file_evaluator != nullptr) {
DataFileStructLike data_file(*entry.data_file);
ICEBERG_ASSIGN_OR_RAISE(bool should_match,
data_file_evaluator->Evaluate(data_file));
if (!should_match) {
continue;
}
}
for (auto& entry : entries) {
if (ignore_existing_ && entry.status == ManifestStatus::kExisting) {
continue;
}

if (!manifest_entry_predicate_(entry)) {
continue;
}
if (data_file_evaluator != nullptr) {
DataFileStructLike data_file(*entry.data_file);
ICEBERG_ASSIGN_OR_RAISE(bool should_match,
data_file_evaluator->Evaluate(data_file));
if (!should_match) {
continue;
}
}

result[spec_id].push_back(std::move(entry));
}
}
if (!manifest_entry_predicate_(entry)) {
continue;
}

return result;
manifest_result[spec_id].push_back(std::move(entry));
}
return manifest_result;
});
}

} // namespace iceberg
11 changes: 11 additions & 0 deletions src/iceberg/manifest/manifest_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"
#include "iceberg/util/error_collector.h"
#include "iceberg/util/executor.h"

namespace iceberg {

Expand Down Expand Up @@ -94,6 +95,9 @@ class ICEBERG_EXPORT ManifestGroup : public ErrorCollector {

/// \brief Set a custom manifest entry filter predicate.
///
/// When an executor is configured with PlanWith(), this predicate may be called
/// concurrently. Callers must synchronize any captured mutable state.
///
/// \param predicate A function that returns true if the entry should be included.
ManifestGroup& FilterManifestEntries(
std::function<bool(const ManifestEntry&)> predicate);
Expand All @@ -120,6 +124,12 @@ class ICEBERG_EXPORT ManifestGroup : public ErrorCollector {
/// \param column_ids Field IDs of columns whose statistics should be preserved.
ManifestGroup& ColumnsToKeepStats(std::unordered_set<int32_t> column_ids);

/// \brief Configure an optional executor for manifest planning.
///
/// \param executor Executor to use, or std::nullopt to plan manifests serially.
/// \return Reference to this for method chaining.
ManifestGroup& PlanWith(OptionalExecutor executor);

/// \brief Plan scan tasks for all matching data files.
Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles();

Expand Down Expand Up @@ -158,6 +168,7 @@ class ICEBERG_EXPORT ManifestGroup : public ErrorCollector {
std::function<bool(const ManifestEntry&)> manifest_entry_predicate_;
std::vector<std::string> columns_;
std::unordered_set<int32_t> columns_to_keep_stats_;
OptionalExecutor executor_;
bool case_sensitive_ = true;
bool ignore_deleted_ = false;
bool ignore_existing_ = false;
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ add_iceberg_test(util_test
lazy_test.cc
location_util_test.cc
math_util_internal_test.cc
executor_util_test.cc
roaring_position_bitmap_test.cc
position_delete_index_test.cc
position_delete_range_consumer_test.cc
Expand Down
Loading
Loading