diff --git a/datafusion/physical-expr/src/expressions/binary/kernels.rs b/datafusion/physical-expr/src/expressions/binary/kernels.rs index e573d7ece2afa..54182540476ad 100644 --- a/datafusion/physical-expr/src/expressions/binary/kernels.rs +++ b/datafusion/physical-expr/src/expressions/binary/kernels.rs @@ -17,8 +17,9 @@ //! This module contains computation kernels that are specific to //! datafusion and not (yet) targeted to port upstream to arrow + use arrow::array::*; -use arrow::buffer::{MutableBuffer, NullBuffer}; +use arrow::buffer::{Buffer, NullBuffer, ScalarBuffer}; use arrow::compute::kernels::bitwise::{ bitwise_and, bitwise_and_scalar, bitwise_or, bitwise_or_scalar, bitwise_shift_left, bitwise_shift_left_scalar, bitwise_shift_right, bitwise_shift_right_scalar, @@ -26,10 +27,11 @@ use arrow::compute::kernels::bitwise::{ }; use arrow::compute::kernels::boolean::not; use arrow::compute::kernels::comparison::{regexp_is_match, regexp_is_match_scalar}; -use arrow::datatypes::DataType; +use arrow::datatypes::{ByteViewType, DataType}; use arrow::error::ArrowError; use datafusion_common::{Result, ScalarValue}; use datafusion_common::{internal_err, plan_err}; +use std::marker::PhantomData; use std::sync::Arc; @@ -161,6 +163,169 @@ create_left_integral_dyn_scalar_kernel!( bitwise_shift_left_scalar ); +struct ConcatByteViewBuilder +where + T: ByteViewType, +{ + views: Vec, + data: Vec, + inline: Vec, + phantom: PhantomData, +} + +impl ConcatByteViewBuilder +where + T: ByteViewType, +{ + /// Returns the elementwise concatenation of two [`GenericByteViewArray`]s. + fn concat_elements_view_array( + left: &GenericByteViewArray, + right: &GenericByteViewArray, + ) -> Result, ArrowError> { + let len = left.len(); + if len != right.len() { + return Err(ArrowError::ComputeError(format!( + "Arrays must have the same length: {} != {}", + len, + right.len() + ))); + } + + let mut null_buffer = NullBuffer::union(left.nulls(), right.nulls()); + if let Some(n) = &null_buffer + && n.null_count() == 0 + { + null_buffer = None + } + + match null_buffer { + None => { + let data_size = left + .lengths() + .zip(right.lengths()) + .map(|(l, r)| l + r) + .filter(|len| *len > MAX_INLINE_VIEW_LEN) + .map(|len| len as usize) + .sum(); + + if data_size > u32::MAX as usize { + return Err(ArrowError::ArithmeticOverflow( + "byte array offset overflow".to_string(), + )); + } + + let mut builder = Self::with_capacity(len, data_size); + for (l, r) in left.bytes_iter().zip(right.bytes_iter()) { + builder.append_concat_view(l, r); + } + builder.finish(None) + } + Some(nulls) => { + let data_size = left + .lengths() + .zip(right.lengths()) + .zip(nulls.iter()) + .filter(|((_, _), not_null)| *not_null) + .map(|((l, r), _)| l + r) + .filter(|len| *len > MAX_INLINE_VIEW_LEN) + .map(|len| len as usize) + .sum(); + + if data_size > u32::MAX as usize { + return Err(ArrowError::ArithmeticOverflow( + "byte array offset overflow".to_string(), + )); + } + + let mut builder = Self::with_capacity(len, data_size); + for ((l, r), not_null) in + left.bytes_iter().zip(right.bytes_iter()).zip(nulls.iter()) + { + if not_null { + builder.append_concat_view(l, r); + } else { + builder.append_empty_view(); + } + } + + builder.finish(Some(nulls)) + } + } + } + + fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self { + Self { + views: Vec::with_capacity(item_capacity), + data: Vec::with_capacity(data_capacity), + inline: Vec::with_capacity(MAX_INLINE_VIEW_LEN as usize), + phantom: PhantomData, + } + } + + /// Append a view containing the concatenation of `left` and `right`. + fn append_concat_view(&mut self, left: &[u8], right: &[u8]) { + let total_len = left.len() + right.len(); + if total_len > MAX_INLINE_VIEW_LEN as usize { + let offset = self.data.len(); + + // SAFETY: we've checked that the total data size is within u32::MAX + // so offset cannot exceed it. + // Not using `u32::try_from` on each insertion makes a ~5% difference + // in benchmarking + debug_assert!(offset <= u32::MAX as usize); + let view_offset: u32 = offset as u32; + self.data.extend_from_slice(left); + self.data.extend_from_slice(right); + self.views + .push(make_view(&self.data[offset..], 0, view_offset)); + } else { + self.inline.extend_from_slice(left); + self.inline.extend_from_slice(right); + self.views.push(make_view(&self.inline, 0, 0)); + self.inline.clear(); + }; + } + + /// Append an empty view. + #[inline] + fn append_empty_view(&mut self) { + self.views.push(0); + } + + fn finish( + self, + null_buffer: Option, + ) -> Result, ArrowError> { + if let Some(ref nulls) = null_buffer + && nulls.len() != self.views.len() + { + return Err(ArrowError::ComputeError(format!( + "Null buffer length ({}) must match row count ({})", + nulls.len(), + self.views.len() + ))); + } + + let buffers = if self.data.is_empty() { + Arc::from([]) + } else { + Arc::from([Buffer::from(self.data)]) + }; + + // SAFETY: views were constructed with correct lengths, offsets, and + // prefixes. UTF-8 validity is implicitly guaranteed by never concatenating + // arrays with mixed ByteViewTypes. + let array = unsafe { + GenericByteViewArray::::new_unchecked( + ScalarBuffer::from(self.views), + buffers, + null_buffer, + ) + }; + Ok(array) + } +} + /// Concatenates two `StringViewArray`s element-wise. /// If either element is `Null`, the result element is also `Null`. /// @@ -171,37 +336,9 @@ pub fn concat_elements_utf8view( left: &StringViewArray, right: &StringViewArray, ) -> std::result::Result { - if left.len() != right.len() { - return Err(ArrowError::ComputeError(format!( - "Arrays must have the same length: {} != {}", - left.len(), - right.len() - ))); - } - let mut result = StringViewBuilder::with_capacity(left.len()); - - // Avoid reallocations by writing to a reused buffer (note we could be even - // more efficient by creating the view directly here and avoid the buffer - // but that would be more complex) - let mut buffer = String::new(); - - // Pre-compute combined null bitmap, so the per-row NULL check is more - // efficient - let nulls = NullBuffer::union(left.nulls(), right.nulls()); - - for i in 0..left.len() { - if nulls.as_ref().is_some_and(|n| n.is_null(i)) { - result.append_null(); - } else { - let l = left.value(i); - let r = right.value(i); - buffer.clear(); - buffer.push_str(l); - buffer.push_str(r); - result.try_append_value(&buffer)?; - } - } - Ok(result.finish()) + // TODO Use the kernel for arrow-rs once https://github.com/apache/arrow-rs/pull/10161 + // has been merged + ConcatByteViewBuilder::concat_elements_view_array(left, right) } /// Concatenates two `BinaryViewArray`s element-wise. @@ -214,38 +351,9 @@ pub fn concat_elements_binary_view_array( left: &BinaryViewArray, right: &BinaryViewArray, ) -> std::result::Result { - if left.len() != right.len() { - return Err(ArrowError::ComputeError(format!( - "Arrays must have the same length: {} != {}", - left.len(), - right.len() - ))); - } - let mut result = BinaryViewBuilder::with_capacity(left.len()); - - // Avoid reallocations by writing to a reused buffer (note we could be even - // more efficient by creating the view directly here and avoid the buffer - // but that would be more complex) - let mut buffer = MutableBuffer::new(0); - - // Pre-compute combined null bitmap, so the per-row NULL check is more - // efficient - let nulls = NullBuffer::union(left.nulls(), right.nulls()); - - for i in 0..left.len() { - if nulls.as_ref().is_some_and(|n| n.is_null(i)) { - result.append_null(); - } else { - let l = left.value(i); - let r = right.value(i); - buffer.clear(); - buffer.extend_from_slice(l); - buffer.extend_from_slice(r); - // No try-version of append_value - result.try_append_value(&buffer)?; - } - } - Ok(result.finish()) + // TODO Use the kernel for arrow-rs once https://github.com/apache/arrow-rs/pull/10161 + // has been merged + ConcatByteViewBuilder::concat_elements_view_array(left, right) } /// Invoke a compute kernel on a pair of binary data arrays with flags