diff --git a/Cargo.lock b/Cargo.lock index fdca3237b71d0..5f9af4edc8f56 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2433,6 +2433,7 @@ dependencies = [ "datafusion-pruning", "insta", "itertools 0.14.0", + "log", "recursive", "tokio", ] diff --git a/datafusion/physical-optimizer/Cargo.toml b/datafusion/physical-optimizer/Cargo.toml index 38c8a7c37211f..eb201d0c2f655 100644 --- a/datafusion/physical-optimizer/Cargo.toml +++ b/datafusion/physical-optimizer/Cargo.toml @@ -51,6 +51,7 @@ datafusion-physical-expr-common = { workspace = true } datafusion-physical-plan = { workspace = true } datafusion-pruning = { workspace = true } itertools = { workspace = true } +log = { workspace = true } recursive = { workspace = true, optional = true } [dev-dependencies] diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index b9eb248f6e843..a7c0a4a1396df 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -40,6 +40,7 @@ pub mod limit_pushdown_past_window; pub mod limited_distinct_aggregation; pub mod optimizer; pub mod output_requirements; +pub mod parallel_window; pub mod projection_pushdown; pub use datafusion_pruning as pruning; pub mod hash_join_buffering; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index 0f81512b61c8e..a99fba4fa2b05 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -29,6 +29,7 @@ use crate::join_selection::JoinSelection; use crate::limit_pushdown::LimitPushdown; use crate::limited_distinct_aggregation::LimitedDistinctAggregation; use crate::output_requirements::OutputRequirements; +use crate::parallel_window::ParallelWindow; use crate::projection_pushdown::ProjectionPushdown; use crate::sanity_checker::SanityCheckPlan; use crate::topk_aggregation::TopKAggregation; @@ -187,6 +188,13 @@ impl PhysicalOptimizer { // [`EnsureRequirements`](crate::ensure_requirements) for the per-phase // breakdown, and // for the original failure mode. + // Re-shape no-PARTITION-BY RANGE-frame windows into a parallel + // form: SortExec(preserve_partitioning) + RangeRepartitionExec + // + parallel-aware BWAG. Runs *before* EnsureRequirements so + // we own the distribution decision — otherwise EnsureRequirements + // would satisfy BWAG's SinglePartition requirement by inserting + // an SPM that collapses the parallelism we're trying to create. + Arc::new(ParallelWindow::new()), Arc::new(EnsureRequirements::new()), // The CombinePartialFinalAggregate rule should be applied after distribution enforcement Arc::new(CombinePartialFinalAggregate::new()), diff --git a/datafusion/physical-optimizer/src/parallel_window.rs b/datafusion/physical-optimizer/src/parallel_window.rs new file mode 100644 index 0000000000000..4216a63987db1 --- /dev/null +++ b/datafusion/physical-optimizer/src/parallel_window.rs @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Parallelize bounded RANGE-frame window functions that have an ORDER BY +//! but no PARTITION BY by re-shaping the plan above the window's input. +//! +//! Runs *before* `EnsureRequirements`. For each eligible +//! `BoundedWindowAggExec`, this rule: +//! - inserts a `SortExec(preserve_partitioning=true)` on the ORDER BY +//! key above the window's existing input; +//! - inserts a `RangeRepartitionExec` carrying the halo distances above +//! that sort; +//! - rebuilds the `BoundedWindowAggExec` on top of the result with +//! `parallel_aware = true`, so its `required_input_distribution()` +//! returns `UnspecifiedDistribution` instead of `SinglePartition` — +//! which is what would otherwise force `EnsureRequirements` to insert +//! a `SortPreservingMergeExec` and collapse our parallelism. +//! +//! By owning the structural decisions before `EnsureRequirements` runs, +//! this rule avoids the post-hoc surgery of stripping an inserted +//! `SortPreservingMergeExec`. All boundary / global-extremes logic lives +//! in `RangeRepartitionExec`'s coordinator and runs against runtime +//! stats rather than plan-time `Statistics`. + +use crate::PhysicalOptimizerRule; +use datafusion_common::ScalarValue; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion_expr::{WindowFrameBound, WindowFrameUnits}; +use datafusion_physical_expr::LexOrdering; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::halo_drop::HaloDropExec; +use datafusion_physical_plan::range_repartition::RangeRepartitionExec; +use datafusion_physical_plan::windows::BoundedWindowAggExec; +use log::info; +use std::sync::Arc; + +#[derive(Default, Clone, Debug)] +pub struct ParallelWindow; + +impl ParallelWindow { + pub fn new() -> Self { + Self + } +} + +impl PhysicalOptimizerRule for ParallelWindow { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> datafusion_common::Result> { + let out = plan.transform_down(|node| { + let Some(window) = node.downcast_ref::() else { + return Ok(Transformed::no(node)); + }; + let Some((halo_preceding, halo_following)) = candidate_halo(window) + else { + return Ok(Transformed::no(node)); + }; + info!( + "ParallelWindow: candidate BoundedWindowAggExec (RANGE frame, no PARTITION BY); \ + halo: {halo_preceding} preceding, {halo_following} following" + ); + // `candidate_halo` already verified order_by.len()==1. + let sort_key = window.window_expr()[0].order_by()[0].clone(); + let lex = LexOrdering::new(vec![sort_key]) + .expect("candidate_halo guarantees one sort key"); + let original_input = Arc::clone(&node.children()[0]); + // Don't pre-insert a SortExec; RangeRepartitionExec now declares + // its required input ordering, so EnsureRequirements will plant + // the pipeline-breaking sort beneath us. Doing both would just + // produce a redundant SortExec that the optimizer collapses. + let range = Arc::new(RangeRepartitionExec::new( + original_input, + lex.clone(), + halo_preceding, + halo_following, + )); + // `parallel_aware = true` flips BWAG's required_input_distribution + // to UnspecifiedDistribution, so EnsureRequirements won't wrap + // us in an SPM. `can_repartition` is vacuous because + // candidate_halo already required partition_keys empty. + let new_window: Arc = Arc::new( + BoundedWindowAggExec::try_new( + window.window_expr().to_vec(), + range, + window.input_order_mode.clone(), + true, + )? + .with_parallel_aware(true), + ); + // Drop halo rows above the per-partition window. HaloDropExec + // reads its primary range from `input.runtime_partition_extremes`, + // which BWAG passes through and RangeRepartitionExec populates. + let drop_halo: Arc = + Arc::new(HaloDropExec::try_new(new_window, &lex)?); + // Jump past the result's children: the BWAG we just emitted is + // still a candidate by shape (RANGE frame, no PARTITION BY) and + // `transform_down` would otherwise re-wrap it forever. + Ok(Transformed::new(drop_halo, true, TreeNodeRecursion::Jump)) + })?; + Ok(out.data) + } + + fn name(&self) -> &str { + "ParallelWindow" + } + + fn schema_check(&self) -> bool { + true + } +} + +/// Returns `(halo_preceding, halo_following)` if the window matches the +/// v1 shape we know how to parallelize: no PARTITION BY, a single +/// `Column` sort key, RANGE frame, finite Int64 bounds (or CurrentRow). +/// Returns `None` otherwise so the rule leaves the plan alone. +fn candidate_halo(window: &BoundedWindowAggExec) -> Option<(i64, i64)> { + if !window.partition_keys().is_empty() { + return None; + } + let order_by = window.window_expr()[0].order_by(); + if order_by.len() != 1 { + return None; + } + if order_by[0].expr.downcast_ref::().is_none() { + return None; + } + let frame = window.window_expr()[0].get_window_frame(); + if frame.units != WindowFrameUnits::Range { + return None; + } + i64_halo(&frame.start_bound, &frame.end_bound) +} + +/// Extract `(halo_preceding, halo_following)` in order-key units from a +/// RANGE window frame. Returns `None` for UNBOUNDED bounds or non-`Int64` +/// distances. v1 scope: only `Preceding(Int64)` / `CurrentRow` for the +/// start bound, and `CurrentRow` / `Following(Int64)` for the end bound. +fn i64_halo(start: &WindowFrameBound, end: &WindowFrameBound) -> Option<(i64, i64)> { + let preceding = match start { + WindowFrameBound::Preceding(ScalarValue::Int64(Some(n))) => *n, + WindowFrameBound::CurrentRow => 0, + _ => return None, + }; + let following = match end { + WindowFrameBound::Following(ScalarValue::Int64(Some(n))) => *n, + WindowFrameBound::CurrentRow => 0, + _ => return None, + }; + Some((preceding, following)) +} diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 8577e86f00514..a64a2adb19544 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -29,7 +29,7 @@ use arrow_schema::Schema; pub use datafusion_common::hash_utils; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; pub use datafusion_common::utils::project_schema; -pub use datafusion_common::{ColumnStatistics, Statistics, internal_err}; +pub use datafusion_common::{ColumnStatistics, ScalarValue, Statistics, internal_err}; pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; pub use datafusion_expr::{Accumulator, ColumnarValue}; pub use datafusion_physical_expr::window::WindowExpr; @@ -93,6 +93,38 @@ use futures::stream::{StreamExt, TryStreamExt}; /// /// [`datafusion-examples`]: https://github.com/apache/datafusion/tree/main/datafusion-examples /// [`memory_pool_execution_plan.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs + +/// Endpoints of a single partition's output expressed in the operator's +/// declared output ordering (the `PhysicalSortExpr` list returned by +/// `equivalence_properties().output_ordering()`). +/// +/// `min` and `max` are tuples of values — one entry per sort key — taken at +/// the lex-smallest and lex-largest output rows. For the leading sort key +/// these are exactly the natural min/max of that key (after honoring +/// ASC/DESC). For trailing sort keys the entries hold the value of that key +/// at the lex-extreme row, not that key's own natural extreme. +/// +/// Default semantics — *observed*: these are the actual min/max of data this +/// partition has produced (or will produce, by the time the upstream is fully +/// consumed). `SortExec` is the canonical observer-style override. +/// +/// Exception — *intended*: `RangeRepartitionExec` returns each output +/// partition's *intended primary range* (the range a downstream `HaloDropExec` +/// should keep, with halo rows excluded), which is **narrower** than the data +/// the partition actually carries when halo is non-zero. The "useful lie" +/// is what lets the operator above the window strip halo without threading a +/// boundaries side-channel. Consumers that need observed extremes must not +/// read through a `RangeRepartitionExec` boundary when halo > 0. +#[derive(Debug, Clone)] +pub struct PartitionExtremes { + /// Sort-key values at the lex-smallest row across the partition. + pub min: Vec, + /// Sort-key values at the lex-largest row across the partition. + pub max: Vec, + /// Total non-empty rows that contributed to `min`/`max`. + pub row_count: usize, +} + pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { /// Short name for the ExecutionPlan, such as 'DataSourceExec'. /// @@ -513,6 +545,29 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { Ok(Arc::new(Statistics::new_unknown(&self.schema()))) } + /// Runtime-derived endpoints of `partition`'s output along the operator's + /// declared output ordering (i.e. each `PhysicalSortExpr` in + /// `equivalence_properties().output_ordering()`). + /// + /// Returns `Ok(None)` by default. Operators that are pipeline-breaking + /// along their output ordering (e.g. `SortExec`) may override to expose + /// the lex-min / lex-max tuple once the partition's input has been fully + /// consumed. Callers are responsible for ensuring the upstream has made + /// enough progress (typically by polling `execute()`) before relying on + /// the result; until then this returns `Ok(None)`. + /// + /// See [`PartitionExtremes`] for the result shape — and read the type doc + /// before assuming "observed" semantics: a few operators (notably + /// `RangeRepartitionExec`) intentionally return the *intended* range, not + /// the data they will actually carry. + fn runtime_partition_extremes( + &self, + partition: usize, + ) -> Result> { + let _ = partition; + Ok(None) + } + /// Returns `true` if a limit can be safely pushed down through this /// `ExecutionPlan` node. /// diff --git a/datafusion/physical-plan/src/halo_drop.rs b/datafusion/physical-plan/src/halo_drop.rs new file mode 100644 index 0000000000000..1ea3e6822041e --- /dev/null +++ b/datafusion/physical-plan/src/halo_drop.rs @@ -0,0 +1,228 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Drop halo rows above a `BoundedWindowAggExec` running per-partition +//! over `RangeRepartitionExec`-routed input. +//! +//! Each partition reads its *intended primary range* from +//! `input.runtime_partition_extremes(partition)` — which `RangeRepartitionExec` +//! exposes as a "useful lie" — and filters rows whose leading sort key +//! falls outside that range. Halo rows (rows duplicated into this +//! partition for the window's frame context at boundaries) sit *outside* +//! the primary range by construction, so the filter drops them. +//! +//! Reads extremes lazily on the first batch, because +//! `RangeRepartitionExec`'s coordinator populates ranges before routing +//! any data — so any batch arriving at us implies ranges are ready. + +use std::sync::Arc; + +use arrow::array::{Array, BooleanArray, Int64Array, RecordBatch}; +use arrow::compute::filter_record_batch; +use datafusion_common::{Result, ScalarValue, internal_datafusion_err}; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::LexOrdering; +use datafusion_physical_expr::expressions::Column; + +use crate::stream::RecordBatchStreamAdapter; +use crate::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, + SendableRecordBatchStream, +}; + +#[derive(Debug)] +pub struct HaloDropExec { + input: Arc, + /// Column index of the leading sort key in the input schema. We + /// resolve it at construction from a `LexOrdering`'s first key, + /// which must be a `Column` (the same constraint `ParallelWindow` + /// applies to candidate windows). + sort_col: usize, + cache: Arc, +} + +impl HaloDropExec { + pub fn try_new( + input: Arc, + ordering: &LexOrdering, + ) -> Result { + // PhysicalExpr: Any, so downcast_ref via Any works directly on + // the Arc through auto-deref. + let sort_col = ordering + .first() + .expr + .downcast_ref::() + .ok_or_else(|| { + internal_datafusion_err!( + "HaloDropExec: leading sort key must be a Column" + ) + })? + .index(); + let cache = Arc::clone(input.properties()); + Ok(Self { + input, + sort_col, + cache, + }) + } +} + +impl DisplayAs for HaloDropExec { + fn fmt_as( + &self, + _t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "HaloDropExec") + } +} + +impl ExecutionPlan for HaloDropExec { + fn name(&self) -> &'static str { + "HaloDropExec" + } + + fn properties(&self) -> &Arc { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + let input = children.swap_remove(0); + let cache = Arc::clone(input.properties()); + Ok(Arc::new(Self { + input, + sort_col: self.sort_col, + cache, + })) + } + + fn maintains_input_order(&self) -> Vec { + vec![true] + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + let input = self.input.execute(partition, context)?; + let schema = self.schema(); + let extremes_provider = Arc::clone(&self.input); + let sort_col = self.sort_col; + let stream = filter_stream(input, extremes_provider, partition, sort_col); + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } +} + +/// Drains `input`, lazy-initializing the `[min, max]` filter range on +/// the first batch by calling +/// `extremes_provider.runtime_partition_extremes(partition)`. Subsequent +/// batches reuse the cached range. +fn filter_stream( + input: SendableRecordBatchStream, + extremes_provider: Arc, + partition: usize, + sort_col: usize, +) -> impl futures::Stream> + Send { + struct St { + input: SendableRecordBatchStream, + range: Option<(i64, i64)>, + provider: Arc, + partition: usize, + sort_col: usize, + } + let st = St { + input, + range: None, + provider: extremes_provider, + partition, + sort_col, + }; + futures::stream::try_unfold(st, |mut st| async move { + use futures::StreamExt; + loop { + let batch = match st.input.next().await { + Some(Ok(b)) => b, + Some(Err(e)) => return Err(e), + None => return Ok(None), + }; + if st.range.is_none() { + let extremes = st + .provider + .runtime_partition_extremes(st.partition)? + .ok_or_else(|| { + internal_datafusion_err!( + "HaloDropExec: extremes unavailable on first batch \ + — RangeRepartitionExec coordinator should have \ + populated them before routing any rows" + ) + })?; + let lo = scalar_to_i64(extremes.min.first())?; + let hi = scalar_to_i64(extremes.max.first())?; + st.range = Some((lo, hi)); + } + let (lo, hi) = st.range.unwrap(); + let filtered = filter_batch(&batch, st.sort_col, lo, hi)?; + if filtered.num_rows() == 0 { + continue; // skip empty filtered batches + } + return Ok(Some((filtered, st))); + } + }) +} + +fn scalar_to_i64(s: Option<&ScalarValue>) -> Result { + match s { + Some(ScalarValue::Int64(Some(v))) => Ok(*v), + _ => Err(internal_datafusion_err!( + "HaloDropExec: leading extreme must be non-null Int64" + )), + } +} + +fn filter_batch( + batch: &RecordBatch, + sort_col: usize, + lo: i64, + hi: i64, +) -> Result { + let col = batch + .column(sort_col) + .as_any() + .downcast_ref::() + .ok_or_else(|| { + internal_datafusion_err!("HaloDropExec: leading sort column must be Int64") + })?; + let mask: BooleanArray = (0..col.len()) + .map(|i| { + if col.is_null(i) { + Some(false) + } else { + let v = col.value(i); + Some(v >= lo && v <= hi) + } + }) + .collect(); + Ok(filter_record_batch(batch, &mask)?) +} diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index c7b1d4729e21d..6f0aeda26cda7 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -42,9 +42,9 @@ pub use datafusion_physical_expr::{ pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; pub use crate::execution_plan::{ - ExecutionPlan, ExecutionPlanProperties, PlanProperties, collect, collect_partitioned, - displayable, execute_input_stream, execute_stream, execute_stream_partitioned, - get_plan_string, with_new_children_if_necessary, + ExecutionPlan, ExecutionPlanProperties, PartitionExtremes, PlanProperties, collect, + collect_partitioned, displayable, execute_input_stream, execute_stream, + execute_stream_partitioned, get_plan_string, with_new_children_if_necessary, }; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; @@ -76,6 +76,7 @@ pub mod execution_plan; pub mod explain; pub mod filter; pub mod filter_pushdown; +pub mod halo_drop; pub mod joins; pub mod limit; pub mod memory; @@ -83,6 +84,7 @@ pub mod metrics; pub mod operator_statistics; pub mod placeholder_row; pub mod projection; +pub mod range_repartition; pub mod recursive_query; pub mod repartition; pub mod scalar_subquery; diff --git a/datafusion/physical-plan/src/range_repartition.rs b/datafusion/physical-plan/src/range_repartition.rs new file mode 100644 index 0000000000000..dec1763d54a9f --- /dev/null +++ b/datafusion/physical-plan/src/range_repartition.rs @@ -0,0 +1,636 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Range-partition an input stream on a single Int64 order-key into N +//! output partitions, with halo overlap for bounded RANGE-frame window +//! functions sitting above it. +//! +//! `execute()`'s first call spawns a coordinator that: +//! 1. opens `child.execute(k)` for every input partition `k`, +//! 2. drives each stream to its first batch (which makes the pipeline- +//! breaking sort child populate its `PartitionExtremes` slot), +//! 3. reads `child.runtime_partition_extremes(k)` per input, +//! 4. lex-reduces those into a single global [`PartitionExtremes`], derives +//! `N` equal-width Int64 bucket boundaries from `[global.min, +//! global.max]`, and computes per-bucket expanded ranges by +//! extending each primary [b_i, b_{i+1}) outward by +//! `halo_preceding` / `halo_following`, +//! 5. then for every batch flowing out of every input stream, splits +//! the batch into per-bucket pieces (rows whose order key lies in +//! bucket `b`'s expanded range), and sends each piece into bucket +//! `b`'s output channel. +//! +//! Halo rows therefore appear in *two* output partitions (their primary +//! bucket and the neighbor whose expanded range reaches them). That's +//! correct for letting the per-bucket window operator compute frame +//! values at the seams — but it also means rows are duplicated in the +//! merged output until a future `HaloDropExec` strips halo rows after +//! the window. + +use std::sync::{Arc, Mutex}; + +use arrow::array::{Array, Int64Array, RecordBatch, UInt32Array}; +use arrow::compute::take_arrays; +use arrow::datatypes::SchemaRef; +use datafusion_common::{DataFusionError, Result, internal_datafusion_err}; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::LexOrdering; +use datafusion_physical_expr_common::sort_expr::OrderingRequirements; +use futures::StreamExt; +use log::info; +use tokio::sync::{mpsc, oneshot}; + +use datafusion_common::ScalarValue; + +use crate::sorts::sort::lex_compare; +use crate::stream::RecordBatchStreamAdapter; +use crate::{ + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, + PartitionExtremes, PlanProperties, SendableRecordBatchStream, +}; + +#[derive(Debug)] +pub struct RangeRepartitionExec { + input: Arc, + cache: Arc, + /// Required input ordering — passed down from the consumer (window + /// operator) so EnsureRequirements inserts the pipeline-breaking sort + /// *below* us, not above. Same key feeds the routing decision. + ordering: LexOrdering, + /// Halo distance preceding each bucket's primary range, in + /// leading-sort-key units. Carried over from the window frame at plan + /// time so the coordinator can derive per-bucket expanded ranges. + halo_preceding: i64, + /// Halo distance following each bucket's primary range. + halo_following: i64, + state: Arc>, + /// Per-output-partition primary `[lo, hi_exclusive)` ranges, filled + /// by the coordinator before any batch is routed. Surfaced through + /// `runtime_partition_extremes(partition)` so downstream operators + /// (e.g. HaloDropExec) can read each bucket's intended primary + /// range without needing the global extremes. + bucket_primary_ranges: Arc>>>, +} + +struct State { + initialized: bool, + /// One `oneshot::Receiver` per output partition, populated when the + /// coordinator hands off this partition's data. `take()`n by the + /// corresponding `execute(partition)` call. + handoffs: Vec>>>, +} + +/// Per-output-partition payload the coordinator hands to its stream. +/// Once the coordinator has computed boundaries it starts router tasks +/// that funnel routed batches into bucket-keyed mpsc channels. Each +/// output partition's stream drains its receiver. +struct PartitionData { + rx: mpsc::Receiver>, +} + +impl std::fmt::Debug for State { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("State") + .field("initialized", &self.initialized) + .field("handoffs", &self.handoffs.len()) + .finish() + } +} + +impl RangeRepartitionExec { + pub fn new( + input: Arc, + ordering: LexOrdering, + halo_preceding: i64, + halo_following: i64, + ) -> Self { + let n = input.output_partitioning().partition_count(); + let cache = Arc::clone(input.properties()); + Self { + input, + cache, + ordering, + halo_preceding, + halo_following, + state: Arc::new(Mutex::new(State { + initialized: false, + handoffs: (0..n).map(|_| None).collect(), + })), + bucket_primary_ranges: Arc::new(Mutex::new(None)), + } + } + + pub fn input(&self) -> &Arc { + &self.input + } +} + +impl DisplayAs for RangeRepartitionExec { + fn fmt_as( + &self, + _t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "RangeRepartitionExec") + } +} + +impl ExecutionPlan for RangeRepartitionExec { + fn name(&self) -> &'static str { + "RangeRepartitionExec" + } + + fn properties(&self) -> &Arc { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + Ok(Arc::new(Self::new( + children.swap_remove(0), + self.ordering.clone(), + self.halo_preceding, + self.halo_following, + ))) + } + + fn required_input_ordering(&self) -> Vec> { + vec![Some(OrderingRequirements::from(self.ordering.clone()))] + } + + fn maintains_input_order(&self) -> Vec { + vec![true] + } + + /// Returns each output partition's *intended primary range* as + /// inclusive `[min, max]` — not the actual range of routed data + /// (which is wider, by `halo_preceding`/`halo_following`). This is a + /// "useful lie" the downstream `HaloDropExec` consumes to filter + /// halo rows back out. + /// + /// Returns `Ok(None)` if the coordinator hasn't computed boundaries + /// yet — callers must drive the input stream to first batch before + /// reading, per the trait contract on `runtime_partition_extremes`. + fn runtime_partition_extremes( + &self, + partition: usize, + ) -> Result> { + let guard = self.bucket_primary_ranges.lock().map_err(|_| { + internal_datafusion_err!( + "RangeRepartitionExec bucket_primary_ranges mutex poisoned" + ) + })?; + let Some(ranges) = guard.as_ref() else { + return Ok(None); + }; + let &(lo, hi_excl) = &ranges[partition]; + // Convert [lo, hi_exclusive) → inclusive [min, max]. + let max = hi_excl.saturating_sub(1); + Ok(Some(PartitionExtremes { + min: vec![ScalarValue::Int64(Some(lo))], + max: vec![ScalarValue::Int64(Some(max))], + row_count: 0, // not tracked; consumers shouldn't rely on it + })) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + let mut state = self.state.lock().map_err(|_| { + internal_datafusion_err!("RangeRepartitionExec mutex poisoned") + })?; + if !state.initialized { + state.initialized = true; + let n = state.handoffs.len(); + let mut senders = Vec::with_capacity(n); + for slot in state.handoffs.iter_mut() { + let (tx, rx) = oneshot::channel(); + senders.push(tx); + *slot = Some(rx); + } + let child = Arc::clone(&self.input); + let ctx = Arc::clone(&context); + let halo_preceding = self.halo_preceding; + let halo_following = self.halo_following; + let primaries = Arc::clone(&self.bucket_primary_ranges); + tokio::spawn(coordinator( + child, + ctx, + senders, + halo_preceding, + halo_following, + primaries, + )); + } + let rx = state + .handoffs + .get_mut(partition) + .and_then(Option::take) + .ok_or_else(|| { + internal_datafusion_err!("partition {partition} already taken") + })?; + drop(state); + + let schema = self.schema(); + Ok(Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&schema), + partition_stream(schema, rx), + ))) + } +} + +/// Stream that awaits the coordinator's handoff for one output partition, +/// then drains the bucket-keyed mpsc receiver router tasks are pushing +/// into. If the coordinator drops the sender (e.g. setup failed) the +/// stream surfaces an error. +fn partition_stream( + _schema: SchemaRef, + rx: oneshot::Receiver>, +) -> impl futures::Stream> + Send { + use futures::stream::{TryStreamExt, once}; + once(async move { + let data = rx + .await + .map_err(|_| internal_datafusion_err!("coordinator dropped"))??; + let mut bucket_rx = data.rx; + let inner = futures::stream::poll_fn(move |cx| bucket_rx.poll_recv(cx)); + Ok::<_, DataFusionError>(inner) + }) + .try_flatten() +} + +/// Coordinator task: drive every input partition to first batch, gather +/// runtime extremes, log the lex-reduced global, then hand off per-input +/// payloads to their corresponding output partition. +async fn coordinator( + child: Arc, + ctx: Arc, + mut senders: Vec>>, + halo_preceding: i64, + halo_following: i64, + bucket_primary_ranges: Arc>>>, +) { + let n = senders.len(); + + // Phase 1: open every input stream and pull the first batch from each. + let mut firsts: Vec<(Option, SendableRecordBatchStream)> = + Vec::with_capacity(n); + for k in 0..n { + let mut stream = match child.execute(k, Arc::clone(&ctx)) { + Ok(s) => s, + Err(e) => { + let msg = format!("input {k} open failed: {e}"); + for tx in senders.drain(..) { + let _ = tx.send(Err(internal_datafusion_err!("{msg}"))); + } + return; + } + }; + let first = match stream.next().await { + Some(Ok(batch)) => Some(batch), + Some(Err(e)) => { + let msg = format!("first batch from input {k} failed: {e}"); + for tx in senders.drain(..) { + let _ = tx.send(Err(internal_datafusion_err!("{msg}"))); + } + return; + } + None => None, + }; + firsts.push((first, stream)); + } + + // Phase 2: collect per-input runtime extremes. + let per_input: Vec> = (0..n) + .map(|k| child.runtime_partition_extremes(k).ok().flatten()) + .collect(); + + // Phase 3: lex-reduce per-input → global, using the input's declared + // output ordering so direction and null ordering are honored. + let ordering: Option = child.output_ordering().cloned(); + let global = ordering + .as_ref() + .and_then(|o| reduce_global_extremes(&per_input, o)); + + info!( + "RangeRepartitionExec: coordinator gathered {} input partitions; \ + global extremes = {:?}", + n, global + ); + + // Phase 4: derive bucket boundaries from the global extremes. v1 is + // Int64-only — if anything's missing we propagate an error to all + // output streams. + let (lo, hi) = match int64_range(global.as_ref()) { + Some(v) => v, + None => { + for tx in senders.drain(..) { + let _ = tx.send(Err(internal_datafusion_err!( + "RangeRepartitionExec: leading sort key must be Int64 \ + with a non-empty global range" + ))); + } + return; + } + }; + let Some(boundaries) = equal_width_int64_boundaries(lo, hi, n) else { + for tx in senders.drain(..) { + let _ = tx.send(Err(internal_datafusion_err!( + "RangeRepartitionExec: cannot split [{lo}, {hi}] into {n} buckets" + ))); + } + return; + }; + log_buckets(lo, hi, &boundaries, halo_preceding, halo_following); + + // Stash per-bucket primary ranges where `runtime_partition_extremes` can + // see them. Done *before* any batch is routed so downstream operators + // that gate on first batch will read populated state. + if let Ok(mut guard) = bucket_primary_ranges.lock() { + *guard = Some(primary_ranges_from_boundaries(lo, hi, &boundaries)); + } + + // Phase 5: figure out which column carries the leading sort key. + let col_idx = match ordering + .as_ref() + .and_then(|o| o.first().expr.downcast_ref::()) + .map(|c| c.index()) + { + Some(idx) => idx, + None => { + for tx in senders.drain(..) { + let _ = tx.send(Err(internal_datafusion_err!( + "RangeRepartitionExec: leading sort key must be a Column \ + (got {:?})", + ordering + ))); + } + return; + } + }; + + // Phase 6: build N output mpsc channels, one per output partition / + // bucket. Hand each receiver to the corresponding output stream. + let mut bucket_txs: Vec>> = Vec::with_capacity(n); + for sender in senders.drain(..) { + let (tx, rx) = mpsc::channel(4); + bucket_txs.push(tx); + let _ = sender.send(Ok(PartitionData { rx })); + } + + // Phase 7: for each input partition, route its batches into per-bucket + // pieces and push to the corresponding bucket_txs. Drop the local + // bucket_txs once all router tasks complete so receivers see EOS. + let bucket_txs = Arc::new(bucket_txs); + let mut routers = Vec::with_capacity(firsts.len()); + for (first_batch, rest) in firsts.into_iter() { + let txs = Arc::clone(&bucket_txs); + let boundaries = boundaries.clone(); + routers.push(tokio::spawn(run_router( + first_batch, + rest, + txs, + boundaries, + col_idx, + halo_preceding, + halo_following, + ))); + } + // Wait for routers so we can drop the senders here (allowing receivers + // to observe EOS). Errors are propagated through the channels. + for handle in routers { + let _ = handle.await; + } + drop(bucket_txs); +} + +/// Drain one input partition's stream and split each batch into pieces +/// keyed by the bucket whose expanded range contains the row's leading +/// sort value. Halo rows land in two buckets (their primary and the +/// neighbor that needs them as halo). +async fn run_router( + first_batch: Option, + mut rest: SendableRecordBatchStream, + bucket_txs: Arc>>>, + boundaries: Vec, + col_idx: usize, + halo_preceding: i64, + halo_following: i64, +) { + if let Some(batch) = first_batch + && let Err(_) = route_batch( + &batch, + col_idx, + &boundaries, + halo_preceding, + halo_following, + &bucket_txs, + ) + .await + { + return; + } + while let Some(item) = rest.next().await { + let batch = match item { + Ok(b) => b, + Err(e) => { + // Best-effort propagation: try each bucket sender. The + // error is rendered to string because `DataFusionError` + // doesn't `Clone`. + let msg = e.to_string(); + for tx in bucket_txs.iter() { + let _ = tx + .send(Err(internal_datafusion_err!("input batch error: {msg}"))) + .await; + } + return; + } + }; + if route_batch( + &batch, + col_idx, + &boundaries, + halo_preceding, + halo_following, + &bucket_txs, + ) + .await + .is_err() + { + return; + } + } +} + +/// For each output bucket, take rows whose leading-sort-key value lands in +/// `[primary_low - halo_preceding, primary_high + halo_following)` and +/// push that piece into the bucket's channel. Bucket-driven loop so each +/// bucket's filter expression is in one place. +async fn route_batch( + batch: &RecordBatch, + col_idx: usize, + boundaries: &[i64], + halo_preceding: i64, + halo_following: i64, + bucket_txs: &[mpsc::Sender>], +) -> Result<()> { + let n_out = bucket_txs.len(); + let arr = batch.column(col_idx); + let col = arr.as_any().downcast_ref::().ok_or_else(|| { + internal_datafusion_err!("RangeRepartitionExec: leading sort key not Int64") + })?; + let n_rows = batch.num_rows(); + + for b in 0..n_out { + let low: i64 = if b == 0 { + i64::MIN + } else { + boundaries[b - 1].saturating_sub(halo_preceding) + }; + let high: i64 = if b == n_out - 1 { + i64::MAX + } else { + boundaries[b].saturating_add(halo_following) + }; + let mut indices: Vec = Vec::new(); + for r in 0..n_rows { + if col.is_null(r) { + continue; + } + let s = col.value(r); + if s >= low && s < high { + indices.push(r as u32); + } + } + if indices.is_empty() { + continue; + } + let take_idx = UInt32Array::from(indices); + let cols = take_arrays(batch.columns(), &take_idx, None)?; + let piece = RecordBatch::try_new(batch.schema(), cols)?; + if bucket_txs[b].send(Ok(piece)).await.is_err() { + return Ok(()); // receiver dropped; bail quietly + } + } + Ok(()) +} + +/// Log the bucket layout the coordinator will route into: the leading-key +/// global range, the interior boundaries, and each output bucket's +/// primary and halo-expanded ranges. +fn log_buckets( + lo: i64, + hi: i64, + boundaries: &[i64], + halo_preceding: i64, + halo_following: i64, +) { + let n = boundaries.len() + 1; + info!( + "RangeRepartitionExec: global leading [{lo}, {hi}] split into {n} buckets; \ + interior boundaries: {boundaries:?}; halo: {halo_preceding} preceding, \ + {halo_following} following" + ); + let mut edges: Vec = std::iter::once(lo) + .chain(boundaries.iter().copied()) + .chain(std::iter::once(hi + 1)) + .collect(); + edges.dedup(); + for (i, win) in edges.windows(2).enumerate() { + let start = win[0] - halo_preceding; + let end = win[1] + halo_following; + info!( + "RangeRepartitionExec: bucket {i}: primary [{}, {}) expanded [{start}, {end})", + win[0], win[1] + ); + } +} + +/// Extract `(lo, hi)` from the leading slot of a global [`PartitionExtremes`]. +/// Returns `None` if the extremes are missing, the leading key isn't +/// Int64, or either endpoint is null. +fn int64_range(global: Option<&PartitionExtremes>) -> Option<(i64, i64)> { + let global = global?; + let (ScalarValue::Int64(Some(lo)), ScalarValue::Int64(Some(hi))) = + (global.min.first()?, global.max.first()?) + else { + return None; + }; + Some((*lo, *hi)) +} + +/// Per-bucket primary ranges as `[start, end_exclusive)` derived from +/// the global `[lo, hi]` (inclusive) and the interior cut points. Same +/// edge convention as `log_buckets`: edges = `[lo] ++ boundaries ++ [hi+1]`, +/// each bucket spans `[edges[i], edges[i+1])`. +pub fn primary_ranges_from_boundaries( + lo: i64, + hi: i64, + boundaries: &[i64], +) -> Vec<(i64, i64)> { + let mut edges: Vec = std::iter::once(lo) + .chain(boundaries.iter().copied()) + .chain(std::iter::once(hi.saturating_add(1))) + .collect(); + edges.dedup(); + edges.windows(2).map(|w| (w[0], w[1])).collect() +} + +/// Split the closed `Int64` interval `[lo, hi]` into `n` equal-width +/// buckets, returning the `n - 1` interior cut points. +pub fn equal_width_int64_boundaries(lo: i64, hi: i64, n: usize) -> Option> { + if n <= 1 { + return Some(vec![]); + } + let span = hi.checked_sub(lo)?; + let n_i = i64::try_from(n).ok()?; + let mut cuts = Vec::with_capacity(n - 1); + for i in 1..n_i { + cuts.push(lo + span * i / n_i); + } + Some(cuts) +} + +/// Lex-reduce per-input partition extremes into one global [`PartitionExtremes`] +/// honoring `ordering`'s direction / nulls-first per key. Returns `None` +/// when no input partition produced extremes (e.g. all inputs were empty, +/// or no upstream supports the trait method). +fn reduce_global_extremes( + per_input: &[Option], + ordering: &LexOrdering, +) -> Option { + let mut iter = per_input.iter().filter_map(Option::clone); + let mut global = iter.next()?; + for next in iter { + if lex_compare(&next.min, &global.min, ordering).is_lt() { + global.min = next.min; + } + if lex_compare(&next.max, &global.max, ordering).is_gt() { + global.max = next.max; + } + global.row_count += next.row_count; + } + Some(global) +} diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 929ff4f7dfc85..8ed5896d531c7 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -70,8 +70,140 @@ use datafusion_physical_expr::LexOrdering; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit}; +use crate::PartitionExtremes; +use arrow::array::ArrayRef; +use datafusion_common::ScalarValue; use futures::{StreamExt, TryStreamExt}; use log::{debug, trace}; +use std::sync::Mutex; + +/// One slot per SortExec output partition. Populated inside the sort code +/// path each time `sort_batch_chunked` produces sorted batches, and +/// surfaced via `ExecutionPlan::runtime_partition_extremes`. By the time +/// downstream sees the first output batch the slot for that partition is +/// already populated. +type PartitionExtremesSlot = Arc>>; +type PartitionExtremesSlots = Arc>; + +fn make_partition_extremes_slots(n_partitions: usize) -> PartitionExtremesSlots { + Arc::new( + (0..n_partitions) + .map(|_| Arc::new(Mutex::new(None))) + .collect(), + ) +} + +/// Pull row values for each sort expression at `row` of an already-sorted +/// chunk. Used to read the lex-smallest (row 0) and lex-largest (row n-1) +/// candidate tuples from each `sort_batch_chunked` invocation. +// +// TODO: this evaluates each PhysicalExpr over the whole batch and then takes +// one row out of the resulting array. We could slice the batch down to a +// single-row view first (`batch.slice(row, 1)`) and evaluate against that, or +// use a `RowConverter` to pull a single row through the expression — either +// is O(1) per call instead of O(rows). +fn evaluate_row( + expressions: &LexOrdering, + batch: &RecordBatch, + row: usize, +) -> Result> { + let n = batch.num_rows(); + expressions + .iter() + .map(|sort_expr| { + let arr: ArrayRef = sort_expr.expr.evaluate(batch)?.into_array(n)?; + ScalarValue::try_from_array(&arr, row) + }) + .collect() +} + +/// Compare two `ScalarValue` tuples using `SortOptions` per key +/// (descending and nulls_first), so callers can pick the lex-smaller or +/// lex-larger of two endpoint candidates without an arrow round-trip. +pub fn lex_compare( + a: &[ScalarValue], + b: &[ScalarValue], + expressions: &LexOrdering, +) -> std::cmp::Ordering { + use std::cmp::Ordering; + for ((va, vb), sort_expr) in a.iter().zip(b.iter()).zip(expressions.iter()) { + let cmp = match (va.is_null(), vb.is_null()) { + (true, true) => Ordering::Equal, + (true, false) => { + if sort_expr.options.nulls_first { + Ordering::Less + } else { + Ordering::Greater + } + } + (false, true) => { + if sort_expr.options.nulls_first { + Ordering::Greater + } else { + Ordering::Less + } + } + (false, false) => va.partial_cmp(vb).unwrap_or(Ordering::Equal), + }; + let cmp = if sort_expr.options.descending { + cmp.reverse() + } else { + cmp + }; + if cmp != Ordering::Equal { + return cmp; + } + } + Ordering::Equal +} + +/// Fold the endpoints of one already-sorted chunk produced by +/// `sort_batch_chunked` into the running [`PartitionExtremes`] for one partition. +fn merge_chunk_into_slot( + slot: &Mutex>, + expressions: &LexOrdering, + sorted_chunks: &[RecordBatch], +) -> Result<()> { + let total_rows: usize = sorted_chunks.iter().map(|b| b.num_rows()).sum(); + if total_rows == 0 { + return Ok(()); + } + let Some(first_chunk) = sorted_chunks.iter().find(|b| b.num_rows() > 0) else { + return Ok(()); + }; + let Some(last_chunk) = sorted_chunks.iter().rev().find(|b| b.num_rows() > 0) else { + return Ok(()); + }; + let chunk_min = evaluate_row(expressions, first_chunk, 0)?; + let chunk_max = evaluate_row(expressions, last_chunk, last_chunk.num_rows() - 1)?; + + let mut guard = slot.lock().unwrap(); + *guard = Some(match guard.take() { + None => PartitionExtremes { + min: chunk_min, + max: chunk_max, + row_count: total_rows, + }, + Some(prev) => { + let min = if lex_compare(&chunk_min, &prev.min, expressions).is_lt() { + chunk_min + } else { + prev.min + }; + let max = if lex_compare(&chunk_max, &prev.max, expressions).is_gt() { + chunk_max + } else { + prev.max + }; + PartitionExtremes { + min, + max, + row_count: prev.row_count + total_rows, + } + } + }); + Ok(()) +} struct ExternalSorterMetrics { /// metrics @@ -261,6 +393,12 @@ struct ExternalSorter { /// How much memory to reserve for performing in-memory sort/merges /// prior to spilling. sort_spill_reservation_bytes: usize, + + /// Optional slot that `sort_batch_stream` updates after each + /// `sort_batch_chunked` call with the leading-key endpoints of the + /// sorted output. `SortExec` provides this so consumers can fetch the + /// observed extremes via `ExecutionPlan::runtime_partition_extremes`. + extremes_slot: Option>>>, } impl ExternalSorter { @@ -309,9 +447,18 @@ impl ExternalSorter { batch_size, sort_spill_reservation_bytes, sort_in_place_threshold_bytes, + extremes_slot: None, }) } + /// Provide a slot for `sort_batch_stream` to publish runtime endpoints + /// into. Used by `SortExec` so its `runtime_partition_extremes` override has + /// a value to return. + fn with_extremes_slot(mut self, slot: Arc>>) -> Self { + self.extremes_slot = Some(slot); + self + } + /// Appends an unsorted [`RecordBatch`] to `in_mem_batches` /// /// Updates memory usage metrics, and possibly triggers spilling to disk @@ -669,6 +816,7 @@ impl ExternalSorter { let expressions = self.expr.clone(); let batch_size = self.batch_size; let output_row_metrics = metrics.output_rows().clone(); + let extremes_slot = self.extremes_slot.clone(); let stream = futures::stream::once(async move { let schema = batch.schema(); @@ -676,6 +824,13 @@ impl ExternalSorter { // Sort the batch immediately and get all output batches let sorted_batches = sort_batch_chunked(&batch, &expressions, batch_size)?; + // Publish leading-key endpoints from this sorted chunk into the + // partition's extremes slot. Combining across multiple chunks + // (path 3) happens inside `merge_chunk_into_slot`. + if let Some(slot) = &extremes_slot { + merge_chunk_into_slot(slot, &expressions, &sorted_batches)?; + } + // Resize the reservation to match the actual sorted output size. // Using try_resize avoids a release-then-reacquire cycle, which // matters for MemoryPool implementations where grow/shrink have @@ -866,6 +1021,10 @@ pub struct SortExec { /// If `fetch` is `Some`, this will also be set and a TopK operator may be used. /// If `fetch` is `None`, this will be `None`. filter: Option>>, + /// Per-output-partition slot populated by the sort code path. Surfaced + /// via `ExecutionPlan::runtime_partition_extremes` so range-partitioning + /// callers can derive global boundaries from runtime data. + runtime_extremes: PartitionExtremesSlots, } impl SortExec { @@ -876,6 +1035,8 @@ impl SortExec { let (cache, sort_prefix) = Self::compute_properties(&input, expr.clone(), preserve_partitioning) .unwrap(); + let runtime_extremes = + make_partition_extremes_slots(cache.partitioning.partition_count()); Self { expr, input, @@ -885,6 +1046,7 @@ impl SortExec { common_sort_prefix: sort_prefix, cache: Arc::new(cache), filter: None, + runtime_extremes, } } @@ -904,6 +1066,8 @@ impl SortExec { self.preserve_partitioning = preserve_partitioning; Arc::make_mut(&mut self.cache).partitioning = Self::output_partitioning_helper(&self.input, self.preserve_partitioning); + self.runtime_extremes = + make_partition_extremes_slots(self.cache.partitioning.partition_count()); self } @@ -929,6 +1093,7 @@ impl SortExec { fetch: self.fetch, cache: Arc::clone(&self.cache), filter: self.filter.clone(), + runtime_extremes: Arc::clone(&self.runtime_extremes), } } @@ -1172,6 +1337,9 @@ impl ExecutionPlan for SortExec { )?; new_sort.cache = Arc::new(cache); new_sort.common_sort_prefix = sort_prefix; + new_sort.runtime_extremes = make_partition_extremes_slots( + new_sort.cache.partitioning.partition_count(), + ); } Ok(Arc::new(new_sort)) @@ -1263,6 +1431,9 @@ impl ExecutionPlan for SortExec { &self.metrics_set, context.runtime_env(), )?; + if let Some(slot) = self.runtime_extremes.get(partition) { + sorter = sorter.with_extremes_slot(Arc::clone(slot)); + } Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), futures::stream::once(async move { @@ -1293,6 +1464,20 @@ impl ExecutionPlan for SortExec { Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?)) } + /// Returns observed lex-min/lex-max for the requested output partition. + /// `None` while that partition's sort path has not yet folded any + /// chunk into its slot — the caller is expected to have driven enough of + /// `execute(partition)` to reach the first `sort_batch_chunked` call. + fn runtime_partition_extremes( + &self, + partition: usize, + ) -> Result> { + Ok(self + .runtime_extremes + .get(partition) + .and_then(|slot| slot.lock().unwrap().clone())) + } + fn with_fetch(&self, limit: Option) -> Option> { Some(Arc::new(SortExec::with_fetch(self, limit))) } diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 6c6b26c9cf49f..d45edd3d91552 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -35,8 +35,9 @@ use crate::windows::{ }; use crate::{ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, - ExecutionPlanProperties, InputOrderMode, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, Statistics, WindowExpr, check_if_same_properties, + ExecutionPlanProperties, InputOrderMode, PartitionExtremes, PlanProperties, + RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr, + check_if_same_properties, }; use arrow::compute::take_record_batch; @@ -97,6 +98,11 @@ pub struct BoundedWindowAggExec { cache: Arc, /// If `can_rerepartition` is false, partition_keys is always empty. can_repartition: bool, + /// When true, `required_input_distribution()` returns + /// `UnspecifiedDistribution` instead of `SinglePartition`, so the operator + /// can run per-partition over a range-partitioned input (with halo). + /// Set by `ParallelWindow` optimizer rule; never round-trips through proto. + parallel_aware: bool, } impl BoundedWindowAggExec { @@ -137,9 +143,16 @@ impl BoundedWindowAggExec { ordered_partition_by_indices, cache: Arc::new(cache), can_repartition, + parallel_aware: false, }) } + /// Opt this BWAG into parallel-aware mode. See `parallel_aware` field. + pub fn with_parallel_aware(mut self, value: bool) -> Self { + self.parallel_aware = value; + self + } + /// Window expressions pub fn window_expr(&self) -> &[Arc] { &self.window_expr @@ -331,6 +344,9 @@ impl ExecutionPlan for BoundedWindowAggExec { } fn required_input_distribution(&self) -> Vec { + if self.parallel_aware { + return vec![Distribution::UnspecifiedDistribution]; + } if self.partition_keys().is_empty() { debug!("No partition defined for BoundedWindowAggExec!!!"); vec![Distribution::SinglePartition] @@ -343,17 +359,30 @@ impl ExecutionPlan for BoundedWindowAggExec { vec![true] } + /// Passthrough: the window operator doesn't alter the leading sort + /// key's value range, so its `partition`-th output partition's + /// extremes are exactly its input partition's extremes. + fn runtime_partition_extremes( + &self, + partition: usize, + ) -> Result> { + self.input.runtime_partition_extremes(partition) + } + fn with_new_children( self: Arc, children: Vec>, ) -> Result> { check_if_same_properties!(self, children); - Ok(Arc::new(BoundedWindowAggExec::try_new( - self.window_expr.clone(), - Arc::clone(&children[0]), - self.input_order_mode.clone(), - self.can_repartition, - )?)) + Ok(Arc::new( + BoundedWindowAggExec::try_new( + self.window_expr.clone(), + Arc::clone(&children[0]), + self.input_order_mode.clone(), + self.can_repartition, + )? + .with_parallel_aware(self.parallel_aware), + )) } fn execute( diff --git a/datafusion/sqllogictest/test_files/parallel_window.slt b/datafusion/sqllogictest/test_files/parallel_window.slt new file mode 100644 index 0000000000000..c47050367007c --- /dev/null +++ b/datafusion/sqllogictest/test_files/parallel_window.slt @@ -0,0 +1,141 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Goal: parallelize window functions that have ORDER BY but no PARTITION BY, +# over a bounded RANGE frame. Today this collapses to a single partition via +# BoundedWindowAggExec::required_input_distribution -> Distribution::SinglePartition +# (datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs:333-340). + +statement ok +set datafusion.execution.target_partitions = 4; + +statement ok +set datafusion.explain.show_statistics = true; + +# Build four parquet files with OVERLAPPING seq ranges so range-repartitioning +# actually has to move rows around. Each file has seq congruent to its index +# mod 4 in [0, 100); a deterministic scramble (seq * 37 mod 100) defeats the +# natural generate_series ordering so DataFusion does not claim output_ordering +# on seq. Per-file min/max still come back Exact from the parquet footer. +query I +COPY (SELECT seq, seq % 7 AS amount FROM ( + SELECT value AS seq FROM generate_series(0, 99, 4) + ) ORDER BY (seq * 37) % 100) +TO 'test_files/scratch/parallel_window/events/0.parquet' +STORED AS PARQUET; +---- +25 + +query I +COPY (SELECT seq, seq % 7 AS amount FROM ( + SELECT value AS seq FROM generate_series(1, 99, 4) + ) ORDER BY (seq * 37) % 100) +TO 'test_files/scratch/parallel_window/events/1.parquet' +STORED AS PARQUET; +---- +25 + +query I +COPY (SELECT seq, seq % 7 AS amount FROM ( + SELECT value AS seq FROM generate_series(2, 99, 4) + ) ORDER BY (seq * 37) % 100) +TO 'test_files/scratch/parallel_window/events/2.parquet' +STORED AS PARQUET; +---- +25 + +query I +COPY (SELECT seq, seq % 7 AS amount FROM ( + SELECT value AS seq FROM generate_series(3, 99, 4) + ) ORDER BY (seq * 37) % 100) +TO 'test_files/scratch/parallel_window/events/3.parquet' +STORED AS PARQUET; +---- +25 + +statement ok +CREATE EXTERNAL TABLE events +STORED AS PARQUET +LOCATION 'test_files/scratch/parallel_window/events'; + +# Bounded RANGE frame, ORDER BY only (no PARTITION BY). Canonical shape. +# Each input partition should report Exact min/max on seq from its parquet footer. +query TT +EXPLAIN SELECT + seq, + SUM(amount) OVER ( + ORDER BY seq + RANGE BETWEEN 5 PRECEDING AND CURRENT ROW + ) AS rolling_sum +FROM events +ORDER BY seq +LIMIT 5; +---- +logical_plan +01)Sort: events.seq ASC NULLS LAST, fetch=5 +02)--Projection: events.seq, sum(events.amount) ORDER BY [events.seq ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND CURRENT ROW AS rolling_sum +03)----WindowAggr: windowExpr=[[sum(events.amount) ORDER BY [events.seq ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND CURRENT ROW]] +04)------TableScan: events projection=[seq, amount] +physical_plan +01)SortPreservingMergeExec: [seq@0 ASC NULLS LAST], fetch=5, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:)]] +02)--ProjectionExec: expr=[seq@0 as seq, sum(events.amount) ORDER BY [events.seq ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND CURRENT ROW@2 as rolling_sum], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:)]] +03)----HaloDropExec, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] +04)------BoundedWindowAggExec: wdw=[sum(events.amount) ORDER BY [events.seq ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND CURRENT ROW: Field { "sum(events.amount) ORDER BY [events.seq ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN 5 PRECEDING AND CURRENT ROW], mode=[Sorted], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] +05)--------RangeRepartitionExec, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:)]] +06)----------SortExec: expr=[seq@0 ASC NULLS LAST], preserve_partitioning=[true], statistics=[Rows=Exact(100), Bytes=Exact(1600), [(Col[0]: Min=Exact(Int64(0)) Max=Exact(Int64(99)) Null=Exact(0) ScanBytes=Exact(800)),(Col[1]: Min=Exact(Int64(0)) Max=Exact(Int64(6)) Null=Exact(0) ScanBytes=Exact(800))]] +07)------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parallel_window/events/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parallel_window/events/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parallel_window/events/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parallel_window/events/3.parquet]]}, projection=[seq, amount], file_type=parquet, sort_order_for_reorder=[seq@0 ASC NULLS LAST], statistics=[Rows=Exact(100), Bytes=Exact(1600), [(Col[0]: Min=Exact(Int64(0)) Max=Exact(Int64(99)) Null=Exact(0) ScanBytes=Exact(800)),(Col[1]: Min=Exact(Int64(0)) Max=Exact(Int64(6)) Null=Exact(0) ScanBytes=Exact(800))]] + +# Actually execute the query — this drives the SortExec down the (false, None) +# external-sort branch in each input partition and lets the runtime min/max +# observer fire. Sorted-output truncated to 5 for a small reproducible row set. +query II +SELECT + seq, + SUM(amount) OVER ( + ORDER BY seq + RANGE BETWEEN 5 PRECEDING AND CURRENT ROW + ) AS rolling_sum +FROM events +ORDER BY seq +LIMIT 5; +---- +0 0 +1 1 +2 3 +3 6 +4 10 + +# With `HaloDropExec` planted above the per-partition window, halo rows +# (rows duplicated into a neighbor's bucket so its window frame sees the +# boundary context) are filtered out before the merge — so the row count +# is exactly the input cardinality, 100. `count(rolling_sum)` forces the +# window to actually execute (a plain count(*) gets statistics-pruned +# from the parquet row count, never instantiates the window operator). +query I +SELECT count(rolling_sum) AS n FROM ( + SELECT seq, SUM(amount) OVER ( + ORDER BY seq + RANGE BETWEEN 5 PRECEDING AND CURRENT ROW + ) AS rolling_sum + FROM events +) t; +---- +100 + +# Reset session settings so this file doesn't leak config into the rest of the run. +statement ok +set datafusion.explain.show_statistics = false;