Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
94c434a
test: add test cases for nested columns
zhf999 Jun 16, 2026
4cdf107
Merge branch 'main' into nested-col-fallback
zhf999 Jun 17, 2026
30ea1d8
walkaround: fallback nested field to RowGroup reading
zhf999 Jun 17, 2026
bed9fdf
style: add comments
zhf999 Jun 17, 2026
dfe419d
fix: judge has_nested_field on read-schema
zhf999 Jun 17, 2026
2ceaf60
fix: make tests clearer
zhf999 Jun 17, 2026
cb1a632
feat: support page-level bitmap pushdown
zhf999 Jun 17, 2026
9a5f4ee
fix: fallback bitmap pushdown to rowgroup-level
zhf999 Jun 17, 2026
09d2121
style: update tests names
zhf999 Jun 17, 2026
3ea428d
stye: update comments
zhf999 Jun 17, 2026
76e7282
fix: set 'SupportPreciseBitmapSelection to true'
zhf999 Jun 17, 2026
72be706
Merge branch 'main' into paged-bitmap2
zhf999 Jun 18, 2026
8078ca4
fxi: set 'SupportPreciseBitmapSelection to false'
zhf999 Jun 18, 2026
48cd610
fix: add assign operator for TargetRowGroup
zhf999 Jun 22, 2026
246e854
fix: use PARQUET_READ_ENABLE_PAGE_INDEX_FILTER to control bitmap filt…
zhf999 Jun 22, 2026
d9c952d
fix: use bitmap to get row ranges instead of the first column with pa…
zhf999 Jun 22, 2026
1c495e4
fix: test
zhf999 Jun 22, 2026
5d534db
fix: FileBatchReader returns discontinuous batch, and change GetPrevi…
zhf999 Jun 26, 2026
ad03498
style: change interface name
zhf999 Jun 26, 2026
cd7bd44
update header files
zhf999 Jun 26, 2026
f9721a8
Merge branch 'main' into fix-rowid
zhf999 Jun 26, 2026
41f932d
fix: return Status::Invalid intead of returning max value
zhf999 Jun 26, 2026
6bd98d8
fix: lance and blob return NotImplemented
zhf999 Jun 26, 2026
a694567
fix: add inclusive extend for fully matched rowgroup in SetReadSchema
zhf999 Jun 26, 2026
eb48e42
fix: calling SetReadSchema many time do not clear row_mapping
zhf999 Jun 26, 2026
7b00794
test: add test for PrefetchFileBatchReaderImpl
zhf999 Jun 26, 2026
5b28d56
Merge branch 'main' into fix-rowid
zhf999 Jun 26, 2026
0d9a174
style: replace auto in assigning macro with explicit type
zhf999 Jun 26, 2026
7edf5c1
Merge branch 'main' into paged-bitmap2
zhf999 Jun 26, 2026
09c3ef5
fix: mismatched Create function call
zhf999 Jun 26, 2026
17a3e90
Merge branch 'fix-rowid' into paged-bitmap-tmp
zhf999 Jun 26, 2026
1336922
style: rename interfaces and parameters
zhf999 Jun 29, 2026
8f82f44
fix: use a more efficient way to apply bitmap
zhf999 Jun 29, 2026
d3b73e1
update headers
zhf999 Jun 29, 2026
fcc1ac8
fix: use iterator to apply bitmap
zhf999 Jun 29, 2026
0fb2dec
test: add assertion
zhf999 Jun 29, 2026
a3e37bd
test: use '.value()' directly to validate the result.
zhf999 Jun 29, 2026
8820e7c
update comments
zhf999 Jun 29, 2026
376a312
style: change method name
zhf999 Jun 29, 2026
f1c02db
fix: small fixes
zhf999 Jun 29, 2026
ad66b22
fix: blob test
zhf999 Jun 29, 2026
25ef3d0
fix: blob and lance tests
zhf999 Jun 29, 2026
4409afb
fix: unittest
zhf999 Jun 29, 2026
5bdca4a
Merge branch 'fix-rowid' into paged-bitmap-tmp
zhf999 Jun 29, 2026
8482e90
fix: blob
zhf999 Jun 29, 2026
a2d7036
Merge branch 'fix-rowid' into paged-bitmap-tmp
zhf999 Jun 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/paimon/reader/file_batch_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class PAIMON_EXPORT FileBatchReader : public BatchReader {
using BatchReader::NextBatch;
using BatchReader::NextBatchWithBitmap;

/// Get the row number of the first row in the previously read batch.
virtual Result<uint64_t> GetPreviousBatchFirstRowNumber() const = 0;
/// Get the global row number of the row in the previously read batch.
virtual Result<uint64_t> GetPreviousBatchFileRowId(uint64_t batch_row_id) const = 0;

/// Get the number of rows in the file.
virtual Result<uint64_t> GetNumberOfRows() const = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,9 @@ void MapSharedShreddingFileReader::Close() {
reader_->Close();
}

Result<uint64_t> MapSharedShreddingFileReader::GetPreviousBatchFirstRowNumber() const {
return reader_->GetPreviousBatchFirstRowNumber();
Result<uint64_t> MapSharedShreddingFileReader::GetPreviousBatchFileRowId(
uint64_t batch_row_id) const {
return reader_->GetPreviousBatchFileRowId(batch_row_id);
}

Result<uint64_t> MapSharedShreddingFileReader::GetNumberOfRows() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class MapSharedShreddingFileReader : public FileBatchReader {

void Close() override;

Result<uint64_t> GetPreviousBatchFirstRowNumber() const override;
Result<uint64_t> GetPreviousBatchFileRowId(uint64_t batch_row_id) const override;

Result<uint64_t> GetNumberOfRows() const override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ class ApplyBitmapIndexBatchReader : public FileBatchReader {
return Status::Invalid("ApplyBitmapIndexBatchReader does not support SetReadSchema");
}

Result<uint64_t> GetPreviousBatchFirstRowNumber() const override {
return reader_->GetPreviousBatchFirstRowNumber();
Result<uint64_t> GetPreviousBatchFileRowId(uint64_t batch_row_id) const override {
return reader_->GetPreviousBatchFileRowId(batch_row_id);
}

Result<uint64_t> GetNumberOfRows() const override {
Expand All @@ -94,14 +94,23 @@ class ApplyBitmapIndexBatchReader : public FileBatchReader {

private:
Result<RoaringBitmap32> Filter(int32_t batch_size) const {
RoaringBitmap32 is_valid;
PAIMON_ASSIGN_OR_RAISE(int32_t start_pos, reader_->GetPreviousBatchFirstRowNumber());
int32_t length = batch_size;
for (auto iter = bitmap_.EqualOrLarger(start_pos);
iter != bitmap_.End() && *iter < start_pos + length; ++iter) {
is_valid.Add(*iter - start_pos);
RoaringBitmap32 result;
auto bitmap_iter = bitmap_.Begin();
auto bitmap_end = bitmap_.End();

for (int32_t i = 0; i < batch_size; ++i) {
PAIMON_ASSIGN_OR_RAISE(uint64_t file_row_id, reader_->GetPreviousBatchFileRowId(i));
while (bitmap_iter != bitmap_end && static_cast<uint64_t>(*bitmap_iter) < file_row_id) {
++bitmap_iter;
}
if (bitmap_iter == bitmap_end) {
break;
}
if (static_cast<uint64_t>(*bitmap_iter) == file_row_id) {
result.Add(i);
}
}
return is_valid;
return result;
}

private:
Expand Down
4 changes: 2 additions & 2 deletions src/paimon/common/reader/delegating_prefetch_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ class DelegatingPrefetchReader : public FileBatchReader {
return prefetch_reader_->SetReadSchema(read_schema, predicate, selection_bitmap);
}

Result<uint64_t> GetPreviousBatchFirstRowNumber() const override {
return GetReader()->GetPreviousBatchFirstRowNumber();
Result<uint64_t> GetPreviousBatchFileRowId(uint64_t batch_row_id) const override {
return GetReader()->GetPreviousBatchFileRowId(batch_row_id);
}

Result<uint64_t> GetNumberOfRows() const override {
Expand Down
75 changes: 56 additions & 19 deletions src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@ class Schema;

namespace paimon {

namespace {

std::pair<int64_t, int64_t> ComputeBatchSliceByReadRange(
const std::vector<uint64_t>& global_row_ids, const std::pair<uint64_t, uint64_t>& read_range) {
auto begin_it =
std::lower_bound(global_row_ids.begin(), global_row_ids.end(), read_range.first);
auto end_it = std::lower_bound(global_row_ids.begin(), global_row_ids.end(), read_range.second);
return {static_cast<int64_t>(std::distance(global_row_ids.begin(), begin_it)),
static_cast<int64_t>(std::distance(global_row_ids.begin(), end_it))};
}

} // namespace

Result<std::unique_ptr<PrefetchFileBatchReaderImpl>> PrefetchFileBatchReaderImpl::Create(
const std::string& data_file_path, const ReaderBuilder* reader_builder,
const std::shared_ptr<FileSystem>& fs, uint32_t prefetch_max_parallel_num, int32_t batch_size,
Expand Down Expand Up @@ -265,6 +278,7 @@ Status PrefetchFileBatchReaderImpl::CleanUp() {

read_ranges_.clear();
read_ranges_in_group_.clear();
current_batch_global_row_ids_.clear();
clean_prefetch_queue();
for (size_t i = 0; i < readers_pos_.size(); i++) {
readers_pos_[i]->store(0);
Expand Down Expand Up @@ -409,42 +423,54 @@ Status PrefetchFileBatchReaderImpl::EnsureReaderPosition(
Status PrefetchFileBatchReaderImpl::HandleReadResult(
size_t reader_idx, const std::pair<uint64_t, uint64_t>& read_range,
ReadBatchWithBitmap&& read_batch_with_bitmap) {
PAIMON_ASSIGN_OR_RAISE(uint64_t first_row_number,
readers_[reader_idx]->GetPreviousBatchFirstRowNumber());
auto& prefetch_queue = prefetch_queues_[reader_idx];
if (!BatchReader::IsEofBatch(read_batch_with_bitmap)) {
auto& [read_batch, bitmap] = read_batch_with_bitmap;
auto& [c_array, c_schema] = read_batch;

if (first_row_number >= read_range.second) {
// fully out of range, data before first_row_number has been filtered out
readers_pos_[reader_idx]->store(first_row_number);
std::vector<uint64_t> global_row_ids;
global_row_ids.reserve(c_array->length);
for (int64_t i = 0; i < c_array->length; ++i) {
PAIMON_ASSIGN_OR_RAISE(uint64_t global_row_id,
readers_[reader_idx]->GetPreviousBatchFileRowId(i));
global_row_ids.push_back(global_row_id);
}
if (global_row_ids.empty()) {
ReaderUtils::ReleaseReadBatch(std::move(read_batch));
return Status::OK();
}
auto [slice_begin, slice_end] = ComputeBatchSliceByReadRange(global_row_ids, read_range);
if (slice_begin >= slice_end) {
readers_pos_[reader_idx]->store(read_range.second);
ReaderUtils::ReleaseReadBatch(std::move(read_batch));
return Status::OK();
} else if (first_row_number + c_array->length > read_range.second) {
// partially out of range, data before read_range.second has been effectively consumed
} else if (slice_begin > 0 || slice_end < c_array->length) {
readers_pos_[reader_idx]->store(read_range.second);
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> src_array,
arrow::ImportArray(c_array.get(), c_schema.get()));
int32_t target_length = read_range.second - first_row_number;
auto array = src_array->Slice(/*offset=*/0, target_length);
auto array = src_array->Slice(slice_begin, slice_end - slice_begin);
PAIMON_RETURN_NOT_OK_FROM_ARROW(
arrow::ExportArray(*array, c_array.get(), c_schema.get()));
bitmap.RemoveRange(target_length, src_array->length());
RoaringBitmap32 sliced_bitmap;
for (auto iter = bitmap.EqualOrLarger(slice_begin);
iter != bitmap.End() && *iter < slice_end; ++iter) {
sliced_bitmap.Add(*iter - slice_begin);
}
bitmap = std::move(sliced_bitmap);
global_row_ids = std::vector<uint64_t>(global_row_ids.begin() + slice_begin,
global_row_ids.begin() + slice_end);
} else {
// all within the range, data before readers_[reader_idx]->GetNextRowToRead() has been
// effectively consumed
readers_pos_[reader_idx]->store(readers_[reader_idx]->GetNextRowToRead());
}
if (bitmap.IsEmpty()) {
ReaderUtils::ReleaseReadBatch(std::move(read_batch));
return Status::OK();
}
prefetch_queue->push({read_range, std::move(read_batch_with_bitmap), first_row_number});
prefetch_queue->push(
{read_range, std::move(read_batch_with_bitmap), std::move(global_row_ids)});
} else {
std::pair<uint64_t, uint64_t> eof_range;
PAIMON_ASSIGN_OR_RAISE(eof_range, EofRange());
prefetch_queue->push({eof_range, std::move(read_batch_with_bitmap), first_row_number});
prefetch_queue->push({eof_range, std::move(read_batch_with_bitmap), {}});
readers_pos_[reader_idx]->store(std::numeric_limits<uint64_t>::max());
}
return Status::OK();
Expand Down Expand Up @@ -527,7 +553,7 @@ Result<BatchReader::ReadBatchWithBitmap> PrefetchFileBatchReaderImpl::NextBatchW
std::unique_lock<std::mutex> lock(working_mutex_);
cv_.notify_one();
}
previous_batch_first_row_num_ = prefetch_batch.value().previous_batch_first_row_num;
current_batch_global_row_ids_ = std::move(prefetch_batch.value().global_row_ids);
return std::move(prefetch_batch).value().batch;
}
}
Expand All @@ -537,7 +563,7 @@ Result<BatchReader::ReadBatchWithBitmap> PrefetchFileBatchReaderImpl::NextBatchW
assert(false);
return Status::Invalid("peek batch not suppose to be nullptr");
}
previous_batch_first_row_num_ = peek_batch->previous_batch_first_row_num;
current_batch_global_row_ids_.clear();
return BatchReader::MakeEofBatchWithBitmap();
}
if (value_count == prefetch_queues_.size()) {
Expand Down Expand Up @@ -571,8 +597,19 @@ Result<std::unique_ptr<::ArrowSchema>> PrefetchFileBatchReaderImpl::GetFileSchem
return readers_[0]->GetFileSchema();
}

Result<uint64_t> PrefetchFileBatchReaderImpl::GetPreviousBatchFirstRowNumber() const {
return previous_batch_first_row_num_;
Result<uint64_t> PrefetchFileBatchReaderImpl::GetPreviousBatchFileRowId(
uint64_t batch_row_id) const {
if (current_batch_global_row_ids_.size() == 0) {
return Status::Invalid(
"Last batch is not read or last batch is empty, cannot get previous batch global row "
"id");
}
if (batch_row_id >= current_batch_global_row_ids_.size()) {
return Status::Invalid(
fmt::format("batch_row_id {} is out of range, last batch row count is {}", batch_row_id,
current_batch_global_row_ids_.size()));
}
return current_batch_global_row_ids_[batch_row_id];
}

Result<uint64_t> PrefetchFileBatchReaderImpl::GetNumberOfRows() const {
Expand Down
6 changes: 3 additions & 3 deletions src/paimon/common/reader/prefetch_file_batch_reader_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
const std::optional<RoaringBitmap32>& selection_bitmap) override;

Status SeekToRow(uint64_t row_number) override;
Result<uint64_t> GetPreviousBatchFirstRowNumber() const override;
Result<uint64_t> GetPreviousBatchFileRowId(uint64_t batch_row_id) const override;
Result<uint64_t> GetNumberOfRows() const override;
uint64_t GetNextRowToRead() const override;
void Close() override;
Expand Down Expand Up @@ -105,7 +105,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
struct PrefetchBatch {
std::pair<uint64_t, uint64_t> read_range;
BatchReader::ReadBatchWithBitmap batch;
uint64_t previous_batch_first_row_num;
std::vector<uint64_t> global_row_ids;
};

PrefetchFileBatchReaderImpl(
Expand Down Expand Up @@ -160,7 +160,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
std::unique_ptr<std::thread> background_thread_;
Status read_status_;
std::atomic<bool> is_shutdown_ = false;
uint64_t previous_batch_first_row_num_ = std::numeric_limits<uint64_t>::max();
std::vector<uint64_t> current_batch_global_row_ids_;
bool need_prefetch_ = false;
bool read_ranges_freshed_ = false;
const uint32_t prefetch_queue_capacity_;
Expand Down
Loading
Loading