diff --git a/include/paimon/reader/file_batch_reader.h b/include/paimon/reader/file_batch_reader.h index 272de3c82..ddb4e999d 100644 --- a/include/paimon/reader/file_batch_reader.h +++ b/include/paimon/reader/file_batch_reader.h @@ -46,8 +46,8 @@ class PAIMON_EXPORT FileBatchReader : public BatchReader { using BatchReader::NextBatch; using BatchReader::NextBatchWithBitmap; - /// Get the row number of the first row in the previously read batch. - virtual Result GetPreviousBatchFirstRowNumber() const = 0; + /// Get the global row number of the row in the previously read batch. + virtual Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const = 0; /// Get the number of rows in the file. virtual Result GetNumberOfRows() const = 0; diff --git a/src/paimon/common/data/shredding/map_shared_shredding_file_reader.cpp b/src/paimon/common/data/shredding/map_shared_shredding_file_reader.cpp index 7d446f6f1..71deb1114 100644 --- a/src/paimon/common/data/shredding/map_shared_shredding_file_reader.cpp +++ b/src/paimon/common/data/shredding/map_shared_shredding_file_reader.cpp @@ -391,8 +391,9 @@ void MapSharedShreddingFileReader::Close() { reader_->Close(); } -Result MapSharedShreddingFileReader::GetPreviousBatchFirstRowNumber() const { - return reader_->GetPreviousBatchFirstRowNumber(); +Result MapSharedShreddingFileReader::GetPreviousBatchFileRowId( + uint64_t batch_row_id) const { + return reader_->GetPreviousBatchFileRowId(batch_row_id); } Result MapSharedShreddingFileReader::GetNumberOfRows() const { diff --git a/src/paimon/common/data/shredding/map_shared_shredding_file_reader.h b/src/paimon/common/data/shredding/map_shared_shredding_file_reader.h index d09393b3c..46f053517 100644 --- a/src/paimon/common/data/shredding/map_shared_shredding_file_reader.h +++ b/src/paimon/common/data/shredding/map_shared_shredding_file_reader.h @@ -60,7 +60,7 @@ class MapSharedShreddingFileReader : public FileBatchReader { void Close() override; - Result GetPreviousBatchFirstRowNumber() const override; + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override; Result GetNumberOfRows() const override; diff --git a/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader.h b/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader.h index 7e2c9338a..8f770478f 100644 --- a/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader.h +++ b/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader.h @@ -80,8 +80,8 @@ class ApplyBitmapIndexBatchReader : public FileBatchReader { return Status::Invalid("ApplyBitmapIndexBatchReader does not support SetReadSchema"); } - Result GetPreviousBatchFirstRowNumber() const override { - return reader_->GetPreviousBatchFirstRowNumber(); + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { + return reader_->GetPreviousBatchFileRowId(batch_row_id); } Result GetNumberOfRows() const override { @@ -94,14 +94,23 @@ class ApplyBitmapIndexBatchReader : public FileBatchReader { private: Result Filter(int32_t batch_size) const { - RoaringBitmap32 is_valid; - PAIMON_ASSIGN_OR_RAISE(int32_t start_pos, reader_->GetPreviousBatchFirstRowNumber()); - int32_t length = batch_size; - for (auto iter = bitmap_.EqualOrLarger(start_pos); - iter != bitmap_.End() && *iter < start_pos + length; ++iter) { - is_valid.Add(*iter - start_pos); + RoaringBitmap32 result; + auto bitmap_iter = bitmap_.Begin(); + auto bitmap_end = bitmap_.End(); + + for (int32_t i = 0; i < batch_size; ++i) { + PAIMON_ASSIGN_OR_RAISE(uint64_t file_row_id, reader_->GetPreviousBatchFileRowId(i)); + while (bitmap_iter != bitmap_end && static_cast(*bitmap_iter) < file_row_id) { + ++bitmap_iter; + } + if (bitmap_iter == bitmap_end) { + break; + } + if (static_cast(*bitmap_iter) == file_row_id) { + result.Add(i); + } } - return is_valid; + return result; } private: diff --git a/src/paimon/common/reader/delegating_prefetch_reader.h b/src/paimon/common/reader/delegating_prefetch_reader.h index fe2e3eda2..0a22f8263 100644 --- a/src/paimon/common/reader/delegating_prefetch_reader.h +++ b/src/paimon/common/reader/delegating_prefetch_reader.h @@ -54,8 +54,8 @@ class DelegatingPrefetchReader : public FileBatchReader { return prefetch_reader_->SetReadSchema(read_schema, predicate, selection_bitmap); } - Result GetPreviousBatchFirstRowNumber() const override { - return GetReader()->GetPreviousBatchFirstRowNumber(); + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { + return GetReader()->GetPreviousBatchFileRowId(batch_row_id); } Result GetNumberOfRows() const override { diff --git a/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp b/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp index da74f3484..5940a4d48 100644 --- a/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp +++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp @@ -40,6 +40,19 @@ class Schema; namespace paimon { +namespace { + +std::pair ComputeBatchSliceByReadRange( + const std::vector& global_row_ids, const std::pair& read_range) { + auto begin_it = + std::lower_bound(global_row_ids.begin(), global_row_ids.end(), read_range.first); + auto end_it = std::lower_bound(global_row_ids.begin(), global_row_ids.end(), read_range.second); + return {static_cast(std::distance(global_row_ids.begin(), begin_it)), + static_cast(std::distance(global_row_ids.begin(), end_it))}; +} + +} // namespace + Result> PrefetchFileBatchReaderImpl::Create( const std::string& data_file_path, const ReaderBuilder* reader_builder, const std::shared_ptr& fs, uint32_t prefetch_max_parallel_num, int32_t batch_size, @@ -265,6 +278,7 @@ Status PrefetchFileBatchReaderImpl::CleanUp() { read_ranges_.clear(); read_ranges_in_group_.clear(); + current_batch_global_row_ids_.clear(); clean_prefetch_queue(); for (size_t i = 0; i < readers_pos_.size(); i++) { readers_pos_[i]->store(0); @@ -409,42 +423,54 @@ Status PrefetchFileBatchReaderImpl::EnsureReaderPosition( Status PrefetchFileBatchReaderImpl::HandleReadResult( size_t reader_idx, const std::pair& read_range, ReadBatchWithBitmap&& read_batch_with_bitmap) { - PAIMON_ASSIGN_OR_RAISE(uint64_t first_row_number, - readers_[reader_idx]->GetPreviousBatchFirstRowNumber()); auto& prefetch_queue = prefetch_queues_[reader_idx]; if (!BatchReader::IsEofBatch(read_batch_with_bitmap)) { auto& [read_batch, bitmap] = read_batch_with_bitmap; auto& [c_array, c_schema] = read_batch; - - if (first_row_number >= read_range.second) { - // fully out of range, data before first_row_number has been filtered out - readers_pos_[reader_idx]->store(first_row_number); + std::vector global_row_ids; + global_row_ids.reserve(c_array->length); + for (int64_t i = 0; i < c_array->length; ++i) { + PAIMON_ASSIGN_OR_RAISE(uint64_t global_row_id, + readers_[reader_idx]->GetPreviousBatchFileRowId(i)); + global_row_ids.push_back(global_row_id); + } + if (global_row_ids.empty()) { + ReaderUtils::ReleaseReadBatch(std::move(read_batch)); + return Status::OK(); + } + auto [slice_begin, slice_end] = ComputeBatchSliceByReadRange(global_row_ids, read_range); + if (slice_begin >= slice_end) { + readers_pos_[reader_idx]->store(read_range.second); ReaderUtils::ReleaseReadBatch(std::move(read_batch)); return Status::OK(); - } else if (first_row_number + c_array->length > read_range.second) { - // partially out of range, data before read_range.second has been effectively consumed + } else if (slice_begin > 0 || slice_end < c_array->length) { readers_pos_[reader_idx]->store(read_range.second); PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr src_array, arrow::ImportArray(c_array.get(), c_schema.get())); - int32_t target_length = read_range.second - first_row_number; - auto array = src_array->Slice(/*offset=*/0, target_length); + auto array = src_array->Slice(slice_begin, slice_end - slice_begin); PAIMON_RETURN_NOT_OK_FROM_ARROW( arrow::ExportArray(*array, c_array.get(), c_schema.get())); - bitmap.RemoveRange(target_length, src_array->length()); + RoaringBitmap32 sliced_bitmap; + for (auto iter = bitmap.EqualOrLarger(slice_begin); + iter != bitmap.End() && *iter < slice_end; ++iter) { + sliced_bitmap.Add(*iter - slice_begin); + } + bitmap = std::move(sliced_bitmap); + global_row_ids = std::vector(global_row_ids.begin() + slice_begin, + global_row_ids.begin() + slice_end); } else { - // all within the range, data before readers_[reader_idx]->GetNextRowToRead() has been - // effectively consumed readers_pos_[reader_idx]->store(readers_[reader_idx]->GetNextRowToRead()); } if (bitmap.IsEmpty()) { ReaderUtils::ReleaseReadBatch(std::move(read_batch)); return Status::OK(); } - prefetch_queue->push({read_range, std::move(read_batch_with_bitmap), first_row_number}); + prefetch_queue->push( + {read_range, std::move(read_batch_with_bitmap), std::move(global_row_ids)}); } else { std::pair eof_range; PAIMON_ASSIGN_OR_RAISE(eof_range, EofRange()); - prefetch_queue->push({eof_range, std::move(read_batch_with_bitmap), first_row_number}); + prefetch_queue->push({eof_range, std::move(read_batch_with_bitmap), {}}); readers_pos_[reader_idx]->store(std::numeric_limits::max()); } return Status::OK(); @@ -527,7 +553,7 @@ Result PrefetchFileBatchReaderImpl::NextBatchW std::unique_lock lock(working_mutex_); cv_.notify_one(); } - previous_batch_first_row_num_ = prefetch_batch.value().previous_batch_first_row_num; + current_batch_global_row_ids_ = std::move(prefetch_batch.value().global_row_ids); return std::move(prefetch_batch).value().batch; } } @@ -537,7 +563,7 @@ Result PrefetchFileBatchReaderImpl::NextBatchW assert(false); return Status::Invalid("peek batch not suppose to be nullptr"); } - previous_batch_first_row_num_ = peek_batch->previous_batch_first_row_num; + current_batch_global_row_ids_.clear(); return BatchReader::MakeEofBatchWithBitmap(); } if (value_count == prefetch_queues_.size()) { @@ -571,8 +597,19 @@ Result> PrefetchFileBatchReaderImpl::GetFileSchem return readers_[0]->GetFileSchema(); } -Result PrefetchFileBatchReaderImpl::GetPreviousBatchFirstRowNumber() const { - return previous_batch_first_row_num_; +Result PrefetchFileBatchReaderImpl::GetPreviousBatchFileRowId( + uint64_t batch_row_id) const { + if (current_batch_global_row_ids_.size() == 0) { + return Status::Invalid( + "Last batch is not read or last batch is empty, cannot get previous batch global row " + "id"); + } + if (batch_row_id >= current_batch_global_row_ids_.size()) { + return Status::Invalid( + fmt::format("batch_row_id {} is out of range, last batch row count is {}", batch_row_id, + current_batch_global_row_ids_.size())); + } + return current_batch_global_row_ids_[batch_row_id]; } Result PrefetchFileBatchReaderImpl::GetNumberOfRows() const { diff --git a/src/paimon/common/reader/prefetch_file_batch_reader_impl.h b/src/paimon/common/reader/prefetch_file_batch_reader_impl.h index 5ed9fb352..19f936c8f 100644 --- a/src/paimon/common/reader/prefetch_file_batch_reader_impl.h +++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl.h @@ -76,7 +76,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader { const std::optional& selection_bitmap) override; Status SeekToRow(uint64_t row_number) override; - Result GetPreviousBatchFirstRowNumber() const override; + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override; Result GetNumberOfRows() const override; uint64_t GetNextRowToRead() const override; void Close() override; @@ -105,7 +105,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader { struct PrefetchBatch { std::pair read_range; BatchReader::ReadBatchWithBitmap batch; - uint64_t previous_batch_first_row_num; + std::vector global_row_ids; }; PrefetchFileBatchReaderImpl( @@ -160,7 +160,7 @@ class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader { std::unique_ptr background_thread_; Status read_status_; std::atomic is_shutdown_ = false; - uint64_t previous_batch_first_row_num_ = std::numeric_limits::max(); + std::vector current_batch_global_row_ids_; bool need_prefetch_ = false; bool read_ranges_freshed_ = false; const uint32_t prefetch_queue_capacity_; diff --git a/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp b/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp index 24fa6861b..ceeeb9c57 100644 --- a/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp +++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp @@ -284,12 +284,10 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestSimple) { /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS, CacheConfig(), GetDefaultPool())); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto result_array, ReadResultCollector::CollectResult( reader.get(), /*max simulated data processing time*/ 100)); - ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 101); auto expected_array = std::make_shared(data_array); ASSERT_TRUE(result_array->Equals(expected_array)); } @@ -607,12 +605,11 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithLargeBatchSize) { prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS, CacheConfig(), GetDefaultPool())); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto result_array, ReadResultCollector::CollectResult( reader.get(), /*max simulated data processing time*/ 100)); - ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 101); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); auto expected_array = std::make_shared(data_array); ASSERT_TRUE(result_array->Equals(expected_array)); } @@ -636,12 +633,11 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestPartialReaderSuccessRead) { } arrow::ArrayVector result_array_vector; - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto batch_with_bitmap, reader->NextBatchWithBitmap()); auto& [batch, bitmap] = batch_with_bitmap; ASSERT_EQ(batch.first->length, bitmap.Cardinality()); - ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 0); + ASSERT_EQ(reader->GetPreviousBatchFileRowId(0).value(), 0); ASSERT_OK_AND_ASSIGN(auto array, ReadResultCollector::GetArray(std::move(batch))); result_array_vector.push_back(array); ASSERT_OK(prefetch_reader->GetReadStatus()); @@ -682,11 +678,9 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestAllReaderFailedWithIOError) { ->SetNextBatchStatus(Status::IOError("mock error")); } - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); auto batch_result = reader->NextBatchWithBitmap(); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_FALSE(batch_result.ok()); ASSERT_TRUE(batch_result.status().IsIOError()); ASSERT_FALSE(prefetch_reader->is_shutdown_); @@ -695,8 +689,7 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestAllReaderFailedWithIOError) { // call NextBatch again, will still return error status auto batch_result2 = reader->NextBatchWithBitmap(); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_FALSE(batch_result2.ok()); ASSERT_TRUE(batch_result2.status().IsIOError()); } @@ -713,12 +706,11 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestPrefetchWithEmptyData) { prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS, CacheConfig(), GetDefaultPool())); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto result_array, ReadResultCollector::CollectResult( reader.get(), /*max simulated data processing time*/ 100)); - ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 0); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_FALSE(result_array); } @@ -734,17 +726,17 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestCallNextBatchAfterReadingEof) { prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true, /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS, CacheConfig(), GetDefaultPool())); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto result_array, ReadResultCollector::CollectResult( reader.get(), /*max simulated data processing time*/ 100)); - ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 10); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); auto expected_array = std::make_shared(data_array); ASSERT_TRUE(result_array->Equals(expected_array)); // continue to call NextBatch() after reading eof ASSERT_OK_AND_ASSIGN(auto batch_with_bitmap, reader->NextBatchWithBitmap()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_TRUE(BatchReader::IsEofBatch(batch_with_bitmap)); } @@ -841,12 +833,11 @@ TEST_P(PrefetchFileBatchReaderImplTest, TestPrefetchWithPredicatePushdownWithCom PreparePrefetchReader(file_format, schema.get(), predicate, /*selection_bitmap=*/std::nullopt, /*batch_size=*/10, /*prefetch_max_parallel_num=*/3, cache_mode); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto result_array, ReadResultCollector::CollectResult( reader.get(), /*max simulated data processing time*/ 100)); - ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 90); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); arrow::ArrayVector expected_array_vector; expected_array_vector.push_back(data_array->Slice(0, 30)); @@ -878,12 +869,11 @@ TEST_P(PrefetchFileBatchReaderImplTest, /*selection_bitmap=*/std::nullopt, /*batch_size=*/10, /*prefetch_max_parallel_num=*/3, cache_mode); ASSERT_OK(reader->RefreshReadRanges()); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto result_array, ReadResultCollector::CollectResult( reader.get(), /*max simulated data processing time*/ 100)); - ASSERT_EQ(reader->GetPreviousBatchFirstRowNumber().value(), 90); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); arrow::ArrayVector expected_array_vector; expected_array_vector.push_back(data_array->Slice(0, 20)); @@ -923,4 +913,55 @@ TEST_F(PrefetchFileBatchReaderImplTest, TestPrefetchWithBitmap) { ASSERT_TRUE(result_chunk_array->Equals(expected_chunk_array)); } +TEST_P(PrefetchFileBatchReaderImplTest, TestRowMapping) { + auto [file_format, cache_mode] = GetParam(); + auto data_array = PrepareArray(90); + PrepareTestData(file_format, data_array, /*stripe_row_count=*/30, /*row_index_stride=*/10); + auto schema = arrow::schema(fields_); + ASSERT_OK_AND_ASSIGN( + auto predicate, + PredicateBuilder::Or({ + PredicateBuilder::Between(/*field_index=*/1, /*field_name=*/"f1", FieldType::BIGINT, + Literal(20l), Literal(29l)), + PredicateBuilder::Between(/*field_index=*/1, /*field_name=*/"f1", FieldType::BIGINT, + Literal(70l), Literal(79l)), + })); + + auto reader = + PreparePrefetchReader(file_format, schema.get(), predicate, + /*selection_bitmap=*/std::nullopt, + /*batch_size=*/10, /*prefetch_max_parallel_num=*/3, cache_mode); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr batch, + paimon::test::ReadResultCollector::CollectResultOneBatch(reader.get())); + for (uint64_t i = 0; i < 10; i++) { + ASSERT_EQ(reader->GetPreviousBatchFileRowId(i).value(), 20 + i); + } + + ASSERT_OK_AND_ASSIGN(batch, + paimon::test::ReadResultCollector::CollectResultOneBatch(reader.get())); + for (uint64_t i = 0; i < 10; i++) { + ASSERT_EQ(reader->GetPreviousBatchFileRowId(i).value(), 70 + i); + } + + // Set read schema again + std::unique_ptr c_schema = std::make_unique(); + ASSERT_TRUE(arrow::ExportSchema(*schema, c_schema.get()).ok()); + predicate = PredicateBuilder::Between(/*field_index=*/1, /*field_name=*/"f1", FieldType::BIGINT, + Literal(30l), Literal(49l)); + ASSERT_OK(reader->SetReadSchema(c_schema.get(), predicate, std::nullopt)); + + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); + ASSERT_OK_AND_ASSIGN(batch, + paimon::test::ReadResultCollector::CollectResultOneBatch(reader.get())); + for (uint64_t i = 0; i < 10; i++) { + ASSERT_EQ(reader->GetPreviousBatchFileRowId(i).value(), 30 + i); + } + ASSERT_OK_AND_ASSIGN(batch, + paimon::test::ReadResultCollector::CollectResultOneBatch(reader.get())); + for (uint64_t i = 0; i < 10; i++) { + ASSERT_EQ(reader->GetPreviousBatchFileRowId(i).value(), 40 + i); + } +} + } // namespace paimon::test diff --git a/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader.h b/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader.h index 5e2ecf279..48fec8b31 100644 --- a/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader.h +++ b/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader.h @@ -82,8 +82,8 @@ class ApplyDeletionVectorBatchReader : public FileBatchReader { return Status::Invalid("ApplyDeletionVectorBatchReader does not support SetReadSchema"); } - Result GetPreviousBatchFirstRowNumber() const override { - return reader_->GetPreviousBatchFirstRowNumber(); + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { + return reader_->GetPreviousBatchFileRowId(batch_row_id); } Result GetNumberOfRows() const override { @@ -96,9 +96,15 @@ class ApplyDeletionVectorBatchReader : public FileBatchReader { private: Result Filter(int32_t batch_size) const { - PAIMON_ASSIGN_OR_RAISE(uint64_t previous_batch_first_row_number, - reader_->GetPreviousBatchFirstRowNumber()); - return deletion_vector_->IsValid(previous_batch_first_row_number, batch_size); + RoaringBitmap32 is_valid; + for (int32_t i = 0; i < batch_size; ++i) { + PAIMON_ASSIGN_OR_RAISE(uint64_t file_row_id, reader_->GetPreviousBatchFileRowId(i)); + PAIMON_ASSIGN_OR_RAISE(bool is_deleted, deletion_vector_->IsDeleted(file_row_id)); + if (!is_deleted) { + is_valid.Add(i); + } + } + return is_valid; } private: diff --git a/src/paimon/core/io/complete_row_tracking_fields_reader.cpp b/src/paimon/core/io/complete_row_tracking_fields_reader.cpp index 2aef9b29f..29b610b16 100644 --- a/src/paimon/core/io/complete_row_tracking_fields_reader.cpp +++ b/src/paimon/core/io/complete_row_tracking_fields_reader.cpp @@ -86,15 +86,14 @@ CompleteRowTrackingFieldsBatchReader::NextBatchWithBitmap() { std::string row_id_field_name = SpecialFields::RowId().Name(); if (read_schema_->GetFieldIndex(row_id_field_name) != -1) { row_id_array = src_struct_array->GetFieldByName(row_id_field_name); - PAIMON_ASSIGN_OR_RAISE(uint64_t previous_batch_first_row_number, - reader_->GetPreviousBatchFirstRowNumber()); - auto row_id_convert_func = [previous_batch_first_row_number, - this](int32_t idx_in_array) -> Result { + auto row_id_convert_func = [this](int32_t idx_in_array) -> Result { if (first_row_id_ == std::nullopt) { return Status::Invalid( "unexpected: read _ROW_ID special field, but first row id is null in meta"); } - return first_row_id_.value() + previous_batch_first_row_number + idx_in_array; + PAIMON_ASSIGN_OR_RAISE(uint64_t file_row_id, + reader_->GetPreviousBatchFileRowId(idx_in_array)); + return first_row_id_.value() + file_row_id; }; PAIMON_RETURN_NOT_OK(ConvertRowTrackingField(src_struct_array->length(), /*init_value=*/0, row_id_convert_func, &row_id_array)); diff --git a/src/paimon/core/io/complete_row_tracking_fields_reader.h b/src/paimon/core/io/complete_row_tracking_fields_reader.h index cc2f9f7bf..2aa535d79 100644 --- a/src/paimon/core/io/complete_row_tracking_fields_reader.h +++ b/src/paimon/core/io/complete_row_tracking_fields_reader.h @@ -60,8 +60,8 @@ class CompleteRowTrackingFieldsBatchReader : public FileBatchReader { reader_->Close(); } - Result GetPreviousBatchFirstRowNumber() const override { - return reader_->GetPreviousBatchFirstRowNumber(); + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { + return reader_->GetPreviousBatchFileRowId(batch_row_id); } Result GetNumberOfRows() const override { diff --git a/src/paimon/core/io/field_mapping_reader.h b/src/paimon/core/io/field_mapping_reader.h index 39fd920fa..9efd4a07c 100644 --- a/src/paimon/core/io/field_mapping_reader.h +++ b/src/paimon/core/io/field_mapping_reader.h @@ -77,8 +77,8 @@ class FieldMappingReader : public FileBatchReader { return Status::Invalid("FieldMappingReader does not support SetReadSchema"); } - Result GetPreviousBatchFirstRowNumber() const override { - return reader_->GetPreviousBatchFirstRowNumber(); + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { + return reader_->GetPreviousBatchFileRowId(batch_row_id); } Result GetNumberOfRows() const override { diff --git a/src/paimon/core/io/key_value_data_file_record_reader.cpp b/src/paimon/core/io/key_value_data_file_record_reader.cpp index a4edd04e0..8d12ec746 100644 --- a/src/paimon/core/io/key_value_data_file_record_reader.cpp +++ b/src/paimon/core/io/key_value_data_file_record_reader.cpp @@ -81,15 +81,15 @@ Result KeyValueDataFileRecordReader::Iterator::Next() { Result> KeyValueDataFileRecordReader::Iterator::NextWithFilePos() { PAIMON_ASSIGN_OR_RAISE(KeyValue kv, Next()); - return std::make_pair(previous_batch_first_row_number_ + cursor_ - 1, std::move(kv)); + PAIMON_ASSIGN_OR_RAISE(uint64_t global_row_id, + reader_->reader_->GetPreviousBatchFileRowId(cursor_ - 1)); + return std::make_pair(static_cast(global_row_id), std::move(kv)); } Result> KeyValueDataFileRecordReader::NextBatch() { Reset(); PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap batch_with_bitmap, reader_->NextBatchWithBitmap()); - PAIMON_ASSIGN_OR_RAISE(int64_t previous_batch_first_row_number, - reader_->GetPreviousBatchFirstRowNumber()); if (BatchReader::IsEofBatch(batch_with_bitmap)) { // reader eof, just return return std::unique_ptr(); @@ -140,8 +140,7 @@ Result> KeyValueDataFileRecordRe key_ctx_ = std::make_shared(key_fields, pool_); value_ctx_ = std::make_shared(value_fields, pool_); ArrowUtils::TraverseArray(data_batch); - return std::make_unique( - this, previous_batch_first_row_number); + return std::make_unique(this); } void KeyValueDataFileRecordReader::Reset() { diff --git a/src/paimon/core/io/key_value_data_file_record_reader.h b/src/paimon/core/io/key_value_data_file_record_reader.h index c271a3bdb..3fe87a89e 100644 --- a/src/paimon/core/io/key_value_data_file_record_reader.h +++ b/src/paimon/core/io/key_value_data_file_record_reader.h @@ -54,16 +54,13 @@ class KeyValueDataFileRecordReader : public KeyValueRecordReader { class Iterator : public KeyValueRecordReader::Iterator { public: - Iterator(KeyValueDataFileRecordReader* reader, int64_t previous_batch_first_row_number) - : previous_batch_first_row_number_(previous_batch_first_row_number), - reader_(reader), - selection_cardinality_(reader->selection_bitmap_.Cardinality()) {} + explicit Iterator(KeyValueDataFileRecordReader* reader) + : reader_(reader), selection_cardinality_(reader->selection_bitmap_.Cardinality()) {} Result HasNext() const override; Result Next() override; Result> NextWithFilePos(); private: - int64_t previous_batch_first_row_number_; mutable int64_t cursor_ = 0; KeyValueDataFileRecordReader* reader_ = nullptr; int64_t selection_cardinality_ = 0; diff --git a/src/paimon/format/avro/avro_file_batch_reader.cpp b/src/paimon/format/avro/avro_file_batch_reader.cpp index 0c55e0260..eb27049d0 100644 --- a/src/paimon/format/avro/avro_file_batch_reader.cpp +++ b/src/paimon/format/avro/avro_file_batch_reader.cpp @@ -116,6 +116,7 @@ Result AvroFileBatchReader::NextBatch() { previous_first_row_ = next_row_to_read_; next_row_to_read_ += array_builder_->length(); if (array_builder_->length() == 0) { + previous_batch_row_count_ = 0; return BatchReader::MakeEofBatch(); } PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr array, @@ -123,6 +124,7 @@ Result AvroFileBatchReader::NextBatch() { std::unique_ptr c_array = std::make_unique(); std::unique_ptr c_schema = std::make_unique(); PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, c_array.get(), c_schema.get())); + previous_batch_row_count_ = c_array->length; return make_pair(std::move(c_array), std::move(c_schema)); } catch (const ::avro::Exception& e) { return Status::Invalid(fmt::format("avro reader next batch failed. {}", e.what())); @@ -168,6 +170,7 @@ Status AvroFileBatchReader::SetReadSchema(::ArrowSchema* read_schema, reader_ = std::move(reader); array_builder_ = std::move(array_builder); previous_first_row_ = std::numeric_limits::max(); + previous_batch_row_count_ = 0; next_row_to_read_ = std::numeric_limits::max(); close_ = false; return Status::OK(); diff --git a/src/paimon/format/avro/avro_file_batch_reader.h b/src/paimon/format/avro/avro_file_batch_reader.h index 98d5deede..b70cdb784 100644 --- a/src/paimon/format/avro/avro_file_batch_reader.h +++ b/src/paimon/format/avro/avro_file_batch_reader.h @@ -45,8 +45,20 @@ class AvroFileBatchReader : public FileBatchReader { Status SetReadSchema(::ArrowSchema* read_schema, const std::shared_ptr& predicate, const std::optional& selection_bitmap) override; - Result GetPreviousBatchFirstRowNumber() const override { - return previous_first_row_; + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { + if (previous_batch_row_count_ == 0) { + if (previous_first_row_ == std::numeric_limits::max()) { + return Status::Invalid("No batch has been read yet."); + } else { + return Status::Invalid("Last batch was EOF."); + } + } + if (batch_row_id >= previous_batch_row_count_) { + return Status::Invalid( + fmt::format("batch_row_id {} is out of range, last batch row count is {}", + batch_row_id, previous_batch_row_count_)); + } + return previous_first_row_ + batch_row_id; } Result GetNumberOfRows() const override; @@ -90,6 +102,7 @@ class AvroFileBatchReader : public FileBatchReader { std::optional> read_fields_projection_; uint64_t previous_first_row_ = std::numeric_limits::max(); uint64_t next_row_to_read_ = std::numeric_limits::max(); + uint64_t previous_batch_row_count_ = 0; mutable std::optional total_rows_ = std::nullopt; const int32_t batch_size_; bool close_ = false; diff --git a/src/paimon/format/avro/avro_file_batch_reader_test.cpp b/src/paimon/format/avro/avro_file_batch_reader_test.cpp index f4f052a34..54e598c6d 100644 --- a/src/paimon/format/avro/avro_file_batch_reader_test.cpp +++ b/src/paimon/format/avro/avro_file_batch_reader_test.cpp @@ -327,7 +327,7 @@ TEST_F(AvroFileBatchReaderTest, TestSetReadSchemaRejectNestedSubFieldProjection) "does not support nested sub-field projection"); } -TEST_F(AvroFileBatchReaderTest, TestGetPreviousBatchFirstRowNumber) { +TEST_F(AvroFileBatchReaderTest, TestGetPreviousBatchFileRowId) { std::string path = paimon::test::GetDataDir() + "/avro/append_simple.db/" "append_simple/bucket-0/" @@ -352,26 +352,25 @@ TEST_F(AvroFileBatchReaderTest, TestGetPreviousBatchFirstRowNumber) { ASSERT_OK_AND_ASSIGN(auto num_rows, reader->GetNumberOfRows()); ASSERT_EQ(4, num_rows); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto batch1, reader->NextBatch()); ArrowArrayRelease(batch1.first.get()); ArrowSchemaRelease(batch1.second.get()); - ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(0, reader->GetPreviousBatchFileRowId(0).value()); ASSERT_OK_AND_ASSIGN(auto batch2, reader->NextBatch()); - ASSERT_EQ(1, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(1, reader->GetPreviousBatchFileRowId(0).value()); ArrowArrayRelease(batch2.first.get()); ArrowSchemaRelease(batch2.second.get()); ASSERT_OK_AND_ASSIGN(auto batch3, reader->NextBatch()); - ASSERT_EQ(2, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(2, reader->GetPreviousBatchFileRowId(0).value()); ArrowArrayRelease(batch3.first.get()); ArrowSchemaRelease(batch3.second.get()); ASSERT_OK_AND_ASSIGN(auto batch4, reader->NextBatch()); - ASSERT_EQ(3, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(3, reader->GetPreviousBatchFileRowId(0).value()); ArrowArrayRelease(batch4.first.get()); ArrowSchemaRelease(batch4.second.get()); ASSERT_OK_AND_ASSIGN(auto batch5, reader->NextBatch()); - ASSERT_EQ(4, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_TRUE(BatchReader::IsEofBatch(batch5)); } @@ -397,7 +396,7 @@ TEST_F(AvroFileBatchReaderTest, TestSetReadSchemaResetsReaderToFirstRow) { ASSERT_OK_AND_ASSIGN(auto reader, reader_builder->Build(in)); ASSERT_OK_AND_ASSIGN(auto first_batch, reader->NextBatch()); - ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(0, reader->GetPreviousBatchFileRowId(0).value()); auto first_array = arrow::ImportArray(first_batch.first.get(), first_batch.second.get()).ValueOrDie(); ASSERT_TRUE(first_array->Equals(src_array->Slice(0, 2))) << first_array->ToString(); @@ -407,11 +406,10 @@ TEST_F(AvroFileBatchReaderTest, TestSetReadSchemaResetsReaderToFirstRow) { ASSERT_TRUE(arrow::ExportSchema(*read_schema, c_schema.get()).ok()); ASSERT_OK(reader->SetReadSchema(c_schema.get(), /*predicate=*/nullptr, /*selection_bitmap=*/std::nullopt)); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto projected_batch, reader->NextBatch()); - ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(0, reader->GetPreviousBatchFileRowId(0).value()); auto projected_array = arrow::ImportArray(projected_batch.first.get(), projected_batch.second.get()).ValueOrDie(); auto expected_projected_array = arrow::ipc::internal::json::ArrayFromJSON( diff --git a/src/paimon/format/blob/blob_file_batch_reader.cpp b/src/paimon/format/blob/blob_file_batch_reader.cpp index 08fa6149b..d5b5a2fe2 100644 --- a/src/paimon/format/blob/blob_file_batch_reader.cpp +++ b/src/paimon/format/blob/blob_file_batch_reader.cpp @@ -157,7 +157,7 @@ Status BlobFileBatchReader::SetReadSchema(::ArrowSchema* read_schema, target_type_ = arrow::struct_(arrow_schema->fields()); current_pos_ = 0; previous_batch_first_row_number_ = std::numeric_limits::max(); - + previous_batch_row_count_ = 0; return Status::OK(); } @@ -292,6 +292,7 @@ Result BlobFileBatchReader::NextBatch() { } if (current_pos_ >= target_blob_lengths_.size()) { PAIMON_ASSIGN_OR_RAISE(previous_batch_first_row_number_, GetNumberOfRows()); + previous_batch_row_count_ = 0; return BatchReader::MakeEofBatch(); } int32_t left_rows = target_blob_lengths_.size() - current_pos_; @@ -303,6 +304,7 @@ Result BlobFileBatchReader::NextBatch() { PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*blob_array, c_array.get(), c_schema.get())); previous_batch_first_row_number_ = target_blob_row_indexes_[current_pos_]; current_pos_ += rows_to_read; + previous_batch_row_count_ = c_array->length; return make_pair(std::move(c_array), std::move(c_schema)); } diff --git a/src/paimon/format/blob/blob_file_batch_reader.h b/src/paimon/format/blob/blob_file_batch_reader.h index 06287d759..27549c190 100644 --- a/src/paimon/format/blob/blob_file_batch_reader.h +++ b/src/paimon/format/blob/blob_file_batch_reader.h @@ -24,6 +24,7 @@ #include "arrow/memory_pool.h" #include "arrow/type.h" +#include "fmt/format.h" #include "paimon/common/data/blob_defs.h" #include "paimon/fs/file_system.h" #include "paimon/memory/bytes.h" @@ -97,14 +98,26 @@ class BlobFileBatchReader : public FileBatchReader { Result NextBatch() override; - Result GetPreviousBatchFirstRowNumber() const override { + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { if (all_blob_lengths_.size() != target_blob_lengths_.size()) { - return Status::Invalid( - "Cannot call GetPreviousBatchFirstRowNumber in BlobFileBatchReader because, after " + return Status::NotImplemented( + "Cannot call GetPreviousBatchFileRowId in BlobFileBatchReader because, after " "bitmap pushdown, rows in the array returned by NextBatch are no longer " "contiguous."); } - return previous_batch_first_row_number_; + if (previous_batch_row_count_ == 0) { + if (previous_batch_first_row_number_ == std::numeric_limits::max()) { + return Status::Invalid("No batch has been read yet."); + } else { + return Status::Invalid("Last batch was EOF."); + } + } + if (batch_row_id >= previous_batch_row_count_) { + return Status::Invalid( + fmt::format("batch_row_id {} is out of range, last batch row count is {}", + batch_row_id, previous_batch_row_count_)); + } + return previous_batch_first_row_number_ + batch_row_id; } Result GetNumberOfRows() const override { @@ -174,6 +187,7 @@ class BlobFileBatchReader : public FileBatchReader { size_t current_pos_ = 0; uint64_t previous_batch_first_row_number_ = std::numeric_limits::max(); + uint64_t previous_batch_row_count_ = 0; bool closed_ = false; }; diff --git a/src/paimon/format/blob/blob_file_batch_reader_test.cpp b/src/paimon/format/blob/blob_file_batch_reader_test.cpp index bde27d64d..c372fe2a2 100644 --- a/src/paimon/format/blob/blob_file_batch_reader_test.cpp +++ b/src/paimon/format/blob/blob_file_batch_reader_test.cpp @@ -169,22 +169,21 @@ TEST_F(BlobFileBatchReaderTest, TestRowNumbers) { ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, std::nullopt)); ASSERT_OK_AND_ASSIGN(auto number_of_rows, reader->GetNumberOfRows()); ASSERT_EQ(3, number_of_rows); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto batch1, reader->NextBatch()); ArrowArrayRelease(batch1.first.get()); ArrowSchemaRelease(batch1.second.get()); - ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(0, reader->GetPreviousBatchFileRowId(0).value()); ASSERT_OK_AND_ASSIGN(auto batch2, reader->NextBatch()); - ASSERT_EQ(1, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(1, reader->GetPreviousBatchFileRowId(0).value()); ArrowArrayRelease(batch2.first.get()); ArrowSchemaRelease(batch2.second.get()); ASSERT_OK_AND_ASSIGN(auto batch3, reader->NextBatch()); - ASSERT_EQ(2, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(2, reader->GetPreviousBatchFileRowId(0).value()); ArrowArrayRelease(batch3.first.get()); ArrowSchemaRelease(batch3.second.get()); ASSERT_OK_AND_ASSIGN(auto batch4, reader->NextBatch()); - ASSERT_EQ(3, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_TRUE(BatchReader::IsEofBatch(batch4)); } @@ -255,8 +254,7 @@ TEST_P(BlobFileBatchReaderTest, EmptyFile) { ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, std::nullopt)); ASSERT_OK_AND_ASSIGN(auto number_of_rows, reader->GetNumberOfRows()); ASSERT_EQ(0, number_of_rows); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto batch, reader->NextBatch()); ASSERT_TRUE(BatchReader::IsEofBatch(batch)); } diff --git a/src/paimon/format/lance/lance_file_batch_reader.cpp b/src/paimon/format/lance/lance_file_batch_reader.cpp index 1ae1deb84..acbcb835a 100644 --- a/src/paimon/format/lance/lance_file_batch_reader.cpp +++ b/src/paimon/format/lance/lance_file_batch_reader.cpp @@ -96,7 +96,7 @@ Status LanceFileBatchReader::SetReadSchema(::ArrowSchema* read_schema, PAIMON_RETURN_NOT_OK(LanceToPaimonStatus(err_code, error_message_)); stream_reader_ = nullptr; previous_batch_first_row_num_ = std::numeric_limits::max(); - last_batch_row_num_ = 0; + previous_batch_row_count_ = 0; } return Status::OK(); } @@ -119,7 +119,7 @@ Result LanceFileBatchReader::NextBatch() { // first read previous_batch_first_row_num_ = 0; } else { - previous_batch_first_row_num_ += last_batch_row_num_; + previous_batch_first_row_num_ += previous_batch_row_count_; } auto c_array = std::make_unique(); auto c_schema = std::make_unique(); @@ -130,7 +130,7 @@ Result LanceFileBatchReader::NextBatch() { if (is_eof) { return BatchReader::MakeEofBatch(); } - last_batch_row_num_ = c_array->length; + previous_batch_row_count_ = c_array->length; return std::make_pair(std::move(c_array), std::move(c_schema)); } diff --git a/src/paimon/format/lance/lance_file_batch_reader.h b/src/paimon/format/lance/lance_file_batch_reader.h index fb2628035..129fdcc2d 100644 --- a/src/paimon/format/lance/lance_file_batch_reader.h +++ b/src/paimon/format/lance/lance_file_batch_reader.h @@ -22,6 +22,7 @@ #include #include "arrow/c/bridge.h" +#include "fmt/format.h" #include "lance_lib/lance_api.h" #include "paimon/metrics.h" #include "paimon/reader/file_batch_reader.h" @@ -41,15 +42,27 @@ class LanceFileBatchReader : public FileBatchReader { Result NextBatch() override; - Result GetPreviousBatchFirstRowNumber() const override { + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { if (!read_row_ids_.empty() && read_row_ids_.size() != num_rows_) { // TODO(xinyu.lxy): support function - return Status::Invalid( - "Cannot call GetPreviousBatchFirstRowNumber in LanceFileBatchReader because, after " + return Status::NotImplemented( + "Cannot call GetPreviousBatchFileRowId in LanceFileBatchReader because, after " "bitmap pushdown, rows in the array returned by NextBatch are no longer " "contiguous."); } - return previous_batch_first_row_num_; + if (previous_batch_row_count_ == 0) { + if (previous_batch_first_row_num_ == std::numeric_limits::max()) { + return Status::Invalid("No batch has been read yet."); + } else { + return Status::Invalid("Last batch was EOF."); + } + } + if (batch_row_id >= previous_batch_row_count_) { + return Status::Invalid( + fmt::format("batch_row_id {} is out of range, last batch row count is {}", + batch_row_id, previous_batch_row_count_)); + } + return previous_batch_first_row_num_ + batch_row_id; } Result GetNumberOfRows() const override { @@ -81,7 +94,7 @@ class LanceFileBatchReader : public FileBatchReader { uint64_t num_rows_ = 0; // only validate when there is no bitmap pushdown uint64_t previous_batch_first_row_num_ = std::numeric_limits::max(); - uint64_t last_batch_row_num_ = 0; + uint64_t previous_batch_row_count_ = 0; mutable std::string error_message_; LanceFileReader* file_reader_ = nullptr; LanceReaderAdapter* stream_reader_ = nullptr; diff --git a/src/paimon/format/lance/lance_format_reader_writer_test.cpp b/src/paimon/format/lance/lance_format_reader_writer_test.cpp index b1ad6be73..71a073413 100644 --- a/src/paimon/format/lance/lance_format_reader_writer_test.cpp +++ b/src/paimon/format/lance/lance_format_reader_writer_test.cpp @@ -478,27 +478,26 @@ TEST_F(LanceFileReaderWriterTest, TestPreviousBatchFirstRowNumber) { ASSERT_OK_AND_ASSIGN( std::unique_ptr reader, LanceFileBatchReader::Create(file_path, /*batch_size=*/4, /*batch_readahead=*/2)); - ASSERT_EQ(std::numeric_limits::max(), - reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(reader->GetPreviousBatchFileRowId(0)); // first batch row 0-3 ASSERT_OK_AND_ASSIGN(auto read_batch, reader->NextBatch()); ASSERT_OK_AND_ASSIGN(auto read_array, paimon::test::ReadResultCollector::GetArray(std::move(read_batch))); ASSERT_TRUE(read_array->Equals(array->Slice(0, 4))); - ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(0, reader->GetPreviousBatchFileRowId(0).value()); // second batch 4-5 ASSERT_OK_AND_ASSIGN(read_batch, reader->NextBatch()); ASSERT_OK_AND_ASSIGN(read_array, paimon::test::ReadResultCollector::GetArray(std::move(read_batch))); ASSERT_TRUE(read_array->Equals(array->Slice(4, 2))); - ASSERT_EQ(4, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(4, reader->GetPreviousBatchFileRowId(0).value()); // eof ASSERT_OK_AND_ASSIGN(read_batch, reader->NextBatch()); ASSERT_TRUE(BatchReader::IsEofBatch(read_batch)); - ASSERT_EQ(6, reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_EQ(6, reader->GetPreviousBatchFileRowId(0).value()); // test with bitmap pushdown ArrowSchema c_read_schema; @@ -506,8 +505,8 @@ TEST_F(LanceFileReaderWriterTest, TestPreviousBatchFirstRowNumber) { ASSERT_OK(reader->SetReadSchema(&c_read_schema, /*predicate=*/nullptr, /*selection_bitmap=*/RoaringBitmap32::From({0, 3}))); ASSERT_NOK_WITH_MSG( - reader->GetPreviousBatchFirstRowNumber(), - "Cannot call GetPreviousBatchFirstRowNumber in LanceFileBatchReader because, after bitmap " + reader->GetPreviousBatchFileRowId(0), + "Cannot call GetPreviousBatchFileRowId in LanceFileBatchReader because, after bitmap " "pushdown, rows in the array returned by NextBatch are no longer contiguous."); } } // namespace paimon::lance::test diff --git a/src/paimon/format/orc/orc_file_batch_reader.cpp b/src/paimon/format/orc/orc_file_batch_reader.cpp index 76ad15688..45c60b574 100644 --- a/src/paimon/format/orc/orc_file_batch_reader.cpp +++ b/src/paimon/format/orc/orc_file_batch_reader.cpp @@ -166,6 +166,7 @@ Status OrcFileBatchReader::SetReadSchema(::ArrowSchema* read_schema, options_, &target_column_ids)); target_column_ids_ = target_column_ids; + previous_batch_row_count_ = 0; PAIMON_RETURN_NOT_OK(reader_->SetReadSchema(target_type, row_reader_options)); return Status::OK(); } @@ -179,7 +180,13 @@ Result>> OrcFileBatchReader::PreBuffer } Result OrcFileBatchReader::NextBatch() { - return reader_->Next(); + PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatch batch, reader_->Next()); + if (BatchReader::IsEofBatch(batch)) { + previous_batch_row_count_ = 0; + } else { + previous_batch_row_count_ = batch.first->length; + } + return batch; } std::shared_ptr OrcFileBatchReader::GetReaderMetrics() const { diff --git a/src/paimon/format/orc/orc_file_batch_reader.h b/src/paimon/format/orc/orc_file_batch_reader.h index c2460f3a7..da9c40696 100644 --- a/src/paimon/format/orc/orc_file_batch_reader.h +++ b/src/paimon/format/orc/orc_file_batch_reader.h @@ -62,8 +62,21 @@ class OrcFileBatchReader : public PrefetchFileBatchReader { // OrcFileBatchReader. Therefore, we need to hold BatchReader when using output ArrowArray. Result NextBatch() override; - Result GetPreviousBatchFirstRowNumber() const override { - return reader_->GetRowNumber(); + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { + uint64_t previous_first_row = reader_->GetRowNumber(); + if (previous_batch_row_count_ == 0) { + if (previous_first_row == std::numeric_limits::max()) { + return Status::Invalid("No batch has been read yet."); + } else { + return Status::Invalid("Last batch was EOF."); + } + } + if (batch_row_id >= previous_batch_row_count_) { + return Status::Invalid( + fmt::format("batch_row_id {} is out of range, last batch row count is {}", + batch_row_id, previous_batch_row_count_)); + } + return previous_first_row + batch_row_id; } Result GetNumberOfRows() const override { @@ -120,6 +133,8 @@ class OrcFileBatchReader : public PrefetchFileBatchReader { std::shared_ptr metrics_; std::vector target_column_ids_; std::vector> cache_ranges_; + + uint64_t previous_batch_row_count_ = 0; }; } // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_file_batch_reader_test.cpp b/src/paimon/format/orc/orc_file_batch_reader_test.cpp index 13c34d355..b9e781553 100644 --- a/src/paimon/format/orc/orc_file_batch_reader_test.cpp +++ b/src/paimon/format/orc/orc_file_batch_reader_test.cpp @@ -493,14 +493,22 @@ TEST_P(OrcFileBatchReaderTest, TestNextBatchSimple) { for (auto batch_size : {1, 2, 3, 5, 8, 10}) { auto orc_batch_reader = PrepareOrcFileBatchReader(file_name, &read_schema, batch_size, natural_read_size); - ASSERT_EQ(std::numeric_limits::max(), - orc_batch_reader->GetPreviousBatchFirstRowNumber().value()); - ASSERT_OK_AND_ASSIGN(auto result_array, paimon::test::ReadResultCollector::CollectResult( - orc_batch_reader.get())); - ASSERT_EQ(orc_batch_reader->GetPreviousBatchFirstRowNumber().value(), 8); + ASSERT_NOK(orc_batch_reader->GetPreviousBatchFileRowId(0)); + int i = 0; + while (true) { + ASSERT_OK_AND_ASSIGN( + auto result_array, + paimon::test::ReadResultCollector::CollectResultOneBatch(orc_batch_reader.get())); + if (!result_array) { + ASSERT_NOK(orc_batch_reader->GetPreviousBatchFileRowId(0)); + break; + } + ASSERT_EQ(orc_batch_reader->GetPreviousBatchFileRowId(0).value(), i * batch_size); + ASSERT_TRUE(result_array->Equals(std::make_shared( + struct_array_->Slice(i * batch_size, result_array->length())))); + i++; + } orc_batch_reader->Close(); - auto expected_array = std::make_shared(struct_array_); - ASSERT_TRUE(result_array->Equals(expected_array)); // test metrics auto reader_metrics = orc_batch_reader->GetReaderMetrics(); ASSERT_OK_AND_ASSIGN(uint64_t io_count, @@ -768,19 +776,18 @@ TEST_F(OrcFileBatchReaderTest, TestReadNoField) { auto orc_batch_reader = PrepareOrcFileBatchReader(file_name, &read_schema, /*batch_size=*/3, /*natural_read_size=*/10); // read 3 rows - ASSERT_EQ(std::numeric_limits::max(), - orc_batch_reader->GetPreviousBatchFirstRowNumber().value()); + ASSERT_NOK(orc_batch_reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto batch1, orc_batch_reader->NextBatch()); - ASSERT_EQ(orc_batch_reader->GetPreviousBatchFirstRowNumber().value(), 0); + ASSERT_EQ(orc_batch_reader->GetPreviousBatchFileRowId(0).value(), 0); // read 3 rows ASSERT_OK_AND_ASSIGN(auto batch2, orc_batch_reader->NextBatch()); - ASSERT_EQ(orc_batch_reader->GetPreviousBatchFirstRowNumber().value(), 3); + ASSERT_EQ(orc_batch_reader->GetPreviousBatchFileRowId(0).value(), 3); // read 2 rows ASSERT_OK_AND_ASSIGN(auto batch3, orc_batch_reader->NextBatch()); - ASSERT_EQ(orc_batch_reader->GetPreviousBatchFirstRowNumber().value(), 6); + ASSERT_EQ(orc_batch_reader->GetPreviousBatchFileRowId(0).value(), 6); // read rows with eof ASSERT_OK_AND_ASSIGN(auto batch4, orc_batch_reader->NextBatch()); - ASSERT_EQ(orc_batch_reader->GetPreviousBatchFirstRowNumber().value(), 8); + ASSERT_NOK(orc_batch_reader->GetPreviousBatchFileRowId(0)); ASSERT_TRUE(BatchReader::IsEofBatch(batch4)); orc_batch_reader->Close(); diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.cpp b/src/paimon/format/parquet/parquet_file_batch_reader.cpp index 5e6893c39..3fe9f340d 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader.cpp @@ -39,6 +39,7 @@ #include "paimon/common/metrics/metrics_impl.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/options_utils.h" +#include "paimon/common/utils/scope_guard.h" #include "paimon/common/utils/string_utils.h" #include "paimon/core/schema/arrow_schema_validator.h" #include "paimon/core/utils/nested_projection_utils.h" @@ -213,11 +214,13 @@ Status ParquetFileBatchReader::SetReadSchema( target_row_groups.emplace_back(/*rg_index=*/rg_id, /*is_partially_matched=*/true, /*ranges=*/it->second); } else { - target_row_groups.emplace_back(/*rg_index=*/rg_id, - /*is_partially_matched=*/false, - /*ranges=*/RowRanges()); + target_row_groups.emplace_back( + /*rg_index=*/rg_id, /*is_partially_matched=*/false, /*ranges=*/ + RowRanges(Range(0, reader_->GetAllRowGroupRanges()[rg_id].second - + reader_->GetAllRowGroupRanges()[rg_id].first - 1))); } } + PAIMON_RETURN_NOT_OK(UpdateAllTargetRowranges(target_row_groups)); PAIMON_RETURN_NOT_OK(reader_->PrepareForReadingLazy(target_row_groups, column_indices)); } PAIMON_PARQUET_CATCH_AND_RETURN_STATUS("ParquetFileBatchReader::SetReadSchema") @@ -343,6 +346,7 @@ Result ParquetFileBatchReader::NextBatch() { try { PAIMON_ASSIGN_OR_RAISE(std::shared_ptr batch, reader_->Next()); if (batch == nullptr) { + row_mapping_.clear(); return BatchReader::MakeEofBatch(); } PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr array, @@ -361,6 +365,7 @@ Result ParquetFileBatchReader::NextBatch() { "equal with read schema {}", array->type()->ToString(), read_data_type_->ToString())); } + PAIMON_RETURN_NOT_OK(GenerateRowMapping(array->length())); std::unique_ptr c_array = std::make_unique(); std::unique_ptr c_schema = std::make_unique(); PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, c_array.get(), c_schema.get())); @@ -526,4 +531,54 @@ Result> ParquetFileBatchReader::ComputeNestedColumnIndices( return indices; } +Status ParquetFileBatchReader::UpdateAllTargetRowranges( + const std::vector& target_row_groups) { + row_mapping_.clear(); + auto all_row_group_ranges = reader_->GetAllRowGroupRanges(); + RowRanges all_ranges; + for (const auto& target_row_group : target_row_groups) { + for (const auto& range : target_row_group.row_ranges.GetRanges()) { + all_ranges.Add( + Range(range.from + all_row_group_ranges[target_row_group.row_group_index].first, + range.to + all_row_group_ranges[target_row_group.row_group_index].first)); + } + } + all_row_ranges_ = std::move(all_ranges); + return Status::OK(); +} + +Status ParquetFileBatchReader::GenerateRowMapping(int64_t batch_length) { + const std::vector& all_ranges = all_row_ranges_.GetRanges(); + PAIMON_ASSIGN_OR_RAISE(int64_t batch_start_row, reader_->GetPreviousBatchFirstRowNumber()); + + auto cur_range_it = + std::upper_bound(all_ranges.begin(), all_ranges.end(), batch_start_row, + [](int64_t value, const Range& r) { return value < r.from; }); + if (cur_range_it == all_ranges.begin()) { + return Status::Invalid("No range found!"); + } + --cur_range_it; + if (batch_start_row < cur_range_it->from || batch_start_row > cur_range_it->to) { + return Status::Invalid( + fmt::format("Batch start row {} is not in the current range [{}, {}]!", batch_start_row, + cur_range_it->from, cur_range_it->to)); + } + + std::vector row_mapping; + row_mapping.reserve(batch_length); + int64_t global_row = batch_start_row; + for (int64_t i = 0; i < batch_length; ++i) { + if (global_row > cur_range_it->to) { + ++cur_range_it; + if (cur_range_it == all_ranges.end()) { + return Status::Invalid("Batch length exceeds the total row ranges!"); + } + global_row = cur_range_it->from; + } + row_mapping.push_back(global_row); + global_row++; + } + row_mapping_ = std::move(row_mapping); + return Status::OK(); +} } // namespace paimon::parquet diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.h b/src/paimon/format/parquet/parquet_file_batch_reader.h index 4bd684e8f..b020f7e39 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.h +++ b/src/paimon/format/parquet/parquet_file_batch_reader.h @@ -16,6 +16,8 @@ #pragma once +#include + #include #include #include @@ -94,9 +96,22 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { Result>> GenReadRanges( bool* need_prefetch) const override; - Result GetPreviousBatchFirstRowNumber() const override { - assert(reader_); - return reader_->GetPreviousBatchFirstRowNumber(); + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { + if (row_mapping_.size() == 0) { + PAIMON_ASSIGN_OR_RAISE(uint64_t previous_first_row, + reader_->GetPreviousBatchFirstRowNumber()); + if (previous_first_row == std::numeric_limits::max()) { + return Status::Invalid("No batch has been read yet."); + } else { + return Status::Invalid("Last batch was EOF."); + } + } + if (batch_row_id >= row_mapping_.size()) { + return Status::Invalid( + fmt::format("batch_row_id {} is out of range, last batch row count is {}", + batch_row_id, row_mapping_.size())); + } + return row_mapping_[batch_row_id]; } Result GetNumberOfRows() const override { @@ -173,6 +188,8 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { const std::shared_ptr& read_schema, const std::shared_ptr& file_schema); + Status UpdateAllTargetRowranges(const std::vector& target_row_groups); + // precondition: predicate supposed not be empty Result> FilterRowGroupsByPredicate( const std::shared_ptr& predicate, @@ -189,6 +206,8 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { const std::map& column_name_to_index, const std::vector& src_row_groups); + Status GenerateRowMapping(int64_t batch_length); + private: std::map options_; // hold the lifecycle of arrow memory pool. @@ -204,6 +223,9 @@ class ParquetFileBatchReader : public PrefetchFileBatchReader { uint64_t read_rows_ = 0; uint64_t read_batch_count_ = 0; + + RowRanges all_row_ranges_; + std::vector row_mapping_; }; } // namespace paimon::parquet diff --git a/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp b/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp index ec353f071..3e2f51f58 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp @@ -168,12 +168,15 @@ class ParquetFileBatchReaderTest : public ::testing::Test, void WriteArray(const std::string& file_path, const std::shared_ptr& src_array, const std::shared_ptr& arrow_schema, int64_t write_batch_size, - bool enable_dictionary, int64_t max_row_group_length) const { + bool enable_dictionary, int64_t max_row_group_length, + int64_t max_page_size = 1024 * 1024 * 1024) const { ASSERT_OK_AND_ASSIGN(std::shared_ptr out, fs_->Create(file_path, /*overwrite=*/true)); ::parquet::WriterProperties::Builder builder; builder.write_batch_size(write_batch_size); builder.max_row_group_length(max_row_group_length); + builder.data_pagesize(max_page_size); + builder.enable_write_page_index(); enable_dictionary ? builder.enable_dictionary() : builder.disable_dictionary(); auto writer_properties = builder.build(); ASSERT_OK_AND_ASSIGN(auto format_writer, ParquetFormatWriter::Create( @@ -229,6 +232,17 @@ class ParquetFileBatchReaderTest : public ::testing::Test, std::shared_ptr struct_array_; }; +static std::shared_ptr MakeSequentialIntData(int32_t num_rows) { + arrow::Int32Builder val_builder; + EXPECT_TRUE(val_builder.Reserve(num_rows).ok()); + for (int32_t i = 0; i < num_rows; ++i) { + val_builder.UnsafeAppend(i); + } + auto val_array = val_builder.Finish().ValueOrDie(); + auto field = arrow::field("f0", arrow::int32()); + return arrow::StructArray::Make({val_array}, {field}).ValueOrDie(); +} + TEST_F(ParquetFileBatchReaderTest, TestParquetMetadataCacheReusesSerializedFooter) { WriteArray(file_path_, struct_array_, schema_, /*write_batch_size=*/struct_array_->length(), /*enable_dictionary=*/false, @@ -447,11 +461,8 @@ TEST_F(ParquetFileBatchReaderTest, TestNextBatchSimple) { auto parquet_batch_reader = PrepareParquetFileBatchReader(file_name, schema_, /*predicate=*/nullptr, /*selection_bitmap=*/std::nullopt, batch_size); - ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(), - std::numeric_limits::max()); ASSERT_OK_AND_ASSIGN(auto result_array, paimon::test::ReadResultCollector::CollectResult( parquet_batch_reader.get())); - ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(), 6); parquet_batch_reader->Close(); auto expected_array = std::make_shared(struct_array_); ASSERT_TRUE(result_array->Equals(expected_array)); @@ -812,20 +823,19 @@ TEST_F(ParquetFileBatchReaderTest, TestReadNoField) { PrepareParquetFileBatchReader(file_name, read_schema, /*predicate=*/nullptr, /*selection_bitmap=*/std::nullopt, /*batch_size=*/2); // read 2 rows - ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(), - std::numeric_limits::max()); + ASSERT_NOK(parquet_batch_reader->GetPreviousBatchFileRowId(0)); ASSERT_OK_AND_ASSIGN(auto batch1, parquet_batch_reader->NextBatch()); - ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(), 0); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 0); // read 2 rows ASSERT_OK_AND_ASSIGN(auto batch2, parquet_batch_reader->NextBatch()); - ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(), 2); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 2); // read 2 rows ASSERT_OK_AND_ASSIGN(auto batch3, parquet_batch_reader->NextBatch()); - ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(), 4); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 4); // read rows with eof ASSERT_OK_AND_ASSIGN(auto batch4, parquet_batch_reader->NextBatch()); - ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(), 6); ASSERT_TRUE(BatchReader::IsEofBatch(batch4)); + ASSERT_NOK(parquet_batch_reader->GetPreviousBatchFileRowId(0)); parquet_batch_reader->Close(); arrow::FieldVector fields; @@ -1013,4 +1023,154 @@ TEST_F(ParquetFileBatchReaderTest, TestAddMetadataPerFieldMetadata) { ASSERT_TRUE(data->Equals(*result_array->chunk(0))) << result_array->ToString(); } +TEST_F(ParquetFileBatchReaderTest, TestRowMappingSimple) { + arrow::FieldVector fields = {arrow::field("f0", arrow::int32())}; + auto src_array = MakeSequentialIntData(12); + // data in file rowGroup0:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] + // one row per page + auto arrow_schema = arrow::schema(fields); + WriteArray(file_path_, src_array, arrow_schema, /*write_batch_size=*/1, + /*enable_dictionary=*/true, /*max_row_group_length=*/12, /*max_page_size=*/1); + + // 1<=f0<=3 || 5<=f0<=6 + ASSERT_OK_AND_ASSIGN( + auto predicate, + PredicateBuilder::Or({PredicateBuilder::Between(/*field_index=*/0, /*field_name=*/"f0", + FieldType::INT, Literal(1), Literal(3)), + PredicateBuilder::Between(/*field_index=*/0, /*field_name=*/"f0", + FieldType::INT, Literal(5), Literal(6))})); + + auto parquet_batch_reader = PrepareParquetFileBatchReader( + file_path_, arrow_schema, /*predicate=*/predicate, std::nullopt, /*batch_size=*/2); + + ASSERT_NOK(parquet_batch_reader->GetPreviousBatchFileRowId(0)); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr batch1, + paimon::test::ReadResultCollector::CollectResultOneBatch(parquet_batch_reader.get())); + auto expected_batch1 = src_array->Slice(1, 2); + ASSERT_TRUE(batch1->chunk(0)->Equals(expected_batch1)) << batch1->ToString(); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 1); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(1).value(), 2); + // out of bound return invalid + ASSERT_NOK(parquet_batch_reader->GetPreviousBatchFileRowId(2)); + + // Not adjacent pages + ASSERT_OK_AND_ASSIGN( + std::shared_ptr batch2, + paimon::test::ReadResultCollector::CollectResultOneBatch(parquet_batch_reader.get())); + auto expected_batch2 = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ +[3], +[5] + ])") + .ValueOrDie()); + ASSERT_TRUE(batch2->chunk(0)->Equals(expected_batch2)) << batch2->ToString(); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 3); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(1).value(), 5); + + // Only one record read + ASSERT_OK_AND_ASSIGN( + std::shared_ptr batch3, + paimon::test::ReadResultCollector::CollectResultOneBatch(parquet_batch_reader.get())); + auto expected_batch3 = src_array->Slice(6, 1); + ASSERT_TRUE(batch3->chunk(0)->Equals(expected_batch3)) << batch3->ToString(); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 6); + ASSERT_NOK(parquet_batch_reader->GetPreviousBatchFileRowId(1)); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr eof_batch, + paimon::test::ReadResultCollector::CollectResultOneBatch(parquet_batch_reader.get())); + ASSERT_EQ(nullptr, eof_batch); + // previous batch is eof, return invalid. + ASSERT_NOK(parquet_batch_reader->GetPreviousBatchFileRowId(0)); +} + +TEST_F(ParquetFileBatchReaderTest, TestRowMappingFullyAndPartially) { + arrow::FieldVector fields = {arrow::field("f0", arrow::int32())}; + auto src_array = MakeSequentialIntData(12); + // data in file RowGroup0:[0, 1, 2] | RowGroup1:[3, 4, 5] | RowGroup2:[6, 7, 8] | RowGroup3:[9, + // 10, 11] one row per page + auto arrow_schema = arrow::schema(fields); + WriteArray(file_path_, src_array, arrow_schema, /*write_batch_size=*/1, + /*enable_dictionary=*/true, /*max_row_group_length=*/3, /*max_page_size=*/1); + + // 3<=f0<=5 || f0==6 || f0==8 + // RowGroup 1 is fully matched, RowGroup 2 is partially matched, RowGroup 0 and RowGroup 3 are + // not matched. + ASSERT_OK_AND_ASSIGN( + auto predicate, + PredicateBuilder::Or({PredicateBuilder::Between(/*field_index=*/0, /*field_name=*/"f0", + FieldType::INT, Literal(3), Literal(5)), + PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", + FieldType::INT, Literal(6)), + PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", + FieldType::INT, Literal(8))})); + + auto parquet_batch_reader = PrepareParquetFileBatchReader( + file_path_, arrow_schema, /*predicate=*/predicate, std::nullopt, /*batch_size=*/3); + + ASSERT_NOK(parquet_batch_reader->GetPreviousBatchFileRowId(0)); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr batch1, + paimon::test::ReadResultCollector::CollectResultOneBatch(parquet_batch_reader.get())); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 3); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(2).value(), 5); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr batch2, + paimon::test::ReadResultCollector::CollectResultOneBatch(parquet_batch_reader.get())); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 6); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(1).value(), 8); +} + +TEST_F(ParquetFileBatchReaderTest, TestRowMappingSetReadSchemaTwice) { + arrow::FieldVector fields = {arrow::field("f0", arrow::int32())}; + auto src_array = MakeSequentialIntData(12); + // data in file RowGroup0:[0, 1, 2] | RowGroup1:[3, 4, 5] | RowGroup2:[6, 7, 8] | RowGroup3:[9, + // 10, 11] one row per page + auto arrow_schema = arrow::schema(fields); + WriteArray(file_path_, src_array, arrow_schema, /*write_batch_size=*/1, + /*enable_dictionary=*/true, /*max_row_group_length=*/3, /*max_page_size=*/1); + + // 1<=f0<=3 || 6<=f0<=7 + ASSERT_OK_AND_ASSIGN( + auto predicate, + PredicateBuilder::Or({PredicateBuilder::Between(/*field_index=*/0, /*field_name=*/"f0", + FieldType::INT, Literal(1), Literal(3)), + PredicateBuilder::Between(/*field_index=*/0, /*field_name=*/"f0", + FieldType::INT, Literal(6), Literal(7))})); + + auto parquet_batch_reader = PrepareParquetFileBatchReader( + file_path_, arrow_schema, /*predicate=*/predicate, std::nullopt, /*batch_size=*/3); + + ASSERT_NOK(parquet_batch_reader->GetPreviousBatchFileRowId(0)); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr batch1, + paimon::test::ReadResultCollector::CollectResultOneBatch(parquet_batch_reader.get())); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 1); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(1).value(), 2); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr batch2, + paimon::test::ReadResultCollector::CollectResultOneBatch(parquet_batch_reader.get())); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 3); + ASSERT_NOK(parquet_batch_reader->GetPreviousBatchFileRowId(1)); + + ASSERT_OK_AND_ASSIGN( + predicate, + PredicateBuilder::Or({PredicateBuilder::Between(/*field_index=*/0, /*field_name=*/"f0", + FieldType::INT, Literal(3), Literal(5))})); + + std::unique_ptr c_schema = std::make_unique(); + ASSERT_TRUE(arrow::ExportSchema(*arrow_schema, c_schema.get()).ok()); + ASSERT_OK( + parquet_batch_reader->SetReadSchema(c_schema.get(), /*predicate=*/predicate, std::nullopt)); + ASSERT_NOK(parquet_batch_reader->GetPreviousBatchFileRowId(0)); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr batch3, + paimon::test::ReadResultCollector::CollectResultOneBatch(parquet_batch_reader.get())); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(0).value(), 3); + ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFileRowId(2).value(), 5); +} + } // namespace paimon::parquet::test diff --git a/src/paimon/format/parquet/row_ranges.h b/src/paimon/format/parquet/row_ranges.h index 46c3f4d21..6174ac4eb 100644 --- a/src/paimon/format/parquet/row_ranges.h +++ b/src/paimon/format/parquet/row_ranges.h @@ -109,7 +109,7 @@ class RowRanges { struct TargetRowGroup { int32_t row_group_index{-1}; bool is_partially_matched{false}; - // page-filtered row ranges, only valid if is_partially_matched is true. + RowRanges row_ranges; // Whether this row group has been excluded by ApplyReadRanges. // When true, this row group is logically skipped during iteration diff --git a/src/paimon/testing/mock/mock_file_batch_reader.h b/src/paimon/testing/mock/mock_file_batch_reader.h index 386af59be..439d5c296 100644 --- a/src/paimon/testing/mock/mock_file_batch_reader.h +++ b/src/paimon/testing/mock/mock_file_batch_reader.h @@ -156,8 +156,11 @@ class MockFileBatchReader : public PrefetchFileBatchReader { return metrics; } - Result GetPreviousBatchFirstRowNumber() const override { - return ToReaderRowNumber(previous_batch_first_row_num_); + Result GetPreviousBatchFileRowId(uint64_t batch_row_id) const override { + if (previous_batch_first_row_num_ == std::numeric_limits::max()) { + return Status::Invalid("No batch has been read yet"); + } + return previous_batch_first_row_num_ + batch_row_id; } Result GetNumberOfRows() const override { @@ -191,7 +194,7 @@ class MockFileBatchReader : public PrefetchFileBatchReader { int32_t batch_size_ = 0; int32_t current_pos_ = 0; int32_t read_end_pos_ = 0; - int32_t previous_batch_first_row_num_ = -1; + uint64_t previous_batch_first_row_num_ = std::numeric_limits::max(); Status next_batch_status_; bool enable_randomize_batch_size_ = true; std::vector> read_ranges_; diff --git a/src/paimon/testing/utils/read_result_collector.h b/src/paimon/testing/utils/read_result_collector.h index 5106f3b26..55f0358ea 100644 --- a/src/paimon/testing/utils/read_result_collector.h +++ b/src/paimon/testing/utils/read_result_collector.h @@ -68,10 +68,6 @@ class ReadResultCollector { return results; } - static Result> CollectResult(BatchReader* batch_reader) { - return CollectResult(batch_reader, /*max simulated data processing time*/ 0); - } - // will convert dictionary array to string array for comparing results static Result> CollectResult( BatchReader* batch_reader, int64_t max_data_processing_time_in_us) { @@ -79,35 +75,10 @@ class ReadResultCollector { int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); std::srand(seed); while (true) { - // Prioritize calling NextBatch. If it fails (paimon inner reader e.g., - // PrefetchBatchReader, ApplyBitmapIndexBatchReader...), call NextBatchWithBitmap. - auto batch_result = batch_reader->NextBatch(); - BatchReader::ReadBatch batch; - if (!batch_result.ok()) { - if (batch_result.status().ToString().find("should use NextBatchWithBitmap") != - std::string::npos) { - PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap batch_with_bitmap, - batch_reader->NextBatchWithBitmap()); - if (BatchReader::IsEofBatch(batch_with_bitmap)) { - break; - } - assert(!batch_with_bitmap.second.IsEmpty()); - PAIMON_ASSIGN_OR_RAISE( - batch, ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap), - arrow::default_memory_pool())); - } else { - return batch_result.status(); - } - } else { - batch = std::move(batch_result).value(); - if (BatchReader::IsEofBatch(batch)) { - break; - } + PAIMON_ASSIGN_OR_RAISE(auto result_array, ReadOneBatch(batch_reader)); + if (result_array == nullptr) { + break; } - auto& [c_array, c_schema] = batch; - assert(c_array->length > 0); - PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(auto result_array, - arrow::ImportArray(c_array.get(), c_schema.get())); result_array_vector.push_back(result_array); if (max_data_processing_time_in_us > 0) { usleep(std::rand() % max_data_processing_time_in_us); @@ -131,6 +102,35 @@ class ReadResultCollector { return chunk_array; } + static Result> CollectResult(BatchReader* batch_reader) { + return CollectResult(batch_reader, /*max simulated data processing time*/ 0); + } + + static Result> CollectResultOneBatch( + BatchReader* batch_reader) { + return CollectResultOneBatch(batch_reader, /*max_simulated_data_processing_time*/ 0); + } + + static Result> CollectResultOneBatch( + BatchReader* batch_reader, int64_t max_data_processing_time_in_us) { + int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); + std::srand(seed); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr result_array, + ReadOneBatch(batch_reader)); + if (result_array == nullptr) { + return std::shared_ptr(); + } + PAIMON_ASSIGN_OR_RAISE( + auto converted_array, + DictArrayConverter::ConvertDictArray(result_array, arrow::default_memory_pool())); + if (max_data_processing_time_in_us > 0) { + usleep(std::rand() % max_data_processing_time_in_us); + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr chunk_array, + arrow::ChunkedArray::Make({converted_array})); + return chunk_array; + } + static Result> GetArray(BatchReader::ReadBatch&& batch) { if (BatchReader::IsEofBatch(batch)) { return std::shared_ptr(); @@ -165,5 +165,39 @@ class ReadResultCollector { arrow::compute::Take(arrow::Datum(array), arrow::Datum(sorted_indices))); return sorted_batch.chunked_array(); } + + private: + static Result> ReadOneBatch(BatchReader* batch_reader) { + // Prioritize calling NextBatch. If it fails (paimon inner reader e.g., + // PrefetchBatchReader, ApplyBitmapIndexBatchReader...), call NextBatchWithBitmap. + auto batch_result = batch_reader->NextBatch(); + BatchReader::ReadBatch batch; + if (!batch_result.ok()) { + if (batch_result.status().ToString().find("should use NextBatchWithBitmap") != + std::string::npos) { + PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap batch_with_bitmap, + batch_reader->NextBatchWithBitmap()); + if (BatchReader::IsEofBatch(batch_with_bitmap)) { + return std::shared_ptr(); + } + assert(!batch_with_bitmap.second.IsEmpty()); + PAIMON_ASSIGN_OR_RAISE( + batch, ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap), + arrow::default_memory_pool())); + } else { + return batch_result.status(); + } + } else { + batch = std::move(batch_result).value(); + if (BatchReader::IsEofBatch(batch)) { + return std::shared_ptr(); + } + } + auto& [c_array, c_schema] = batch; + assert(c_array->length > 0); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(auto result_array, + arrow::ImportArray(c_array.get(), c_schema.get())); + return result_array; + } }; } // namespace paimon::test