Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 173 additions & 65 deletions datafusion/physical-expr/src/expressions/binary/kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@

//! This module contains computation kernels that are specific to
//! datafusion and not (yet) targeted to port upstream to arrow

use arrow::array::*;
use arrow::buffer::{MutableBuffer, NullBuffer};
use arrow::buffer::{Buffer, NullBuffer, ScalarBuffer};
use arrow::compute::kernels::bitwise::{
bitwise_and, bitwise_and_scalar, bitwise_or, bitwise_or_scalar, bitwise_shift_left,
bitwise_shift_left_scalar, bitwise_shift_right, bitwise_shift_right_scalar,
bitwise_xor, bitwise_xor_scalar,
};
use arrow::compute::kernels::boolean::not;
use arrow::compute::kernels::comparison::{regexp_is_match, regexp_is_match_scalar};
use arrow::datatypes::DataType;
use arrow::datatypes::{ByteViewType, DataType};
use arrow::error::ArrowError;
use datafusion_common::{Result, ScalarValue};
use datafusion_common::{internal_err, plan_err};
use std::marker::PhantomData;

use std::sync::Arc;

Expand Down Expand Up @@ -161,6 +163,169 @@ create_left_integral_dyn_scalar_kernel!(
bitwise_shift_left_scalar
);

struct ConcatByteViewBuilder<T>
where
T: ByteViewType,
{
views: Vec<u128>,
data: Vec<u8>,
inline: Vec<u8>,
phantom: PhantomData<T>,
}

impl<T> ConcatByteViewBuilder<T>
where
T: ByteViewType,
{
/// Returns the elementwise concatenation of two [`GenericByteViewArray`]s.
fn concat_elements_view_array(
left: &GenericByteViewArray<T>,
right: &GenericByteViewArray<T>,
) -> Result<GenericByteViewArray<T>, ArrowError> {
let len = left.len();
if len != right.len() {
return Err(ArrowError::ComputeError(format!(
"Arrays must have the same length: {} != {}",
len,
right.len()
)));
}

let mut null_buffer = NullBuffer::union(left.nulls(), right.nulls());
if let Some(n) = &null_buffer
&& n.null_count() == 0
{
null_buffer = None
}

match null_buffer {
None => {
let data_size = left
.lengths()
.zip(right.lengths())
.map(|(l, r)| l + r)
.filter(|len| *len > MAX_INLINE_VIEW_LEN)
.map(|len| len as usize)
.sum();

if data_size > u32::MAX as usize {
return Err(ArrowError::ArithmeticOverflow(
"byte array offset overflow".to_string(),
));
}

let mut builder = Self::with_capacity(len, data_size);
for (l, r) in left.bytes_iter().zip(right.bytes_iter()) {
builder.append_concat_view(l, r);
}
builder.finish(None)
}
Some(nulls) => {
let data_size = left
.lengths()
.zip(right.lengths())
.zip(nulls.iter())
.filter(|((_, _), not_null)| *not_null)
.map(|((l, r), _)| l + r)
.filter(|len| *len > MAX_INLINE_VIEW_LEN)
.map(|len| len as usize)
.sum();

if data_size > u32::MAX as usize {
return Err(ArrowError::ArithmeticOverflow(
"byte array offset overflow".to_string(),
));
}

let mut builder = Self::with_capacity(len, data_size);
for ((l, r), not_null) in
left.bytes_iter().zip(right.bytes_iter()).zip(nulls.iter())
{
if not_null {
builder.append_concat_view(l, r);
} else {
builder.append_empty_view();
}
}

builder.finish(Some(nulls))
}
}
}

fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
Self {
views: Vec::with_capacity(item_capacity),
data: Vec::with_capacity(data_capacity),
inline: Vec::with_capacity(MAX_INLINE_VIEW_LEN as usize),
phantom: PhantomData,
}
}

/// Append a view containing the concatenation of `left` and `right`.
fn append_concat_view(&mut self, left: &[u8], right: &[u8]) {
let total_len = left.len() + right.len();
if total_len > MAX_INLINE_VIEW_LEN as usize {
let offset = self.data.len();

// SAFETY: we've checked that the total data size is within u32::MAX
// so offset cannot exceed it.
// Not using `u32::try_from` on each insertion makes a ~5% difference
// in benchmarking
debug_assert!(offset <= u32::MAX as usize);
let view_offset: u32 = offset as u32;
self.data.extend_from_slice(left);
self.data.extend_from_slice(right);
self.views
.push(make_view(&self.data[offset..], 0, view_offset));
} else {
self.inline.extend_from_slice(left);
self.inline.extend_from_slice(right);
self.views.push(make_view(&self.inline, 0, 0));
self.inline.clear();
};
}

/// Append an empty view.
#[inline]
fn append_empty_view(&mut self) {
self.views.push(0);
}

fn finish(
self,
null_buffer: Option<NullBuffer>,
) -> Result<GenericByteViewArray<T>, ArrowError> {
if let Some(ref nulls) = null_buffer
&& nulls.len() != self.views.len()
{
return Err(ArrowError::ComputeError(format!(
"Null buffer length ({}) must match row count ({})",
nulls.len(),
self.views.len()
)));
}

let buffers = if self.data.is_empty() {
Arc::from([])
} else {
Arc::from([Buffer::from(self.data)])
};

// SAFETY: views were constructed with correct lengths, offsets, and
// prefixes. UTF-8 validity is implicitly guaranteed by never concatenating
// arrays with mixed ByteViewTypes.
let array = unsafe {
GenericByteViewArray::<T>::new_unchecked(
ScalarBuffer::from(self.views),
buffers,
null_buffer,
)
};
Ok(array)
}
}

/// Concatenates two `StringViewArray`s element-wise.
/// If either element is `Null`, the result element is also `Null`.
///
Expand All @@ -171,37 +336,9 @@ pub fn concat_elements_utf8view(
left: &StringViewArray,
right: &StringViewArray,
) -> std::result::Result<StringViewArray, ArrowError> {
if left.len() != right.len() {
return Err(ArrowError::ComputeError(format!(
"Arrays must have the same length: {} != {}",
left.len(),
right.len()
)));
}
let mut result = StringViewBuilder::with_capacity(left.len());

// Avoid reallocations by writing to a reused buffer (note we could be even
// more efficient by creating the view directly here and avoid the buffer
// but that would be more complex)
let mut buffer = String::new();

// Pre-compute combined null bitmap, so the per-row NULL check is more
// efficient
let nulls = NullBuffer::union(left.nulls(), right.nulls());

for i in 0..left.len() {
if nulls.as_ref().is_some_and(|n| n.is_null(i)) {
result.append_null();
} else {
let l = left.value(i);
let r = right.value(i);
buffer.clear();
buffer.push_str(l);
buffer.push_str(r);
result.try_append_value(&buffer)?;
}
}
Ok(result.finish())
// TODO Use the kernel for arrow-rs once https://github.com/apache/arrow-rs/pull/10161
// has been merged
ConcatByteViewBuilder::concat_elements_view_array(left, right)
}

/// Concatenates two `BinaryViewArray`s element-wise.
Expand All @@ -214,38 +351,9 @@ pub fn concat_elements_binary_view_array(
left: &BinaryViewArray,
right: &BinaryViewArray,
) -> std::result::Result<BinaryViewArray, ArrowError> {
if left.len() != right.len() {
return Err(ArrowError::ComputeError(format!(
"Arrays must have the same length: {} != {}",
left.len(),
right.len()
)));
}
let mut result = BinaryViewBuilder::with_capacity(left.len());

// Avoid reallocations by writing to a reused buffer (note we could be even
// more efficient by creating the view directly here and avoid the buffer
// but that would be more complex)
let mut buffer = MutableBuffer::new(0);

// Pre-compute combined null bitmap, so the per-row NULL check is more
// efficient
let nulls = NullBuffer::union(left.nulls(), right.nulls());

for i in 0..left.len() {
if nulls.as_ref().is_some_and(|n| n.is_null(i)) {
result.append_null();
} else {
let l = left.value(i);
let r = right.value(i);
buffer.clear();
buffer.extend_from_slice(l);
buffer.extend_from_slice(r);
// No try-version of append_value
result.try_append_value(&buffer)?;
}
}
Ok(result.finish())
// TODO Use the kernel for arrow-rs once https://github.com/apache/arrow-rs/pull/10161
// has been merged
ConcatByteViewBuilder::concat_elements_view_array(left, right)
}

/// Invoke a compute kernel on a pair of binary data arrays with flags
Expand Down
Loading