Skip to content

Commit c256ccd

Browse files
authored
fix: support total_size=0 in byte_range_to_row_range (#7115)
## Summary In certain cases DataFusion might decide to create a ranges for empty files that have zero rows. In order to avoid hitting a division by zero error in the ```rust let average_row = total_size / row_count; ``` line, add an early exit check to `VortexOpener::open` to immediately return an empty stream if the file contains zero rows: ```rust if vxf.row_count() == 0 { let empty_stream = stream::iter(vec![]).boxed(); return Ok(empty_stream); } ``` The enforced `row_count > 0` invariant for the `byte_range_to_row_range` call is now also called out with a `debug_assert!` macro. ## Testing Add a new `test_open_empty_file` test case. Signed-off-by: Alexander Alexandrov <alexander.s.alexandrov@gmail.com>
1 parent a773f86 commit c256ccd

1 file changed

Lines changed: 35 additions & 0 deletions

File tree

vortex-datafusion/src/persistent/opener.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,12 @@ impl FileOpener for VortexOpener {
198198
.await
199199
.map_err(|e| exec_datafusion_err!("Failed to open Vortex file {e}"))?;
200200

201+
// Check if there are rows in this file. If not, we can save
202+
// ourselves some work and return an empty stream.
203+
if vxf.row_count() == 0 {
204+
return Ok(stream::empty().boxed());
205+
}
206+
201207
// This is the expected arrow types of the actual columns in the file, which might have different types
202208
// from the unified logical schema or miss
203209
let this_file_schema = Arc::new(calculate_physical_schema(
@@ -430,6 +436,8 @@ fn apply_byte_range(
430436
}
431437

432438
fn byte_range_to_row_range(byte_range: Range<u64>, row_count: u64, total_size: u64) -> Range<u64> {
439+
debug_assert!(row_count > 0); // Asserted by an early exit check in VortexOpener::open
440+
433441
let average_row = total_size / row_count;
434442
assert!(average_row > 0, "A row must always have at least one byte");
435443

@@ -619,6 +627,33 @@ mod tests {
619627
Ok(())
620628
}
621629

630+
#[tokio::test]
631+
async fn test_open_empty_file() -> anyhow::Result<()> {
632+
use futures::TryStreamExt;
633+
634+
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
635+
let data_batch = record_batch!(("a", Int32, Vec::<i32>::new())).unwrap();
636+
let file_path = "part=1/empty.vortex";
637+
let file_size =
638+
write_arrow_to_vortex(object_store.clone(), file_path, data_batch.clone()).await?;
639+
640+
let file_schema = data_batch.schema();
641+
// Parallel scans may attach a byte range even for empty files; the
642+
// opener must not call byte_range_to_row_range when the row_count is 0.
643+
let file =
644+
PartitionedFile::new_with_range(file_path.to_string(), file_size, 0, file_size as i64);
645+
646+
let table_schema = TableSchema::from_file_schema(file_schema.clone());
647+
648+
let opener = make_opener(object_store, table_schema, None);
649+
let stream = opener.open(file)?.await?;
650+
let data = stream.try_collect::<Vec<_>>().await?;
651+
652+
assert_eq!(data.len(), 0);
653+
654+
Ok(())
655+
}
656+
622657
#[rstest]
623658
#[tokio::test]
624659
async fn test_open_files_different_table_schema() -> anyhow::Result<()> {

0 commit comments

Comments
 (0)