fix: handle non-contiguous RowRanges when resolving global row IDs#383
fix: handle non-contiguous RowRanges when resolving global row IDs#383zhf999 wants to merge 23 commits into
Conversation
…ousBatchFirstRowId to GetGlobalRowId
| Result<uint64_t> PrefetchFileBatchReaderImpl::GetPreviousBatchFirstRowNumber() const { | ||
| return previous_batch_first_row_num_; | ||
| Result<uint64_t> PrefetchFileBatchReaderImpl::GetPreviousBatchGlobalRowId( | ||
| uint64_t batch_row_id) const { |
There was a problem hiding this comment.
Why can’t we just return previous_batch_first_row_num_ + batch_row_id directly?
There was a problem hiding this comment.
The PrefetchFileBatchReaderImpl may hold ParquetFileBatchReader, which may return contenation of two discontinuous batch. Should we fallback PrefetchFileBatchReaderImpl::GetPreviousBatchGlobalRowId to simply return previous_batch_first_row_num_ + batch_row_id like LanceFileBatchReader or BlobFileBatchReader?
|
|
||
| static Result<std::shared_ptr<arrow::ChunkedArray>> CollectResultOneBatch( | ||
| BatchReader* batch_reader, int64_t max_data_processing_time_in_us) { | ||
| int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); |
There was a problem hiding this comment.
Can CollectResultOneBatch just return an Arrow array directly? It doesn’t look like we need a ChunkedArray here.
There was a problem hiding this comment.
CollectResultOneBatch is designed to align with CollectResult. Should we implement CollectResultOneBatch with a different return value?
| PrepareOrcFileBatchReader(file_name, &read_schema, batch_size, natural_read_size); | ||
| ASSERT_EQ(std::numeric_limits<uint64_t>::max(), | ||
| orc_batch_reader->GetPreviousBatchFirstRowNumber().value()); | ||
| ASSERT_EQ(orc_batch_reader->GetPreviousBatchFileRowId(0).value(), -1); |
There was a problem hiding this comment.
why -1 here? Status::Invalid?
| return CollectResultOneBatch(batch_reader, /*max_simulated_data_processing_time*/ 0); | ||
| } | ||
|
|
||
| static Result<std::shared_ptr<arrow::ChunkedArray>> CollectResultOneBatch( |
There was a problem hiding this comment.
It's very similar with CollectResult, can you refactor to extract the common parts?
| Result<uint64_t> GetPreviousBatchFirstRowNumber() const override { | ||
| assert(reader_); | ||
| return reader_->GetPreviousBatchFirstRowNumber(); | ||
| Result<uint64_t> GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { |
There was a problem hiding this comment.
change the interface in FileReaderWrapper together
| target_row_groups.emplace_back( | ||
| /*rg_index=*/rg_id, | ||
| /*is_partially_matched=*/false, | ||
| /*ranges=*/ | ||
| RowRanges(Range(0, reader_->GetAllRowGroupRanges()[rg_id].second - | ||
| reader_->GetAllRowGroupRanges()[rg_id].first - 1))); |
There was a problem hiding this comment.
format it
| target_row_groups.emplace_back( | |
| /*rg_index=*/rg_id, | |
| /*is_partially_matched=*/false, | |
| /*ranges=*/ | |
| RowRanges(Range(0, reader_->GetAllRowGroupRanges()[rg_id].second - | |
| reader_->GetAllRowGroupRanges()[rg_id].first - 1))); | |
| target_row_groups.emplace_back( | |
| /*rg_index=*/rg_id, /*is_partially_matched=*/false, /*ranges=*/ | |
| RowRanges(Range(0, reader_->GetAllRowGroupRanges()[rg_id].second - | |
| reader_->GetAllRowGroupRanges()[rg_id].first - 1))); |
| ReadResultCollector::CollectResult( | ||
| reader.get(), /*max simulated data processing time*/ 100)); | ||
| ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 101); | ||
| ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); |
There was a problem hiding this comment.
Now that the interface has been modified, these calls will always trigger ASSERT_NOK, so there's no point in testing them anymore, right? There seem to be similar issues in other test files as well.
| /// Get the row number of the first row in the previously read batch. | ||
| virtual Result<uint64_t> GetPreviousBatchFirstRowNumber() const = 0; | ||
| /// Get the global row number of the row in the previously read batch. | ||
| virtual Result<uint64_t> GetPreviousBatchFileRowId(uint64_t batch_row_id) const = 0; |
There was a problem hiding this comment.
Are there any explicit semantic constraints on this interface before reading begins and after EOF is reached? As per previous discussions, it returns Status::Invalid before reading starts. However, after reaching EOF, the behavior currently varies wildly—some continue to accumulate, while others return errors. Should we impose some constraints on this? cc @lxy-9602
Purpose
previous_batch_start + offset.GetPreviousBatchFileRowId(batch_row_id)to resolve the file row ID for a row index inside the current batch.PrefetchFileBatchReaderImpl, cache the actual file row IDs for each returned batch and keep row-id mapping aligned when a batch is sliced byread_range.batch_row_id -> file_row_idmapping inNextBatch().GetPreviousBatchFileRowId()correct under non-contiguous rows caused by predicate + bitmap filtering._ROW_IDfield conversion, KeyValue iteration positions).Tests
src/paimon/format/avro/avro_file_batch_reader_test.cppsrc/paimon/format/blob/blob_file_batch_reader_test.cppsrc/paimon/format/lance/lance_format_reader_writer_test.cppsrc/paimon/format/orc/orc_file_batch_reader_test.cppsrc/paimon/format/parquet/parquet_file_batch_reader_test.cppsrc/paimon/common/reader/prefetch_file_batch_reader_impl_test.cppTestRowMappingto validate file row mapping across non-contiguous ranges.API and Format
FileBatchReaderand implementations now useGetPreviousBatchFileRowId(uint64_t batch_row_id).batch_row_idinside the current batch (instead of deriving by batch start + offset under contiguous assumptions).Documentation
No.
Generative AI tooling
gpt-5.3-codex