diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 1eefb4f0ae1..deb67a908d5 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -198,6 +198,12 @@ impl FileOpener for VortexOpener { .await .map_err(|e| exec_datafusion_err!("Failed to open Vortex file {e}"))?; + // Check if there are rows in this file. If not, we can save + // ourselves some work and return an empty stream. + if vxf.row_count() == 0 { + return Ok(stream::empty().boxed()); + } + // This is the expected arrow types of the actual columns in the file, which might have different types // from the unified logical schema or miss let this_file_schema = Arc::new(calculate_physical_schema( @@ -430,6 +436,8 @@ fn apply_byte_range( } fn byte_range_to_row_range(byte_range: Range, row_count: u64, total_size: u64) -> Range { + debug_assert!(row_count > 0); // Asserted by an early exit check in VortexOpener::open + let average_row = total_size / row_count; assert!(average_row > 0, "A row must always have at least one byte"); @@ -619,6 +627,33 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_open_empty_file() -> anyhow::Result<()> { + use futures::TryStreamExt; + + let object_store = Arc::new(InMemory::new()) as Arc; + let data_batch = record_batch!(("a", Int32, Vec::::new())).unwrap(); + let file_path = "part=1/empty.vortex"; + let file_size = + write_arrow_to_vortex(object_store.clone(), file_path, data_batch.clone()).await?; + + let file_schema = data_batch.schema(); + // Parallel scans may attach a byte range even for empty files; the + // opener must not call byte_range_to_row_range when the row_count is 0. + let file = + PartitionedFile::new_with_range(file_path.to_string(), file_size, 0, file_size as i64); + + let table_schema = TableSchema::from_file_schema(file_schema.clone()); + + let opener = make_opener(object_store, table_schema, None); + let stream = opener.open(file)?.await?; + let data = stream.try_collect::>().await?; + + assert_eq!(data.len(), 0); + + Ok(()) + } + #[rstest] #[tokio::test] async fn test_open_files_different_table_schema() -> anyhow::Result<()> {