Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 132 additions & 83 deletions src/iceberg/delete_file_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
#include <algorithm>
#include <cstdint>
#include <iterator>
#include <mutex>
#include <ranges>
#include <shared_mutex>
#include <vector>

#include "iceberg/expression/expression.h"
Expand All @@ -37,6 +39,7 @@
#include "iceberg/schema.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/content_file_util.h"
#include "iceberg/util/executor_util_internal.h"
#include "iceberg/util/macros.h"

namespace iceberg {
Expand Down Expand Up @@ -528,107 +531,153 @@ DeleteFileIndex::Builder& DeleteFileIndex::Builder::IgnoreResiduals() {
return *this;
}

DeleteFileIndex::Builder& DeleteFileIndex::Builder::PlanWith(OptionalExecutor executor) {
executor_ = executor;
return *this;
}

Result<std::vector<ManifestEntry>> DeleteFileIndex::Builder::LoadDeleteFiles() {
// Build expression caches per spec ID
std::unordered_map<int32_t, std::shared_ptr<Expression>> part_expr_cache;
// TODO(zehua): Replace with a thread-safe LRU cache.
std::shared_mutex projected_expr_cache_mutex;
std::unordered_map<int32_t, std::shared_ptr<Expression>> projected_expr_cache;
std::shared_mutex eval_cache_mutex;
std::unordered_map<int32_t, std::unique_ptr<ManifestEvaluator>> eval_cache;

auto data_filter = ignore_residuals_ ? True::Instance() : data_filter_;

// Filter and read manifests into manifest entries
std::vector<ManifestEntry> files;
for (const auto& manifest : delete_manifests_) {
if (manifest.content != ManifestContent::kDeletes) {
continue;
auto and_filters =
[](std::shared_ptr<Expression> left,
std::shared_ptr<Expression> right) -> Result<std::shared_ptr<Expression>> {
if (left && right) {
return And::MakeFolded(std::move(left), std::move(right));
}
if (!manifest.has_added_files() && !manifest.has_existing_files()) {
continue;
return right ? std::move(right) : std::move(left);
};

auto get_projected_expr = [&](int32_t spec_id,
const std::shared_ptr<PartitionSpec>& spec)
-> Result<std::shared_ptr<Expression>> {
if (!data_filter_) {
return std::shared_ptr<Expression>();
}

const int32_t spec_id = manifest.partition_spec_id;
auto spec_iter = specs_by_id_.find(spec_id);
ICEBERG_CHECK(spec_iter != specs_by_id_.cend(),
"Partition spec ID {} not found when loading delete files", spec_id);

const auto& spec = spec_iter->second;

// Get or compute projected partition expression
if (!part_expr_cache.contains(spec_id) && data_filter_) {
auto projector = Projections::Inclusive(*spec, *schema_, case_sensitive_);
ICEBERG_ASSIGN_OR_RAISE(auto projected, projector->Project(data_filter_));
part_expr_cache[spec_id] = std::move(projected);
{
std::shared_lock lock(projected_expr_cache_mutex);
auto iter = projected_expr_cache.find(spec_id);
if (iter != projected_expr_cache.end()) {
return iter->second;
}
}

// Get or create manifest evaluator
if (!eval_cache.contains(spec_id)) {
auto filter = partition_filter_;
if (auto it = part_expr_cache.find(spec_id); it != part_expr_cache.cend()) {
if (filter) {
ICEBERG_ASSIGN_OR_RAISE(filter, And::Make(filter, it->second));
} else {
filter = it->second;
}
}
if (filter) {
ICEBERG_ASSIGN_OR_RAISE(auto evaluator,
ManifestEvaluator::MakePartitionFilter(
std::move(filter), spec, *schema_, case_sensitive_));
eval_cache[spec_id] = std::move(evaluator);
}
std::lock_guard lock(projected_expr_cache_mutex);
auto iter = projected_expr_cache.find(spec_id);
if (iter != projected_expr_cache.end()) {
return iter->second;
}

// Evaluate manifest against filter
if (auto it = eval_cache.find(spec_id); it != eval_cache.end()) {
ICEBERG_ASSIGN_OR_RAISE(auto should_match, it->second->Evaluate(manifest));
if (!should_match) {
continue; // Manifest doesn't match filter
}
auto projector = Projections::Inclusive(*spec, *schema_, case_sensitive_);
ICEBERG_ASSIGN_OR_RAISE(auto projected, projector->Project(data_filter_));
auto [inserted_iter, _] = projected_expr_cache.emplace(spec_id, std::move(projected));
return inserted_iter->second;
};

auto get_manifest_evaluator =
[&](int32_t spec_id, const std::shared_ptr<PartitionSpec>& spec,
const std::shared_ptr<Expression>& filter) -> Result<ManifestEvaluator*> {
if (!filter) {
return nullptr;
}

// Read manifest entries
ICEBERG_ASSIGN_OR_RAISE(auto reader,
ManifestReader::Make(manifest, io_, schema_, spec));

auto partition_filter = partition_filter_;
if (auto it = part_expr_cache.find(spec_id); it != part_expr_cache.cend()) {
if (partition_filter) {
ICEBERG_ASSIGN_OR_RAISE(partition_filter,
And::Make(partition_filter, it->second));
} else {
partition_filter = it->second;
{
std::shared_lock lock(eval_cache_mutex);
auto iter = eval_cache.find(spec_id);
if (iter != eval_cache.end()) {
return iter->second.get();
}
}
if (partition_filter) {
reader->FilterPartitions(std::move(partition_filter));
}
if (partition_set_) {
reader->FilterPartitions(partition_set_);
}
reader->FilterRows(data_filter).CaseSensitive(case_sensitive_).TryDropStats();

ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries());
files.reserve(files.size() + entries.size());

for (auto& entry : entries) {
ICEBERG_CHECK(entry.data_file != nullptr, "ManifestEntry must have a data file");
ICEBERG_CHECK(entry.sequence_number.has_value(),
"Missing sequence number from delete file: {}",
entry.data_file->file_path);
if (entry.sequence_number.value() > min_sequence_number_) {
auto& file = *entry.data_file;
// keep minimum stats to avoid memory pressure
std::unordered_set<int32_t> columns =
file.content == DataFile::Content::kPositionDeletes
? std::unordered_set<int32_t>{MetadataColumns::kDeleteFilePathColumnId}
: std::unordered_set<int32_t>(file.equality_ids.begin(),
file.equality_ids.end());
ContentFileUtil::DropUnselectedStats(*entry.data_file, columns);
files.emplace_back(std::move(entry));
}

std::lock_guard lock(eval_cache_mutex);
auto iter = eval_cache.find(spec_id);
if (iter != eval_cache.end()) {
return iter->second.get();
}
}

return files;
ICEBERG_ASSIGN_OR_RAISE(auto evaluator, ManifestEvaluator::MakePartitionFilter(
filter, spec, *schema_, case_sensitive_));
auto [inserted_iter, _] = eval_cache.emplace(spec_id, std::move(evaluator));
return inserted_iter->second.get();
};

return ParallelCollect(
executor_, delete_manifests_,
[&](const ManifestFile& manifest) -> Result<std::vector<ManifestEntry>> {
std::vector<ManifestEntry> manifest_result;
if (manifest.content != ManifestContent::kDeletes) {
return manifest_result;
}
if (!manifest.has_added_files() && !manifest.has_existing_files()) {
return manifest_result;
}

const int32_t spec_id = manifest.partition_spec_id;
auto spec_iter = specs_by_id_.find(spec_id);
ICEBERG_CHECK(spec_iter != specs_by_id_.cend(),
"Partition spec ID {} not found when loading delete files",
spec_id);

const auto& spec = spec_iter->second;

ICEBERG_ASSIGN_OR_RAISE(auto projected_data_filter,
get_projected_expr(spec_id, spec));
ICEBERG_ASSIGN_OR_RAISE(auto delete_partition_filter,
and_filters(partition_filter_, projected_data_filter));
ICEBERG_ASSIGN_OR_RAISE(
auto manifest_evaluator,
get_manifest_evaluator(spec_id, spec, delete_partition_filter));
if (manifest_evaluator != nullptr) {
ICEBERG_ASSIGN_OR_RAISE(auto should_match,
manifest_evaluator->Evaluate(manifest));
if (!should_match) {
return manifest_result;
}
}

// Read manifest entries
ICEBERG_ASSIGN_OR_RAISE(auto reader,
ManifestReader::Make(manifest, io_, schema_, spec));

if (delete_partition_filter) {
reader->FilterPartitions(std::move(delete_partition_filter));
}
if (partition_set_) {
reader->FilterPartitions(partition_set_);
}
reader->FilterRows(data_filter).CaseSensitive(case_sensitive_).TryDropStats();

ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries());
manifest_result.reserve(entries.size());

for (auto& entry : entries) {
ICEBERG_CHECK(entry.data_file != nullptr,
"ManifestEntry must have a data file");
ICEBERG_CHECK(entry.sequence_number.has_value(),
"Missing sequence number from delete file: {}",
entry.data_file->file_path);
if (entry.sequence_number.value() > min_sequence_number_) {
auto& file = *entry.data_file;
// keep minimum stats to avoid memory pressure
std::unordered_set<int32_t> columns =
file.content == DataFile::Content::kPositionDeletes
? std::unordered_set<
int32_t>{MetadataColumns::kDeleteFilePathColumnId}
: std::unordered_set<int32_t>(file.equality_ids.begin(),
file.equality_ids.end());
ContentFileUtil::DropUnselectedStats(*entry.data_file, columns);
manifest_result.emplace_back(std::move(entry));
}
}
return manifest_result;
});
}

Status DeleteFileIndex::Builder::AddDV(
Expand Down
8 changes: 8 additions & 0 deletions src/iceberg/delete_file_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"
#include "iceberg/util/error_collector.h"
#include "iceberg/util/executor.h"
#include "iceberg/util/partition_value_util.h"

namespace iceberg {
Expand Down Expand Up @@ -356,6 +357,12 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector {
/// \brief Ignore residual expressions after partition filtering.
Builder& IgnoreResiduals();

/// \brief Configure an optional executor for reading delete manifests.
///
/// \param executor Executor to use, or std::nullopt to read manifests serially.
/// \return Reference to this for method chaining.
Builder& PlanWith(OptionalExecutor executor);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this public API also take Executor&? If OptionalExecutor is only internal plumbing, it should live in an _internal.h header so users do not depend on it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can rename the file in future PR.


/// \brief Build the DeleteFileIndex.
Result<std::unique_ptr<DeleteFileIndex>> Build();

Expand Down Expand Up @@ -388,6 +395,7 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector {
std::shared_ptr<Expression> data_filter_;
std::shared_ptr<Expression> partition_filter_;
std::shared_ptr<PartitionSet> partition_set_;
OptionalExecutor executor_;
bool case_sensitive_ = true;
bool ignore_residuals_ = false;
};
Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/manifest/manifest_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ ManifestGroup& ManifestGroup::ColumnsToKeepStats(std::unordered_set<int32_t> col

ManifestGroup& ManifestGroup::PlanWith(OptionalExecutor executor) {
executor_ = executor;
delete_index_builder_.PlanWith(executor);
return *this;
}

Expand Down Expand Up @@ -314,8 +315,7 @@ Result<std::unique_ptr<ManifestReader>> ManifestGroup::MakeReader(

auto columns = columns_;
if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue &&
!columns.empty() &&
std::ranges::find(columns, Schema::kAllColumns) == columns.end()) {
!columns.empty() && !std::ranges::contains(columns, Schema::kAllColumns)) {
auto data_file_schema = DataFileFilterSchema();
ICEBERG_ASSIGN_OR_RAISE(
auto bound_file_filter,
Expand Down
15 changes: 12 additions & 3 deletions src/iceberg/table_scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,12 @@ TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::MinRowsRequested(
return *this;
}

template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::PlanWith(Executor& executor) {
context_.plan_executor = std::ref(executor);
return *this;
}

template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::UseSnapshot(int64_t snapshot_id) {
ICEBERG_BUILDER_CHECK(!context_.snapshot_id.has_value(),
Expand Down Expand Up @@ -538,7 +544,8 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co
.Select(ScanColumns())
.FilterData(filter())
.IgnoreDeleted()
.ColumnsToKeepStats(context_.columns_to_keep_stats);
.ColumnsToKeepStats(context_.columns_to_keep_stats)
.PlanWith(context_.plan_executor);
if (context_.ignore_residuals) {
manifest_group->IgnoreResiduals();
}
Expand Down Expand Up @@ -641,7 +648,8 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> IncrementalAppendScan::PlanFi
entry.status == ManifestStatus::kAdded;
})
.IgnoreDeleted()
.ColumnsToKeepStats(context_.columns_to_keep_stats);
.ColumnsToKeepStats(context_.columns_to_keep_stats)
.PlanWith(context_.plan_executor);

if (context_.ignore_residuals) {
manifest_group->IgnoreResiduals();
Expand Down Expand Up @@ -737,7 +745,8 @@ IncrementalChangelogScan::PlanFiles(std::optional<int64_t> from_snapshot_id_excl
snapshot_ids.contains(entry.snapshot_id.value());
})
.IgnoreExisting()
.ColumnsToKeepStats(context_.columns_to_keep_stats);
.ColumnsToKeepStats(context_.columns_to_keep_stats)
.PlanWith(context_.plan_executor);

if (context_.ignore_residuals) {
manifest_group->IgnoreResiduals();
Expand Down
8 changes: 8 additions & 0 deletions src/iceberg/table_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "iceberg/table_metadata.h"
#include "iceberg/type_fwd.h"
#include "iceberg/util/error_collector.h"
#include "iceberg/util/executor.h"

namespace iceberg {

Expand Down Expand Up @@ -228,6 +229,7 @@ struct TableScanContext {
std::optional<int64_t> to_snapshot_id;
std::string branch{};
std::optional<int64_t> min_rows_requested;
OptionalExecutor plan_executor;

// Validate the context parameters to see if they have conflicts.
[[nodiscard]] Status Validate() const;
Expand Down Expand Up @@ -302,6 +304,12 @@ class ICEBERG_TEMPLATE_CLASS_EXPORT TableScanBuilder : public ErrorCollector {
/// \param num_rows The minimum number of rows requested
TableScanBuilder& MinRowsRequested(int64_t num_rows);

/// \brief Configure an executor for manifest planning.
///
/// \param executor Executor to use while planning manifests.
/// \return Reference to this for method chaining.
TableScanBuilder& PlanWith(Executor& executor);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document the executor lifetime here too. The built scan stores this by reference and may use it later in PlanFiles().


/// \brief Request this scan to use the given snapshot by ID.
/// \param snapshot_id a snapshot ID
/// \note InvalidArgument will be returned if the snapshot cannot be found
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/test/arrow_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ TEST(ArrowExecutorAdapterTest, RunsTaskGroupOnThreadPool) {
std::mutex mutex;
std::vector<std::thread::id> thread_ids;

auto status = TaskGroup<>()
auto status = TaskGroup()
.SetExecutor(std::ref(executor))
.Submit([&]() -> Status {
std::lock_guard lock(mutex);
Expand Down
Loading
Loading