diff --git a/crates/paimon/src/arrow/format/mosaic.rs b/crates/paimon/src/arrow/format/mosaic.rs index 963226b8..23f97803 100644 --- a/crates/paimon/src/arrow/format/mosaic.rs +++ b/crates/paimon/src/arrow/format/mosaic.rs @@ -22,7 +22,8 @@ use crate::io::FileRead; use crate::spec::{DataField, DataType as PaimonDataType, Datum, Predicate}; use crate::table::{ArrowRecordBatchStream, RowRange}; use crate::Error; -use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions, UInt64Array}; +use arrow_array::RecordBatch; +use arrow_array::RecordBatchOptions; use arrow_schema::{DataType as ArrowDataType, SchemaRef, TimeUnit}; use async_stream::try_stream; use async_trait::async_trait; @@ -90,7 +91,7 @@ impl FormatFileReader for MosaicFormatReader { let row_group_rows = mosaic_reader .row_group_num_rows(row_group_index) .map_err(mosaic_read_error)?; - let selected_indices = selected_indices_for_row_group( + let selected_slices = selected_slices_for_row_group( row_group_rows, row_group_start, row_selection.as_deref(), @@ -102,8 +103,8 @@ impl FormatFileReader for MosaicFormatReader { source: None, })?; - if let Some(indices) = selected_indices.as_ref() { - if indices.is_empty() { + if let Some(slices) = selected_slices.as_ref() { + if slices.is_empty() { continue; } } @@ -124,9 +125,9 @@ impl FormatFileReader for MosaicFormatReader { } let batch = if all_projected_columns_missing { - let row_count = selected_indices + let row_count = selected_slices .as_ref() - .map_or(row_group_rows, UInt64Array::len); + .map_or(row_group_rows, |slices| selected_row_count(slices)); empty_batch(read_schema.clone(), row_count)? } else { let names = projected_names @@ -140,7 +141,7 @@ impl FormatFileReader for MosaicFormatReader { let batch = row_group_reader .read_columns() .map_err(mosaic_read_error)?; - take_rows(batch, selected_indices.as_ref(), &read_schema)? + take_row_slices(batch, selected_slices.as_deref(), &read_schema)? }; for chunk in split_batch(batch, batch_size) { yield chunk; @@ -397,11 +398,11 @@ fn is_timestamp_nanos_struct(fields: &arrow_schema::Fields) -> bool { && *fields[1].data_type() == ArrowDataType::Int32 } -fn selected_indices_for_row_group( +fn selected_slices_for_row_group( row_group_rows: usize, row_group_start: usize, row_selection: Option<&[RowRange]>, -) -> crate::Result> { +) -> crate::Result>> { let Some(row_selection) = row_selection else { return Ok(None); }; @@ -414,7 +415,7 @@ fn selected_indices_for_row_group( source: None, })?; - let mut indices = Vec::new(); + let mut slices = Vec::new(); for range in row_selection { let from = usize::try_from(range.from()).map_err(|e| Error::DataInvalid { message: format!( @@ -438,41 +439,43 @@ fn selected_indices_for_row_group( if start >= end { continue; } - indices.extend((start - row_group_start..end - row_group_start).map(|idx| idx as u64)); + slices.push((start - row_group_start, end - start)); } - Ok(Some(UInt64Array::from(indices))) + Ok(Some(slices)) } -fn take_rows( +fn selected_row_count(slices: &[(usize, usize)]) -> usize { + slices.iter().map(|(_, len)| *len).sum() +} + +fn take_row_slices( batch: RecordBatch, - indices: Option<&UInt64Array>, + slices: Option<&[(usize, usize)]>, target_schema: &SchemaRef, ) -> crate::Result { - let Some(indices) = indices else { + let Some(slices) = slices else { return ensure_schema(batch, target_schema); }; - if batch.num_columns() == 0 { - return empty_batch(target_schema.clone(), indices.len()); + if slices.is_empty() { + return empty_batch(target_schema.clone(), 0); } - let columns = batch - .columns() - .iter() - .map(|column| { - arrow_select::take::take(column.as_ref(), indices, None).map_err(|e| { - Error::UnexpectedError { - message: format!("Failed to apply Mosaic row selection: {e}"), - source: Some(Box::new(e)), - } - }) - }) - .collect::>>()?; + if slices.len() == 1 { + let (offset, len) = slices[0]; + return ensure_schema(batch.slice(offset, len), target_schema); + } - RecordBatch::try_new(target_schema.clone(), columns).map_err(|e| Error::UnexpectedError { - message: format!("Failed to build Mosaic RecordBatch: {e}"), - source: Some(Box::new(e)), + let sliced_batches = slices + .iter() + .map(|(offset, len)| batch.slice(*offset, *len)) + .collect::>(); + arrow_select::concat::concat_batches(target_schema, &sliced_batches).map_err(|e| { + Error::UnexpectedError { + message: format!("Failed to apply Mosaic row selection: {e}"), + source: Some(Box::new(e)), + } }) } @@ -854,6 +857,24 @@ mod tests { assert_eq!(ids.values(), &[2, 3, 5]); } + #[test] + fn test_large_row_selection_stays_as_ranges() { + let row_group_rows = 1_000_000; + let selected = selected_slices_for_row_group( + row_group_rows, + 0, + Some(&[ + RowRange::new(0, 499_999), + RowRange::new(500_001, row_group_rows as i64 - 1), + ]), + ) + .unwrap() + .unwrap(); + + assert_eq!(selected, vec![(0, 500_000), (500_001, 499_999)]); + assert_eq!(selected_row_count(&selected), row_group_rows - 1); + } + #[tokio::test] async fn test_read_predicate_prunes_non_matching_row_groups() { let fields = data_fields();