feat(spark): add concat_ws with array support#20928
feat(spark): add concat_ws with array support#20928davidlghellin wants to merge 7 commits intoapache:mainfrom
concat_ws with array support#20928Conversation
There was a problem hiding this comment.
Pull request overview
Adds Spark-compatible support for concat_ws in the DataFusion Spark shim, including array-argument expansion semantics, and replaces previously-commented SLT examples with executable Spark SQLLogicTest coverage.
Changes:
- Register new Spark UDF
concat_wsin the Spark string function module. - Implement
SparkConcatWswith support for array arguments and Spark-style null handling. - Expand
concat_ws.sltinto a broader suite of executable tests (scalar, arrays, columns, and edge cases).
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| datafusion/sqllogictest/test_files/spark/string/concat_ws.slt | Converts commented examples into runnable SLT queries and adds broader coverage for Spark concat_ws behavior. |
| datafusion/spark/src/function/string/mod.rs | Registers and exports the new Spark concat_ws UDF. |
| datafusion/spark/src/function/string/concat_ws.rs | New Spark-compatible concat_ws implementation (including array expansion) plus unit tests. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
c08bca3 to
b251419
Compare
|
Thanks @andygrove for the branch update! The merge commit got lost on my pull --rebase, so I re-synced and fixed the errors manually. Should be green now. |
| if args.args.is_empty() { | ||
| return Ok(ColumnarValue::Scalar(ScalarValue::Utf8( | ||
| Some(String::new()), | ||
| ))); | ||
| } |
There was a problem hiding this comment.
I believe Spark throws and error on 0 arguments
| spark_concat_ws_with_arrays(&args.args, args.number_rows) | ||
| } | ||
|
|
||
| fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> { |
There was a problem hiding this comment.
This function is called only when signature is user_defined; as the code is, it is essentially dead code
| // Convert all to arrays for uniform processing | ||
| let arrays: Vec<ArrayRef> = args | ||
| .iter() | ||
| .map(|arg| arg.to_array(num_rows)) | ||
| .collect::<Result<Vec<_>>>()?; |
| for _ in 0..num_rows { | ||
| builder.append_null(); | ||
| } | ||
| return Ok(ColumnarValue::Array(Arc::new(builder.finish()) as ArrayRef)); |
There was a problem hiding this comment.
Can just return a scalar here instead of allocating an array
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { |
There was a problem hiding this comment.
How many of these tests can instead be SLTs?
| } | ||
|
|
||
| /// Get a string value from an array at a given row index, supporting Utf8/LargeUtf8/Utf8View. | ||
| fn get_string_value(arr: &ArrayRef, row_idx: usize) -> Result<String> { |
There was a problem hiding this comment.
Doesn't seem ideal to have to do this cast to array when extracting each value; should look into restructuring this cast to pull it higher up. e.g. could look into StringArrayType
| } | ||
| DataType::Binary => { | ||
| let bin_arr = | ||
| arr.as_any().downcast_ref::<BinaryArray>().ok_or_else(|| { |
There was a problem hiding this comment.
Why do we cast using as_any() here but for string above we use proper methods?
| } | ||
|
|
||
| /// Convert binary bytes to UTF-8 string, matching core `concat`/`concat_ws` behavior. | ||
| fn binary_to_utf8(bytes: &[u8]) -> Result<String> { |
There was a problem hiding this comment.
It seems we might benefit from coercing binary arrays to string arrays in the function signature so we don't need to have this handling logic here
Which issue does this PR close?
Part of #15914
Rationale for this change
DataFusion core's
concat_wsdoes not support array arguments. Spark'sconcat_ws(sep, ...)accepts both scalar strings and arrays, expanding array elements and skipping nulls. This is needed for Spark compatibility in thedatafusion-sparkcrate.What changes are included in this PR?
SparkConcatWsUDF indatafusion/spark/src/function/string/concat_ws.rsconcat_ws(sep, str1, str2, ...)with scalar stringsconcat_ws(',', array('a', 'b'), 'c')→"a,b,c"concat_ws(',')) returns empty stringmod.rs(make_udf_function!,export_functions!,functions())Are these changes tested?
Yes.
concat_ws.rs(basic, null values skipped, null separator, list arrays, list with nulls, mixed scalar+list, multiple rows)spark/string/concat_ws.sltcovering scalars, arrays, nulls, column expressions, and edge casesAre there any user-facing changes?
No. This is a new function in the
datafusion-sparkcrate only.