Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 53 additions & 32 deletions crates/paimon/src/arrow/format/mosaic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use crate::io::FileRead;
use crate::spec::{DataField, DataType as PaimonDataType, Datum, Predicate};
use crate::table::{ArrowRecordBatchStream, RowRange};
use crate::Error;
use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions, UInt64Array};
use arrow_array::RecordBatch;
use arrow_array::RecordBatchOptions;
use arrow_schema::{DataType as ArrowDataType, SchemaRef, TimeUnit};
use async_stream::try_stream;
use async_trait::async_trait;
Expand Down Expand Up @@ -90,7 +91,7 @@ impl FormatFileReader for MosaicFormatReader {
let row_group_rows = mosaic_reader
.row_group_num_rows(row_group_index)
.map_err(mosaic_read_error)?;
let selected_indices = selected_indices_for_row_group(
let selected_slices = selected_slices_for_row_group(
row_group_rows,
row_group_start,
row_selection.as_deref(),
Expand All @@ -102,8 +103,8 @@ impl FormatFileReader for MosaicFormatReader {
source: None,
})?;

if let Some(indices) = selected_indices.as_ref() {
if indices.is_empty() {
if let Some(slices) = selected_slices.as_ref() {
if slices.is_empty() {
continue;
}
}
Expand All @@ -124,9 +125,9 @@ impl FormatFileReader for MosaicFormatReader {
}

let batch = if all_projected_columns_missing {
let row_count = selected_indices
let row_count = selected_slices
.as_ref()
.map_or(row_group_rows, UInt64Array::len);
.map_or(row_group_rows, |slices| selected_row_count(slices));
empty_batch(read_schema.clone(), row_count)?
} else {
let names = projected_names
Expand All @@ -140,7 +141,7 @@ impl FormatFileReader for MosaicFormatReader {
let batch = row_group_reader
.read_columns()
.map_err(mosaic_read_error)?;
take_rows(batch, selected_indices.as_ref(), &read_schema)?
take_row_slices(batch, selected_slices.as_deref(), &read_schema)?
};
for chunk in split_batch(batch, batch_size) {
yield chunk;
Expand Down Expand Up @@ -397,11 +398,11 @@ fn is_timestamp_nanos_struct(fields: &arrow_schema::Fields) -> bool {
&& *fields[1].data_type() == ArrowDataType::Int32
}

fn selected_indices_for_row_group(
fn selected_slices_for_row_group(
row_group_rows: usize,
row_group_start: usize,
row_selection: Option<&[RowRange]>,
) -> crate::Result<Option<UInt64Array>> {
) -> crate::Result<Option<Vec<(usize, usize)>>> {
let Some(row_selection) = row_selection else {
return Ok(None);
};
Expand All @@ -414,7 +415,7 @@ fn selected_indices_for_row_group(
source: None,
})?;

let mut indices = Vec::new();
let mut slices = Vec::new();
for range in row_selection {
let from = usize::try_from(range.from()).map_err(|e| Error::DataInvalid {
message: format!(
Expand All @@ -438,41 +439,43 @@ fn selected_indices_for_row_group(
if start >= end {
continue;
}
indices.extend((start - row_group_start..end - row_group_start).map(|idx| idx as u64));
slices.push((start - row_group_start, end - start));
}

Ok(Some(UInt64Array::from(indices)))
Ok(Some(slices))
}

fn take_rows(
fn selected_row_count(slices: &[(usize, usize)]) -> usize {
slices.iter().map(|(_, len)| *len).sum()
}

fn take_row_slices(
batch: RecordBatch,
indices: Option<&UInt64Array>,
slices: Option<&[(usize, usize)]>,
target_schema: &SchemaRef,
) -> crate::Result<RecordBatch> {
let Some(indices) = indices else {
let Some(slices) = slices else {
return ensure_schema(batch, target_schema);
};

if batch.num_columns() == 0 {
return empty_batch(target_schema.clone(), indices.len());
if slices.is_empty() {
return empty_batch(target_schema.clone(), 0);
}

let columns = batch
.columns()
.iter()
.map(|column| {
arrow_select::take::take(column.as_ref(), indices, None).map_err(|e| {
Error::UnexpectedError {
message: format!("Failed to apply Mosaic row selection: {e}"),
source: Some(Box::new(e)),
}
})
})
.collect::<crate::Result<Vec<ArrayRef>>>()?;
if slices.len() == 1 {
let (offset, len) = slices[0];
return ensure_schema(batch.slice(offset, len), target_schema);
}

RecordBatch::try_new(target_schema.clone(), columns).map_err(|e| Error::UnexpectedError {
message: format!("Failed to build Mosaic RecordBatch: {e}"),
source: Some(Box::new(e)),
let sliced_batches = slices
.iter()
.map(|(offset, len)| batch.slice(*offset, *len))
.collect::<Vec<_>>();
arrow_select::concat::concat_batches(target_schema, &sliced_batches).map_err(|e| {
Error::UnexpectedError {
message: format!("Failed to apply Mosaic row selection: {e}"),
source: Some(Box::new(e)),
}
})
}

Expand Down Expand Up @@ -854,6 +857,24 @@ mod tests {
assert_eq!(ids.values(), &[2, 3, 5]);
}

#[test]
fn test_large_row_selection_stays_as_ranges() {
let row_group_rows = 1_000_000;
let selected = selected_slices_for_row_group(
row_group_rows,
0,
Some(&[
RowRange::new(0, 499_999),
RowRange::new(500_001, row_group_rows as i64 - 1),
]),
)
.unwrap()
.unwrap();

assert_eq!(selected, vec![(0, 500_000), (500_001, 499_999)]);
assert_eq!(selected_row_count(&selected), row_group_rows - 1);
}

#[tokio::test]
async fn test_read_predicate_prunes_non_matching_row_groups() {
let fields = data_fields();
Expand Down
Loading