diff --git a/include/paimon/reader/file_batch_reader.h b/include/paimon/reader/file_batch_reader.h index 272de3c82..ddb4e999d 100644 --- a/include/paimon/reader/file_batch_reader.h +++ b/include/paimon/reader/file_batch_reader.h @@ -46,8 +46,8 @@ class PAIMON_EXPORT FileBatchReader : public BatchReader { using BatchReader::NextBatch; using BatchReader::NextBatchWithBitmap; - /// Get the row number of the first row in the previously read batch. - virtual Result GetPreviousBatchFirstRowNumber() const = 0; + /// Get the global row number of the row in the previously read batch. + virtual Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const = 0; /// Get the number of rows in the file. virtual Result GetNumberOfRows() const = 0; diff --git a/src/paimon/common/data/shredding/map_shared_shredding_file_reader.cpp b/src/paimon/common/data/shredding/map_shared_shredding_file_reader.cpp index 7d446f6f1..71deb1114 100644 --- a/src/paimon/common/data/shredding/map_shared_shredding_file_reader.cpp +++ b/src/paimon/common/data/shredding/map_shared_shredding_file_reader.cpp @@ -391,8 +391,9 @@ void MapSharedShreddingFileReader::Close() { reader_->Close(); } -Result MapSharedShreddingFileReader::GetPreviousBatchFirstRowNumber() const { - return reader_->GetPreviousBatchFirstRowNumber(); +Result MapSharedShreddingFileReader::GetPreviousBatchFileRowId( + uint64_t batch_row_id) const { + return reader_->GetPreviousBatchFileRowId(batch_row_id); } Result MapSharedShreddingFileReader::GetNumberOfRows() const { diff --git a/src/paimon/common/data/shredding/map_shared_shredding_file_reader.h b/src/paimon/common/data/shredding/map_shared_shredding_file_reader.h index d09393b3c..46f053517 100644 --- a/src/paimon/common/data/shredding/map_shared_shredding_file_reader.h +++ b/src/paimon/common/data/shredding/map_shared_shredding_file_reader.h @@ -60,7 +60,7 @@ class MapSharedShreddingFileReader : public FileBatchReader { void Close() override; - Result GetPreviousBatchFirstRowNumber() const override; + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override; Result GetNumberOfRows() const override; diff --git a/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader.h b/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader.h index 7e2c9338a..8f770478f 100644 --- a/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader.h +++ b/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader.h @@ -80,8 +80,8 @@ class ApplyBitmapIndexBatchReader : public FileBatchReader { return Status::Invalid("ApplyBitmapIndexBatchReader does not support SetReadSchema"); } - Result GetPreviousBatchFirstRowNumber() const override { - return reader_->GetPreviousBatchFirstRowNumber(); + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { + return reader_->GetPreviousBatchFileRowId(batch_row_id); } Result GetNumberOfRows() const override { @@ -94,14 +94,23 @@ class ApplyBitmapIndexBatchReader : public FileBatchReader { private: Result Filter(int32_t batch_size) const { - RoaringBitmap32 is_valid; - PAIMON_ASSIGN_OR_RAISE(int32_t start_pos, reader_->GetPreviousBatchFirstRowNumber()); - int32_t length = batch_size; - for (auto iter = bitmap_.EqualOrLarger(start_pos); - iter != bitmap_.End() && *iter < start_pos + length; ++iter) { - is_valid.Add(*iter - start_pos); + RoaringBitmap32 result; + auto bitmap_iter = bitmap_.Begin(); + auto bitmap_end = bitmap_.End(); + + for (int32_t i = 0; i < batch_size; ++i) { + PAIMON_ASSIGN_OR_RAISE(uint64_t file_row_id, reader_->GetPreviousBatchFileRowId(i)); + while (bitmap_iter != bitmap_end && static_cast(*bitmap_iter) < file_row_id) { + ++bitmap_iter; + } + if (bitmap_iter == bitmap_end) { + break; + } + if (static_cast(*bitmap_iter) == file_row_id) { + result.Add(i); + } } - return is_valid; + return result; } private: diff --git a/src/paimon/common/reader/delegating_prefetch_reader.h b/src/paimon/common/reader/delegating_prefetch_reader.h index fe2e3eda2..0a22f8263 100644 --- a/src/paimon/common/reader/delegating_prefetch_reader.h +++ b/src/paimon/common/reader/delegating_prefetch_reader.h @@ -54,8 +54,8 @@ class DelegatingPrefetchReader : public FileBatchReader { return prefetch_reader_->SetReadSchema(read_schema, predicate, selection_bitmap); } - Result GetPreviousBatchFirstRowNumber() const override { - return GetReader()->GetPreviousBatchFirstRowNumber(); + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { + return GetReader()->GetPreviousBatchFileRowId(batch_row_id); } Result GetNumberOfRows() const override { diff --git a/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp b/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp index da74f3484..5940a4d48 100644 --- a/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp +++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp @@ -40,6 +40,19 @@ class Schema; namespace paimon { +namespace { + +std::pair ComputeBatchSliceByReadRange( + const std::vector& global_row_ids, const std::pair& read_range) { + auto begin_it = + std::lower_bound(global_row_ids.begin(), global_row_ids.end(), read_range.first); + auto end_it = std::lower_bound(global_row_ids.begin(), global_row_ids.end(), read_range.second); + return {static_cast(std::distance(global_row_ids.begin(), begin_it)), + static_cast(std::distance(global_row_ids.begin(), end_it))}; +} + +} // namespace + Result> PrefetchFileBatchReaderImpl::Create( const std::string& data_file_path, const ReaderBuilder* reader_builder, const std::shared_ptr& fs, uint32_t prefetch_max_parallel_num, int32_t batch_size, @@ -265,6 +278,7 @@ Status PrefetchFileBatchReaderImpl::CleanUp() { read_ranges_.clear(); read_ranges_in_group_.clear(); + current_batch_global_row_ids_.clear(); clean_prefetch_queue(); for (size_t i = 0; i < readers_pos_.size(); i++) { readers_pos_[i]->store(0); @@ -409,42 +423,54 @@ Status PrefetchFileBatchReaderImpl::EnsureReaderPosition( Status PrefetchFileBatchReaderImpl::HandleReadResult( size_t reader_idx, const std::pair& read_range, ReadBatchWithBitmap&& read_batch_with_bitmap) { - PAIMON_ASSIGN_OR_RAISE(uint64_t first_row_number, - readers_[reader_idx]->GetPreviousBatchFirstRowNumber()); auto& prefetch_queue = prefetch_queues_[reader_idx]; if (!BatchReader::IsEofBatch(read_batch_with_bitmap)) { auto& [read_batch, bitmap] = read_batch_with_bitmap; auto& [c_array, c_schema] = read_batch; - - if (first_row_number >= read_range.second) { - // fully out of range, data before first_row_number has been filtered out - readers_pos_[reader_idx]->store(first_row_number); + std::vector global_row_ids; + global_row_ids.reserve(c_array->length); + for (int64_t i = 0; i < c_array->length; ++i) { + PAIMON_ASSIGN_OR_RAISE(uint64_t global_row_id, + readers_[reader_idx]->GetPreviousBatchFileRowId(i)); + global_row_ids.push_back(global_row_id); + } + if (global_row_ids.empty()) { + ReaderUtils::ReleaseReadBatch(std::move(read_batch)); + return Status::OK(); + } + auto [slice_begin, slice_end] = ComputeBatchSliceByReadRange(global_row_ids, read_range); + if (slice_begin >= slice_end) { + readers_pos_[reader_idx]->store(read_range.second); ReaderUtils::ReleaseReadBatch(std::move(read_batch)); return Status::OK(); - } else if (first_row_number + c_array->length > read_range.second) { - // partially out of range, data before read_range.second has been effectively consumed + } else if (slice_begin > 0 || slice_end < c_array->length) { readers_pos_[reader_idx]->store(read_range.second); PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr src_array, arrow::ImportArray(c_array.get(), c_schema.get())); - int32_t target_length = read_range.second - first_row_number; - auto array = src_array->Slice(/*offset=*/0, target_length); + auto array = src_array->Slice(slice_begin, slice_end - slice_begin); PAIMON_RETURN_NOT_OK_FROM_ARROW( arrow::ExportArray(*array, c_array.get(), c_schema.get())); - bitmap.RemoveRange(target_length, src_array->length()); + RoaringBitmap32 sliced_bitmap; + for (auto iter = bitmap.EqualOrLarger(slice_begin); + iter != bitmap.End() && *iter < slice_end; ++iter) { + sliced_bitmap.Add(*iter - slice_begin); + } + bitmap = std::move(sliced_bitmap); + global_row_ids = std::vector(global_row_ids.begin() + slice_begin, + global_row_ids.begin() + slice_end); } else { - // all within the range, data before readers_[reader_idx]->GetNextRowToRead() has been - // effectively consumed readers_pos_[reader_idx]->store(readers_[reader_idx]->GetNextRowToRead()); } if (bitmap.IsEmpty()) { ReaderUtils::ReleaseReadBatch(std::move(read_batch)); return Status::OK(); } - prefetch_queue->push({read_range, std::move(read_batch_with_bitmap), first_row_number}); + prefetch_queue->push( + {read_range, std::move(read_batch_with_bitmap), std::move(global_row_ids)}); } else { std::pair eof_range; PAIMON_ASSIGN_OR_RAISE(eof_range, EofRange()); - prefetch_queue->push({eof_range, std::move(read_batch_with_bitmap), first_row_number}); + prefetch_queue->push({eof_range, std::move(read_batch_with_bitmap), {}}); readers_pos_[reader_idx]->store(std::numeric_limits::max()); } return Status::OK(); @@ -527,7 +553,7 @@ Result PrefetchFileBatchReaderImpl::NextBatchW std::unique_lock lock(working_mutex_); cv_.notify_one(); } - previous_batch_first_row_num_ = prefetch_batch.value().previous_batch_first_row_num; + current_batch_global_row_ids_ = std::move(prefetch_batch.value().global_row_ids); return std::move(prefetch_batch).value().batch; } } @@ -537,7 +563,7 @@ Result PrefetchFileBatchReaderImpl::NextBatchW assert(false); return Status::Invalid("peek batch not suppose to be nullptr"); } - previous_batch_first_row_num_ = peek_batch->previous_batch_first_row_num; + current_batch_global_row_ids_.clear(); return BatchReader::MakeEofBatchWithBitmap(); } if (value_count == prefetch_queues_.size()) { @@ -571,8 +597,19 @@ Result> PrefetchFileBatchReaderImpl::GetFileSchem return readers_[0]->GetFileSchema(); } -Result PrefetchFileBatchReaderImpl::GetPreviousBatchFirstRowNumber() const { - return previous_batch_first_row_num_; +Result PrefetchFileBatchReaderImpl::GetPreviousBatchFileRowId( + uint64_t batch_row_id) const { + if (current_batch_global_row_ids_.size() == 0) { + return Status::Invalid( + "Last batch is not read or last batch is empty, cannot get previous batch global row " + "id"); + } + if (batch_row_id >= current_batch_global_row_ids_.size()) { + return Status::Invalid( + fmt::format("batch_row_id {} is out of range, last batch row count is {}", batch_row_id, + current_batch_global_row_ids_.size())); + } + return current_batch_global_row_ids_[batch_row_id]; } Result PrefetchFileBatchReaderImpl::GetNumberOfRows() const { diff --git a/src/paimon/common/reader/prefetch_file_batch_reader_impl.h b/src/paimon/common/reader/prefetch_file_batch_reader_impl.h index 5ed9fb352..19f936c8f 100644 --- a/src/paimon/common/reader/prefetch_file_batch_reader_impl.h +++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl.h @@ -76,7 +76,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader { const std::optional& selection_bitmap) override; Status SeekToRow(uint64_t row_number) override; - Result GetPreviousBatchFirstRowNumber() const override; + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override; Result GetNumberOfRows() const override; uint64_t GetNextRowToRead() const override; void Close() override; @@ -105,7 +105,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader { struct PrefetchBatch { std::pair read_range; BatchReader::ReadBatchWithBitmap batch; - uint64_t previous_batch_first_row_num; + std::vector global_row_ids; }; PrefetchFileBatchReaderImpl( @@ -160,7 +160,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader { std::unique_ptr background_thread_; Status read_status_; std::atomic is_shutdown_ = false; - uint64_t previous_batch_first_row_num_ = std::numeric_limits::max(); + std::vector current_batch_global_row_ids_; bool need_prefetch_ = false; bool read_ranges_freshed_ = false; const uint32_t prefetch_queue_capacity_; diff --git a/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp b/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp index 24fa6861b..ceeeb9c57 100644 --- a/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp +++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp @@ -284,12 +284,10 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestSimple) { /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS, CacheConfig(), GetDefaultPool())); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto result_array, ReadResultCollector::CollectResult( reader.get(), /*max simulated data processing time*/ 100)); - ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 101); auto expected_array = std::make_shared(data_array); ASSERT_TRUE(result_array->Equals(expected_array)); } @@ -607,12 +605,11 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithLargeBatchSize) { prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS, CacheConfig(), GetDefaultPool())); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto result_array, ReadResultCollector::CollectResult( reader.get(), /*max simulated data processing time*/ 100)); - ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 101); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); auto expected_array = std::make_shared(data_array); ASSERT_TRUE(result_array->Equals(expected_array)); } @@ -636,12 +633,11 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestPartialReaderSuccessRead) { } arrow::ArrayVector result_array_vector; - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto batch_with_bitmap, reader->NextBatchWithBitmap()); auto& [batch, bitmap] = batch_with_bitmap; ASSERT_EQ(batch.first->length, bitmap.Cardinality()); - ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 0); + ASSERT_EQ(reader->GetPreviousBatchFileRowId(0).value(), 0); ASSERT_OK_AND_ASSIGN(auto array, ReadResultCollector::GetArray(std::move(batch))); result_array_vector.push_back(array); ASSERT_OK(prefetch_reader->GetReadStatus()); @@ -682,11 +678,9 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestAllReaderFailedWithIOError) { ->SetNextBatchStatus(Status::IOError("mock error")); } - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); auto batch_result = reader->NextBatchWithBitmap(); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_FALSE(batch_result.ok()); ASSERT_TRUE(batch_result.status().IsIOError()); ASSERT_FALSE(prefetch_reader->is_shutdown_); @@ -695,8 +689,7 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestAllReaderFailedWithIOError) { // call NextBatch again, will still return error status auto batch_result2 = reader->NextBatchWithBitmap(); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_FALSE(batch_result2.ok()); ASSERT_TRUE(batch_result2.status().IsIOError()); } @@ -713,12 +706,11 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestPrefetchWithEmptyData) { prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS, CacheConfig(), GetDefaultPool())); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto result_array, ReadResultCollector::CollectResult( reader.get(), /*max simulated data processing time*/ 100)); - ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 0); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_FALSE(result_array); } @@ -734,17 +726,17 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestCallNextBatchAfterReadingEof) { prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS, CacheConfig(), GetDefaultPool())); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto result_array, ReadResultCollector::CollectResult( reader.get(), /*max simulated data processing time*/ 100)); - ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 10); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); auto expected_array = std::make_shared(data_array); ASSERT_TRUE(result_array->Equals(expected_array)); // continue to call NextBatch() after reading eof ASSERT_OK_AND_ASSIGN(auto batch_with_bitmap, reader->NextBatchWithBitmap()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_TRUE(BatchReader::IsEofBatch(batch_with_bitmap)); } @@ -841,12 +833,11 @@ TEST_P(PrefetchFileBatchReaderImplTest, TestPrefetchWithPredicatePushdownWithCom PreparePrefetchReader(file_format, schema.get(), predicate, /*selection_bitmap=*/std::nullopt, /*batch_size=*/10, /*prefetch_max_parallel_num=*/3, cache_mode); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto result_array, ReadResultCollector::CollectResult( reader.get(), /*max simulated data processing time*/ 100)); - ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 90); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); arrow::ArrayVector expected_array_vector; expected_array_vector.push_back(data_array->Slice(0, 30)); @@ -878,12 +869,11 @@ TEST_P(PrefetchFileBatchReaderImplTest, /*selection_bitmap=*/std::nullopt, /*batch_size=*/10, /*prefetch_max_parallel_num=*/3, cache_mode); ASSERT_OK(reader->RefreshReadRanges()); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto result_array, ReadResultCollector::CollectResult( reader.get(), /*max simulated data processing time*/ 100)); - ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 90); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); arrow::ArrayVector expected_array_vector; expected_array_vector.push_back(data_array->Slice(0, 20)); @@ -923,4 +913,55 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestPrefetchWithBitmap) { ASSERT_TRUE(result_chunk_array->Equals(expected_chunk_array)); } +TEST_P(PrefetchFileBatchReaderImplTest, TestRowMapping) { + auto [file_format, cache_mode] = GetParam(); + auto data_array = PrepareArray(90); + PrepareTestData(file_format, data_array, /*stripe_row_count=*/30, /*row_index_stride=*/10); + auto schema = arrow::schema(fields_); + ASSERT_OK_AND_ASSIGN( + auto predicate, + PredicateBuilder::Or({ + PredicateBuilder::Between(/*field_index=*/1, /*field_name=*/"f1", FieldType::BIGINT, + Literal(20l), Literal(29l)), + PredicateBuilder::Between(/*field_index=*/1, /*field_name=*/"f1", FieldType::BIGINT, + Literal(70l), Literal(79l)), + })); + + auto reader = + PreparePrefetchReader(file_format, schema.get(), predicate, + /*selection_bitmap=*/std::nullopt, + /*batch_size=*/10, /*prefetch_max_parallel_num=*/3, cache_mode); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr batch, + paimon::test::ReadResultCollector::CollectResultOneBatch(reader.get())); + for (uint64_t i = 0; i < 10; i++) { + ASSERT_EQ(reader->GetPreviousBatchFileRowId(i).value(), 20 + i); + } + + ASSERT_OK_AND_ASSIGN(batch, + paimon::test::ReadResultCollector::CollectResultOneBatch(reader.get())); + for (uint64_t i = 0; i < 10; i++) { + ASSERT_EQ(reader->GetPreviousBatchFileRowId(i).value(), 70 + i); + } + + // Set read schema again + std::unique_ptr c_schema = std::make_unique(); + ASSERT_TRUE(arrow::ExportSchema(*schema, c_schema.get()).ok()); + predicate = PredicateBuilder::Between(/*field_index=*/1, /*field_name=*/"f1", FieldType::BIGINT, + Literal(30l), Literal(49l)); + ASSERT_OK(reader->SetReadSchema(c_schema.get(), predicate, std::nullopt)); + + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); + ASSERT_OK_AND_ASSIGN(batch, + paimon::test::ReadResultCollector::CollectResultOneBatch(reader.get())); + for (uint64_t i = 0; i < 10; i++) { + ASSERT_EQ(reader->GetPreviousBatchFileRowId(i).value(), 30 + i); + } + ASSERT_OK_AND_ASSIGN(batch, + paimon::test::ReadResultCollector::CollectResultOneBatch(reader.get())); + for (uint64_t i = 0; i < 10; i++) { + ASSERT_EQ(reader->GetPreviousBatchFileRowId(i).value(), 40 + i); + } +} + } // namespace paimon::test diff --git a/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader.h b/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader.h index 5e2ecf279..48fec8b31 100644 --- a/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader.h +++ b/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader.h @@ -82,8 +82,8 @@ class ApplyDeletionVectorBatchReader : public FileBatchReader { return Status::Invalid("ApplyDeletionVectorBatchReader does not support SetReadSchema"); } - Result GetPreviousBatchFirstRowNumber() const override { - return reader_->GetPreviousBatchFirstRowNumber(); + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { + return reader_->GetPreviousBatchFileRowId(batch_row_id); } Result GetNumberOfRows() const override { @@ -96,9 +96,15 @@ class ApplyDeletionVectorBatchReader : public FileBatchReader { private: Result Filter(int32_t batch_size) const { - PAIMON_ASSIGN_OR_RAISE(uint64_t previous_batch_first_row_number, - reader_->GetPreviousBatchFirstRowNumber()); - return deletion_vector_->IsValid(previous_batch_first_row_number, batch_size); + RoaringBitmap32 is_valid; + for (int32_t i = 0; i < batch_size; ++i) { + PAIMON_ASSIGN_OR_RAISE(uint64_t file_row_id, reader_->GetPreviousBatchFileRowId(i)); + PAIMON_ASSIGN_OR_RAISE(bool is_deleted, deletion_vector_->IsDeleted(file_row_id)); + if (!is_deleted) { + is_valid.Add(i); + } + } + return is_valid; } private: diff --git a/src/paimon/core/io/complete_row_tracking_fields_reader.cpp b/src/paimon/core/io/complete_row_tracking_fields_reader.cpp index 2aef9b29f..29b610b16 100644 --- a/src/paimon/core/io/complete_row_tracking_fields_reader.cpp +++ b/src/paimon/core/io/complete_row_tracking_fields_reader.cpp @@ -86,15 +86,14 @@ CompleteRowTrackingFieldsBatchReader::NextBatchWithBitmap() { std::string row_id_field_name = SpecialFields::RowId().Name(); if (read_schema_->GetFieldIndex(row_id_field_name) != -1) { row_id_array = src_struct_array->GetFieldByName(row_id_field_name); - PAIMON_ASSIGN_OR_RAISE(uint64_t previous_batch_first_row_number, - reader_->GetPreviousBatchFirstRowNumber()); - auto row_id_convert_func = [previous_batch_first_row_number, - this](int32_t idx_in_array) -> Result { + auto row_id_convert_func = [this](int32_t idx_in_array) -> Result { if (first_row_id_ == std::nullopt) { return Status::Invalid( "unexpected: read _ROW_ID special field, but first row id is null in meta"); } - return first_row_id_.value() + previous_batch_first_row_number + idx_in_array; + PAIMON_ASSIGN_OR_RAISE(uint64_t file_row_id, + reader_->GetPreviousBatchFileRowId(idx_in_array)); + return first_row_id_.value() + file_row_id; }; PAIMON_RETURN_NOT_OK(ConvertRowTrackingField(src_struct_array->length(), /*init_value=*/0, row_id_convert_func, &row_id_array)); diff --git a/src/paimon/core/io/complete_row_tracking_fields_reader.h b/src/paimon/core/io/complete_row_tracking_fields_reader.h index cc2f9f7bf..2aa535d79 100644 --- a/src/paimon/core/io/complete_row_tracking_fields_reader.h +++ b/src/paimon/core/io/complete_row_tracking_fields_reader.h @@ -60,8 +60,8 @@ class CompleteRowTrackingFieldsBatchReader : public FileBatchReader { reader_->Close(); } - Result GetPreviousBatchFirstRowNumber() const override { - return reader_->GetPreviousBatchFirstRowNumber(); + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { + return reader_->GetPreviousBatchFileRowId(batch_row_id); } Result GetNumberOfRows() const override { diff --git a/src/paimon/core/io/field_mapping_reader.h b/src/paimon/core/io/field_mapping_reader.h index 39fd920fa..9efd4a07c 100644 --- a/src/paimon/core/io/field_mapping_reader.h +++ b/src/paimon/core/io/field_mapping_reader.h @@ -77,8 +77,8 @@ class FieldMappingReader : public FileBatchReader { return Status::Invalid("FieldMappingReader does not support SetReadSchema"); } - Result GetPreviousBatchFirstRowNumber() const override { - return reader_->GetPreviousBatchFirstRowNumber(); + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { + return reader_->GetPreviousBatchFileRowId(batch_row_id); } Result GetNumberOfRows() const override { diff --git a/src/paimon/core/io/key_value_data_file_record_reader.cpp b/src/paimon/core/io/key_value_data_file_record_reader.cpp index a4edd04e0..8d12ec746 100644 --- a/src/paimon/core/io/key_value_data_file_record_reader.cpp +++ b/src/paimon/core/io/key_value_data_file_record_reader.cpp @@ -81,15 +81,15 @@ Result KeyValueDataFileRecordReader::Iterator::Next() { Result> KeyValueDataFileRecordReader::Iterator::NextWithFilePos() { PAIMON_ASSIGN_OR_RAISE(KeyValue kv, Next()); - return std::make_pair(previous_batch_first_row_number_ + cursor_ - 1, std::move(kv)); + PAIMON_ASSIGN_OR_RAISE(uint64_t global_row_id, + reader_->reader_->GetPreviousBatchFileRowId(cursor_ - 1)); + return std::make_pair(static_cast(global_row_id), std::move(kv)); } Result> KeyValueDataFileRecordReader::NextBatch() { Reset(); PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap batch_with_bitmap, reader_->NextBatchWithBitmap()); - PAIMON_ASSIGN_OR_RAISE(int64_t previous_batch_first_row_number, - reader_->GetPreviousBatchFirstRowNumber()); if (BatchReader::IsEofBatch(batch_with_bitmap)) { // reader eof, just return return std::unique_ptr(); @@ -140,8 +140,7 @@ Result> KeyValueDataFileRecordRe key_ctx_ = std::make_shared(key_fields, pool_); value_ctx_ = std::make_shared(value_fields, pool_); ArrowUtils::TraverseArray(data_batch); - return std::make_unique( - this, previous_batch_first_row_number); + return std::make_unique(this); } void KeyValueDataFileRecordReader::Reset() { diff --git a/src/paimon/core/io/key_value_data_file_record_reader.h b/src/paimon/core/io/key_value_data_file_record_reader.h index c271a3bdb..3fe87a89e 100644 --- a/src/paimon/core/io/key_value_data_file_record_reader.h +++ b/src/paimon/core/io/key_value_data_file_record_reader.h @@ -54,16 +54,13 @@ class KeyValueDataFileRecordReader : public KeyValueRecordReader { class Iterator : public KeyValueRecordReader::Iterator { public: - Iterator(KeyValueDataFileRecordReader* reader, int64_t previous_batch_first_row_number) - : previous_batch_first_row_number_(previous_batch_first_row_number), - reader_(reader), - selection_cardinality_(reader->selection_bitmap_.Cardinality()) {} + explicit Iterator(KeyValueDataFileRecordReader* reader) + : reader_(reader), selection_cardinality_(reader->selection_bitmap_.Cardinality()) {} Result HasNext() const override; Result Next() override; Result> NextWithFilePos(); private: - int64_t previous_batch_first_row_number_; mutable int64_t cursor_ = 0; KeyValueDataFileRecordReader* reader_ = nullptr; int64_t selection_cardinality_ = 0; diff --git a/src/paimon/format/avro/avro_file_batch_reader.h b/src/paimon/format/avro/avro_file_batch_reader.h index 98d5deede..a90d82c5a 100644 --- a/src/paimon/format/avro/avro_file_batch_reader.h +++ b/src/paimon/format/avro/avro_file_batch_reader.h @@ -45,8 +45,8 @@ class AvroFileBatchReader : public FileBatchReader { Status SetReadSchema(::ArrowSchema* read_schema, const std::shared_ptr& predicate, const std::optional& selection_bitmap) override; - Result GetPreviousBatchFirstRowNumber() const override { - return previous_first_row_; + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { + return previous_first_row_ + batch_row_id; } Result GetNumberOfRows() const override; diff --git a/src/paimon/format/avro/avro_file_batch_reader_test.cpp b/src/paimon/format/avro/avro_file_batch_reader_test.cpp index f4f052a34..2e1e00c42 100644 --- a/src/paimon/format/avro/avro_file_batch_reader_test.cpp +++ b/src/paimon/format/avro/avro_file_batch_reader_test.cpp @@ -327,7 +327,7 @@ TEST_F(AvroFileBatchReaderTest, TestSetReadSchemaRejectNestedSubFieldProjection) "does not support nested sub-field projection"); } -TEST_F(AvroFileBatchReaderTest, TestGetPreviousBatchFirstRowNumber) { +TEST_F(AvroFileBatchReaderTest, TestGetPreviousBatchFileRowId) { std::string path = paimon::test::GetDataDir() + "/avro/append_simple.db/" "append_simple/bucket-0/" @@ -352,26 +352,25 @@ TEST_F(AvroFileBatchReaderTest, TestGetPreviousBatchFirstRowNumber) { ASSERT_OK_AND_ASSIGN(auto num_rows, reader->GetNumberOfRows()); ASSERT_EQ(4, num_rows); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(std::numeric_limits::max(), reader->GetPreviousBatchFileRowId(0).value()); ASSERT_OK_AND_ASSIGN(auto batch1, reader->NextBatch()); ArrowArrayRelease(batch1.first.get()); ArrowSchemaRelease(batch1.second.get()); - ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(0, reader->GetPreviousBatchFileRowId(0).value()); ASSERT_OK_AND_ASSIGN(auto batch2, reader->NextBatch()); - ASSERT_EQ(1, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(1, reader->GetPreviousBatchFileRowId(0).value()); ArrowArrayRelease(batch2.first.get()); ArrowSchemaRelease(batch2.second.get()); ASSERT_OK_AND_ASSIGN(auto batch3, reader->NextBatch()); - ASSERT_EQ(2, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(2, reader->GetPreviousBatchFileRowId(0).value()); ArrowArrayRelease(batch3.first.get()); ArrowSchemaRelease(batch3.second.get()); ASSERT_OK_AND_ASSIGN(auto batch4, reader->NextBatch()); - ASSERT_EQ(3, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(3, reader->GetPreviousBatchFileRowId(0).value()); ArrowArrayRelease(batch4.first.get()); ArrowSchemaRelease(batch4.second.get()); ASSERT_OK_AND_ASSIGN(auto batch5, reader->NextBatch()); - ASSERT_EQ(4, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(4, reader->GetPreviousBatchFileRowId(0).value()); ASSERT_TRUE(BatchReader::IsEofBatch(batch5)); } @@ -397,7 +396,7 @@ TEST_F(AvroFileBatchReaderTest, TestSetReadSchemaResetsReaderToFirstRow) { ASSERT_OK_AND_ASSIGN(auto reader, reader_builder->Build(in)); ASSERT_OK_AND_ASSIGN(auto first_batch, reader->NextBatch()); - ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(0, reader->GetPreviousBatchFileRowId(0).value()); auto first_array = arrow::ImportArray(first_batch.first.get(), first_batch.second.get()).ValueOrDie(); ASSERT_TRUE(first_array->Equals(src_array->Slice(0, 2))) << first_array->ToString(); @@ -407,11 +406,10 @@ TEST_F(AvroFileBatchReaderTest, TestSetReadSchemaResetsReaderToFirstRow) { ASSERT_TRUE(arrow::ExportSchema(*read_schema, c_schema.get()).ok()); ASSERT_OK(reader->SetReadSchema(c_schema.get(), /*predicate=*/nullptr, /*selection_bitmap=*/std::nullopt)); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(std::numeric_limits::max(), reader->GetPreviousBatchFileRowId(0).value()); ASSERT_OK_AND_ASSIGN(auto projected_batch, reader->NextBatch()); - ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(0, reader->GetPreviousBatchFileRowId(0).value()); auto projected_array = arrow::ImportArray(projected_batch.first.get(), projected_batch.second.get()).ValueOrDie(); auto expected_projected_array = arrow::ipc::internal::json::ArrayFromJSON( diff --git a/src/paimon/format/blob/blob_file_batch_reader.h b/src/paimon/format/blob/blob_file_batch_reader.h index 06287d759..998939c8a 100644 --- a/src/paimon/format/blob/blob_file_batch_reader.h +++ b/src/paimon/format/blob/blob_file_batch_reader.h @@ -97,14 +97,17 @@ class BlobFileBatchReader : public FileBatchReader { Result NextBatch() override; - Result GetPreviousBatchFirstRowNumber() const override { + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { if (all_blob_lengths_.size() != target_blob_lengths_.size()) { - return Status::Invalid( - "Cannot call GetPreviousBatchFirstRowNumber in BlobFileBatchReader because, after " + return Status::NotImplemented( + "Cannot call GetPreviousBatchFileRowId in BlobFileBatchReader because, after " "bitmap pushdown, rows in the array returned by NextBatch are no longer " "contiguous."); } - return previous_batch_first_row_number_; + if (previous_batch_first_row_number_ == std::numeric_limits::max()) { + return Status::Invalid("No batch has been read yet."); + } + return previous_batch_first_row_number_ + batch_row_id; } Result GetNumberOfRows() const override { diff --git a/src/paimon/format/blob/blob_file_batch_reader_test.cpp b/src/paimon/format/blob/blob_file_batch_reader_test.cpp index bde27d64d..c9d5dbd95 100644 --- a/src/paimon/format/blob/blob_file_batch_reader_test.cpp +++ b/src/paimon/format/blob/blob_file_batch_reader_test.cpp @@ -169,22 +169,21 @@ TEST_F(BlobFileBatchReaderTest, TestRowNumbers) { ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, std::nullopt)); ASSERT_OK_AND_ASSIGN(auto number_of_rows, reader->GetNumberOfRows()); ASSERT_EQ(3, number_of_rows); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto batch1, reader->NextBatch()); ArrowArrayRelease(batch1.first.get()); ArrowSchemaRelease(batch1.second.get()); - ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(0, reader->GetPreviousBatchFileRowId(0).value()); ASSERT_OK_AND_ASSIGN(auto batch2, reader->NextBatch()); - ASSERT_EQ(1, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(1, reader->GetPreviousBatchFileRowId(0).value()); ArrowArrayRelease(batch2.first.get()); ArrowSchemaRelease(batch2.second.get()); ASSERT_OK_AND_ASSIGN(auto batch3, reader->NextBatch()); - ASSERT_EQ(2, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(2, reader->GetPreviousBatchFileRowId(0).value()); ArrowArrayRelease(batch3.first.get()); ArrowSchemaRelease(batch3.second.get()); ASSERT_OK_AND_ASSIGN(auto batch4, reader->NextBatch()); - ASSERT_EQ(3, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(3, reader->GetPreviousBatchFileRowId(0).value()); ASSERT_TRUE(BatchReader::IsEofBatch(batch4)); } @@ -255,8 +254,7 @@ TEST_P(BlobFileBatchReaderTest, EmptyFile) { ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, std::nullopt)); ASSERT_OK_AND_ASSIGN(auto number_of_rows, reader->GetNumberOfRows()); ASSERT_EQ(0, number_of_rows); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto batch, reader->NextBatch()); ASSERT_TRUE(BatchReader::IsEofBatch(batch)); } diff --git a/src/paimon/format/lance/lance_file_batch_reader.h b/src/paimon/format/lance/lance_file_batch_reader.h index fb2628035..5cfbdb981 100644 --- a/src/paimon/format/lance/lance_file_batch_reader.h +++ b/src/paimon/format/lance/lance_file_batch_reader.h @@ -41,15 +41,18 @@ class LanceFileBatchReader : public FileBatchReader { Result NextBatch() override; - Result GetPreviousBatchFirstRowNumber() const override { + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { if (!read_row_ids_.empty() && read_row_ids_.size() != num_rows_) { // TODO(xinyu.lxy): support function - return Status::Invalid( - "Cannot call GetPreviousBatchFirstRowNumber in LanceFileBatchReader because, after " + return Status::NotImplemented( + "Cannot call GetPreviousBatchFileRowId in LanceFileBatchReader because, after " "bitmap pushdown, rows in the array returned by NextBatch are no longer " "contiguous."); } - return previous_batch_first_row_num_; + if (previous_batch_first_row_num_ == std::numeric_limits::max()) { + return Status::Invalid("No batch has been read yet"); + } + return previous_batch_first_row_num_ + batch_row_id; } Result GetNumberOfRows() const override { diff --git a/src/paimon/format/lance/lance_format_reader_writer_test.cpp b/src/paimon/format/lance/lance_format_reader_writer_test.cpp index b1ad6be73..71a073413 100644 --- a/src/paimon/format/lance/lance_format_reader_writer_test.cpp +++ b/src/paimon/format/lance/lance_format_reader_writer_test.cpp @@ -478,27 +478,26 @@ TEST_F(LanceFileReaderWriterTest, TestPreviousBatchFirstRowNumber) { ASSERT_OK_AND_ASSIGN( std::unique_ptr reader, LanceFileBatchReader::Create(file_path, /*batch_size=*/4, /*batch_readahead=*/2)); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); // first batch row 0-3 ASSERT_OK_AND_ASSIGN(auto read_batch, reader->NextBatch()); ASSERT_OK_AND_ASSIGN(auto read_array, paimon::test::ReadResultCollector::GetArray(std::move(read_batch))); ASSERT_TRUE(read_array->Equals(array->Slice(0, 4))); - ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(0, reader->GetPreviousBatchFileRowId(0).value()); // second batch 4-5 ASSERT_OK_AND_ASSIGN(read_batch, reader->NextBatch()); ASSERT_OK_AND_ASSIGN(read_array, paimon::test::ReadResultCollector::GetArray(std::move(read_batch))); ASSERT_TRUE(read_array->Equals(array->Slice(4, 2))); - ASSERT_EQ(4, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(4, reader->GetPreviousBatchFileRowId(0).value()); // eof ASSERT_OK_AND_ASSIGN(read_batch, reader->NextBatch()); ASSERT_TRUE(BatchReader::IsEofBatch(read_batch)); - ASSERT_EQ(6, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(6, reader->GetPreviousBatchFileRowId(0).value()); // test with bitmap pushdown ArrowSchema c_read_schema; @@ -506,8 +505,8 @@ TEST_F(LanceFileReaderWriterTest, TestPreviousBatchFirstRowNumber) { ASSERT_OK(reader->SetReadSchema(&c_read_schema, /*predicate=*/nullptr, /*selection_bitmap=*/RoaringBitmap32::From({0, 3}))); ASSERT_NOK_WITH_MSG( - reader->GetPreviousBatchFirstRowNumber(), - "Cannot call GetPreviousBatchFirstRowNumber in LanceFileBatchReader because, after bitmap " + reader->GetPreviousBatchFileRowId(0), + "Cannot call GetPreviousBatchFileRowId in LanceFileBatchReader because, after bitmap " "pushdown, rows in the array returned by NextBatch are no longer contiguous."); } } // namespace paimon::lance::test diff --git a/src/paimon/format/orc/orc_file_batch_reader.h b/src/paimon/format/orc/orc_file_batch_reader.h index c2460f3a7..0f60c3779 100644 --- a/src/paimon/format/orc/orc_file_batch_reader.h +++ b/src/paimon/format/orc/orc_file_batch_reader.h @@ -62,8 +62,8 @@ class OrcFileBatchReader : public PrefetchFileBatchReader { // OrcFileBatchReader. Therefore, we need to hold BatchReader when using output ArrowArray. Result NextBatch() override; - Result GetPreviousBatchFirstRowNumber() const override { - return reader_->GetRowNumber(); + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { + return reader_->GetRowNumber() + batch_row_id; } Result GetNumberOfRows() const override { diff --git a/src/paimon/format/orc/orc_file_batch_reader_test.cpp b/src/paimon/format/orc/orc_file_batch_reader_test.cpp index 13c34d355..87490e0a2 100644 --- a/src/paimon/format/orc/orc_file_batch_reader_test.cpp +++ b/src/paimon/format/orc/orc_file_batch_reader_test.cpp @@ -493,11 +493,10 @@ TEST_P(OrcFileBatchReaderTest, TestNextBatchSimple) { for (auto batch_size : {1, 2, 3, 5, 8, 10}) { auto orc_batch_reader = PrepareOrcFileBatchReader(file_name, &read_schema, batch_size, natural_read_size); - ASSERT_EQ(std::numeric_limits::max(), - orc_batch_reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(orc_batch_reader->GetPreviousBatchFileRowId(0).value(), -1); ASSERT_OK_AND_ASSIGN(auto result_array, paimon::test::ReadResultCollector::CollectResult( orc_batch_reader.get())); - ASSERT_EQ(orc_batch_reader->GetPreviousBatchFirstRowNumber().value(), 8); + ASSERT_EQ(orc_batch_reader->GetPreviousBatchFileRowId(0).value(), 8); orc_batch_reader->Close(); auto expected_array = std::make_shared(struct_array_); ASSERT_TRUE(result_array->Equals(expected_array)); @@ -768,19 +767,18 @@ TEST_F(OrcFileBatchReaderTest, TestReadNoField) { auto orc_batch_reader = PrepareOrcFileBatchReader(file_name, &read_schema, /*batch_size=*/3, /*natural_read_size=*/10); // read 3 rows - ASSERT_EQ(std::numeric_limits::max(), - orc_batch_reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(orc_batch_reader->GetPreviousBatchFileRowId(0).value(), -1); ASSERT_OK_AND_ASSIGN(auto batch1, orc_batch_reader->NextBatch()); - ASSERT_EQ(orc_batch_reader->GetPreviousBatchFirstRowNumber().value(), 0); + ASSERT_EQ(orc_batch_reader->GetPreviousBatchFileRowId(0).value(), 0); // read 3 rows ASSERT_OK_AND_ASSIGN(auto batch2, orc_batch_reader->NextBatch()); - ASSERT_EQ(orc_batch_reader->GetPreviousBatchFirstRowNumber().value(), 3); + ASSERT_EQ(orc_batch_reader->GetPreviousBatchFileRowId(0).value(), 3); // read 2 rows ASSERT_OK_AND_ASSIGN(auto batch3, orc_batch_reader->NextBatch()); - ASSERT_EQ(orc_batch_reader->GetPreviousBatchFirstRowNumber().value(), 6); + ASSERT_EQ(orc_batch_reader->GetPreviousBatchFileRowId(0).value(), 6); // read rows with eof ASSERT_OK_AND_ASSIGN(auto batch4, orc_batch_reader->NextBatch()); - ASSERT_EQ(orc_batch_reader->GetPreviousBatchFirstRowNumber().value(), 8); + ASSERT_EQ(orc_batch_reader->GetPreviousBatchFileRowId(0).value(), 8); ASSERT_TRUE(BatchReader::IsEofBatch(batch4)); orc_batch_reader->Close(); diff --git a/src/paimon/format/parquet/file_reader_wrapper.cpp b/src/paimon/format/parquet/file_reader_wrapper.cpp index e7d6bf606..090da9122 100644 --- a/src/paimon/format/parquet/file_reader_wrapper.cpp +++ b/src/paimon/format/parquet/file_reader_wrapper.cpp @@ -164,14 +164,15 @@ void FileReaderWrapper::AdvanceToNextRowGroup() { current_row_group_idx_++; // Skip row groups excluded by read range. while (current_row_group_idx_ < target_row_groups_.size() && - target_row_groups_[current_row_group_idx_].excluded_by_read_range) { + target_row_groups_[current_row_group_idx_].IsExcludedByReadRange()) { current_row_group_idx_++; } if (current_row_group_idx_ >= target_row_groups_.size()) { next_row_to_read_ = num_rows_; } else { next_row_to_read_ = - all_row_group_ranges_[target_row_groups_[current_row_group_idx_].row_group_index].first; + all_row_group_ranges_[target_row_groups_[current_row_group_idx_].GetRowGroupIndex()] + .first; } } @@ -181,10 +182,10 @@ Status FileReaderWrapper::SeekToRow(uint64_t row_number) { filtered_global_offset_ = 0; for (uint64_t i = 0; i < target_row_groups_.size(); i++) { - if (target_row_groups_[i].excluded_by_read_range) { + if (target_row_groups_[i].IsExcludedByReadRange()) { continue; } - int32_t rg_id = target_row_groups_[i].row_group_index; + int32_t rg_id = target_row_groups_[i].GetRowGroupIndex(); uint64_t rg_start = all_row_group_ranges_[rg_id].first; uint64_t rg_end = all_row_group_ranges_[rg_id].second; if (row_number > rg_start && row_number < rg_end) { @@ -200,9 +201,9 @@ Status FileReaderWrapper::SeekToRow(uint64_t row_number) { // Rebuild batch_reader_ for non-page-filtered RGs at/after seek position. std::vector fully_matched_indices; for (uint64_t j = i; j < target_row_groups_.size(); j++) { - if (!target_row_groups_[j].excluded_by_read_range && - !target_row_groups_[j].is_partially_matched) { - fully_matched_indices.push_back(target_row_groups_[j].row_group_index); + if (!target_row_groups_[j].IsExcludedByReadRange() && + !target_row_groups_[j].IsPartiallyMatched()) { + fully_matched_indices.push_back(target_row_groups_[j].GetRowGroupIndex()); } } if (!fully_matched_indices.empty()) { @@ -222,7 +223,7 @@ Status FileReaderWrapper::SeekToRow(uint64_t row_number) { } Result> FileReaderWrapper::NextPageFiltered() { - int32_t rg_id = target_row_groups_[current_row_group_idx_].row_group_index; + int32_t rg_id = target_row_groups_[current_row_group_idx_].GetRowGroupIndex(); // Construct the per-RG streaming reader on demand. if (!current_page_filtered_reader_) { @@ -237,7 +238,7 @@ Result> FileReaderWrapper::NextPageFiltered( file_reader_->parquet_reader(), target_rg, target_column_indices_, page_filtered_read_schema_, file_reader_->properties().cache_options(), pre_buffered, page_ranges, max_chunksize, pool_)); - current_filtered_row_ranges_ = target_rg.row_ranges; + current_filtered_row_ranges_ = target_rg.GetRowRanges(); current_filtered_rg_start_ = all_row_group_ranges_[rg_id].first; filtered_global_offset_ = 0; } @@ -273,7 +274,7 @@ Result> FileReaderWrapper::NextFullyMatched( return std::shared_ptr(); } - int32_t rg_id = target_row_groups_[current_row_group_idx_].row_group_index; + int32_t rg_id = target_row_groups_[current_row_group_idx_].GetRowGroupIndex(); uint64_t rg_end = all_row_group_ranges_[rg_id].second; int64_t num_rows = record_batch->num_rows(); @@ -298,7 +299,7 @@ Result> FileReaderWrapper::Next() { while (current_row_group_idx_ < target_row_groups_.size()) { bool is_partially_matched = - target_row_groups_[current_row_group_idx_].is_partially_matched; + target_row_groups_[current_row_group_idx_].IsPartiallyMatched(); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr batch, is_partially_matched ? NextPageFiltered() : NextFullyMatched()); if (batch) { @@ -368,9 +369,9 @@ std::vector<::arrow::io::ReadRange> FileReaderWrapper::CollectPreBufferRanges( auto file_metadata = file_reader_->parquet_reader()->metadata(); for (const auto& trg : target_row_groups_) { - if (trg.excluded_by_read_range) continue; + if (trg.IsExcludedByReadRange()) continue; - if (trg.is_partially_matched) { + if (trg.IsPartiallyMatched()) { // Page-filtered RGs: only matching page byte ranges. auto page_ranges = PageFilteredRowGroupReader::ComputePageRanges( file_reader_->parquet_reader(), trg, column_indices); @@ -378,7 +379,7 @@ std::vector<::arrow::io::ReadRange> FileReaderWrapper::CollectPreBufferRanges( std::make_move_iterator(page_ranges.end())); } else { // Fully-matched RGs: entire column chunk ranges. - auto rg_metadata = file_metadata->RowGroup(trg.row_group_index); + auto rg_metadata = file_metadata->RowGroup(trg.GetRowGroupIndex()); for (int32_t col_idx : column_indices) { auto col_chunk = rg_metadata->ColumnChunk(col_idx); int64_t offset = col_chunk->data_page_offset(); @@ -416,12 +417,12 @@ Status FileReaderWrapper::PrepareForReading(const std::vector& t std::vector fully_matched_row_groups; uint64_t active_count = 0; for (const auto& trg : target_row_groups_) { - if (trg.excluded_by_read_range) { + if (trg.IsExcludedByReadRange()) { continue; } active_count++; - if (!trg.is_partially_matched) { - fully_matched_row_groups.push_back(trg.row_group_index); + if (!trg.IsPartiallyMatched()) { + fully_matched_row_groups.push_back(trg.GetRowGroupIndex()); } } @@ -455,14 +456,15 @@ Status FileReaderWrapper::PrepareForReading(const std::vector& t // Reset read state. Find the first non-excluded row group. uint64_t first_active_idx = 0; while (first_active_idx < target_row_groups_.size() && - target_row_groups_[first_active_idx].excluded_by_read_range) { + target_row_groups_[first_active_idx].IsExcludedByReadRange()) { first_active_idx++; } if (first_active_idx >= target_row_groups_.size()) { next_row_to_read_ = num_rows_; } else { next_row_to_read_ = - all_row_group_ranges_[target_row_groups_[first_active_idx].row_group_index].first; + all_row_group_ranges_[target_row_groups_[first_active_idx].GetRowGroupIndex()] + .first; } previous_first_row_ = std::numeric_limits::max(); current_row_group_idx_ = first_active_idx; @@ -476,7 +478,7 @@ Status FileReaderWrapper::ApplyReadRanges( const std::vector>& read_ranges) { if (read_ranges.empty()) { for (auto& trg : target_row_groups_) { - trg.excluded_by_read_range = true; + trg.SetExcludedByReadRange(true); } reader_initialized_ = false; return Status::OK(); @@ -492,7 +494,7 @@ Status FileReaderWrapper::ApplyReadRanges( } // Mark each target row group as excluded or not based on the matching set. for (auto& trg : target_row_groups_) { - trg.excluded_by_read_range = matching_rg_indices.count(trg.row_group_index) == 0; + trg.SetExcludedByReadRange(matching_rg_indices.count(trg.GetRowGroupIndex()) == 0); } reader_initialized_ = false; return Status::OK(); diff --git a/src/paimon/format/parquet/file_reader_wrapper.h b/src/paimon/format/parquet/file_reader_wrapper.h index 758ff703a..3221dbf12 100644 --- a/src/paimon/format/parquet/file_reader_wrapper.h +++ b/src/paimon/format/parquet/file_reader_wrapper.h @@ -33,6 +33,7 @@ #include "arrow/type_fwd.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/format/parquet/row_ranges.h" +#include "paimon/format/parquet/target_row_group.h" #include "paimon/result.h" #include "paimon/status.h" #include "parquet/arrow/reader.h" diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp index 9c87438b8..20c5efb97 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp +++ b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp @@ -234,8 +234,8 @@ Result> PageFilteredRowGroupReader::Re const ::arrow::io::CacheOptions& cache_options, bool pre_buffered, const std::vector<::arrow::io::ReadRange>& page_ranges, int64_t max_chunksize, std::shared_ptr<::arrow::MemoryPool> pool) { - const auto& row_ranges = target_row_group.row_ranges; - int32_t row_group_index = target_row_group.row_group_index; + const auto& row_ranges = target_row_group.GetRowRanges(); + int32_t row_group_index = target_row_group.GetRowGroupIndex(); if (row_ranges.IsEmpty()) { PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr empty_table, @@ -289,8 +289,8 @@ Result> PageFilteredRowGroupReader::Re std::vector<::arrow::io::ReadRange> PageFilteredRowGroupReader::ComputePageRanges( ::parquet::ParquetFileReader* parquet_reader, const TargetRowGroup& target_row_group, const std::vector& column_indices) { - int32_t row_group_index = target_row_group.row_group_index; - const auto& row_ranges = target_row_group.row_ranges; + int32_t row_group_index = target_row_group.GetRowGroupIndex(); + const auto& row_ranges = target_row_group.GetRowRanges(); std::vector<::arrow::io::ReadRange> ranges; auto file_metadata = parquet_reader->metadata(); diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader.h b/src/paimon/format/parquet/page_filtered_row_group_reader.h index 5092bb5ca..c7376512f 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader.h +++ b/src/paimon/format/parquet/page_filtered_row_group_reader.h @@ -27,6 +27,7 @@ #include "arrow/record_batch.h" #include "arrow/type.h" #include "paimon/format/parquet/row_ranges.h" +#include "paimon/format/parquet/target_row_group.h" #include "paimon/result.h" #include "parquet/column_reader.h" #include "parquet/file_reader.h" diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp b/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp index d6bb36ceb..2de1d306c 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp +++ b/src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp @@ -44,6 +44,7 @@ #include "paimon/status.h" #include "paimon/testing/utils/read_result_collector.h" #include "paimon/testing/utils/testharness.h" +#include "paimon/utils/roaring_bitmap32.h" #include "parquet/arrow/reader.h" #include "parquet/file_reader.h" #include "parquet/properties.h" @@ -129,6 +130,31 @@ class PageFilteredRowGroupReaderTest : public ::testing::Test { paimon::test::ReadResultCollector::CollectResult(batch_reader.get())); } + /// Read back a Parquet file with a predicate, a bitmap, and page index filter enabled. + void ReadWithPredicateAndBitmapImpl(const std::string& file_name, + const std::shared_ptr& read_schema, + const std::shared_ptr& predicate, + const RoaringBitmap32& bitmap, + std::shared_ptr* out, + int32_t batch_size = 1024, + bool enable_page_level_filter = true) { + ASSERT_OK_AND_ASSIGN(std::shared_ptr in, fs_->Open(file_name)); + ASSERT_OK_AND_ASSIGN(int64_t length, in->Length()); + auto in_stream = std::make_shared(in, arrow_pool_, length); + + std::map options; + options[PARQUET_READ_ENABLE_PAGE_INDEX_FILTER] = + enable_page_level_filter ? "true" : "false"; + ASSERT_OK_AND_ASSIGN(auto batch_reader, + ParquetFileBatchReader::Create(std::move(in_stream), options, + batch_size, nullptr, arrow_pool_)); + auto c_schema = std::make_unique(); + ASSERT_TRUE(arrow::ExportSchema(*read_schema, c_schema.get()).ok()); + ASSERT_OK(batch_reader->SetReadSchema(c_schema.get(), predicate, bitmap)); + ASSERT_OK_AND_ASSIGN(*out, + paimon::test::ReadResultCollector::CollectResult(batch_reader.get())); + } + protected: std::shared_ptr arrow_pool_; std::shared_ptr pool_; @@ -1087,6 +1113,92 @@ TEST_F(PageFilteredRowGroupReaderTest, NestedMapColumnRowGroupFilter) { ASSERT_TRUE(expected->Equals(result->chunk(0))); } +/// Test: nested map projection falls back to row-group-level filtering when page index filter is +/// unavailable for nested read schemas. +/// +/// Schema: { id: int32, props: map } +/// 100 rows, 10 per page, 1 row group. +/// Predicate: id >= 30 would be a partial-row-group match at first 50-row group. +/// Because nested schema disables page-level filtering, the entire first row group (0..49) is read, +/// so rows [0, 99] should all be returned. +TEST_F(PageFilteredRowGroupReaderTest, NestedMapBitmapFallback) { + std::string file_name = dir_->Str() + "/nested_map_projection_fallback.parquet"; + auto data = MakeMapColumnData(100); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/50); + + auto field_props = arrow::field("props", arrow::map(arrow::utf8(), arrow::int32())); + auto read_schema = arrow::schema({arrow::field("id", arrow::int32()), field_props}); + + RoaringBitmap32 bitmap; + bitmap.AddRange(70, 100); + + std::shared_ptr result; + ReadWithPredicateAndBitmapImpl(file_name, read_schema, nullptr, bitmap, &result); + + ASSERT_TRUE(result); + // Because page-level filtering is skipped for nested schemas, we read full row groups. + ASSERT_EQ(50, result->length()); + + auto expected = data->Slice(50, 50); + ASSERT_TRUE(expected->Equals(result->chunk(0))); +} + +/// Test: nested list projection falls back to row-group-level filtering when page index filter is +/// unavailable for nested read schemas. +/// +/// Schema: { id: int32, tags: list } +/// Predicate: id >= 30 would be a partial-row-group match at first 50-row group. +/// Because nested schema disables page-level filtering, the entire first row group (0..49) is read. +TEST_F(PageFilteredRowGroupReaderTest, NestedListBitmapFallback) { + std::string file_name = dir_->Str() + "/nested_list_projection_fallback.parquet"; + auto data = MakeListColumnData(100); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/50); + + auto field_tags = arrow::field("tags", arrow::list(arrow::field("item", arrow::int32()))); + auto read_schema = arrow::schema({arrow::field("id", arrow::int32()), field_tags}); + + RoaringBitmap32 bitmap; + bitmap.AddRange(70, 100); + + std::shared_ptr result; + ReadWithPredicateAndBitmapImpl(file_name, read_schema, nullptr, bitmap, &result); + + ASSERT_TRUE(result); + ASSERT_EQ(50, result->length()); + + auto expected = data->Slice(50, 50); + ASSERT_TRUE(expected->Equals(result->chunk(0))); +} + +/// Test: nested struct projection falls back to row-group-level filtering when page index filter is +/// unavailable for nested read schemas. +/// +/// Schema: { id: int32, info: struct } +/// Predicate: id >= 30 would be a partial-row-group match at first 50-row group. +/// Because nested schema disables page-level filtering, the entire first row group (0..49) is read. +TEST_F(PageFilteredRowGroupReaderTest, NestedStructBitmapFallback) { + std::string file_name = dir_->Str() + "/nested_struct_projection_fallback.parquet"; + auto data = MakeNestedStructData(100); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/50); + + auto field_x = arrow::field("x", arrow::int32()); + auto field_y = arrow::field("y", arrow::int32()); + auto field_info = arrow::field("info", arrow::struct_({field_x, field_y})); + auto read_schema = arrow::schema({arrow::field("id", arrow::int32()), field_info}); + + RoaringBitmap32 bitmap; + bitmap.AddRange(70, 100); + + std::shared_ptr result; + ReadWithPredicateAndBitmapImpl(file_name, read_schema, nullptr, bitmap, &result); + + ASSERT_TRUE(result); + ASSERT_EQ(50, result->length()); + + auto expected = data->Slice(50, 50); + ASSERT_TRUE(expected->Equals(result->chunk(0))); +} + /// Test: rowgroup-level filtering with multiple adjacent nested columns (struct + list). /// /// Schema: { id: int32, info: struct, tags: list } @@ -1143,5 +1255,233 @@ TEST_F(PageFilteredRowGroupReaderTest, MultipleAdjacentNestedColumns) { auto expected = data->Slice(50, 50); ASSERT_TRUE(expected->Equals(result->chunk(0))); } +/// Test: bitmap hits all pages of a subset of row groups (no predicate). +/// +/// 200 rows, 10 rows per page, 100 rows per row group → 2 row groups. +/// RG0: rows 0-99, RG1: rows 100-199. +/// Bitmap: {0..99} hits all pages of RG0, RG1 is excluded entirely. +/// Expected: 100 rows (0-99). +TEST_F(PageFilteredRowGroupReaderTest, BitmapAllPagesSomeRowGroups) { + std::string file_name = dir_->Str() + "/bitmap_all_pages_rg.parquet"; + auto data = MakeSequentialIntData(200); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/100); + + RoaringBitmap32 bitmap; + bitmap.AddRange(0, 100); // hits all of RG0 + + auto read_schema = arrow::schema({arrow::field("val", arrow::int32())}); + std::shared_ptr result; + ReadWithPredicateAndBitmapImpl(file_name, read_schema, /*predicate=*/nullptr, bitmap, &result); + ASSERT_TRUE(result); + ASSERT_EQ(100, result->length()); + + auto flat = arrow::Concatenate(result->chunks()).ValueOrDie(); + auto struct_arr = std::dynamic_pointer_cast(flat); + ASSERT_TRUE(struct_arr); + auto val_arr = std::dynamic_pointer_cast(struct_arr->field(0)); + for (int32_t i = 0; i < 100; ++i) { + ASSERT_EQ(i, val_arr->Value(i)); + } +} + +/// Test: bitmap hits partial pages of a row group (no predicate). +/// +/// 200 rows, 10 rows per page, 100 rows per row group → 2 row groups. +/// Bitmap: {30..59} hits pages 3-5 of RG0 (rows 30-59), RG1 excluded. +/// Expected: 30 rows (30-59). +TEST_F(PageFilteredRowGroupReaderTest, BitmapPartialPagesSingleRowGroup) { + std::string file_name = dir_->Str() + "/bitmap_partial_pages_rg.parquet"; + auto data = MakeSequentialIntData(200); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/100); + + RoaringBitmap32 bitmap; + bitmap.AddRange(90, 110); // hits pages 3-5 of RG0 + + auto read_schema = arrow::schema({arrow::field("val", arrow::int32())}); + std::shared_ptr result; + ReadWithPredicateAndBitmapImpl(file_name, read_schema, /*predicate=*/nullptr, bitmap, &result); + ASSERT_TRUE(result); + ASSERT_EQ(20, result->length()); + + auto flat = arrow::Concatenate(result->chunks()).ValueOrDie(); + auto struct_arr = std::dynamic_pointer_cast(flat); + ASSERT_TRUE(struct_arr); + auto val_arr = std::dynamic_pointer_cast(struct_arr->field(0)); + for (int32_t i = 0; i < 20; ++i) { + ASSERT_EQ(90 + i, val_arr->Value(i)); + } +} + +/// Test: bitmap hits all pages of some row groups and partial pages of others. +/// +/// 200 rows, 10 rows per page, 100 rows per row group → 2 row groups. +/// Bitmap: {0..99} hits all of RG0 + {120..149} hits pages 2-4 of RG1. +/// Expected: 100 (RG0) + 30 (RG1 partial) = 130 rows. +TEST_F(PageFilteredRowGroupReaderTest, BitmapAllAndPartialPagesMixed) { + std::string file_name = dir_->Str() + "/bitmap_all_and_partial.parquet"; + auto data = MakeSequentialIntData(200); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/100); + + RoaringBitmap32 bitmap; + bitmap.AddRange(0, 100); // all of RG0 + bitmap.AddRange(120, 150); // pages 2-4 of RG1 + + auto read_schema = arrow::schema({arrow::field("val", arrow::int32())}); + std::shared_ptr result; + ReadWithPredicateAndBitmapImpl(file_name, read_schema, /*predicate=*/nullptr, bitmap, &result); + ASSERT_TRUE(result); + ASSERT_EQ(130, result->length()); + + // Verify: rows 0-99 + 120-149 + auto flat = arrow::Concatenate(result->chunks()).ValueOrDie(); + auto struct_arr = std::dynamic_pointer_cast(flat); + ASSERT_TRUE(struct_arr); + auto val_arr = std::dynamic_pointer_cast(struct_arr->field(0)); + for (int32_t i = 0; i < 100; ++i) { + ASSERT_EQ(i, val_arr->Value(i)); + } + for (int32_t i = 0; i < 30; ++i) { + ASSERT_EQ(120 + i, val_arr->Value(100 + i)); + } +} + +/// Test: bitmap hits partial pages of a row group, with page-filtered option disabled. +/// +/// 200 rows, 10 rows per page, 100 rows per row group → 2 row groups. +/// Bitmap: {0..99} hits all of RG0 + {120..149} hits pages 2-4 of RG1. +/// Expected: 100 rows (100-199) because page-filtered option is disabled, so page-level bitmap is +/// ignored. +TEST_F(PageFilteredRowGroupReaderTest, BitmapWithPageFilteredOptionDisabled) { + std::string file_name = dir_->Str() + "/bitmap_all_and_partial.parquet"; + auto data = MakeSequentialIntData(200); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/100); + + RoaringBitmap32 bitmap; + bitmap.AddRange(120, 150); // pages 2-4 of RG1 + + auto read_schema = arrow::schema({arrow::field("val", arrow::int32())}); + std::shared_ptr result; + ReadWithPredicateAndBitmapImpl(file_name, read_schema, /*predicate=*/nullptr, bitmap, &result, + 1024, false); + ASSERT_TRUE(result); + ASSERT_EQ(100, result->length()); + + // Verify: 100-199 + auto flat = arrow::Concatenate(result->chunks()).ValueOrDie(); + auto struct_arr = std::dynamic_pointer_cast(flat); + ASSERT_TRUE(struct_arr); + auto val_arr = std::dynamic_pointer_cast(struct_arr->field(0)); + for (int32_t i = 0; i < 100; ++i) { + ASSERT_EQ(100 + i, val_arr->Value(i)); + } +} + +/// Test: bitmap + predicate both applied, bitmap hits all pages of some row groups. +/// +/// 200 rows, 10 rows per page, 100 rows per row group → 2 row groups. +/// Bitmap: {0..99} hits all of RG0. +/// Predicate: val >= 50. Page-level filtering on RG0: pages 5-9. +/// Expected: 50 rows (50-99). +TEST_F(PageFilteredRowGroupReaderTest, BitmapAllPagesWithPredicate) { + std::string file_name = dir_->Str() + "/bitmap_all_predicate.parquet"; + auto data = MakeSequentialIntData(200); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/100); + + RoaringBitmap32 bitmap; + bitmap.AddRange(0, 100); // hits all of RG0 + + auto predicate = PredicateBuilder::GreaterOrEqual( + /*field_index=*/0, /*field_name=*/"val", FieldType::INT, Literal(50)); + + auto read_schema = arrow::schema({arrow::field("val", arrow::int32())}); + std::shared_ptr result; + ReadWithPredicateAndBitmapImpl(file_name, read_schema, predicate, bitmap, &result); + ASSERT_TRUE(result); + ASSERT_EQ(50, result->length()); + + auto flat = arrow::Concatenate(result->chunks()).ValueOrDie(); + auto struct_arr = std::dynamic_pointer_cast(flat); + ASSERT_TRUE(struct_arr); + auto val_arr = std::dynamic_pointer_cast(struct_arr->field(0)); + for (int32_t i = 0; i < 50; ++i) { + ASSERT_EQ(50 + i, val_arr->Value(i)); + } +} + +/// Test: bitmap + predicate both applied, bitmap hits partial pages of a row group. +/// +/// 200 rows, 10 rows per page, 100 rows per row group → 2 row groups. +/// Bitmap: {30..59} hits pages 3-5 of RG0 (rows 30-59). +/// Predicate: val >= 40. Page-level filtering further narrows to pages 4-5 (rows 40-59). +/// Expected: 20 rows (40-59). +TEST_F(PageFilteredRowGroupReaderTest, BitmapPartialPagesWithPredicate) { + std::string file_name = dir_->Str() + "/bitmap_partial_predicate.parquet"; + auto data = MakeSequentialIntData(200); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/100); + + RoaringBitmap32 bitmap; + bitmap.AddRange(30, 60); // hits pages 3-5 of RG0 + + auto predicate = PredicateBuilder::GreaterOrEqual( + /*field_index=*/0, /*field_name=*/"val", FieldType::INT, Literal(40)); + + auto read_schema = arrow::schema({arrow::field("val", arrow::int32())}); + std::shared_ptr result; + ReadWithPredicateAndBitmapImpl(file_name, read_schema, predicate, bitmap, &result); + ASSERT_TRUE(result); + ASSERT_EQ(20, result->length()); + + auto flat = arrow::Concatenate(result->chunks()).ValueOrDie(); + auto struct_arr = std::dynamic_pointer_cast(flat); + auto val_arr = std::dynamic_pointer_cast(struct_arr->field(0)); + for (int32_t i = 0; i < 20; ++i) { + ASSERT_EQ(40 + i, val_arr->Value(i)); + } +} + +/// Test: bitmap + predicate both applied, bitmap hits all pages of some RG and +/// partial pages of another. +/// +/// 200 rows, 10 rows per page, 100 rows per row group → 2 row groups. +/// Bitmap: {0..99} (all of RG0) + {120..149} (pages 2-4 of RG1). +/// Predicate: val >= 50 AND val < 160. +/// RG0: all pages → page-filtered to val>=50 → rows 50-99 (50 rows) +/// RG1: pages 2-4 (120-149) → page-filtered to val>=50 AND val<160 → all match (30 rows) +/// Expected: 80 rows (50-99 + 120-149). +TEST_F(PageFilteredRowGroupReaderTest, BitmapMixedWithPredicate) { + std::string file_name = dir_->Str() + "/bitmap_mixed_predicate.parquet"; + auto data = MakeSequentialIntData(200); + WriteTestFile(file_name, data, /*write_batch_size=*/10, /*max_row_group_length=*/100); + + RoaringBitmap32 bitmap; + bitmap.AddRange(0, 100); // all of RG0 + bitmap.AddRange(120, 150); // pages 2-4 of RG1 + + ASSERT_OK_AND_ASSIGN( + auto predicate, + PredicateBuilder::And( + {PredicateBuilder::GreaterOrEqual(/*field_index=*/0, /*field_name=*/"val", + FieldType::INT, Literal(50)), + PredicateBuilder::LessThan(/*field_index=*/0, /*field_name=*/"val", FieldType::INT, + Literal(160))})); + + auto read_schema = arrow::schema({arrow::field("val", arrow::int32())}); + std::shared_ptr result; + ReadWithPredicateAndBitmapImpl(file_name, read_schema, predicate, bitmap, &result); + ASSERT_TRUE(result); + ASSERT_EQ(80, result->length()); + + // Verify: rows 50-99 + 120-149 + auto flat = arrow::Concatenate(result->chunks()).ValueOrDie(); + auto struct_arr = std::dynamic_pointer_cast(flat); + ASSERT_TRUE(struct_arr); + auto val_arr = std::dynamic_pointer_cast(struct_arr->field(0)); + for (int32_t i = 0; i < 50; ++i) { + ASSERT_EQ(50 + i, val_arr->Value(i)); + } + for (int32_t i = 0; i < 30; ++i) { + ASSERT_EQ(120 + i, val_arr->Value(50 + i)); + } +} } // namespace paimon::parquet::test diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.cpp b/src/paimon/format/parquet/parquet_file_batch_reader.cpp index 5e6893c39..38d925aa9 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader.cpp @@ -18,6 +18,7 @@ #include #include +#include #include #include "arrow/acero/options.h" @@ -39,11 +40,13 @@ #include "paimon/common/metrics/metrics_impl.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/options_utils.h" +#include "paimon/common/utils/scope_guard.h" #include "paimon/common/utils/string_utils.h" #include "paimon/core/schema/arrow_schema_validator.h" #include "paimon/core/utils/nested_projection_utils.h" #include "paimon/format/parquet/parquet_field_id_converter.h" #include "paimon/format/parquet/parquet_format_defs.h" +#include "paimon/format/parquet/parquet_schema_util.h" #include "paimon/format/parquet/parquet_timestamp_converter.h" #include "paimon/format/parquet/predicate_converter.h" #include "paimon/reader/batch_reader.h" @@ -156,21 +159,31 @@ Status ParquetFileBatchReader::SetReadSchema( field_index_map[field->name()] = leaf_indices; } - std::vector row_groups = arrow::internal::Iota(reader_->GetNumberOfRowGroups()); + TargetRowGroups target_row_groups = + TargetRowGroup::MakeSerialRowGroups(reader_->GetAllRowGroupRanges()); + PAIMON_ASSIGN_OR_RAISE( + bool enable_page_index_filter, + OptionsUtils::GetValueFromMap(options_, PARQUET_READ_ENABLE_PAGE_INDEX_FILTER, + DEFAULT_PARQUET_READ_ENABLE_PAGE_INDEX_FILTER)); + if (predicate) { - PAIMON_ASSIGN_OR_RAISE(row_groups, - FilterRowGroupsByPredicate(predicate, file_schema, row_groups)); + PAIMON_ASSIGN_OR_RAISE( + target_row_groups, + FilterRowGroupsByPredicate(predicate, file_schema, target_row_groups)); } if (selection_bitmap) { - PAIMON_ASSIGN_OR_RAISE(row_groups, - FilterRowGroupsByBitmap(selection_bitmap.value(), row_groups)); + // walkaround: page index filter does not support nested fields for now, skip page index + // bitmap pushdown if there is any nested field in the schema + PAIMON_ASSIGN_OR_RAISE( + target_row_groups, + FilterRowGroupsByBitmap(selection_bitmap.value(), target_row_groups, + !has_nested_field && enable_page_index_filter)); } // Apply page-level filtering after bitmap pruning so we don't read page index // pages for row groups that the bitmap already excluded. // If no predicate is provided, skip page-level filtering, row_group_row_ranges will be // empty - std::map row_group_row_ranges; - if (predicate && !row_groups.empty()) { + if (predicate && !target_row_groups.empty()) { PAIMON_ASSIGN_OR_RAISE( bool enable_page_index_filter, OptionsUtils::GetValueFromMap(options_, PARQUET_READ_ENABLE_PAGE_INDEX_FILTER, @@ -189,13 +202,9 @@ Status ParquetFileBatchReader::SetReadSchema( column_name_to_index[name] = indices[0]; } } - - std::pair, std::map> page_filter_result; PAIMON_ASSIGN_OR_RAISE( - page_filter_result, - FilterRowGroupsByPageIndex(predicate, column_name_to_index, row_groups)); - row_groups = std::move(page_filter_result.first); - row_group_row_ranges = std::move(page_filter_result.second); + target_row_groups, + FilterRowGroupsByPageIndex(predicate, column_name_to_index, target_row_groups)); } } @@ -203,30 +212,19 @@ Status ParquetFileBatchReader::SetReadSchema( metrics_->SetCounter(ParquetMetrics::READ_ROW_GROUPS_TOTAL, reader_->GetNumberOfRowGroups()); - metrics_->SetCounter(ParquetMetrics::READ_ROW_GROUPS_AFTER_FILTER, row_groups.size()); - - // Build TargetRowGroup list with page-filter info in one shot. - std::vector target_row_groups; - for (int32_t rg_id : row_groups) { - auto it = row_group_row_ranges.find(rg_id); - if (it != row_group_row_ranges.end()) { - target_row_groups.emplace_back(/*rg_index=*/rg_id, /*is_partially_matched=*/true, - /*ranges=*/it->second); - } else { - target_row_groups.emplace_back(/*rg_index=*/rg_id, - /*is_partially_matched=*/false, - /*ranges=*/RowRanges()); - } - } + metrics_->SetCounter(ParquetMetrics::READ_ROW_GROUPS_AFTER_FILTER, + target_row_groups.size()); + + PAIMON_RETURN_NOT_OK(UpdateAllTargetRowranges(target_row_groups)); PAIMON_RETURN_NOT_OK(reader_->PrepareForReadingLazy(target_row_groups, column_indices)); } PAIMON_PARQUET_CATCH_AND_RETURN_STATUS("ParquetFileBatchReader::SetReadSchema") return Status::OK(); } -Result> ParquetFileBatchReader::FilterRowGroupsByPredicate( +Result ParquetFileBatchReader::FilterRowGroupsByPredicate( const std::shared_ptr& predicate, const std::shared_ptr file_schema, - const std::vector& src_row_groups) const { + const TargetRowGroups& src_row_groups) const { if (!predicate) { return Status::Invalid("cannot pushdown an empty predicate"); } @@ -249,58 +247,114 @@ Result> ParquetFileBatchReader::FilterRowGroupsByPredicate( std::shared_ptr file_fragment, parquet_file_format->MakeFragment( file_source, /*partition_expression=*/PredicateConverter::AlwaysTrue(), - /*physical_schema=*/nullptr, /*row_groups=*/src_row_groups)); + /*physical_schema=*/nullptr, + /*row_groups=*/TargetRowGroup::GetRowGroupIndices(src_row_groups))); PAIMON_RETURN_NOT_OK_FROM_ARROW( file_fragment->EnsureCompleteMetadata(reader_->GetFileReader())); PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(arrow::dataset::FragmentVector target_fragments, file_fragment->SplitByRowGroup(bind_expr)); - std::vector target_row_groups; + TargetRowGroups target_row_groups; target_row_groups.reserve(src_row_groups.size()); for (const auto& fragment : target_fragments) { auto parquet_fragment = dynamic_cast(fragment.get()); if (!parquet_fragment) { return Status::Invalid("cannot cast to ParquetFileFragment in ParquetFileBatchReader"); } - target_row_groups.insert(target_row_groups.end(), parquet_fragment->row_groups().begin(), - parquet_fragment->row_groups().end()); + for (auto rg_index : parquet_fragment->row_groups()) { + target_row_groups.emplace_back(src_row_groups[rg_index]); + } } return target_row_groups; } -Result> ParquetFileBatchReader::FilterRowGroupsByBitmap( - const RoaringBitmap32& bitmap, const std::vector& src_row_groups) const { +Result ParquetFileBatchReader::FilterRowGroupsByBitmap( + const RoaringBitmap32& bitmap, const TargetRowGroups& src_row_groups, + bool enable_page_filtered) const { if (bitmap.IsEmpty()) { return Status::Invalid("cannot push down an empty bitmap to ParquetFileBatchReader"); } + + auto meta_data = reader_->GetFileReader()->parquet_reader()->metadata(); const auto& all_row_group_ranges = reader_->GetAllRowGroupRanges(); - // filter row groups by row range - std::vector target_row_groups; - for (const auto& row_group_idx : src_row_groups) { + + TargetRowGroups target_row_groups; + for (const auto& row_group : src_row_groups) { + int32_t row_group_idx = row_group.GetRowGroupIndex(); if (static_cast(row_group_idx) >= all_row_group_ranges.size()) { return Status::Invalid( fmt::format("src row group {} not in row group meta", row_group_idx)); } const auto& [start_row_idx, end_row_idx] = all_row_group_ranges[row_group_idx]; - if (bitmap.ContainsAny(start_row_idx, end_row_idx)) { - target_row_groups.push_back(row_group_idx); + if (!bitmap.ContainsAny(start_row_idx, end_row_idx)) { + continue; + } + + int64_t rg_row_count = meta_data->RowGroup(row_group_idx)->num_rows(); + if (!enable_page_filtered) { + // For nested schema, we cannot apply page-level filtering, so we directly add the whole + // row group if bitmap matches. + target_row_groups.emplace_back(row_group); + continue; + } + auto page_ranges = BitmapToRowRanges(bitmap, start_row_idx, end_row_idx); + if (page_ranges.RowCount() < rg_row_count) { + target_row_groups.emplace_back(/*row_group_idx=*/row_group_idx, + /*is_partially_matched=*/true, + /*row_ranges=*/page_ranges); + } else { + target_row_groups.emplace_back(row_group); } } return target_row_groups; } +RowRanges ParquetFileBatchReader::BitmapToRowRanges(const RoaringBitmap32& bitmap, + uint64_t start_row, uint64_t end_row) { + RowRanges row_ranges; + + if (bitmap.IsEmpty() || start_row >= end_row) { + return row_ranges; + } + + auto it = bitmap.EqualOrLarger(static_cast(start_row)); + auto end = bitmap.End(); + + if (it == end || static_cast(*it) >= end_row) { + return row_ranges; + } + + int64_t range_start = *it; + int64_t range_end = *it; + + for (++it; it != end; ++it) { + int32_t current = *it; + if (static_cast(current) >= end_row) { + break; + } + + if (current == range_end + 1) { + range_end = current; + } else { + row_ranges.Add(RowRanges::Range(range_start - start_row, range_end - start_row)); + range_start = current; + range_end = current; + } + } + + row_ranges.Add(RowRanges::Range(range_start - start_row, range_end - start_row)); + return row_ranges; +} + // Uses page-level column index statistics to filter row groups and store per-row-group // RowRanges for true page-level skipping. A row group is excluded if ALL its pages are // determined to not match the predicate. For partially matched row groups, RowRanges // are stored for page-level filtering during reading. -Result, std::map>> -ParquetFileBatchReader::FilterRowGroupsByPageIndex( +Result ParquetFileBatchReader::FilterRowGroupsByPageIndex( const std::shared_ptr& predicate, const std::map& column_name_to_index, - const std::vector& src_row_groups) { - std::map rg_row_ranges; - + const TargetRowGroups& src_row_groups) const { if (!predicate) { - return std::make_pair(src_row_groups, rg_row_ranges); + return src_row_groups; } auto page_index_reader = reader_->GetPageIndexReader(); @@ -308,41 +362,48 @@ ParquetFileBatchReader::FilterRowGroupsByPageIndex( PAIMON_LOG_DEBUG(logger_, "Page index not available in file, skipping page-level filtering (%s)", PARQUET_WRITE_ENABLE_PAGE_INDEX); - return std::make_pair(src_row_groups, rg_row_ranges); + return src_row_groups; } auto file_metadata = reader_->GetFileReader()->parquet_reader()->metadata(); - std::vector target_row_groups; - target_row_groups.reserve(src_row_groups.size()); + TargetRowGroups target_row_groups; - for (int32_t row_group_idx : src_row_groups) { + for (const auto& row_group : src_row_groups) { + int32_t row_group_idx = row_group.GetRowGroupIndex(); auto result = reader_->CalculateFilteredRowRanges(row_group_idx, predicate, column_name_to_index); if (!result.ok()) { - target_row_groups.push_back(row_group_idx); + target_row_groups.emplace_back(row_group); continue; } const auto& row_ranges = result.value(); if (!row_ranges.IsEmpty()) { - target_row_groups.push_back(row_group_idx); - int64_t rg_row_count = file_metadata->RowGroup(row_group_idx)->num_rows(); - if (row_ranges.RowCount() < rg_row_count) { - rg_row_ranges[row_group_idx] = row_ranges; + auto intersection = row_group.IsPartiallyMatched() + ? RowRanges::Intersection(row_group.GetRowRanges(), row_ranges) + : row_ranges; + if (intersection.IsEmpty()) { + continue; + } + if (intersection.RowCount() < rg_row_count) { + target_row_groups.emplace_back(row_group_idx, true, intersection); + } else { + target_row_groups.emplace_back(row_group); } } } - return std::make_pair(std::move(target_row_groups), std::move(rg_row_ranges)); + return target_row_groups; } Result ParquetFileBatchReader::NextBatch() { try { PAIMON_ASSIGN_OR_RAISE(std::shared_ptr batch, reader_->Next()); if (batch == nullptr) { + row_mapping_.clear(); return BatchReader::MakeEofBatch(); } PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr array, @@ -361,6 +422,7 @@ Result ParquetFileBatchReader::NextBatch() { "equal with read schema {}", array->type()->ToString(), read_data_type_->ToString())); } + PAIMON_RETURN_NOT_OK(GenerateRowMapping(array->length())); std::unique_ptr c_array = std::make_unique(); std::unique_ptr c_schema = std::make_unique(); PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, c_array.get(), c_schema.get())); @@ -526,4 +588,58 @@ Result> ParquetFileBatchReader::ComputeNestedColumnIndices( return indices; } +Status ParquetFileBatchReader::UpdateAllTargetRowranges( + const std::vector& target_row_groups) { + row_mapping_.clear(); + auto all_row_group_ranges = reader_->GetAllRowGroupRanges(); + RowRanges all_ranges; + for (const auto& target_row_group : target_row_groups) { + auto row_group_idx = target_row_group.GetRowGroupIndex(); + for (const auto& range : target_row_group.GetRowRanges().GetRanges()) { + all_ranges.Add(Range(all_row_group_ranges[row_group_idx].first + range.from, + all_row_group_ranges[row_group_idx].first + range.to)); + } + } + all_row_ranges_ = std::move(all_ranges); + return Status::OK(); +} + +Status ParquetFileBatchReader::GenerateRowMapping(int64_t batch_length) { + const std::vector& all_ranges = all_row_ranges_.GetRanges(); + PAIMON_ASSIGN_OR_RAISE(int64_t batch_start_row, reader_->GetPreviousBatchFirstRowNumber()); + + auto cur_range_it = + std::upper_bound(all_ranges.begin(), all_ranges.end(), batch_start_row, + [](int64_t value, const Range& r) { return value < r.from; }); + if (cur_range_it == all_ranges.begin()) { + std::stringstream s; + for (auto range : all_ranges) { + s << "range: [" << range.from << ", " << range.to << "]" << std::endl; + } + return Status::Invalid(fmt::format("No range found! {} {}", s.str(), all_ranges.size())); + } + --cur_range_it; + if (batch_start_row < cur_range_it->from || batch_start_row > cur_range_it->to) { + return Status::Invalid( + fmt::format("Batch start row {} is not in the current range [{}, {}]!", batch_start_row, + cur_range_it->from, cur_range_it->to)); + } + + std::vector row_mapping; + row_mapping.reserve(batch_length); + int64_t global_row = batch_start_row; + for (int64_t i = 0; i < batch_length; ++i) { + if (global_row > cur_range_it->to) { + ++cur_range_it; + if (cur_range_it == all_ranges.end()) { + return Status::Invalid("Batch length exceeds the total row ranges!"); + } + global_row = cur_range_it->from; + } + row_mapping.push_back(global_row); + global_row++; + } + row_mapping_ = std::move(row_mapping); + return Status::OK(); +} } // namespace paimon::parquet diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.h b/src/paimon/format/parquet/parquet_file_batch_reader.h index 4bd684e8f..cdedd8234 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.h +++ b/src/paimon/format/parquet/parquet_file_batch_reader.h @@ -16,6 +16,8 @@ #pragma once +#include + #include #include #include @@ -37,6 +39,7 @@ #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/format/parquet/file_reader_wrapper.h" #include "paimon/format/parquet/row_ranges.h" +#include "paimon/format/parquet/target_row_group.h" #include "paimon/logging.h" #include "paimon/reader/prefetch_file_batch_reader.h" #include "paimon/result.h" @@ -94,9 +97,18 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { Result>> GenReadRanges( bool* need_prefetch) const override; - Result GetPreviousBatchFirstRowNumber() const override { - assert(reader_); - return reader_->GetPreviousBatchFirstRowNumber(); + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { + if (row_mapping_.size() == 0) { + return Status::Invalid( + "Last batch is not read or last batch is empty, cannot get previous batch global " + "row id"); + } + if (batch_row_id >= row_mapping_.size()) { + return Status::Invalid( + fmt::format("batch_row_id {} is out of range, last batch row count is {}", + batch_row_id, row_mapping_.size())); + } + return row_mapping_[batch_row_id]; } Result GetNumberOfRows() const override { @@ -154,6 +166,12 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { } } + static RowRanges BitmapToRowRanges(const RoaringBitmap32& bitmap, uint64_t start_row, + uint64_t end_row); + + Result FilterPagesByBitmap(const RoaringBitmap32& bitmap, + int32_t row_group_idx, uint64_t rg_start_row, + int64_t rg_row_count) const; /// Recursively collect leaf column indices for the sub-fields in read_type /// that match file_type by paimon field ID. Unmatched sub-fields in file_type /// have their leaf indices skipped. Partial projection inside LIST/MAP is @@ -173,21 +191,26 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { const std::shared_ptr& read_schema, const std::shared_ptr& file_schema); + Status UpdateAllTargetRowranges(const std::vector& target_row_groups); + // precondition: predicate supposed not be empty - Result> FilterRowGroupsByPredicate( + Result FilterRowGroupsByPredicate( const std::shared_ptr& predicate, const std::shared_ptr file_schema, - const std::vector& src_row_groups) const; + const TargetRowGroups& src_row_groups) const; - Result> FilterRowGroupsByBitmap( - const RoaringBitmap32& bitmap, const std::vector& src_row_groups) const; + Result FilterRowGroupsByBitmap(const RoaringBitmap32& bitmap, + const TargetRowGroups& src_row_groups, + bool has_nested_column) const; // Apply page-level filtering using column index. // Returns (filtered row groups, per-row-group RowRanges for partial matches). - Result, std::map>> - FilterRowGroupsByPageIndex(const std::shared_ptr& predicate, - const std::map& column_name_to_index, - const std::vector& src_row_groups); + Result FilterRowGroupsByPageIndex( + const std::shared_ptr& predicate, + const std::map& column_name_to_index, + const TargetRowGroups& src_row_groups) const; + + Status GenerateRowMapping(int64_t batch_length); private: std::map options_; @@ -204,6 +227,9 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { uint64_t read_rows_ = 0; uint64_t read_batch_count_ = 0; + + RowRanges all_row_ranges_; + std::vector row_mapping_; }; } // namespace paimon::parquet diff --git a/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp b/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp index ec353f071..78f26b5ea 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp @@ -168,12 +168,15 @@ class ParquetFileBatchReaderTest : public ::testing::Test, void WriteArray(const std::string& file_path, const std::shared_ptr& src_array, const std::shared_ptr& arrow_schema, int64_t write_batch_size, - bool enable_dictionary, int64_t max_row_group_length) const { + bool enable_dictionary, int64_t max_row_group_length, + int64_t max_page_size = 1024 * 1024 * 1024) const { ASSERT_OK_AND_ASSIGN(std::shared_ptr out, fs_->Create(file_path, /*overwrite=*/true)); ::parquet::WriterProperties::Builder builder; builder.write_batch_size(write_batch_size); builder.max_row_group_length(max_row_group_length); + builder.data_pagesize(max_page_size); + builder.enable_write_page_index(); enable_dictionary ? builder.enable_dictionary() : builder.disable_dictionary(); auto writer_properties = builder.build(); ASSERT_OK_AND_ASSIGN(auto format_writer, ParquetFormatWriter::Create( @@ -192,12 +195,15 @@ class ParquetFileBatchReaderTest : public ::testing::Test, std::unique_ptr PrepareParquetFileBatchReader( const std::string& file_name, const std::shared_ptr& read_schema, const std::shared_ptr& predicate, - const std::optional& selection_bitmap, int32_t batch_size) const { + const std::optional& selection_bitmap, int32_t batch_size, + bool enable_page_level_filter = false) const { EXPECT_OK_AND_ASSIGN(auto input_stream, fs_->Open(file_name)); auto length = fs_->GetFileStatus(file_name).value()->GetLen(); auto in_stream = std::make_unique(std::move(input_stream), pool_, length); - std::map options = {}; + std::map options; + options[PARQUET_READ_ENABLE_PAGE_INDEX_FILTER] = + enable_page_level_filter ? "true" : "false"; return PrepareParquetFileBatchReader(std::move(in_stream), options, read_schema, predicate, selection_bitmap, batch_size); } @@ -229,6 +235,17 @@ class ParquetFileBatchReaderTest : public ::testing::Test, std::shared_ptr struct_array_; }; +static std::shared_ptr MakeSequentialIntData(int32_t num_rows) { + arrow::Int32Builder val_builder; + EXPECT_TRUE(val_builder.Reserve(num_rows).ok()); + for (int32_t i = 0; i < num_rows; ++i) { + val_builder.UnsafeAppend(i); + } + auto val_array = val_builder.Finish().ValueOrDie(); + auto field = arrow::field("f0", arrow::int32()); + return arrow::StructArray::Make({val_array}, {field}).ValueOrDie(); +} + TEST_F(ParquetFileBatchReaderTest, TestParquetMetadataCacheReusesSerializedFooter) { WriteArray(file_path_, struct_array_, schema_, /*write_batch_size=*/struct_array_->length(), /*enable_dictionary=*/false, @@ -447,11 +464,8 @@ TEST_F(ParquetFileBatchReaderTest, TestNextBatchSimple) { auto parquet_batch_reader = PrepareParquetFileBatchReader(file_name, schema_, /*predicate=*/nullptr, /*selection_bitmap=*/std::nullopt, batch_size); - ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(), - std::numeric_limits::max()); ASSERT_OK_AND_ASSIGN(auto result_array, paimon::test::ReadResultCollector::CollectResult( parquet_batch_reader.get())); - ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(), 6); parquet_batch_reader->Close(); auto expected_array = std::make_shared(struct_array_); ASSERT_TRUE(result_array->Equals(expected_array)); @@ -704,7 +718,7 @@ TEST_F(ParquetFileBatchReaderTest, TestCreateArrowReaderProperties) { } } -TEST_F(ParquetFileBatchReaderTest, TestBitmapPushDownWithMultiRowGroups) { +TEST_F(ParquetFileBatchReaderTest, TestBitmapRowGroupPushDownWithMultiRowGroups) { arrow::FieldVector fields = {arrow::field("f0", arrow::int32())}; auto arrow_type = arrow::struct_(fields); auto src_array = std::dynamic_pointer_cast( @@ -742,8 +756,48 @@ TEST_F(ParquetFileBatchReaderTest, TestBitmapPushDownWithMultiRowGroups) { auto expected_array = arrow::ChunkedArray::Make({src_array->Slice(0, 6)}).ValueOrDie(); ASSERT_TRUE(result_array->Equals(expected_array)) << result_array->ToString(); } +TEST_F(ParquetFileBatchReaderTest, TestBitmapPagePushDownWithMultiRowGroups) { + arrow::FieldVector fields = {arrow::field("f0", arrow::int32())}; + auto arrow_type = arrow::struct_(fields); + auto src_array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow_type, R"([ + [0], + [1], + [2], + [3], + [4], + [5], + [6], + [7], + [8], + [9], + [10], + [11] + ])") + .ValueOrDie()); + auto src_schema = arrow::schema(fields); + std::optional bitmap = RoaringBitmap32::From({3, 5}); + // data in file rowGroup0:[0, 1, 2, 3, 4, 5] | rowGroup1:[6, 7, 8, 9, 10, 11] + + auto arrow_schema = arrow::schema(fields); + WriteArray(file_path_, src_array, arrow_schema, /*write_batch_size=*/12, + /*enable_dictionary=*/true, + /*max_row_group_length=*/6); + + auto parquet_batch_reader = + PrepareParquetFileBatchReader(file_path_, arrow_schema, /*predicate=*/nullptr, bitmap, + /*batch_size=*/12, /*enable_page_level_filter=*/true); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr result_array, + paimon::test::ReadResultCollector::CollectResult(parquet_batch_reader.get())); + + auto expected_array = + arrow::ChunkedArray::Make({src_array->Slice(3, 1), src_array->Slice(5, 1)}).ValueOrDie(); + ASSERT_TRUE(result_array->Equals(expected_array)) << result_array->ToString(); +} -TEST_F(ParquetFileBatchReaderTest, TestPredicateAndBitmapPushDown) { +TEST_F(ParquetFileBatchReaderTest, TestPredicateAndBitmapRowGroupPushDown) { arrow::FieldVector fields = {arrow::field("f0", arrow::int32())}; auto arrow_type = arrow::struct_(fields); arrow::StructBuilder struct_builder(arrow_type, arrow::default_memory_pool(), @@ -800,6 +854,64 @@ TEST_F(ParquetFileBatchReaderTest, TestPredicateAndBitmapPushDown) { ASSERT_FALSE(result_array); } } +TEST_F(ParquetFileBatchReaderTest, TestPredicateAndBitmapPagePushDown) { + arrow::FieldVector fields = {arrow::field("f0", arrow::int32())}; + auto arrow_type = arrow::struct_(fields); + arrow::StructBuilder struct_builder(arrow_type, arrow::default_memory_pool(), + {std::make_shared()}); + auto int_builder = static_cast(struct_builder.field_builder(0)); + int32_t length = 1024; + for (int32_t i = 0; i < length; ++i) { + ASSERT_TRUE(struct_builder.Append().ok()); + ASSERT_TRUE(int_builder->Append(i).ok()); + } + // data file: + // rowGroup0: [0, 256) + // rowGroup1: [256, 512) + // rowGroup2: [512, 768) + // rowGroup3: [768, 1024) + std::shared_ptr src_array; + ASSERT_TRUE(struct_builder.Finish(&src_array).ok()); + auto src_schema = arrow::schema(fields); + auto arrow_schema = arrow::schema(fields); + WriteArray(file_path_, src_array, arrow_schema, /*write_batch_size=*/1024, + /*enable_dictionary=*/true, + /*max_row_group_length=*/256); + { + // simple case + std::optional bitmap = RoaringBitmap32::From({100, 400, 600}); + ASSERT_OK_AND_ASSIGN( + auto predicate, + PredicateBuilder::Or( + {PredicateBuilder::LessThan(/*field_index=*/0, /*field_name=*/"f0", FieldType::INT, + Literal(255)), + PredicateBuilder::GreaterThan(/*field_index=*/0, /*field_name=*/"f0", + FieldType::INT, Literal(600))})); + auto parquet_batch_reader = + PrepareParquetFileBatchReader(file_path_, arrow_schema, predicate, bitmap, + /*batch_size=*/length, /*enable_page_level_filter=*/true); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr result_array, + paimon::test::ReadResultCollector::CollectResult(parquet_batch_reader.get())); + + auto expected_array = + arrow::ChunkedArray::Make({src_array->Slice(100, 1), src_array->Slice(600, 1)}) + .ValueOrDie(); + ASSERT_TRUE(result_array->Equals(expected_array)) << result_array->ToString(); + } + { + // test all data has been filtered out with predicate and bitmap pushdown + std::optional bitmap = RoaringBitmap32::From({100, 400, 600}); + auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/0, /*field_name=*/"f0", + FieldType::INT, Literal(800)); + auto parquet_batch_reader = PrepareParquetFileBatchReader( + file_path_, arrow_schema, predicate, bitmap, /*batch_size=*/length); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr result_array, + paimon::test::ReadResultCollector::CollectResult(parquet_batch_reader.get())); + ASSERT_FALSE(result_array); + } +} TEST_F(ParquetFileBatchReaderTest, TestReadNoField) { // if only read partition fields, format reader will set empty read schema @@ -812,20 +924,19 @@ TEST_F(ParquetFileBatchReaderTest, TestReadNoField) { PrepareParquetFileBatchReader(file_name, read_schema, /*predicate=*/nullptr, /*selection_bitmap=*/std::nullopt, /*batch_size=*/2); // read 2 rows - ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(), - std::numeric_limits::max()); + ASSERT_NOK(parquet_batch_reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto batch1, parquet_batch_reader->NextBatch()); - ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(), 0); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 0); // read 2 rows ASSERT_OK_AND_ASSIGN(auto batch2, parquet_batch_reader->NextBatch()); - ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(), 2); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 2); // read 2 rows ASSERT_OK_AND_ASSIGN(auto batch3, parquet_batch_reader->NextBatch()); - ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(), 4); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 4); // read rows with eof ASSERT_OK_AND_ASSIGN(auto batch4, parquet_batch_reader->NextBatch()); - ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(), 6); ASSERT_TRUE(BatchReader::IsEofBatch(batch4)); + ASSERT_NOK(parquet_batch_reader->GetPreviousBatchFileRowId(0)); parquet_batch_reader->Close(); arrow::FieldVector fields; @@ -1013,4 +1124,157 @@ TEST_F(ParquetFileBatchReaderTest, TestAddMetadataPerFieldMetadata) { ASSERT_TRUE(data->Equals(*result_array->chunk(0))) << result_array->ToString(); } +TEST_F(ParquetFileBatchReaderTest, TestRowMappingSimple) { + arrow::FieldVector fields = {arrow::field("f0", arrow::int32())}; + auto src_array = MakeSequentialIntData(12); + // data in file rowGroup0:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] + // one row per page + auto arrow_schema = arrow::schema(fields); + WriteArray(file_path_, src_array, arrow_schema, /*write_batch_size=*/1, + /*enable_dictionary=*/true, /*max_row_group_length=*/12, /*max_page_size=*/1); + + // 1<=f0<=3 || 5<=f0<=6 + ASSERT_OK_AND_ASSIGN( + auto predicate, + PredicateBuilder::Or({PredicateBuilder::Between(/*field_index=*/0, /*field_name=*/"f0", + FieldType::INT, Literal(1), Literal(3)), + PredicateBuilder::Between(/*field_index=*/0, /*field_name=*/"f0", + FieldType::INT, Literal(5), Literal(6))})); + + auto parquet_batch_reader = PrepareParquetFileBatchReader( + file_path_, arrow_schema, /*predicate=*/predicate, std::nullopt, /*batch_size=*/2, + /*enable_page_level_filter=*/true); + + ASSERT_NOK(parquet_batch_reader->GetPreviousBatchFileRowId(0)); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr batch1, + paimon::test::ReadResultCollector::CollectResultOneBatch(parquet_batch_reader.get())); + auto expected_batch1 = src_array->Slice(1, 2); + ASSERT_TRUE(batch1->chunk(0)->Equals(expected_batch1)) << batch1->ToString(); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 1); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(1).value(), 2); + // out of bound return invalid + ASSERT_NOK(parquet_batch_reader->GetPreviousBatchFileRowId(2)); + + // Not adjacent pages + ASSERT_OK_AND_ASSIGN( + std::shared_ptr batch2, + paimon::test::ReadResultCollector::CollectResultOneBatch(parquet_batch_reader.get())); + auto expected_batch2 = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ +[3], +[5] + ])") + .ValueOrDie()); + ASSERT_TRUE(batch2->chunk(0)->Equals(expected_batch2)) << batch2->ToString(); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 3); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(1).value(), 5); + + // Only one record read + ASSERT_OK_AND_ASSIGN( + std::shared_ptr batch3, + paimon::test::ReadResultCollector::CollectResultOneBatch(parquet_batch_reader.get())); + auto expected_batch3 = src_array->Slice(6, 1); + ASSERT_TRUE(batch3->chunk(0)->Equals(expected_batch3)) << batch3->ToString(); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 6); + ASSERT_NOK(parquet_batch_reader->GetPreviousBatchFileRowId(1)); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr eof_batch, + paimon::test::ReadResultCollector::CollectResultOneBatch(parquet_batch_reader.get())); + ASSERT_EQ(nullptr, eof_batch); + // previous batch is eof, return invalid. + ASSERT_NOK(parquet_batch_reader->GetPreviousBatchFileRowId(0)); +} + +TEST_F(ParquetFileBatchReaderTest, TestRowMappingFullyAndPartially) { + arrow::FieldVector fields = {arrow::field("f0", arrow::int32())}; + auto src_array = MakeSequentialIntData(12); + // data in file RowGroup0:[0, 1, 2] | RowGroup1:[3, 4, 5] | RowGroup2:[6, 7, 8] | RowGroup3:[9, + // 10, 11] one row per page + auto arrow_schema = arrow::schema(fields); + WriteArray(file_path_, src_array, arrow_schema, /*write_batch_size=*/1, + /*enable_dictionary=*/true, /*max_row_group_length=*/3, /*max_page_size=*/1); + + // 3<=f0<=5 || f0==6 || f0==8 + // RowGroup 1 is fully matched, RowGroup 2 is partially matched, RowGroup 0 and RowGroup 3 are + // not matched. + ASSERT_OK_AND_ASSIGN( + auto predicate, + PredicateBuilder::Or({PredicateBuilder::Between(/*field_index=*/0, /*field_name=*/"f0", + FieldType::INT, Literal(3), Literal(5)), + PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", + FieldType::INT, Literal(6)), + PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", + FieldType::INT, Literal(8))})); + + auto parquet_batch_reader = PrepareParquetFileBatchReader( + file_path_, arrow_schema, /*predicate=*/predicate, std::nullopt, /*batch_size=*/3, + /*enable_page_level_filter=*/true); + + ASSERT_NOK(parquet_batch_reader->GetPreviousBatchFileRowId(0)); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr batch1, + paimon::test::ReadResultCollector::CollectResultOneBatch(parquet_batch_reader.get())); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 3); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(2).value(), 5); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr batch2, + paimon::test::ReadResultCollector::CollectResultOneBatch(parquet_batch_reader.get())); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 6); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(1).value(), 8); +} + +TEST_F(ParquetFileBatchReaderTest, TestRowMappingSetReadSchemaTwice) { + arrow::FieldVector fields = {arrow::field("f0", arrow::int32())}; + auto src_array = MakeSequentialIntData(12); + // data in file RowGroup0:[0, 1, 2] | RowGroup1:[3, 4, 5] | RowGroup2:[6, 7, 8] | RowGroup3:[9, + // 10, 11] one row per page + auto arrow_schema = arrow::schema(fields); + WriteArray(file_path_, src_array, arrow_schema, /*write_batch_size=*/1, + /*enable_dictionary=*/true, /*max_row_group_length=*/3, /*max_page_size=*/1); + + // 1<=f0<=3 || 6<=f0<=7 + ASSERT_OK_AND_ASSIGN( + auto predicate, + PredicateBuilder::Or({PredicateBuilder::Between(/*field_index=*/0, /*field_name=*/"f0", + FieldType::INT, Literal(1), Literal(3)), + PredicateBuilder::Between(/*field_index=*/0, /*field_name=*/"f0", + FieldType::INT, Literal(6), Literal(7))})); + + auto parquet_batch_reader = PrepareParquetFileBatchReader( + file_path_, arrow_schema, /*predicate=*/predicate, std::nullopt, /*batch_size=*/3, + /*enable_page_level_filter=*/true); + + ASSERT_NOK(parquet_batch_reader->GetPreviousBatchFileRowId(0)); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr batch1, + paimon::test::ReadResultCollector::CollectResultOneBatch(parquet_batch_reader.get())); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 1); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(1).value(), 2); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr batch2, + paimon::test::ReadResultCollector::CollectResultOneBatch(parquet_batch_reader.get())); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 3); + ASSERT_NOK(parquet_batch_reader->GetPreviousBatchFileRowId(1)); + + ASSERT_OK_AND_ASSIGN( + predicate, + PredicateBuilder::Or({PredicateBuilder::Between(/*field_index=*/0, /*field_name=*/"f0", + FieldType::INT, Literal(3), Literal(5))})); + + std::unique_ptr c_schema = std::make_unique(); + ASSERT_TRUE(arrow::ExportSchema(*arrow_schema, c_schema.get()).ok()); + ASSERT_OK( + parquet_batch_reader->SetReadSchema(c_schema.get(), /*predicate=*/predicate, std::nullopt)); + ASSERT_NOK(parquet_batch_reader->GetPreviousBatchFileRowId(0)); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr batch3, + paimon::test::ReadResultCollector::CollectResultOneBatch(parquet_batch_reader.get())); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 3); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(2).value(), 5); +} + } // namespace paimon::parquet::test diff --git a/src/paimon/format/parquet/row_ranges.h b/src/paimon/format/parquet/row_ranges.h index 46c3f4d21..b2b8338db 100644 --- a/src/paimon/format/parquet/row_ranges.h +++ b/src/paimon/format/parquet/row_ranges.h @@ -105,21 +105,4 @@ class RowRanges { private: std::vector ranges_; }; - -struct TargetRowGroup { - int32_t row_group_index{-1}; - bool is_partially_matched{false}; - // page-filtered row ranges, only valid if is_partially_matched is true. - RowRanges row_ranges; - // Whether this row group has been excluded by ApplyReadRanges. - // When true, this row group is logically skipped during iteration - // but retained so that a subsequent wider ApplyReadRanges can restore it. - bool excluded_by_read_range{false}; - - TargetRowGroup() = default; - TargetRowGroup(int32_t rg_index, bool is_partially_matched, RowRanges ranges) - : row_group_index(rg_index), - is_partially_matched(is_partially_matched), - row_ranges(std::move(ranges)) {} -}; } // namespace paimon::parquet diff --git a/src/paimon/format/parquet/target_row_group.h b/src/paimon/format/parquet/target_row_group.h new file mode 100644 index 000000000..119ddb228 --- /dev/null +++ b/src/paimon/format/parquet/target_row_group.h @@ -0,0 +1,92 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "paimon/format/parquet/row_ranges.h" + +namespace paimon::parquet { +class TargetRowGroup; +using TargetRowGroups = std::vector; +class TargetRowGroup { + public: + explicit TargetRowGroup(int32_t rg_index) : row_group_index(rg_index) {} + TargetRowGroup(int32_t rg_index, bool is_partially_matched, RowRanges ranges) + : row_group_index(rg_index), + is_partially_matched(is_partially_matched), + row_ranges(std::move(ranges)) {} + + TargetRowGroup(const TargetRowGroup& other) = default; + TargetRowGroup& operator=(const TargetRowGroup& other) = default; + + bool IsExcludedByReadRange() const { + return excluded_by_read_range; + } + + void SetExcludedByReadRange(bool excluded) { + excluded_by_read_range = excluded; + } + + int32_t GetRowGroupIndex() const { + return row_group_index; + } + + bool IsPartiallyMatched() const { + return is_partially_matched; + } + + const RowRanges& GetRowRanges() const { + return row_ranges; + } + + static TargetRowGroups MakeSerialRowGroups( + const std::vector>& ranges) { + TargetRowGroups target_row_groups; + target_row_groups.reserve(ranges.size()); + for (size_t i = 0; i < ranges.size(); ++i) { + target_row_groups.emplace_back( + i, false, RowRanges(Range(0, ranges[i].second - ranges[i].first - 1))); + } + return target_row_groups; + } + + static std::vector GetRowGroupIndices(const TargetRowGroups& target_row_groups) { + std::vector indices; + indices.reserve(target_row_groups.size()); + for (const auto& rg : target_row_groups) { + indices.push_back(rg.GetRowGroupIndex()); + } + return indices; + } + + private: + int32_t row_group_index{-1}; + bool is_partially_matched{false}; + // Local row ranges + RowRanges row_ranges; + // Whether this row group has been excluded by ApplyReadRanges. + // When true, this row group is logically skipped during iteration + // but retained so that a subsequent wider ApplyReadRanges can restore it. + bool excluded_by_read_range{false}; +}; + +} // namespace paimon::parquet diff --git a/src/paimon/testing/mock/mock_file_batch_reader.h b/src/paimon/testing/mock/mock_file_batch_reader.h index 386af59be..439d5c296 100644 --- a/src/paimon/testing/mock/mock_file_batch_reader.h +++ b/src/paimon/testing/mock/mock_file_batch_reader.h @@ -156,8 +156,11 @@ class MockFileBatchReader : public PrefetchFileBatchReader { return metrics; } - Result GetPreviousBatchFirstRowNumber() const override { - return ToReaderRowNumber(previous_batch_first_row_num_); + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { + if (previous_batch_first_row_num_ == std::numeric_limits::max()) { + return Status::Invalid("No batch has been read yet"); + } + return previous_batch_first_row_num_ + batch_row_id; } Result GetNumberOfRows() const override { @@ -191,7 +194,7 @@ class MockFileBatchReader : public PrefetchFileBatchReader { int32_t batch_size_ = 0; int32_t current_pos_ = 0; int32_t read_end_pos_ = 0; - int32_t previous_batch_first_row_num_ = -1; + uint64_t previous_batch_first_row_num_ = std::numeric_limits::max(); Status next_batch_status_; bool enable_randomize_batch_size_ = true; std::vector> read_ranges_; diff --git a/src/paimon/testing/utils/read_result_collector.h b/src/paimon/testing/utils/read_result_collector.h index 5106f3b26..10f7f8dae 100644 --- a/src/paimon/testing/utils/read_result_collector.h +++ b/src/paimon/testing/utils/read_result_collector.h @@ -131,6 +131,53 @@ class ReadResultCollector { return chunk_array; } + static Result> CollectResultOneBatch( + BatchReader* batch_reader) { + return CollectResultOneBatch(batch_reader, /*max_simulated_data_processing_time*/ 0); + } + + static Result> CollectResultOneBatch( + BatchReader* batch_reader, int64_t max_data_processing_time_in_us) { + int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); + std::srand(seed); + auto batch_result = batch_reader->NextBatch(); + BatchReader::ReadBatch batch; + if (!batch_result.ok()) { + if (batch_result.status().ToString().find("should use NextBatchWithBitmap") != + std::string::npos) { + PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap batch_with_bitmap, + batch_reader->NextBatchWithBitmap()); + if (BatchReader::IsEofBatch(batch_with_bitmap)) { + return std::shared_ptr(); + } + assert(!batch_with_bitmap.second.IsEmpty()); + PAIMON_ASSIGN_OR_RAISE( + batch, ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap), + arrow::default_memory_pool())); + } else { + return batch_result.status(); + } + } else { + batch = std::move(batch_result).value(); + if (BatchReader::IsEofBatch(batch)) { + return std::shared_ptr(); + } + } + auto& [c_array, c_schema] = batch; + assert(c_array->length > 0); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr result_array, + arrow::ImportArray(c_array.get(), c_schema.get())); + PAIMON_ASSIGN_OR_RAISE( + auto converted_array, + DictArrayConverter::ConvertDictArray(result_array, arrow::default_memory_pool())); + if (max_data_processing_time_in_us > 0) { + usleep(std::rand() % max_data_processing_time_in_us); + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr chunk_array, + arrow::ChunkedArray::Make({converted_array})); + return chunk_array; + } + static Result> GetArray(BatchReader::ReadBatch&& batch) { if (BatchReader::IsEofBatch(batch)) { return std::shared_ptr();