Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5d534db
fix: FileBatchReader returns discontinuous batch, and change GetPrevi…
zhf999 Jun 26, 2026
ad03498
style: change interface name
zhf999 Jun 26, 2026
cd7bd44
update header files
zhf999 Jun 26, 2026
f9721a8
Merge branch 'main' into fix-rowid
zhf999 Jun 26, 2026
41f932d
fix: return Status::Invalid intead of returning max value
zhf999 Jun 26, 2026
6bd98d8
fix: lance and blob return NotImplemented
zhf999 Jun 26, 2026
a694567
fix: add inclusive extend for fully matched rowgroup in SetReadSchema
zhf999 Jun 26, 2026
eb48e42
fix: calling SetReadSchema many time do not clear row_mapping
zhf999 Jun 26, 2026
7b00794
test: add test for PrefetchFileBatchReaderImpl
zhf999 Jun 26, 2026
5b28d56
Merge branch 'main' into fix-rowid
zhf999 Jun 26, 2026
0d9a174
style: replace auto in assigning macro with explicit type
zhf999 Jun 26, 2026
1336922
style: rename interfaces and parameters
zhf999 Jun 29, 2026
8f82f44
fix: use a more efficient way to apply bitmap
zhf999 Jun 29, 2026
d3b73e1
update headers
zhf999 Jun 29, 2026
fcc1ac8
fix: use iterator to apply bitmap
zhf999 Jun 29, 2026
0fb2dec
test: add assertion
zhf999 Jun 29, 2026
a3e37bd
test: use '.value()' directly to validate the result.
zhf999 Jun 29, 2026
8820e7c
update comments
zhf999 Jun 29, 2026
376a312
style: change method name
zhf999 Jun 29, 2026
f1c02db
fix: small fixes
zhf999 Jun 29, 2026
ad66b22
fix: blob test
zhf999 Jun 29, 2026
25ef3d0
fix: blob and lance tests
zhf999 Jun 29, 2026
8482e90
fix: blob
zhf999 Jun 29, 2026
fa1b068
refractor: merge 'CollectResult' and 'CollectResultOneBatch' into a s…
zhf999 Jun 30, 2026
8dc8576
refractor: extract common parts of CollectResult and CollectResultOne…
zhf999 Jun 30, 2026
db4b42e
Update src/paimon/format/parquet/parquet_file_batch_reader.cpp
zhf999 Jun 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/paimon/reader/file_batch_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class PAIMON_EXPORT FileBatchReader : public BatchReader {
using BatchReader::NextBatch;
using BatchReader::NextBatchWithBitmap;

/// Get the row number of the first row in the previously read batch.
virtual Result<uint64_t> GetPreviousBatchFirstRowNumber() const = 0;
/// Get the global row number of the row in the previously read batch.
virtual Result<uint64_t> GetPreviousBatchFileRowId(uint64_t batch_row_id) const = 0;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any explicit semantic constraints on this interface before reading begins and after EOF is reached? As per previous discussions, it returns Status::Invalid before reading starts. However, after reaching EOF, the behavior currently varies wildly—some continue to accumulate, while others return errors. Should we impose some constraints on this? cc @lxy-9602


Comment thread
zhf999 marked this conversation as resolved.
/// Get the number of rows in the file.
virtual Result<uint64_t> GetNumberOfRows() const = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,9 @@ void MapSharedShreddingFileReader::Close() {
reader_->Close();
}

Result<uint64_t> MapSharedShreddingFileReader::GetPreviousBatchFirstRowNumber() const {
return reader_->GetPreviousBatchFirstRowNumber();
Result<uint64_t> MapSharedShreddingFileReader::GetPreviousBatchFileRowId(
uint64_t batch_row_id) const {
return reader_->GetPreviousBatchFileRowId(batch_row_id);
}

Result<uint64_t> MapSharedShreddingFileReader::GetNumberOfRows() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class MapSharedShreddingFileReader : public FileBatchReader {

void Close() override;

Result<uint64_t> GetPreviousBatchFirstRowNumber() const override;
Result<uint64_t> GetPreviousBatchFileRowId(uint64_t batch_row_id) const override;

Result<uint64_t> GetNumberOfRows() const override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ class ApplyBitmapIndexBatchReader : public FileBatchReader {
return Status::Invalid("ApplyBitmapIndexBatchReader does not support SetReadSchema");
}

Result<uint64_t> GetPreviousBatchFirstRowNumber() const override {
return reader_->GetPreviousBatchFirstRowNumber();
Result<uint64_t> GetPreviousBatchFileRowId(uint64_t batch_row_id) const override {
return reader_->GetPreviousBatchFileRowId(batch_row_id);
}

Result<uint64_t> GetNumberOfRows() const override {
Expand All @@ -94,14 +94,23 @@ class ApplyBitmapIndexBatchReader : public FileBatchReader {

private:
Result<RoaringBitmap32> Filter(int32_t batch_size) const {
RoaringBitmap32 is_valid;
PAIMON_ASSIGN_OR_RAISE(int32_t start_pos, reader_->GetPreviousBatchFirstRowNumber());
int32_t length = batch_size;
for (auto iter = bitmap_.EqualOrLarger(start_pos);
iter != bitmap_.End() && *iter < start_pos + length; ++iter) {
is_valid.Add(*iter - start_pos);
RoaringBitmap32 result;
auto bitmap_iter = bitmap_.Begin();
auto bitmap_end = bitmap_.End();

for (int32_t i = 0; i < batch_size; ++i) {
PAIMON_ASSIGN_OR_RAISE(uint64_t file_row_id, reader_->GetPreviousBatchFileRowId(i));
while (bitmap_iter != bitmap_end && static_cast<uint64_t>(*bitmap_iter) < file_row_id) {
++bitmap_iter;
}
if (bitmap_iter == bitmap_end) {
break;
}
if (static_cast<uint64_t>(*bitmap_iter) == file_row_id) {
result.Add(i);
}
Comment thread
zhf999 marked this conversation as resolved.
}
return is_valid;
return result;
}

private:
Expand Down
4 changes: 2 additions & 2 deletions src/paimon/common/reader/delegating_prefetch_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ class DelegatingPrefetchReader : public FileBatchReader {
return prefetch_reader_->SetReadSchema(read_schema, predicate, selection_bitmap);
}

Result<uint64_t> GetPreviousBatchFirstRowNumber() const override {
return GetReader()->GetPreviousBatchFirstRowNumber();
Result<uint64_t> GetPreviousBatchFileRowId(uint64_t batch_row_id) const override {
return GetReader()->GetPreviousBatchFileRowId(batch_row_id);
}

Result<uint64_t> GetNumberOfRows() const override {
Expand Down
75 changes: 56 additions & 19 deletions src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@ class Schema;

namespace paimon {

namespace {

std::pair<int64_t, int64_t> ComputeBatchSliceByReadRange(
const std::vector<uint64_t>& global_row_ids, const std::pair<uint64_t, uint64_t>& read_range) {
auto begin_it =
std::lower_bound(global_row_ids.begin(), global_row_ids.end(), read_range.first);
auto end_it = std::lower_bound(global_row_ids.begin(), global_row_ids.end(), read_range.second);
return {static_cast<int64_t>(std::distance(global_row_ids.begin(), begin_it)),
static_cast<int64_t>(std::distance(global_row_ids.begin(), end_it))};
}

} // namespace

Result<std::unique_ptr<PrefetchFileBatchReaderImpl>> PrefetchFileBatchReaderImpl::Create(
const std::string& data_file_path, const ReaderBuilder* reader_builder,
const std::shared_ptr<FileSystem>& fs, uint32_t prefetch_max_parallel_num, int32_t batch_size,
Expand Down Expand Up @@ -265,6 +278,7 @@ Status PrefetchFileBatchReaderImpl::CleanUp() {

read_ranges_.clear();
read_ranges_in_group_.clear();
current_batch_global_row_ids_.clear();
clean_prefetch_queue();
for (size_t i = 0; i < readers_pos_.size(); i++) {
readers_pos_[i]->store(0);
Expand Down Expand Up @@ -409,42 +423,54 @@ Status PrefetchFileBatchReaderImpl::EnsureReaderPosition(
Status PrefetchFileBatchReaderImpl::HandleReadResult(
size_t reader_idx, const std::pair<uint64_t, uint64_t>& read_range,
ReadBatchWithBitmap&& read_batch_with_bitmap) {
PAIMON_ASSIGN_OR_RAISE(uint64_t first_row_number,
readers_[reader_idx]->GetPreviousBatchFirstRowNumber());
auto& prefetch_queue = prefetch_queues_[reader_idx];
if (!BatchReader::IsEofBatch(read_batch_with_bitmap)) {
auto& [read_batch, bitmap] = read_batch_with_bitmap;
auto& [c_array, c_schema] = read_batch;

if (first_row_number >= read_range.second) {
// fully out of range, data before first_row_number has been filtered out
readers_pos_[reader_idx]->store(first_row_number);
std::vector<uint64_t> global_row_ids;
global_row_ids.reserve(c_array->length);
for (int64_t i = 0; i < c_array->length; ++i) {
PAIMON_ASSIGN_OR_RAISE(uint64_t global_row_id,
readers_[reader_idx]->GetPreviousBatchFileRowId(i));
global_row_ids.push_back(global_row_id);
}
if (global_row_ids.empty()) {
ReaderUtils::ReleaseReadBatch(std::move(read_batch));
return Status::OK();
}
auto [slice_begin, slice_end] = ComputeBatchSliceByReadRange(global_row_ids, read_range);
if (slice_begin >= slice_end) {
readers_pos_[reader_idx]->store(read_range.second);
ReaderUtils::ReleaseReadBatch(std::move(read_batch));
return Status::OK();
} else if (first_row_number + c_array->length > read_range.second) {
// partially out of range, data before read_range.second has been effectively consumed
} else if (slice_begin > 0 || slice_end < c_array->length) {
readers_pos_[reader_idx]->store(read_range.second);
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> src_array,
arrow::ImportArray(c_array.get(), c_schema.get()));
int32_t target_length = read_range.second - first_row_number;
auto array = src_array->Slice(/*offset=*/0, target_length);
auto array = src_array->Slice(slice_begin, slice_end - slice_begin);
PAIMON_RETURN_NOT_OK_FROM_ARROW(
arrow::ExportArray(*array, c_array.get(), c_schema.get()));
bitmap.RemoveRange(target_length, src_array->length());
RoaringBitmap32 sliced_bitmap;
for (auto iter = bitmap.EqualOrLarger(slice_begin);
iter != bitmap.End() && *iter < slice_end; ++iter) {
sliced_bitmap.Add(*iter - slice_begin);
}
bitmap = std::move(sliced_bitmap);
global_row_ids = std::vector<uint64_t>(global_row_ids.begin() + slice_begin,
global_row_ids.begin() + slice_end);
} else {
// all within the range, data before readers_[reader_idx]->GetNextRowToRead() has been
// effectively consumed
readers_pos_[reader_idx]->store(readers_[reader_idx]->GetNextRowToRead());
}
if (bitmap.IsEmpty()) {
ReaderUtils::ReleaseReadBatch(std::move(read_batch));
return Status::OK();
}
prefetch_queue->push({read_range, std::move(read_batch_with_bitmap), first_row_number});
prefetch_queue->push(
{read_range, std::move(read_batch_with_bitmap), std::move(global_row_ids)});
} else {
std::pair<uint64_t, uint64_t> eof_range;
PAIMON_ASSIGN_OR_RAISE(eof_range, EofRange());
prefetch_queue->push({eof_range, std::move(read_batch_with_bitmap), first_row_number});
prefetch_queue->push({eof_range, std::move(read_batch_with_bitmap), {}});
readers_pos_[reader_idx]->store(std::numeric_limits<uint64_t>::max());
}
return Status::OK();
Expand Down Expand Up @@ -527,7 +553,7 @@ Result<BatchReader::ReadBatchWithBitmap> PrefetchFileBatchReaderImpl::NextBatchW
std::unique_lock<std::mutex> lock(working_mutex_);
cv_.notify_one();
}
previous_batch_first_row_num_ = prefetch_batch.value().previous_batch_first_row_num;
current_batch_global_row_ids_ = std::move(prefetch_batch.value().global_row_ids);
return std::move(prefetch_batch).value().batch;
}
}
Expand All @@ -537,7 +563,7 @@ Result<BatchReader::ReadBatchWithBitmap> PrefetchFileBatchReaderImpl::NextBatchW
assert(false);
return Status::Invalid("peek batch not suppose to be nullptr");
}
previous_batch_first_row_num_ = peek_batch->previous_batch_first_row_num;
current_batch_global_row_ids_.clear();
return BatchReader::MakeEofBatchWithBitmap();
}
if (value_count == prefetch_queues_.size()) {
Expand Down Expand Up @@ -571,8 +597,19 @@ Result<std::unique_ptr<::ArrowSchema>> PrefetchFileBatchReaderImpl::GetFileSchem
return readers_[0]->GetFileSchema();
}

Result<uint64_t> PrefetchFileBatchReaderImpl::GetPreviousBatchFirstRowNumber() const {
return previous_batch_first_row_num_;
Result<uint64_t> PrefetchFileBatchReaderImpl::GetPreviousBatchFileRowId(
uint64_t batch_row_id) const {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can’t we just return previous_batch_first_row_num_ + batch_row_id directly?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PrefetchFileBatchReaderImpl may hold ParquetFileBatchReader, which may return contenation of two discontinuous batch. Should we fallback PrefetchFileBatchReaderImpl::GetPreviousBatchGlobalRowId to simply return previous_batch_first_row_num_ + batch_row_id like LanceFileBatchReader or BlobFileBatchReader?

if (current_batch_global_row_ids_.size() == 0) {
return Status::Invalid(
"Last batch is not read or last batch is empty, cannot get previous batch global row "
"id");
}
if (batch_row_id >= current_batch_global_row_ids_.size()) {
return Status::Invalid(
fmt::format("batch_row_id {} is out of range, last batch row count is {}", batch_row_id,
current_batch_global_row_ids_.size()));
}
return current_batch_global_row_ids_[batch_row_id];
}

Result<uint64_t> PrefetchFileBatchReaderImpl::GetNumberOfRows() const {
Expand Down
6 changes: 3 additions & 3 deletions src/paimon/common/reader/prefetch_file_batch_reader_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
const std::optional<RoaringBitmap32>& selection_bitmap) override;

Status SeekToRow(uint64_t row_number) override;
Result<uint64_t> GetPreviousBatchFirstRowNumber() const override;
Result<uint64_t> GetPreviousBatchFileRowId(uint64_t batch_row_id) const override;
Result<uint64_t> GetNumberOfRows() const override;
uint64_t GetNextRowToRead() const override;
void Close() override;
Expand Down Expand Up @@ -105,7 +105,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
struct PrefetchBatch {
std::pair<uint64_t, uint64_t> read_range;
BatchReader::ReadBatchWithBitmap batch;
uint64_t previous_batch_first_row_num;
std::vector<uint64_t> global_row_ids;
};

PrefetchFileBatchReaderImpl(
Expand Down Expand Up @@ -160,7 +160,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
std::unique_ptr<std::thread> background_thread_;
Status read_status_;
std::atomic<bool> is_shutdown_ = false;
uint64_t previous_batch_first_row_num_ = std::numeric_limits<uint64_t>::max();
std::vector<uint64_t> current_batch_global_row_ids_;
bool need_prefetch_ = false;
bool read_ranges_freshed_ = false;
const uint32_t prefetch_queue_capacity_;
Expand Down
Loading
Loading