From 3cb03be75ebe438120b92ffa7b06dadf079f6d9d Mon Sep 17 00:00:00 2001 From: Matt Katz Date: Thu, 25 Jun 2026 15:04:44 -0700 Subject: [PATCH 1/6] feat[datafusion]: push down list_length expression Signed-off-by: Matt Katz --- Cargo.lock | 1 + Cargo.toml | 1 + vortex-datafusion/Cargo.toml | 1 + vortex-datafusion/src/convert/exprs.rs | 181 +++++++++++++++++++++- vortex-datafusion/src/persistent/tests.rs | 111 +++++++++++++ 5 files changed, 293 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 66457601e06..02dc8d43f25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9831,6 +9831,7 @@ dependencies = [ "datafusion-execution 54.0.0", "datafusion-expr 54.0.0", "datafusion-functions 54.0.0", + "datafusion-functions-nested 54.0.0", "datafusion-physical-expr 54.0.0", "datafusion-physical-expr-adapter 54.0.0", "datafusion-physical-expr-common 54.0.0", diff --git a/Cargo.toml b/Cargo.toml index deed8b8d58a..876a0906e17 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -144,6 +144,7 @@ datafusion-datasource = { version = "54", default-features = false } datafusion-execution = { version = "54" } datafusion-expr = { version = "54" } datafusion-functions = { version = "54" } +datafusion-functions-nested = { version = "54" } datafusion-physical-expr = { version = "54" } datafusion-physical-expr-adapter = { version = "54" } datafusion-physical-expr-common = { version = "54" } diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index 4aaefec35ff..72ebb3362b4 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -24,6 +24,7 @@ datafusion-datasource = { workspace = true, default-features = false } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-functions = { workspace = true } +datafusion-functions-nested = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-adapter = { workspace = true } datafusion-physical-expr-common = { workspace = true } diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index 7b02c6f531a..69e329dfcef 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -7,12 +7,14 @@ use arrow_schema::DataType; use arrow_schema::Field; use arrow_schema::Schema; use datafusion_common::Result as DFResult; +use datafusion_common::ScalarValue; use datafusion_common::exec_datafusion_err; use datafusion_common::tree_node::TreeNode; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_expr::Operator as DFOperator; use datafusion_functions::core::getfield::GetFieldFunc; use datafusion_functions::string::octet_length::OctetLengthFunc; +use datafusion_functions_nested::length::ArrayLength; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr::projection::ProjectionExpr; @@ -32,6 +34,7 @@ use vortex::expr::get_item; use vortex::expr::is_not_null; use vortex::expr::is_null; use vortex::expr::list_contains; +use vortex::expr::list_length; use vortex::expr::lit; use vortex::expr::nested_case_when; use vortex::expr::not; @@ -155,6 +158,36 @@ impl DefaultExpressionConvertor { Ok(cast(byte_length(input), return_dtype)) } + /// Attempts to convert DataFusion's `array_length` function (aliased as `list_length`) to + /// Vortex `list_length`. + /// + /// Supports the single-argument form `array_length(arr)` and the equivalent two-argument + /// form with an explicit first dimension `array_length(arr, 1)`. Higher dimensions recurse + /// into nested lists and are rejected by [`can_array_length_be_pushed_down`] before reaching + /// this point. + fn try_convert_array_length(&self, scalar_fn: &ScalarFunctionExpr) -> DFResult { + let Some(input) = array_length_input(scalar_fn) else { + return Err(exec_datafusion_err!( + "array_length pushdown supports only the one-argument form or an explicit first \ + dimension" + )); + }; + + let input = self.convert(input.as_ref())?; + // Both DataFusion `array_length` and Vortex `list_length` return UInt64; the cast aligns + // nullability with DataFusion's declared return type. + let return_dtype = self + .session + .arrow() + .from_arrow_field(&Field::new( + "", + scalar_fn.return_type().clone(), + scalar_fn.nullable(), + )) + .map_err(|e| exec_datafusion_err!("Failed to convert return type to dtype: {e}"))?; + Ok(cast(list_length(input), return_dtype)) + } + /// Attempts to convert a DataFusion ScalarFunctionExpr to a Vortex expression. fn try_convert_scalar_function(&self, scalar_fn: &ScalarFunctionExpr) -> DFResult { if let Some(octet_length_fn) = @@ -163,6 +196,12 @@ impl DefaultExpressionConvertor { return self.try_convert_octet_length(octet_length_fn); } + if let Some(array_length_fn) = + ScalarFunctionExpr::try_downcast_func::(scalar_fn) + { + return self.try_convert_array_length(array_length_fn); + } + if let Some(get_field_fn) = ScalarFunctionExpr::try_downcast_func::(scalar_fn) { // DataFusion's GetFieldFunc flattens nested field access into a single call @@ -511,6 +550,7 @@ fn is_convertible_expr(expr: &Arc) -> bool { || expr.downcast_ref::().is_some_and(|sf| { ScalarFunctionExpr::try_downcast_func::(sf).is_some() || ScalarFunctionExpr::try_downcast_func::(sf).is_some() + || ScalarFunctionExpr::try_downcast_func::(sf).is_some() }) } @@ -572,14 +612,20 @@ fn supported_data_types(dt: &DataType) -> bool { } /// Checks if a scalar function can be pushed down. -/// Currently GetFieldFunc and OctetLengthFunc are supported. +/// Currently GetFieldFunc, OctetLengthFunc, and ArrayLength are supported. fn can_scalar_fn_be_pushed_down(scalar_fn: &ScalarFunctionExpr, schema: &Schema) -> bool { if ScalarFunctionExpr::try_downcast_func::(scalar_fn).is_some() { return true; } - ScalarFunctionExpr::try_downcast_func::(scalar_fn) + if ScalarFunctionExpr::try_downcast_func::(scalar_fn) .is_some_and(|octet_length| can_octet_length_be_pushed_down(octet_length, schema)) + { + return true; + } + + ScalarFunctionExpr::try_downcast_func::(scalar_fn) + .is_some_and(|array_length| can_array_length_be_pushed_down(array_length, schema)) } fn can_octet_length_be_pushed_down(scalar_fn: &ScalarFunctionExpr, schema: &Schema) -> bool { @@ -598,6 +644,59 @@ fn can_octet_length_be_pushed_down(scalar_fn: &ScalarFunctionExpr, schema: &Sche }) && can_be_pushed_down_impl(input, schema) } +fn can_array_length_be_pushed_down(scalar_fn: &ScalarFunctionExpr, schema: &Schema) -> bool { + let Some(input) = array_length_input(scalar_fn) else { + return false; + }; + + // The argument must resolve to a list type. We gate on the resolved data type rather than + // `can_be_pushed_down_impl`, since list columns are intentionally rejected there. We still + // require the argument to be a convertible expression (e.g. a column or struct field access). + input.data_type(schema).as_ref().is_ok_and(|data_type| { + matches!( + data_type, + DataType::List(_) | DataType::LargeList(_) | DataType::FixedSizeList(_, _) + ) + }) && is_convertible_expr(input) +} + +/// Returns the list argument of an `array_length` call if the call is a form we can rewrite to +/// `list_length`: either the single-argument form `array_length(arr)`, or the two-argument form +/// with an explicit first dimension `array_length(arr, 1)`, which is equivalent. Higher +/// dimensions recurse into nested lists and are not supported. +fn array_length_input(scalar_fn: &ScalarFunctionExpr) -> Option<&Arc> { + match scalar_fn.args() { + [input] => Some(input), + [input, dimension] if is_dimension_one(dimension) => Some(input), + _ => None, + } +} + +/// Returns true if `expr` is an integer literal equal to 1. The dimension argument of +/// `array_length` is coerced to `Int64`, but we accept any integer width defensively. +fn is_dimension_one(expr: &Arc) -> bool { + let Some(literal) = expr.downcast_ref::() else { + return false; + }; + + let dimension = match literal.value() { + ScalarValue::Int8(Some(v)) => i64::from(*v), + ScalarValue::Int16(Some(v)) => i64::from(*v), + ScalarValue::Int32(Some(v)) => i64::from(*v), + ScalarValue::Int64(Some(v)) => *v, + ScalarValue::UInt8(Some(v)) => i64::from(*v), + ScalarValue::UInt16(Some(v)) => i64::from(*v), + ScalarValue::UInt32(Some(v)) => i64::from(*v), + ScalarValue::UInt64(Some(v)) => match i64::try_from(*v) { + Ok(v) => v, + Err(_) => return false, + }, + _ => return false, + }; + + dimension == 1 +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -652,6 +751,21 @@ mod tests { ) } + fn array_length_expr( + args: Vec>, + schema: &Schema, + ) -> Arc { + Arc::new( + ScalarFunctionExpr::try_new( + Arc::new(ScalarUDF::from(ArrayLength::new())), + args, + schema, + Arc::new(ConfigOptions::new()), + ) + .unwrap(), + ) + } + #[test] fn test_make_vortex_predicate_empty() { let expr_convertor = DefaultExpressionConvertor::default(); @@ -798,6 +912,23 @@ mod tests { "); } + #[rstest] + fn test_expr_from_df_array_length(test_schema: Schema) { + let expr = Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc; + let array_length = array_length_expr(vec![expr], &test_schema); + + let result = DefaultExpressionConvertor::default() + .convert(array_length.as_ref()) + .unwrap(); + + assert_snapshot!(result.display_tree().to_string(), @r" + vortex.cast(u64?) + └── input: vortex.list.length() + └── input: vortex.get_item(unsupported_list) + └── input: vortex.root() + "); + } + #[rstest] // Supported types #[case::null(DataType::Null, true)] @@ -974,6 +1105,52 @@ mod tests { assert!(!can_be_pushed_down_impl(&octet_length, &test_schema)); } + #[rstest] + fn test_can_be_pushed_down_array_length_supported(test_schema: Schema) { + let expr = Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc; + let array_length = array_length_expr(vec![expr], &test_schema); + + assert!(can_be_pushed_down_impl(&array_length, &test_schema)); + } + + #[rstest] + fn test_can_be_pushed_down_array_length_unsupported_operand(test_schema: Schema) { + // `array_length` over a non-list column cannot be pushed down. + let expr = Arc::new(df_expr::Column::new("name", 1)) as Arc; + let array_length = Arc::new(ScalarFunctionExpr::new( + "array_length", + Arc::new(ScalarUDF::from(ArrayLength::new())), + vec![expr], + Arc::new(Field::new("array_length", DataType::UInt64, true)), + Arc::new(ConfigOptions::new()), + )) as Arc; + + assert!(!can_be_pushed_down_impl(&array_length, &test_schema)); + } + + #[rstest] + fn test_can_be_pushed_down_array_length_dimension_one_supported(test_schema: Schema) { + // `array_length(arr, 1)` is the first-dimension length, equivalent to `list_length`. + let list = Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc; + let dimension = + Arc::new(df_expr::Literal::new(ScalarValue::Int64(Some(1)))) as Arc; + let array_length = array_length_expr(vec![list, dimension], &test_schema); + + assert!(can_be_pushed_down_impl(&array_length, &test_schema)); + } + + #[rstest] + fn test_can_be_pushed_down_array_length_higher_dimension_not_supported(test_schema: Schema) { + // Dimensions other than 1 recurse into nested lists, which `list_length` does not model, + // so they must not be pushed down. + let list = Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc; + let dimension = + Arc::new(df_expr::Literal::new(ScalarValue::Int64(Some(2)))) as Arc; + let array_length = array_length_expr(vec![list, dimension], &test_schema); + + assert!(!can_be_pushed_down_impl(&array_length, &test_schema)); + } + // https://github.com/vortex-data/vortex/issues/6211 #[tokio::test] async fn test_cast_int_to_string() -> anyhow::Result<()> { diff --git a/vortex-datafusion/src/persistent/tests.rs b/vortex-datafusion/src/persistent/tests.rs index 220a1477b13..65660df8e21 100644 --- a/vortex-datafusion/src/persistent/tests.rs +++ b/vortex-datafusion/src/persistent/tests.rs @@ -13,6 +13,8 @@ use datafusion::execution::SessionStateBuilder; use datafusion::prelude::SessionConfig; use datafusion::prelude::SessionContext; use datafusion_common::GetExt; +use datafusion_expr::ScalarUDF; +use datafusion_functions_nested::length::ArrayLength; use datafusion_physical_plan::display::DisplayableExecutionPlan; use insta::assert_snapshot; use object_store::ObjectStore; @@ -21,6 +23,7 @@ use rstest::rstest; use vortex::VortexSessionDefault; use vortex::array::IntoArray; use vortex::array::arrays::ChunkedArray; +use vortex::array::arrays::ListArray; use vortex::array::arrays::StructArray; use vortex::array::arrays::VarBinArray; use vortex::array::validity::Validity; @@ -233,6 +236,114 @@ async fn test_octet_length_pushdown() -> anyhow::Result<()> { Ok(()) } +#[tokio::test] +async fn test_array_length_pushdown() -> anyhow::Result<()> { + // `new(true)` enables projection pushdown so the `array_length` projection is pushed into + // the Vortex scan rather than evaluated above it. + let ctx = TestSessionContext::new(true); + // `array_length` is a nested-array function; the test session is built without the + // `nested_expressions` default feature, so register it explicitly. + ctx.session + .register_udf(ScalarUDF::from(ArrayLength::new())); + let session = VortexSession::default(); + + // Five lists with element counts 3, 4, 0, 5, 2 respectively. The empty list exercises the + // 0 (not NULL) result that both DataFusion's `array_length` and Vortex's `list_length` + // produce for a non-null empty list. + let elements = buffer![ + 10i32, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140 + ] + .into_array(); + let offsets = buffer![0i32, 3, 7, 7, 12, 14].into_array(); + let int_list = ListArray::try_new(elements, offsets, Validity::AllValid)?.into_array(); + let ids = buffer![0i32, 1, 2, 3, 4].into_array(); + + let st = StructArray::try_new( + ["id", "int_list"].into(), + vec![ids, int_list], + 5, + Validity::NonNullable, + )?; + + let mut writer = ObjectStoreWrite::new(Arc::clone(&ctx.store), &"list.vortex".into()).await?; + session + .write_options() + .write(&mut writer, st.into_array().to_array_stream()) + .await?; + writer.shutdown().await?; + + // Projection: `array_length` computed from the list offsets without materializing elements. + let result = ctx + .session + .sql( + "SELECT id, array_length(int_list) AS len \ + FROM '/list.vortex' \ + ORDER BY id", + ) + .await? + .collect() + .await?; + + assert_eq!( + result[0].schema().field_with_name("len")?.data_type(), + &DataType::UInt64 + ); + assert_snapshot!(pretty_format_batches(&result)?, @r" + +----+-----+ + | id | len | + +----+-----+ + | 0 | 3 | + | 1 | 4 | + | 2 | 0 | + | 3 | 5 | + | 4 | 2 | + +----+-----+ + "); + + // The explicit first-dimension form `array_length(int_list, 1)` is equivalent and pushes + // down identically. + let with_dimension = ctx + .session + .sql( + "SELECT array_length(int_list, 1) AS len \ + FROM '/list.vortex' \ + ORDER BY id", + ) + .await? + .collect() + .await?; + + assert_snapshot!(pretty_format_batches(&with_dimension)?, @r" + +-----+ + | len | + +-----+ + | 3 | + | 4 | + | 0 | + | 5 | + | 2 | + +-----+ + "); + + // Filter: `WHERE array_length(int_list) >= 4` keeps the 4- and 5-element lists. + let filtered = ctx + .session + .sql("SELECT COUNT(*) AS cnt FROM '/list.vortex' WHERE array_length(int_list) >= 4") + .await? + .collect() + .await?; + + assert_snapshot!(pretty_format_batches(&filtered)?, @r" + +-----+ + | cnt | + +-----+ + | 2 | + +-----+ + "); + + Ok(()) +} + #[tokio::test] async fn create_table_ordered_by() -> anyhow::Result<()> { let ctx = TestSessionContext::default(); From bd0b1e8b9d711c8e8667e35392380f051d4df09d Mon Sep 17 00:00:00 2001 From: Matt Katz Date: Thu, 25 Jun 2026 17:31:02 -0700 Subject: [PATCH 2/6] comments and test change Signed-off-by: Matt Katz --- vortex-datafusion/src/convert/exprs.rs | 27 +++++++++++--------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index 69e329dfcef..98d4a868965 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -162,9 +162,7 @@ impl DefaultExpressionConvertor { /// Vortex `list_length`. /// /// Supports the single-argument form `array_length(arr)` and the equivalent two-argument - /// form with an explicit first dimension `array_length(arr, 1)`. Higher dimensions recurse - /// into nested lists and are rejected by [`can_array_length_be_pushed_down`] before reaching - /// this point. + /// form with an explicit first dimension `array_length(arr, 1)`. fn try_convert_array_length(&self, scalar_fn: &ScalarFunctionExpr) -> DFResult { let Some(input) = array_length_input(scalar_fn) else { return Err(exec_datafusion_err!( @@ -174,8 +172,6 @@ impl DefaultExpressionConvertor { }; let input = self.convert(input.as_ref())?; - // Both DataFusion `array_length` and Vortex `list_length` return UInt64; the cast aligns - // nullability with DataFusion's declared return type. let return_dtype = self .session .arrow() @@ -732,7 +728,7 @@ mod tests { true, ), Field::new( - "unsupported_list", + "tags", DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), true, ), @@ -914,7 +910,7 @@ mod tests { #[rstest] fn test_expr_from_df_array_length(test_schema: Schema) { - let expr = Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc; + let expr = Arc::new(df_expr::Column::new("tags", 5)) as Arc; let array_length = array_length_expr(vec![expr], &test_schema); let result = DefaultExpressionConvertor::default() @@ -924,7 +920,7 @@ mod tests { assert_snapshot!(result.display_tree().to_string(), @r" vortex.cast(u64?) └── input: vortex.list.length() - └── input: vortex.get_item(unsupported_list) + └── input: vortex.get_item(tags) └── input: vortex.root() "); } @@ -992,8 +988,7 @@ mod tests { #[rstest] fn test_can_be_pushed_down_column_unsupported_type(test_schema: Schema) { - let col_expr = - Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc; + let col_expr = Arc::new(df_expr::Column::new("tags", 5)) as Arc; assert!(!can_be_pushed_down_impl(&col_expr, &test_schema)); } @@ -1050,7 +1045,7 @@ mod tests { #[rstest] fn test_can_be_pushed_down_binary_unsupported_operand(test_schema: Schema) { - let left = Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc; + let left = Arc::new(df_expr::Column::new("tags", 5)) as Arc; let right = Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc; let binary_expr = Arc::new(df_expr::BinaryExpr::new(left, DFOperator::Eq, right)) @@ -1073,7 +1068,7 @@ mod tests { #[rstest] fn test_can_be_pushed_down_like_unsupported_operand(test_schema: Schema) { - let expr = Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc; + let expr = Arc::new(df_expr::Column::new("tags", 5)) as Arc; let pattern = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some( "test%".to_string(), )))) as Arc; @@ -1093,7 +1088,7 @@ mod tests { #[rstest] fn test_can_be_pushed_down_octet_length_unsupported_operand(test_schema: Schema) { - let expr = Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc; + let expr = Arc::new(df_expr::Column::new("tags", 5)) as Arc; let octet_length = Arc::new(ScalarFunctionExpr::new( "octet_length", Arc::new(ScalarUDF::from(OctetLengthFunc::new())), @@ -1107,7 +1102,7 @@ mod tests { #[rstest] fn test_can_be_pushed_down_array_length_supported(test_schema: Schema) { - let expr = Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc; + let expr = Arc::new(df_expr::Column::new("tags", 5)) as Arc; let array_length = array_length_expr(vec![expr], &test_schema); assert!(can_be_pushed_down_impl(&array_length, &test_schema)); @@ -1131,7 +1126,7 @@ mod tests { #[rstest] fn test_can_be_pushed_down_array_length_dimension_one_supported(test_schema: Schema) { // `array_length(arr, 1)` is the first-dimension length, equivalent to `list_length`. - let list = Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc; + let list = Arc::new(df_expr::Column::new("tags", 5)) as Arc; let dimension = Arc::new(df_expr::Literal::new(ScalarValue::Int64(Some(1)))) as Arc; let array_length = array_length_expr(vec![list, dimension], &test_schema); @@ -1143,7 +1138,7 @@ mod tests { fn test_can_be_pushed_down_array_length_higher_dimension_not_supported(test_schema: Schema) { // Dimensions other than 1 recurse into nested lists, which `list_length` does not model, // so they must not be pushed down. - let list = Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc; + let list = Arc::new(df_expr::Column::new("tags", 5)) as Arc; let dimension = Arc::new(df_expr::Literal::new(ScalarValue::Int64(Some(2)))) as Arc; let array_length = array_length_expr(vec![list, dimension], &test_schema); From bde84941bcfdbe19ae4a7607d74b6df13f0a78a9 Mon Sep 17 00:00:00 2001 From: Matt Katz Date: Fri, 26 Jun 2026 12:43:20 -0700 Subject: [PATCH 3/6] address review: simplify dimension check, move e2e to sqllogictest Signed-off-by: Matt Katz --- Cargo.lock | 1 + vortex-datafusion/src/convert/exprs.rs | 27 +---- vortex-datafusion/src/persistent/tests.rs | 111 ------------------ vortex-sqllogictest/Cargo.toml | 1 + .../bin/sqllogictests-runner.rs | 8 +- .../slt/datafusion/list_length_pushdown.slt | 54 +++++++++ 6 files changed, 67 insertions(+), 135 deletions(-) create mode 100644 vortex-sqllogictest/slt/datafusion/list_length_pushdown.slt diff --git a/Cargo.lock b/Cargo.lock index 02dc8d43f25..0d8ce4d52da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10452,6 +10452,7 @@ dependencies = [ "async-trait", "bigdecimal", "datafusion 54.0.0", + "datafusion-functions-nested 54.0.0", "datafusion-sqllogictest", "indicatif", "regex", diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index 98d4a868965..862c56bc3cc 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -668,29 +668,12 @@ fn array_length_input(scalar_fn: &ScalarFunctionExpr) -> Option<&Arc) -> bool { - let Some(literal) = expr.downcast_ref::() else { - return false; - }; - - let dimension = match literal.value() { - ScalarValue::Int8(Some(v)) => i64::from(*v), - ScalarValue::Int16(Some(v)) => i64::from(*v), - ScalarValue::Int32(Some(v)) => i64::from(*v), - ScalarValue::Int64(Some(v)) => *v, - ScalarValue::UInt8(Some(v)) => i64::from(*v), - ScalarValue::UInt16(Some(v)) => i64::from(*v), - ScalarValue::UInt32(Some(v)) => i64::from(*v), - ScalarValue::UInt64(Some(v)) => match i64::try_from(*v) { - Ok(v) => v, - Err(_) => return false, - }, - _ => return false, - }; - - dimension == 1 + expr.downcast_ref::() + .is_some_and(|literal| matches!(literal.value(), ScalarValue::Int64(Some(1)))) } #[cfg(test)] diff --git a/vortex-datafusion/src/persistent/tests.rs b/vortex-datafusion/src/persistent/tests.rs index 65660df8e21..220a1477b13 100644 --- a/vortex-datafusion/src/persistent/tests.rs +++ b/vortex-datafusion/src/persistent/tests.rs @@ -13,8 +13,6 @@ use datafusion::execution::SessionStateBuilder; use datafusion::prelude::SessionConfig; use datafusion::prelude::SessionContext; use datafusion_common::GetExt; -use datafusion_expr::ScalarUDF; -use datafusion_functions_nested::length::ArrayLength; use datafusion_physical_plan::display::DisplayableExecutionPlan; use insta::assert_snapshot; use object_store::ObjectStore; @@ -23,7 +21,6 @@ use rstest::rstest; use vortex::VortexSessionDefault; use vortex::array::IntoArray; use vortex::array::arrays::ChunkedArray; -use vortex::array::arrays::ListArray; use vortex::array::arrays::StructArray; use vortex::array::arrays::VarBinArray; use vortex::array::validity::Validity; @@ -236,114 +233,6 @@ async fn test_octet_length_pushdown() -> anyhow::Result<()> { Ok(()) } -#[tokio::test] -async fn test_array_length_pushdown() -> anyhow::Result<()> { - // `new(true)` enables projection pushdown so the `array_length` projection is pushed into - // the Vortex scan rather than evaluated above it. - let ctx = TestSessionContext::new(true); - // `array_length` is a nested-array function; the test session is built without the - // `nested_expressions` default feature, so register it explicitly. - ctx.session - .register_udf(ScalarUDF::from(ArrayLength::new())); - let session = VortexSession::default(); - - // Five lists with element counts 3, 4, 0, 5, 2 respectively. The empty list exercises the - // 0 (not NULL) result that both DataFusion's `array_length` and Vortex's `list_length` - // produce for a non-null empty list. - let elements = buffer![ - 10i32, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140 - ] - .into_array(); - let offsets = buffer![0i32, 3, 7, 7, 12, 14].into_array(); - let int_list = ListArray::try_new(elements, offsets, Validity::AllValid)?.into_array(); - let ids = buffer![0i32, 1, 2, 3, 4].into_array(); - - let st = StructArray::try_new( - ["id", "int_list"].into(), - vec![ids, int_list], - 5, - Validity::NonNullable, - )?; - - let mut writer = ObjectStoreWrite::new(Arc::clone(&ctx.store), &"list.vortex".into()).await?; - session - .write_options() - .write(&mut writer, st.into_array().to_array_stream()) - .await?; - writer.shutdown().await?; - - // Projection: `array_length` computed from the list offsets without materializing elements. - let result = ctx - .session - .sql( - "SELECT id, array_length(int_list) AS len \ - FROM '/list.vortex' \ - ORDER BY id", - ) - .await? - .collect() - .await?; - - assert_eq!( - result[0].schema().field_with_name("len")?.data_type(), - &DataType::UInt64 - ); - assert_snapshot!(pretty_format_batches(&result)?, @r" - +----+-----+ - | id | len | - +----+-----+ - | 0 | 3 | - | 1 | 4 | - | 2 | 0 | - | 3 | 5 | - | 4 | 2 | - +----+-----+ - "); - - // The explicit first-dimension form `array_length(int_list, 1)` is equivalent and pushes - // down identically. - let with_dimension = ctx - .session - .sql( - "SELECT array_length(int_list, 1) AS len \ - FROM '/list.vortex' \ - ORDER BY id", - ) - .await? - .collect() - .await?; - - assert_snapshot!(pretty_format_batches(&with_dimension)?, @r" - +-----+ - | len | - +-----+ - | 3 | - | 4 | - | 0 | - | 5 | - | 2 | - +-----+ - "); - - // Filter: `WHERE array_length(int_list) >= 4` keeps the 4- and 5-element lists. - let filtered = ctx - .session - .sql("SELECT COUNT(*) AS cnt FROM '/list.vortex' WHERE array_length(int_list) >= 4") - .await? - .collect() - .await?; - - assert_snapshot!(pretty_format_batches(&filtered)?, @r" - +-----+ - | cnt | - +-----+ - | 2 | - +-----+ - "); - - Ok(()) -} - #[tokio::test] async fn create_table_ordered_by() -> anyhow::Result<()> { let ctx = TestSessionContext::default(); diff --git a/vortex-sqllogictest/Cargo.toml b/vortex-sqllogictest/Cargo.toml index 9cc2f4c63e9..ddfd79ef99e 100644 --- a/vortex-sqllogictest/Cargo.toml +++ b/vortex-sqllogictest/Cargo.toml @@ -18,6 +18,7 @@ anyhow = { workspace = true } async-trait = { workspace = true } bigdecimal = { workspace = true } datafusion = { workspace = true } +datafusion-functions-nested = { workspace = true } datafusion-sqllogictest = { workspace = true } indicatif = { workspace = true } regex = { workspace = true } diff --git a/vortex-sqllogictest/bin/sqllogictests-runner.rs b/vortex-sqllogictest/bin/sqllogictests-runner.rs index 8f99ff4ec49..80335ac8951 100644 --- a/vortex-sqllogictest/bin/sqllogictests-runner.rs +++ b/vortex-sqllogictest/bin/sqllogictests-runner.rs @@ -73,8 +73,12 @@ fn drive_datafusion(path: &Path, work_dir: &Path, mode: Mode) -> anyhow::Result< Arc::new(DefaultTableFactory::new()), ) .with_file_formats(vec![factory]); - let session = - SessionContext::new_with_state(session_state_builder.build()).enable_url_table(); + // The workspace builds `datafusion` without the `nested_expressions` feature, so array + // functions (e.g. `make_array`, `array_length`) are not registered by default. Register + // them explicitly so SLT files can construct and query list columns. + let mut session_state = session_state_builder.build(); + datafusion_functions_nested::register_all(&mut session_state)?; + let session = SessionContext::new_with_state(session_state).enable_url_table(); let mut runner = Runner::new(|| async { Ok(PathNormalizing::new( diff --git a/vortex-sqllogictest/slt/datafusion/list_length_pushdown.slt b/vortex-sqllogictest/slt/datafusion/list_length_pushdown.slt new file mode 100644 index 00000000000..df37c7f9e0b --- /dev/null +++ b/vortex-sqllogictest/slt/datafusion/list_length_pushdown.slt @@ -0,0 +1,54 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright the Vortex contributors + +# `array_length` / `list_length` over a list column is rewritten to Vortex's `list_length` and +# pushed into the scan (computed from list offsets, without materializing the elements). + +include ../setup.slt.no + +# Projection-expression pushdown is off by default; enable it so the length computation folds +# into the Vortex scan rather than being evaluated by a ProjectionExec above it. +statement ok +SET vortex.projection_pushdown = true; + +# A Vortex file with a list column whose rows have 3, 2, and 4 elements. +query I +COPY ( + SELECT 1 AS id, make_array(10, 20, 30) AS tags + UNION ALL SELECT 2, make_array(40, 50) + UNION ALL SELECT 3, make_array(60, 70, 80, 90) +) TO '${WORK_DIR}/list_length.vortex'; +---- +3 + +# `array_length(list)` returns the per-row element count as UInt64. +query II +SELECT id, array_length(tags) AS len FROM '${WORK_DIR}/list_length.vortex' ORDER BY id; +---- +1 3 +2 2 +3 4 + +# The explicit first-dimension form `array_length(list, 1)` is equivalent. +query II +SELECT id, array_length(tags, 1) AS len FROM '${WORK_DIR}/list_length.vortex' ORDER BY id; +---- +1 3 +2 2 +3 4 + +# Filtering on the length pushes the predicate into the scan. +query I +SELECT count(*) FROM '${WORK_DIR}/list_length.vortex' WHERE array_length(tags) >= 3; +---- +2 + +# The physical plan must show the projection pushed into the Vortex DataSourceExec, with no +# array_length ScalarFunctionExpr remaining above the scan. +query TT +EXPLAIN SELECT array_length(tags) AS len FROM '${WORK_DIR}/list_length.vortex'; +---- +logical_plan +01)Projection: array_length(${WORK_DIR}/list_length.vortex.tags) AS len +02)--TableScan: ${WORK_DIR}/list_length.vortex projection=[tags] +physical_plan DataSourceExec: file_groups={}, projection=[array_length(tags@1) as len], file_type=vortex From 17cbf26f817327a0d40cf8ba4d1812b4334747d1 Mon Sep 17 00:00:00 2001 From: Matt Katz Date: Fri, 26 Jun 2026 14:03:37 -0700 Subject: [PATCH 4/6] sqllogictest: mirror duckdb list_length coverage, show repartition blocks projection fold Signed-off-by: Matt Katz --- .../slt/datafusion/list_length_pushdown.slt | 149 +++++++++++++++--- 1 file changed, 124 insertions(+), 25 deletions(-) diff --git a/vortex-sqllogictest/slt/datafusion/list_length_pushdown.slt b/vortex-sqllogictest/slt/datafusion/list_length_pushdown.slt index df37c7f9e0b..71c809c1693 100644 --- a/vortex-sqllogictest/slt/datafusion/list_length_pushdown.slt +++ b/vortex-sqllogictest/slt/datafusion/list_length_pushdown.slt @@ -11,44 +11,143 @@ include ../setup.slt.no statement ok SET vortex.projection_pushdown = true; -# A Vortex file with a list column whose rows have 3, 2, and 4 elements. +# Two list columns so we can exercise list-length pushdown over the same and different columns, +# in both SELECT and WHERE. Row 5's `b` is an empty list (length 0). +statement ok +CREATE TABLE list_test (id INT, a INT[], b INT[]); + +statement ok +INSERT INTO list_test VALUES + (1, [10, 20, 30], [1]), + (2, [40, 50, 60, 70], [2, 3]), + (3, [80], [4, 5, 6]), + (4, [90, 100, 110, 120, 130], [7, 8, 9, 10]), + (5, [140, 150], []); + +statement ok +COPY (SELECT * FROM list_test) TO '${WORK_DIR}/list-length.vortex'; + +# array_length(a) projection is pushed into the scan: it appears in the DataSourceExec +# projection with no ProjectionExec above the scan. +query TT +EXPLAIN SELECT array_length(a) AS len FROM '${WORK_DIR}/list-length.vortex'; +---- +logical_plan +01)Projection: array_length(${WORK_DIR}/list-length.vortex.a) AS len +02)--TableScan: ${WORK_DIR}/list-length.vortex projection=[a] +physical_plan DataSourceExec: file_groups={}, projection=[array_length(a@1) as len], file_type=vortex + +query I +SELECT array_length(a) FROM '${WORK_DIR}/list-length.vortex' ORDER BY id; +---- +3 +4 +1 +5 +2 + +# `list_length` is an alias for `array_length`. +query I +SELECT list_length(a) FROM '${WORK_DIR}/list-length.vortex' ORDER BY id; +---- +3 +4 +1 +5 +2 + +# The explicit first-dimension form `array_length(a, 1)` is equivalent. +query I +SELECT array_length(a, 1) FROM '${WORK_DIR}/list-length.vortex' ORDER BY id; +---- +3 +4 +1 +5 +2 + +# array_length over the other list column, including the empty list (length 0, not NULL). query I -COPY ( - SELECT 1 AS id, make_array(10, 20, 30) AS tags - UNION ALL SELECT 2, make_array(40, 50) - UNION ALL SELECT 3, make_array(60, 70, 80, 90) -) TO '${WORK_DIR}/list_length.vortex'; +SELECT array_length(b) FROM '${WORK_DIR}/list-length.vortex' ORDER BY id; ---- +1 +2 3 +4 +0 + +# array_length in WHERE is pushed into the scan as a predicate (no FilterExec above the scan). +query TT +EXPLAIN SELECT id FROM '${WORK_DIR}/list-length.vortex' WHERE array_length(a) >= 4; +---- +logical_plan +01)Projection: ${WORK_DIR}/list-length.vortex.id +02)--Filter: array_length(${WORK_DIR}/list-length.vortex.a) >= UInt64(4) +03)----TableScan: ${WORK_DIR}/list-length.vortex projection=[id, a], partial_filters=[array_length(${WORK_DIR}/list-length.vortex.a) >= UInt64(4)] +physical_plan DataSourceExec: file_groups={}, projection=[id], file_type=vortex, predicate: array_length(a@1) >= 4 + +query I +SELECT id FROM '${WORK_DIR}/list-length.vortex' WHERE array_length(a) >= 4 ORDER BY id; +---- +2 +4 -# `array_length(list)` returns the per-row element count as UInt64. +# array_length on the same column in both SELECT and WHERE produces the correct result. +query I +SELECT array_length(a) FROM '${WORK_DIR}/list-length.vortex' +WHERE array_length(a) >= 4 ORDER BY id; +---- +4 +5 + +# array_length filtering a different column than the projection. query II -SELECT id, array_length(tags) AS len FROM '${WORK_DIR}/list_length.vortex' ORDER BY id; +SELECT id, array_length(a) FROM '${WORK_DIR}/list-length.vortex' +WHERE array_length(b) >= 3 ORDER BY id; ---- -1 3 -2 2 -3 4 +3 1 +4 5 -# The explicit first-dimension form `array_length(list, 1)` is equivalent. +# array_length over different columns in both SELECT and WHERE. query II -SELECT id, array_length(tags, 1) AS len FROM '${WORK_DIR}/list_length.vortex' ORDER BY id; +SELECT array_length(a), array_length(b) FROM '${WORK_DIR}/list-length.vortex' +WHERE array_length(b) >= 2 ORDER BY id; ---- +4 2 1 3 -2 2 -3 4 +5 4 -# Filtering on the length pushes the predicate into the scan. -query I -SELECT count(*) FROM '${WORK_DIR}/list_length.vortex' WHERE array_length(tags) >= 3; +# The projection fold interacts with parallelism. When a filter is present, DataFusion's +# EnforceDistribution rule inserts a RepartitionExec to parallelize the work above the scan; it +# lands between the ProjectionExec and the DataSourceExec and blocks the projection fold, so only +# the filter is pushed down. (target_partitions is pinned so the RepartitionExec count is stable.) +statement ok +SET datafusion.execution.target_partitions = 2; + +query TT +EXPLAIN SELECT array_length(a) AS len FROM '${WORK_DIR}/list-length.vortex' +WHERE array_length(a) >= 4; ---- -2 +logical_plan +01)Projection: array_length(${WORK_DIR}/list-length.vortex.a) AS len +02)--Filter: array_length(${WORK_DIR}/list-length.vortex.a) >= UInt64(4) +03)----TableScan: ${WORK_DIR}/list-length.vortex projection=[a], partial_filters=[array_length(${WORK_DIR}/list-length.vortex.a) >= UInt64(4)] +physical_plan +01)ProjectionExec: expr=[array_length(a@0) as len] +02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----DataSourceExec: file_groups={}, projection=[a], file_type=vortex, predicate: array_length(a@1) >= 4 + +# With a single partition no RepartitionExec is inserted, so the ProjectionExec stays adjacent to +# the scan and both the projection and the predicate fold into it. +statement ok +SET datafusion.execution.target_partitions = 1; -# The physical plan must show the projection pushed into the Vortex DataSourceExec, with no -# array_length ScalarFunctionExpr remaining above the scan. query TT -EXPLAIN SELECT array_length(tags) AS len FROM '${WORK_DIR}/list_length.vortex'; +EXPLAIN SELECT array_length(a) AS len FROM '${WORK_DIR}/list-length.vortex' +WHERE array_length(a) >= 4; ---- logical_plan -01)Projection: array_length(${WORK_DIR}/list_length.vortex.tags) AS len -02)--TableScan: ${WORK_DIR}/list_length.vortex projection=[tags] -physical_plan DataSourceExec: file_groups={}, projection=[array_length(tags@1) as len], file_type=vortex +01)Projection: array_length(${WORK_DIR}/list-length.vortex.a) AS len +02)--Filter: array_length(${WORK_DIR}/list-length.vortex.a) >= UInt64(4) +03)----TableScan: ${WORK_DIR}/list-length.vortex projection=[a], partial_filters=[array_length(${WORK_DIR}/list-length.vortex.a) >= UInt64(4)] +physical_plan DataSourceExec: file_groups={}, projection=[array_length(a@1) as len], file_type=vortex, predicate: array_length(a@1) >= 4 From 6b3fdfef40ff99402fdd80edf993ea5899f06086 Mon Sep 17 00:00:00 2001 From: Matt Katz Date: Fri, 26 Jun 2026 15:31:08 -0700 Subject: [PATCH 5/6] sqllogictest: back list_length test by a Vortex external table Signed-off-by: Matt Katz --- .../slt/datafusion/list_length_pushdown.slt | 62 +++++++++---------- 1 file changed, 28 insertions(+), 34 deletions(-) diff --git a/vortex-sqllogictest/slt/datafusion/list_length_pushdown.slt b/vortex-sqllogictest/slt/datafusion/list_length_pushdown.slt index 71c809c1693..f5f8fb74388 100644 --- a/vortex-sqllogictest/slt/datafusion/list_length_pushdown.slt +++ b/vortex-sqllogictest/slt/datafusion/list_length_pushdown.slt @@ -11,10 +11,12 @@ include ../setup.slt.no statement ok SET vortex.projection_pushdown = true; -# Two list columns so we can exercise list-length pushdown over the same and different columns, -# in both SELECT and WHERE. Row 5's `b` is an empty list (length 0). +# A Vortex-backed external table with two list columns, so we can exercise list-length pushdown +# over the same and different columns in both SELECT and WHERE. Row 5's `b` is an empty list. statement ok -CREATE TABLE list_test (id INT, a INT[], b INT[]); +CREATE EXTERNAL TABLE list_test (id INT, a INT[], b INT[]) +STORED AS vortex +LOCATION '${WORK_DIR}/list_test/'; statement ok INSERT INTO list_test VALUES @@ -24,21 +26,18 @@ INSERT INTO list_test VALUES (4, [90, 100, 110, 120, 130], [7, 8, 9, 10]), (5, [140, 150], []); -statement ok -COPY (SELECT * FROM list_test) TO '${WORK_DIR}/list-length.vortex'; - # array_length(a) projection is pushed into the scan: it appears in the DataSourceExec # projection with no ProjectionExec above the scan. query TT -EXPLAIN SELECT array_length(a) AS len FROM '${WORK_DIR}/list-length.vortex'; +EXPLAIN SELECT array_length(a) AS len FROM list_test; ---- logical_plan -01)Projection: array_length(${WORK_DIR}/list-length.vortex.a) AS len -02)--TableScan: ${WORK_DIR}/list-length.vortex projection=[a] +01)Projection: array_length(list_test.a) AS len +02)--TableScan: list_test projection=[a] physical_plan DataSourceExec: file_groups={}, projection=[array_length(a@1) as len], file_type=vortex query I -SELECT array_length(a) FROM '${WORK_DIR}/list-length.vortex' ORDER BY id; +SELECT array_length(a) FROM list_test ORDER BY id; ---- 3 4 @@ -48,7 +47,7 @@ SELECT array_length(a) FROM '${WORK_DIR}/list-length.vortex' ORDER BY id; # `list_length` is an alias for `array_length`. query I -SELECT list_length(a) FROM '${WORK_DIR}/list-length.vortex' ORDER BY id; +SELECT list_length(a) FROM list_test ORDER BY id; ---- 3 4 @@ -58,7 +57,7 @@ SELECT list_length(a) FROM '${WORK_DIR}/list-length.vortex' ORDER BY id; # The explicit first-dimension form `array_length(a, 1)` is equivalent. query I -SELECT array_length(a, 1) FROM '${WORK_DIR}/list-length.vortex' ORDER BY id; +SELECT array_length(a, 1) FROM list_test ORDER BY id; ---- 3 4 @@ -68,7 +67,7 @@ SELECT array_length(a, 1) FROM '${WORK_DIR}/list-length.vortex' ORDER BY id; # array_length over the other list column, including the empty list (length 0, not NULL). query I -SELECT array_length(b) FROM '${WORK_DIR}/list-length.vortex' ORDER BY id; +SELECT array_length(b) FROM list_test ORDER BY id; ---- 1 2 @@ -78,40 +77,37 @@ SELECT array_length(b) FROM '${WORK_DIR}/list-length.vortex' ORDER BY id; # array_length in WHERE is pushed into the scan as a predicate (no FilterExec above the scan). query TT -EXPLAIN SELECT id FROM '${WORK_DIR}/list-length.vortex' WHERE array_length(a) >= 4; +EXPLAIN SELECT id FROM list_test WHERE array_length(a) >= 4; ---- logical_plan -01)Projection: ${WORK_DIR}/list-length.vortex.id -02)--Filter: array_length(${WORK_DIR}/list-length.vortex.a) >= UInt64(4) -03)----TableScan: ${WORK_DIR}/list-length.vortex projection=[id, a], partial_filters=[array_length(${WORK_DIR}/list-length.vortex.a) >= UInt64(4)] +01)Projection: list_test.id +02)--Filter: array_length(list_test.a) >= UInt64(4) +03)----TableScan: list_test projection=[id, a], partial_filters=[array_length(list_test.a) >= UInt64(4)] physical_plan DataSourceExec: file_groups={}, projection=[id], file_type=vortex, predicate: array_length(a@1) >= 4 query I -SELECT id FROM '${WORK_DIR}/list-length.vortex' WHERE array_length(a) >= 4 ORDER BY id; +SELECT id FROM list_test WHERE array_length(a) >= 4 ORDER BY id; ---- 2 4 # array_length on the same column in both SELECT and WHERE produces the correct result. query I -SELECT array_length(a) FROM '${WORK_DIR}/list-length.vortex' -WHERE array_length(a) >= 4 ORDER BY id; +SELECT array_length(a) FROM list_test WHERE array_length(a) >= 4 ORDER BY id; ---- 4 5 # array_length filtering a different column than the projection. query II -SELECT id, array_length(a) FROM '${WORK_DIR}/list-length.vortex' -WHERE array_length(b) >= 3 ORDER BY id; +SELECT id, array_length(a) FROM list_test WHERE array_length(b) >= 3 ORDER BY id; ---- 3 1 4 5 # array_length over different columns in both SELECT and WHERE. query II -SELECT array_length(a), array_length(b) FROM '${WORK_DIR}/list-length.vortex' -WHERE array_length(b) >= 2 ORDER BY id; +SELECT array_length(a), array_length(b) FROM list_test WHERE array_length(b) >= 2 ORDER BY id; ---- 4 2 1 3 @@ -125,13 +121,12 @@ statement ok SET datafusion.execution.target_partitions = 2; query TT -EXPLAIN SELECT array_length(a) AS len FROM '${WORK_DIR}/list-length.vortex' -WHERE array_length(a) >= 4; +EXPLAIN SELECT array_length(a) AS len FROM list_test WHERE array_length(a) >= 4; ---- logical_plan -01)Projection: array_length(${WORK_DIR}/list-length.vortex.a) AS len -02)--Filter: array_length(${WORK_DIR}/list-length.vortex.a) >= UInt64(4) -03)----TableScan: ${WORK_DIR}/list-length.vortex projection=[a], partial_filters=[array_length(${WORK_DIR}/list-length.vortex.a) >= UInt64(4)] +01)Projection: array_length(list_test.a) AS len +02)--Filter: array_length(list_test.a) >= UInt64(4) +03)----TableScan: list_test projection=[a], partial_filters=[array_length(list_test.a) >= UInt64(4)] physical_plan 01)ProjectionExec: expr=[array_length(a@0) as len] 02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 @@ -143,11 +138,10 @@ statement ok SET datafusion.execution.target_partitions = 1; query TT -EXPLAIN SELECT array_length(a) AS len FROM '${WORK_DIR}/list-length.vortex' -WHERE array_length(a) >= 4; +EXPLAIN SELECT array_length(a) AS len FROM list_test WHERE array_length(a) >= 4; ---- logical_plan -01)Projection: array_length(${WORK_DIR}/list-length.vortex.a) AS len -02)--Filter: array_length(${WORK_DIR}/list-length.vortex.a) >= UInt64(4) -03)----TableScan: ${WORK_DIR}/list-length.vortex projection=[a], partial_filters=[array_length(${WORK_DIR}/list-length.vortex.a) >= UInt64(4)] +01)Projection: array_length(list_test.a) AS len +02)--Filter: array_length(list_test.a) >= UInt64(4) +03)----TableScan: list_test projection=[a], partial_filters=[array_length(list_test.a) >= UInt64(4)] physical_plan DataSourceExec: file_groups={}, projection=[array_length(a@1) as len], file_type=vortex, predicate: array_length(a@1) >= 4 From 8c868690d5e4d721e0ba5f2630d14e8b0946afa8 Mon Sep 17 00:00:00 2001 From: Matt Katz Date: Fri, 26 Jun 2026 15:59:08 -0700 Subject: [PATCH 6/6] sqllogictest: fix list_length test expectations, use array_length(b) >= 3 filters Signed-off-by: Matt Katz --- .../slt/datafusion/list_length_pushdown.slt | 128 ++++++++++-------- 1 file changed, 73 insertions(+), 55 deletions(-) diff --git a/vortex-sqllogictest/slt/datafusion/list_length_pushdown.slt b/vortex-sqllogictest/slt/datafusion/list_length_pushdown.slt index f5f8fb74388..5c4ed588e15 100644 --- a/vortex-sqllogictest/slt/datafusion/list_length_pushdown.slt +++ b/vortex-sqllogictest/slt/datafusion/list_length_pushdown.slt @@ -1,18 +1,14 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright the Vortex contributors -# `array_length` / `list_length` over a list column is rewritten to Vortex's `list_length` and -# pushed into the scan (computed from list offsets, without materializing the elements). +# array_length / array_length(_, 1) / list_length over a list column is rewritten to Vortex's list_length and +# pushed into the scan. include ../setup.slt.no -# Projection-expression pushdown is off by default; enable it so the length computation folds -# into the Vortex scan rather than being evaluated by a ProjectionExec above it. statement ok SET vortex.projection_pushdown = true; -# A Vortex-backed external table with two list columns, so we can exercise list-length pushdown -# over the same and different columns in both SELECT and WHERE. Row 5's `b` is an empty list. statement ok CREATE EXTERNAL TABLE list_test (id INT, a INT[], b INT[]) STORED AS vortex @@ -26,8 +22,8 @@ INSERT INTO list_test VALUES (4, [90, 100, 110, 120, 130], [7, 8, 9, 10]), (5, [140, 150], []); -# array_length(a) projection is pushed into the scan: it appears in the DataSourceExec -# projection with no ProjectionExec above the scan. +# Projection on column a is pushed down + query TT EXPLAIN SELECT array_length(a) AS len FROM list_test; ---- @@ -45,9 +41,16 @@ SELECT array_length(a) FROM list_test ORDER BY id; 5 2 -# `list_length` is an alias for `array_length`. +query TT +EXPLAIN SELECT array_length(a, 1) AS len FROM list_test; +---- +logical_plan +01)Projection: array_length(list_test.a, Int64(1)) AS len +02)--TableScan: list_test projection=[a] +physical_plan DataSourceExec: file_groups={}, projection=[array_length(a@1, 1) as len], file_type=vortex + query I -SELECT list_length(a) FROM list_test ORDER BY id; +SELECT array_length(a, 1) FROM list_test ORDER BY id; ---- 3 4 @@ -55,9 +58,16 @@ SELECT list_length(a) FROM list_test ORDER BY id; 5 2 -# The explicit first-dimension form `array_length(a, 1)` is equivalent. +query TT +EXPLAIN SELECT list_length(a) AS len FROM list_test; +---- +logical_plan +01)Projection: array_length(list_test.a) AS list_length(list_test.a) AS len +02)--TableScan: list_test projection=[a] +physical_plan DataSourceExec: file_groups={}, projection=[array_length(a@1) as len], file_type=vortex + query I -SELECT array_length(a, 1) FROM list_test ORDER BY id; +SELECT list_length(a) FROM list_test ORDER BY id; ---- 3 4 @@ -65,17 +75,27 @@ SELECT array_length(a, 1) FROM list_test ORDER BY id; 5 2 -# array_length over the other list column, including the empty list (length 0, not NULL). -query I -SELECT array_length(b) FROM list_test ORDER BY id; +# Projections on columns a and b are pushed down + +query TT +EXPLAIN SELECT array_length(a) as len_a, array_length(b) AS len_b FROM list_test; ---- -1 -2 -3 -4 -0 +logical_plan +01)Projection: array_length(list_test.a) AS len_a, array_length(list_test.b) AS len_b +02)--TableScan: list_test projection=[a, b] +physical_plan DataSourceExec: file_groups={}, projection=[array_length(a@1) as len_a, array_length(b@2) as len_b], file_type=vortex + +query II +SELECT array_length(a), array_length(b) FROM list_test ORDER BY id; +---- +3 1 +4 2 +1 3 +5 4 +2 0 + +# Filter on a is pushed down -# array_length in WHERE is pushed into the scan as a predicate (no FilterExec above the scan). query TT EXPLAIN SELECT id FROM list_test WHERE array_length(a) >= 4; ---- @@ -91,34 +111,26 @@ SELECT id FROM list_test WHERE array_length(a) >= 4 ORDER BY id; 2 4 -# array_length on the same column in both SELECT and WHERE produces the correct result. -query I -SELECT array_length(a) FROM list_test WHERE array_length(a) >= 4 ORDER BY id; ----- -4 -5 +# Filters on a and b are pushed down -# array_length filtering a different column than the projection. -query II -SELECT id, array_length(a) FROM list_test WHERE array_length(b) >= 3 ORDER BY id; +query TT +EXPLAIN SELECT id FROM list_test WHERE array_length(a) >= 4 AND array_length(b) >= 3; ---- -3 1 -4 5 +logical_plan +01)Projection: list_test.id +02)--Filter: array_length(list_test.a) >= UInt64(4) AND array_length(list_test.b) >= UInt64(3) +03)----TableScan: list_test projection=[id, a, b], partial_filters=[array_length(list_test.a) >= UInt64(4), array_length(list_test.b) >= UInt64(3)] +physical_plan DataSourceExec: file_groups={}, projection=[id], file_type=vortex, predicate: array_length(a@1) >= 4 AND array_length(b@2) >= 3 -# array_length over different columns in both SELECT and WHERE. -query II -SELECT array_length(a), array_length(b) FROM list_test WHERE array_length(b) >= 2 ORDER BY id; +query I +SELECT id FROM list_test WHERE array_length(a) >= 4 AND array_length(b) >= 3 ORDER BY id; ---- -4 2 -1 3 -5 4 +4 + +# Projection and filter on column a are both pushed down in case of no repartitioning -# The projection fold interacts with parallelism. When a filter is present, DataFusion's -# EnforceDistribution rule inserts a RepartitionExec to parallelize the work above the scan; it -# lands between the ProjectionExec and the DataSourceExec and blocks the projection fold, so only -# the filter is pushed down. (target_partitions is pinned so the RepartitionExec count is stable.) statement ok -SET datafusion.execution.target_partitions = 2; +SET datafusion.execution.target_partitions = 1; query TT EXPLAIN SELECT array_length(a) AS len FROM list_test WHERE array_length(a) >= 4; @@ -127,21 +139,27 @@ logical_plan 01)Projection: array_length(list_test.a) AS len 02)--Filter: array_length(list_test.a) >= UInt64(4) 03)----TableScan: list_test projection=[a], partial_filters=[array_length(list_test.a) >= UInt64(4)] -physical_plan -01)ProjectionExec: expr=[array_length(a@0) as len] -02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -03)----DataSourceExec: file_groups={}, projection=[a], file_type=vortex, predicate: array_length(a@1) >= 4 +physical_plan DataSourceExec: file_groups={}, projection=[array_length(a@1) as len], file_type=vortex, predicate: array_length(a@1) >= 4 -# With a single partition no RepartitionExec is inserted, so the ProjectionExec stays adjacent to -# the scan and both the projection and the predicate fold into it. -statement ok -SET datafusion.execution.target_partitions = 1; +query I +SELECT array_length(a) FROM list_test WHERE array_length(a) >= 4 ORDER BY id; +---- +4 +5 + +# Projection on column a and filter on column b are both pushed down in case of no repartitioning query TT -EXPLAIN SELECT array_length(a) AS len FROM list_test WHERE array_length(a) >= 4; +EXPLAIN SELECT array_length(a) AS len FROM list_test WHERE array_length(b) >= 3; ---- logical_plan 01)Projection: array_length(list_test.a) AS len -02)--Filter: array_length(list_test.a) >= UInt64(4) -03)----TableScan: list_test projection=[a], partial_filters=[array_length(list_test.a) >= UInt64(4)] -physical_plan DataSourceExec: file_groups={}, projection=[array_length(a@1) as len], file_type=vortex, predicate: array_length(a@1) >= 4 +02)--Filter: array_length(list_test.b) >= UInt64(3) +03)----TableScan: list_test projection=[a, b], partial_filters=[array_length(list_test.b) >= UInt64(3)] +physical_plan DataSourceExec: file_groups={}, projection=[array_length(a@1) as len], file_type=vortex, predicate: array_length(b@2) >= 3 + +query I +SELECT array_length(a) FROM list_test WHERE array_length(b) >= 3 ORDER BY id; +---- +1 +5