diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index db8c75b4a578b..cf8d98d45bc88 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -74,7 +74,8 @@ use datafusion_physical_expr::equivalence::{ }; use datafusion_physical_expr::projection::{ProjectionRef, combine_projections}; -use futures::{Stream, StreamExt, TryStreamExt}; +use futures::future::BoxFuture; +use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; use log::debug; use parking_lot::Mutex; @@ -216,6 +217,13 @@ pub struct NestedLoopJoinExec { /// partitions share the same spill file via this `OnceAsync`, /// avoiding redundant re-execution of the left child. left_spill_data: Arc>, + /// Coordinator that, in the memory-limited fallback path, shares + /// per-chunk `JoinLeftData` (visited bitmap + probe-thread counter) + /// across all right-side output partitions. This makes the fallback + /// path's left-side tracking consistent with the single-pass path + /// (where `collect_left_input(..., probe_threads_count)` initializes + /// the counter to `right_partition_count`). + fallback_coordinator: Arc, /// Information of index and left / right placement of columns column_indices: Vec, /// Projection to apply to the output of the join @@ -292,6 +300,8 @@ impl NestedLoopJoinExecBuilder { join_type, projection.as_deref(), )?; + let right_partition_count = right.output_partitioning().partition_count().max(1); + let with_visited_bitmap = need_produce_result_in_final(join_type); Ok(NestedLoopJoinExec { left, right, @@ -300,6 +310,10 @@ impl NestedLoopJoinExecBuilder { join_schema, build_side_data: Default::default(), left_spill_data: Arc::new(OnceAsync::default()), + fallback_coordinator: Arc::new(FallbackCoordinator::new( + right_partition_count, + with_visited_bitmap, + )), column_indices, projection, metrics: Default::default(), @@ -496,6 +510,8 @@ impl NestedLoopJoinExec { ) -> Self { let left = children.swap_remove(0); let right = children.swap_remove(0); + let right_partition_count = right.output_partitioning().partition_count().max(1); + let with_visited_bitmap = need_produce_result_in_final(self.join_type); Self { left, @@ -503,6 +519,10 @@ impl NestedLoopJoinExec { metrics: ExecutionPlanMetricsSet::new(), build_side_data: Default::default(), left_spill_data: Arc::new(OnceAsync::default()), + fallback_coordinator: Arc::new(FallbackCoordinator::new( + right_partition_count, + with_visited_bitmap, + )), cache: Arc::clone(&self.cache), filter: self.filter.clone(), join_type: self.join_type, @@ -656,29 +676,20 @@ impl ExecutionPlan for NestedLoopJoinExec { let probe_side_data = self.right.execute(partition, Arc::clone(&context))?; // Determine if OOM fallback to memory-limited mode is possible. - // Conditions: - // 1. Disk manager supports temp files (needed for spilling). - // 2. FULL join with multiple right partitions is not yet supported - // in the fallback path. FULL join needs to track BOTH left-side - // matches (for unmatched left rows) AND right-side matches (for - // unmatched right rows). The fallback path builds a per-partition - // `JoinLeftData` with `probe_threads_counter == 1`, so each - // partition emits unmatched left rows based only on its own - // right-side matches, producing incorrect duplicate output for - // left rows that match in another partition. Other join types - // that need only one-sided final emission (LEFT, LEFT SEMI, - // LEFT ANTI, LEFT MARK) have a similar latent issue in the - // fallback path which predates this change; tracking is out of - // scope for this PR. - let full_join_multi_partition = - matches!(self.join_type, JoinType::Full) && right_partition_count > 1; - let spill_state = if context.runtime_env().disk_manager.tmp_files_enabled() - && !full_join_multi_partition - { + // Condition: disk manager supports temp files (needed for spilling). + // + // For join types that emit unmatched left rows in the final output + // (LEFT, LEFT SEMI, LEFT ANTI, LEFT MARK, FULL), the fallback path + // shares per-chunk `JoinLeftData` (visited bitmap + probe-thread + // counter) across all right-side partitions via + // [`FallbackCoordinator`], so left-side tracking is coordinated + // exactly as in the single-pass path. + let spill_state = if context.runtime_env().disk_manager.tmp_files_enabled() { SpillState::Pending { left_plan: Arc::clone(&self.left), task_context: Arc::clone(&context), left_spill_data: Arc::clone(&self.left_spill_data), + fallback_coordinator: Arc::clone(&self.fallback_coordinator), } } else { SpillState::Disabled @@ -906,6 +917,365 @@ pub(crate) struct LeftSpillData { schema: SchemaRef, } +/// Per-chunk shared state in the memory-limited fallback path. +/// +/// Each chunk's `JoinLeftData` is loaded once by a "leader" partition and +/// shared (via `Arc`) with every right-side output partition. The +/// `probe_threads_counter` inside the `JoinLeftData` is initialized to +/// `right_partition_count`, so `report_probe_completed` returns `true` +/// only when the *last* partition has finished probing the chunk. That +/// last partition is then responsible for emitting unmatched left rows +/// for the chunk, mirroring the single-pass path's coordination via +/// `collect_left_input(..., probe_threads_count)`. +struct CurrentChunk { + /// 0-based monotonically increasing chunk index. + chunk_index: usize, + /// Shared per-chunk left data. Cloned by every partition that probes + /// this chunk; the last to call `report_probe_completed` emits + /// unmatched left rows. + data: Arc, + /// True if the left stream was exhausted while loading this chunk — + /// no further chunks will be produced after it. + is_last: bool, +} + +/// Inner state of [`FallbackCoordinator`], guarded by an async mutex. +struct FallbackCoordinatorInner { + /// Reservation owned by the coordinator. Holds the memory for the + /// currently-loaded chunk. Reset (`resize(0)`) between chunks. + /// Lazily registered by the first leader, after the runtime context + /// becomes available via `initiate_fallback`. + reservation: Option, + /// The shared left spill stream from which chunks are read. Owned by + /// the coordinator so only one partition reads it at a time. + left_stream: Option, + /// Left schema. Set after the first leader resolves the spill future. + left_schema: Option, + /// One batch carried over from the previous chunk's load: when + /// reservation `try_grow` failed for chunk N, the offending batch is + /// recorded here and becomes the first batch of chunk N+1. + carryover: Option, + /// True once the left spill stream has produced `None`. + left_exhausted: bool, + /// Index of the next chunk to be loaded. + next_chunk_index: usize, + /// The currently-loaded chunk, or `None` if no chunk is currently + /// loaded (initial state, or the last partition has just released + /// chunk `next_chunk_index - 1` and the next leader hasn't taken + /// over yet). + current: Option, + /// True while a partition has claimed leader role for the next + /// chunk and is loading it; prevents two partitions from racing. + loader_in_flight: bool, +} + +/// Plan-level shared coordinator for the memory-limited fallback path. +/// +/// All right-side output partitions share one of these. It serializes +/// access to the left spill stream (so each chunk is read exactly once), +/// publishes the loaded chunk as an `Arc` for every +/// partition to clone, and uses a `Notify` so partitions waiting for the +/// next chunk can sleep without busy-looping. +pub(crate) struct FallbackCoordinator { + /// Number of right-side partitions; equals the + /// `probe_threads_counter` initial value for each chunk. + right_partition_count: usize, + /// Whether `JoinLeftData` should carry a left visited bitmap (for + /// join types that emit unmatched left rows in the final output). + with_visited_bitmap: bool, + inner: tokio::sync::Mutex, + /// Notified when a new chunk becomes available, when the left stream + /// is exhausted, or when a chunk is released. + notify: tokio::sync::Notify, +} + +impl FallbackCoordinator { + fn new(right_partition_count: usize, with_visited_bitmap: bool) -> Self { + Self { + right_partition_count, + with_visited_bitmap, + inner: tokio::sync::Mutex::new(FallbackCoordinatorInner { + reservation: None, + left_stream: None, + left_schema: None, + carryover: None, + left_exhausted: false, + next_chunk_index: 0, + current: None, + loader_in_flight: false, + }), + notify: tokio::sync::Notify::new(), + } + } + + /// After the last partition finishes processing chunk + /// `released_chunk_index`, drop the slot so the next leader can + /// load chunk `released_chunk_index + 1`. + async fn release_chunk(self: &Arc, released_chunk_index: usize) { + let mut inner = self.inner.lock().await; + if let Some(cur) = &inner.current + && cur.chunk_index == released_chunk_index + { + inner.current = None; + inner.next_chunk_index = released_chunk_index + 1; + } + // Always notify: waiters may be blocked because they couldn't + // become leader while a previous chunk was current. + drop(inner); + self.notify.notify_waiters(); + } + + /// Fetch `expected_chunk_index`, becoming leader to load it from the + /// left spill stream if no other partition has done so. Returns + /// `Ok(None)` when the left stream is exhausted and no chunk with + /// the requested index exists. + async fn next_chunk( + self: Arc, + expected_chunk_index: usize, + left_spill_fut: OnceFut, + task_context: Arc, + ) -> Result, bool)>> { + // Resolve the left spill future once. All partitions share the + // same OnceFut so this only does real work the first time. + let spill_data = left_spill_fut_get_shared(left_spill_fut).await?; + + loop { + let mut inner = self.inner.lock().await; + + // Lazily initialize the shared left stream and schema from + // the spill file. + if inner.left_stream.is_none() && !inner.left_exhausted { + let stream = spill_data + .spill_manager + .read_spill_as_stream(spill_data.spill_file.clone(), None)?; + inner.left_stream = Some(stream); + inner.left_schema = Some(Arc::clone(&spill_data.schema)); + } + // Lazily register the coordinator's chunk reservation. + if inner.reservation.is_none() { + inner.reservation = Some( + MemoryConsumer::new("NestedLoopJoinFallbackChunk".to_string()) + .with_can_spill(true) + .register(task_context.memory_pool()), + ); + } + + // Case 1: requested chunk is already loaded. + if let Some(cur) = &inner.current + && cur.chunk_index == expected_chunk_index + { + return Ok(Some((Arc::clone(&cur.data), cur.is_last))); + } + + // Case 2: left stream exhausted and no current chunk to + // deliver — caller is past the last chunk. + if inner.left_exhausted + && inner.current.is_none() + && inner.carryover.is_none() + { + return Ok(None); + } + + // Case 3: no chunk loaded and no leader yet — claim leader. + if inner.current.is_none() && !inner.loader_in_flight { + inner.loader_in_flight = true; + // Drop the lock while reading the stream + concatenating + // batches (this can take significant time and memory). + let mut left_stream = inner + .left_stream + .take() + .expect("left_stream installed above"); + let mut reservation = inner + .reservation + .take() + .expect("reservation installed above"); + let left_schema = Arc::clone( + inner + .left_schema + .as_ref() + .expect("left_schema installed above"), + ); + let carryover = inner.carryover.take(); + let chunk_index_to_load = inner.next_chunk_index; + debug_assert_eq!(chunk_index_to_load, expected_chunk_index); + drop(inner); + + let load_result = Arc::clone(&self) + .load_one_chunk( + chunk_index_to_load, + &mut left_stream, + &mut reservation, + carryover, + Arc::clone(&left_schema), + ) + .await; + + // Re-acquire lock and publish the result. + let mut inner = self.inner.lock().await; + inner.left_stream = Some(left_stream); + inner.reservation = Some(reservation); + inner.loader_in_flight = false; + + match load_result { + Ok(LoadOutcome::Chunk { + data, + is_last, + carryover, + }) => { + inner.carryover = carryover; + if is_last { + inner.left_exhausted = true; + } + let arc_data = Arc::new(data); + inner.current = Some(CurrentChunk { + chunk_index: chunk_index_to_load, + data: Arc::clone(&arc_data), + is_last, + }); + drop(inner); + self.notify.notify_waiters(); + return Ok(Some((arc_data, is_last))); + } + Ok(LoadOutcome::Empty) => { + // No data at all. Mark exhausted; let other + // partitions observe and exit. + inner.left_exhausted = true; + drop(inner); + self.notify.notify_waiters(); + return Ok(None); + } + Err(e) => { + drop(inner); + self.notify.notify_waiters(); + return Err(e); + } + } + } + + // Case 4: another partition is loading the next chunk, or + // the current chunk is for a previous index we've already + // moved past — wait to be notified. + let notified = self.notify.notified(); + drop(inner); + notified.await; + } + } + + /// Read one chunk worth of left batches into a `JoinLeftData`, + /// honoring the coordinator's reservation as the memory budget. + async fn load_one_chunk( + self: Arc, + _chunk_index: usize, + left_stream: &mut SendableRecordBatchStream, + reservation: &mut MemoryReservation, + carryover: Option, + left_schema: SchemaRef, + ) -> Result { + // Reset the per-chunk reservation budget. The previous chunk's + // memory has been released when its `JoinLeftData` was dropped. + reservation.resize(0); + + let mut pending_batches: Vec = Vec::new(); + let mut left_stream_exhausted = false; + let mut next_carryover: Option = None; + + // First, account for any carryover batch from the previous + // chunk's load attempt. Its memory is already in-flight, so we + // grow the reservation infallibly. + if let Some(batch) = carryover { + let bytes = batch.get_array_memory_size(); + reservation.grow(bytes); + pending_batches.push(batch); + } + + loop { + match left_stream.next().await { + Some(Ok(batch)) => { + if batch.num_rows() == 0 { + continue; + } + let bytes = batch.get_array_memory_size(); + let can_grow = reservation.try_grow(bytes).is_ok(); + if !can_grow && !pending_batches.is_empty() { + // Defer this batch to the next chunk. + next_carryover = Some(batch); + break; + } else if !can_grow { + // No pending batches — accept the batch even + // over budget so we make progress. + reservation.grow(bytes); + } + pending_batches.push(batch); + } + Some(Err(e)) => return Err(e), + None => { + left_stream_exhausted = true; + break; + } + } + } + + if pending_batches.is_empty() { + debug_assert!(left_stream_exhausted); + return Ok(LoadOutcome::Empty); + } + + let merged_batch = concat_batches(&left_schema, &pending_batches)?; + let n_rows = merged_batch.num_rows(); + let visited_left_side = if self.with_visited_bitmap { + let buffer_size = n_rows.div_ceil(8); + reservation.grow(buffer_size); + let mut buffer = BooleanBufferBuilder::new(n_rows); + buffer.append_n(n_rows, false); + buffer + } else { + BooleanBufferBuilder::new(0) + }; + + // The chunk's reservation is owned by the coordinator (so it + // can be reset between chunks). `JoinLeftData` carries an empty + // RAII placeholder. + let dummy_reservation = reservation.new_empty(); + + let data = JoinLeftData::new( + merged_batch, + Mutex::new(visited_left_side), + AtomicUsize::new(self.right_partition_count), + dummy_reservation, + ); + + Ok(LoadOutcome::Chunk { + data, + is_last: left_stream_exhausted, + carryover: next_carryover, + }) + } +} + +enum LoadOutcome { + Chunk { + data: JoinLeftData, + is_last: bool, + carryover: Option, + }, + Empty, +} + +/// Helper to await `OnceFut::get_shared` outside of `poll_next` context. +async fn left_spill_fut_get_shared( + mut fut: OnceFut, +) -> Result> { + futures::future::poll_fn(move |cx| fut.get_shared(cx)).await +} + +impl std::fmt::Debug for FallbackCoordinator { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FallbackCoordinator") + .field("right_partition_count", &self.right_partition_count) + .finish() + } +} + /// Tracks the state of the memory-limited spill fallback for NLJ. /// /// The NLJ always starts with the standard OnceFut path. If the in-memory @@ -928,6 +1298,9 @@ pub(crate) enum SpillState { /// Shared OnceAsync for left-side spill data. The first partition /// to initiate fallback spills the left side; others share the file. left_spill_data: Arc>, + /// Shared coordinator that publishes per-chunk `JoinLeftData` to + /// every right-side partition. + fallback_coordinator: Arc, }, /// Fallback has been triggered. Left data is being loaded in chunks @@ -935,21 +1308,30 @@ pub(crate) enum SpillState { Active(Box), } +/// Result of a single chunk fetch from the [`FallbackCoordinator`]: +/// either the chunk itself with a flag indicating whether it is the +/// final chunk, or `None` if the left input is fully consumed. +type ChunkFetchOutput = Option<(Arc, bool)>; +/// In-flight future for a chunk fetch. +type ChunkFetchFuture = BoxFuture<'static, Result>; + /// State for active memory-limited spill execution. /// Boxed inside [`SpillState::Active`] to reduce enum size. pub(crate) struct SpillStateActive { /// Shared future for left-side spill data. All partitions wait on /// the same future — the first to poll triggers the actual spill. left_spill_fut: OnceFut, - /// Left input stream for incremental chunk reading (from spill file). - /// None until `left_spill_fut` resolves. - left_stream: Option, - /// Left-side schema (set once `left_spill_fut` resolves) - left_schema: Option, - /// Memory reservation for left-side buffering - reservation: MemoryReservation, - /// Accumulated left batches for the current chunk - pending_batches: Vec, + /// Plan-level coordinator that publishes per-chunk `JoinLeftData` + /// shared across all right-side partitions. + coordinator: Arc, + /// Index of the next chunk this partition expects from the + /// coordinator. Increments after the partition finishes processing + /// a chunk (regardless of whether it was the one that emitted + /// unmatched left rows). + next_chunk_index: usize, + /// Captured `TaskContext` so that the first leader can register the + /// coordinator's reservation against the runtime's memory pool. + task_context: Arc, /// Right input that spills on the first pass and replays from spill later. right_input: ReplayableStreamSource, /// Per-batch accumulated right bitmaps across all left chunks. @@ -957,12 +1339,22 @@ pub(crate) struct SpillStateActive { /// Only populated when `should_track_unmatched_right` is true. global_right_bitmaps: Vec, /// Separate reservation for `global_right_bitmaps`. These buffers live - /// for the full operator lifetime (not per-chunk), so they must be - /// tracked separately from `reservation`, which gets `resize(0)`-ed - /// between chunks. + /// for the full operator lifetime (not per-chunk). global_right_bitmaps_reservation: MemoryReservation, /// Current right batch sequence index within the current pass. right_batch_index: usize, + /// In-flight chunk fetch future. Created by `BufferingLeft` when a + /// new chunk is needed; polled across iterations of `poll_next` + /// until it resolves to either the next chunk or `None` (left side + /// exhausted with no chunk to deliver). + chunk_fetch_in_flight: Option, + /// In-flight chunk release future. Created by `EmitLeftUnmatched` + /// when the last partition for a chunk has finished its work. + chunk_release_in_flight: Option>, + /// Cached left schema, set after the first chunk is fetched. + /// Used by `EmitGlobalRightUnmatched` to build NULL-padded left + /// columns for unmatched right rows. + left_schema: Option, } impl SpillStateActive { @@ -1063,6 +1455,16 @@ pub(crate) struct NestedLoopJoinStream { left_exhausted: bool, /// If we can buffer all left data in one pass (false means memory-limited multi-pass) left_buffered_in_one_pass: bool, + /// True iff this partition is the one that brought the current + /// chunk's `probe_threads_counter` to zero, i.e. it is responsible + /// for emitting unmatched left rows for the chunk and (in + /// memory-limited mode) for releasing the chunk via + /// `FallbackCoordinator::release_chunk`. + is_chunk_emitter: bool, + /// True once we have decided emitter status for the current chunk + /// (i.e. `report_probe_completed` has been called once). Reset to + /// `false` when transitioning to a new chunk's `BufferingLeft`. + chunk_emitter_decided: bool, // Probe(right) side // ----------------- @@ -1262,7 +1664,7 @@ impl Stream for NestedLoopJoinStream { let join_metric = self.metrics.join_metrics.join_time.clone(); let _join_timer = join_metric.timer(); - match self.handle_emit_left_unmatched() { + match self.handle_emit_left_unmatched(cx) { ControlFlow::Continue(()) => continue, ControlFlow::Break(poll) => { return self.metrics.join_metrics.baseline.record_poll(poll); @@ -1344,6 +1746,8 @@ impl NestedLoopJoinStream { left_emit_idx: 0, left_exhausted: false, left_buffered_in_one_pass: true, + is_chunk_emitter: false, + chunk_emitter_decided: false, handled_empty_output: false, should_track_unmatched_right: need_produce_right_in_final(join_type), spill_state, @@ -1371,13 +1775,19 @@ impl NestedLoopJoinStream { /// it to disk. Other partitions share the same spill file. fn initiate_fallback(&mut self) -> Result<()> { // Take ownership of Pending state - let (left_plan, context, left_spill_data) = + let (left_plan, context, left_spill_data, fallback_coordinator) = match std::mem::replace(&mut self.spill_state, SpillState::Disabled) { SpillState::Pending { left_plan, task_context, left_spill_data, - } => (left_plan, task_context, left_spill_data), + fallback_coordinator, + } => ( + left_plan, + task_context, + left_spill_data, + fallback_coordinator, + ), _ => { return internal_err!( "initiate_fallback called in non-Pending spill state" @@ -1422,14 +1832,9 @@ impl NestedLoopJoinStream { }) })?; - // Create reservation with can_spill for fair memory allocation - let reservation = MemoryConsumer::new("NestedLoopJoinLoad[fallback]".to_string()) - .with_can_spill(true) - .register(context.memory_pool()); - // Separate reservation for the global right bitmaps. These buffers - // persist across all left chunks, whereas `reservation` is reset - // between chunks via `resize(0)`. + // are per-partition (each partition tracks matches against its own + // right input) and persist across all left chunks. let global_right_bitmaps_reservation = MemoryConsumer::new("NestedLoopJoinGlobalRightBitmaps".to_string()) .register(context.memory_pool()); @@ -1453,10 +1858,9 @@ impl NestedLoopJoinStream { self.spill_state = SpillState::Active(Box::new(SpillStateActive { left_spill_fut, - left_stream: None, - left_schema: None, - reservation, - pending_batches: Vec::new(), + coordinator: fallback_coordinator, + next_chunk_index: 0, + task_context: Arc::clone(&context), right_input: ReplayableStreamSource::new( right_data, right_spill_manager, @@ -1465,6 +1869,9 @@ impl NestedLoopJoinStream { global_right_bitmaps: Vec::new(), global_right_bitmaps_reservation, right_batch_index: 0, + chunk_fetch_in_flight: None, + chunk_release_in_flight: None, + left_schema: None, })); // State stays BufferingLeft — next poll will enter @@ -1519,9 +1926,8 @@ impl NestedLoopJoinStream { /// Memory-limited path for handle_buffering_left. /// - /// Incrementally polls the left stream and accumulates batches until: - /// - Memory reservation fails (chunk is full, more data remains) - /// - Left stream is exhausted (this is the last/only chunk) + /// Drives an in-flight `next_chunk` future on the coordinator, which + /// loads (or re-uses) the next per-chunk shared `JoinLeftData`. fn handle_buffering_left_memory_limited( &mut self, cx: &mut std::task::Context<'_>, @@ -1532,147 +1938,79 @@ impl NestedLoopJoinStream { ); }; - // On first entry (or after re-entry for a new chunk pass when - // left_stream was consumed), wait for the shared left spill - // future to resolve and then open a stream from the spill file. - if active.left_stream.is_none() { - match active.left_spill_fut.get_shared(cx) { - Poll::Ready(Ok(spill_data)) => { - match spill_data - .spill_manager - .read_spill_as_stream(spill_data.spill_file.clone(), None) - { - Ok(stream) => { - active.left_schema = Some(Arc::clone(&spill_data.schema)); - active.left_stream = Some(stream); - } - Err(e) => { - return ControlFlow::Break(Poll::Ready(Some(Err(e)))); - } - } - } - Poll::Ready(Err(e)) => { - return ControlFlow::Break(Poll::Ready(Some(Err(e)))); - } - Poll::Pending => { - return ControlFlow::Break(Poll::Pending); + // Drain any pending chunk-release future before fetching the next + // chunk. The coordinator slot must be released so the next leader + // can load the following chunk. + if let Some(fut) = active.chunk_release_in_flight.as_mut() { + match fut.poll_unpin(cx) { + Poll::Ready(()) => { + active.chunk_release_in_flight = None; } + Poll::Pending => return ControlFlow::Break(Poll::Pending), } } - let left_stream = active - .left_stream - .as_mut() - .expect("left_stream must be set after spill future resolves"); + // Lazily start a chunk-fetch future for `active.next_chunk_index`. + if active.chunk_fetch_in_flight.is_none() { + let coordinator = Arc::clone(&active.coordinator); + let left_spill_fut = active.left_spill_fut.clone(); + let task_context = Arc::clone(&active.task_context); + let expected = active.next_chunk_index; + active.chunk_fetch_in_flight = Some( + coordinator + .next_chunk(expected, left_spill_fut, task_context) + .boxed(), + ); + } - // Poll left stream for more batches. - // Note: pending_batches may already contain a batch from the - // previous chunk iteration (the batch that triggered the memory limit). - loop { - match left_stream.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(batch))) => { - if batch.num_rows() == 0 { - continue; - } - let batch_rows = batch.num_rows(); - let batch_size = batch.get_array_memory_size(); - let can_grow = active.reservation.try_grow(batch_size).is_ok(); - - if !can_grow && !active.pending_batches.is_empty() { - // Memory limit reached and we already have data. - // Push this batch into pending (it's already in memory) - // and stop buffering for this chunk. - active.pending_batches.push(batch); - self.left_exhausted = false; - self.left_buffered_in_one_pass = false; - break; - } else if !can_grow { - // No pending batches yet — we must accept this batch - // to make progress, even if it exceeds the budget. - active.reservation.grow(batch_size); - } + let fut = active + .chunk_fetch_in_flight + .as_mut() + .expect("chunk_fetch_in_flight installed above"); + let result = match fut.poll_unpin(cx) { + Poll::Ready(r) => r, + Poll::Pending => return ControlFlow::Break(Poll::Pending), + }; + active.chunk_fetch_in_flight = None; - self.metrics.join_metrics.build_mem_used.add(batch_size); - self.metrics.join_metrics.build_input_batches.add(1); - self.metrics.join_metrics.build_input_rows.add(batch_rows); - active.pending_batches.push(batch); - } - Poll::Ready(Some(Err(e))) => { - return ControlFlow::Break(Poll::Ready(Some(Err(e)))); + match result { + Err(e) => ControlFlow::Break(Poll::Ready(Some(Err(e)))), + Ok(None) => { + // No chunk to deliver: left side fully consumed. + self.left_exhausted = true; + if self.is_memory_limited() && self.should_track_unmatched_right { + self.right_data = None; + self.state = NLJState::EmitGlobalRightUnmatched; + } else { + self.state = NLJState::Done; } - Poll::Ready(None) => { - // Left stream exhausted - self.left_exhausted = true; - break; + ControlFlow::Continue(()) + } + Ok(Some((data, is_last))) => { + let n_rows = data.batch().num_rows(); + self.metrics.join_metrics.build_input_batches.add(1); + self.metrics.join_metrics.build_input_rows.add(n_rows); + if active.left_schema.is_none() { + active.left_schema = Some(data.batch().schema()); } - Poll::Pending => { - return ControlFlow::Break(Poll::Pending); + self.buffered_left_data = Some(data); + self.left_exhausted = is_last; + self.left_buffered_in_one_pass = is_last && active.next_chunk_index == 0; + + active.right_batch_index = 0; + match active.right_input.open_pass() { + Ok(stream) => { + self.right_data = Some(stream); + } + Err(e) => { + return ControlFlow::Break(Poll::Ready(Some(Err(e)))); + } } - } - } - - if active.pending_batches.is_empty() { - // No data at all — go directly to Done - self.left_exhausted = true; - self.state = NLJState::Done; - return ControlFlow::Continue(()); - } - let merged_batch = match concat_batches( - active - .left_schema - .as_ref() - .expect("left_schema must be set"), - &active.pending_batches, - ) { - Ok(batch) => batch, - Err(e) => { - return ControlFlow::Break(Poll::Ready(Some(Err(e.into())))); - } - }; - active.pending_batches.clear(); - - // Build visited bitmap if needed for this join type - let with_visited = need_produce_result_in_final(self.join_type); - let n_rows = merged_batch.num_rows(); - let visited_left_side = if with_visited { - let buffer_size = n_rows.div_ceil(8); - // Use infallible grow for bitmap — it's small - active.reservation.grow(buffer_size); - self.metrics.join_metrics.build_mem_used.add(buffer_size); - let mut buffer = BooleanBufferBuilder::new(n_rows); - buffer.append_n(n_rows, false); - buffer - } else { - BooleanBufferBuilder::new(0) - }; - - // Create an empty reservation for JoinLeftData's RAII field. - // The actual memory tracking is managed by the Active state's reservation. - let dummy_reservation = active.reservation.new_empty(); - - let left_data = JoinLeftData::new( - merged_batch, - Mutex::new(visited_left_side), - // In memory-limited mode, only 1 probe thread per chunk - AtomicUsize::new(1), - dummy_reservation, - ); - - self.buffered_left_data = Some(Arc::new(left_data)); - - active.right_batch_index = 0; - match active.right_input.open_pass() { - Ok(stream) => { - self.right_data = Some(stream); - } - Err(e) => { - return ControlFlow::Break(Poll::Ready(Some(Err(e)))); + self.state = NLJState::FetchingRight; + ControlFlow::Continue(()) } } - - self.state = NLJState::FetchingRight; - ControlFlow::Continue(()) } /// Handle FetchingRight state - fetch next right batch and prepare for processing. @@ -1836,13 +2174,27 @@ impl NestedLoopJoinStream { /// next chunk (if the left stream is not yet exhausted). fn handle_emit_left_unmatched( &mut self, + cx: &mut std::task::Context<'_>, ) -> ControlFlow>>> { // Return any completed batches first if let Some(poll) = self.maybe_flush_ready_batch() { return ControlFlow::Break(poll); } - // Process current unmatched state + // First, drive any pending chunk-release future to completion so + // we don't transition to the next state while another partition + // is waiting for the slot to be freed. + if let SpillState::Active(active) = &mut self.spill_state + && let Some(fut) = active.chunk_release_in_flight.as_mut() + { + match fut.poll_unpin(cx) { + Poll::Ready(()) => { + active.chunk_release_in_flight = None; + } + Poll::Pending => return ControlFlow::Break(Poll::Pending), + } + } + match self.process_left_unmatched() { // State unchanged (EmitLeftUnmatched) // Continue processing until we have processed all unmatched rows @@ -1858,13 +2210,35 @@ impl NestedLoopJoinStream { return ControlFlow::Break(poll); } - if !self.left_exhausted && self.is_memory_limited() { - // More left data to process — free current chunk and - // go back to BufferingLeft for the next chunk - if let SpillState::Active(ref active) = self.spill_state { - active.reservation.resize(0); + // Drop our reference to the current chunk's + // `JoinLeftData` before releasing the slot. Once the + // last partition does this, the `Arc` reaches zero + // refcount and the per-chunk reservation is freed. + self.buffered_left_data = None; + + if self.is_memory_limited() { + if let SpillState::Active(active) = &mut self.spill_state { + // The last partition for this chunk releases + // the coordinator slot so the next leader can + // load the following chunk. + if self.is_chunk_emitter { + let coordinator = Arc::clone(&active.coordinator); + let released_index = active.next_chunk_index; + active.chunk_release_in_flight = Some( + async move { + coordinator.release_chunk(released_index).await + } + .boxed(), + ); + } + active.next_chunk_index += 1; } - self.buffered_left_data = None; + // Reset emitter state for the next chunk. + self.is_chunk_emitter = false; + self.chunk_emitter_decided = false; + } + + if !self.left_exhausted && self.is_memory_limited() { self.left_probe_idx = 0; self.left_emit_idx = 0; self.state = NLJState::BufferingLeft; @@ -1874,9 +2248,7 @@ impl NestedLoopJoinStream { // All left chunks done — emit global right unmatched. // Drop the exhausted right stream so that // EmitGlobalRightUnmatched opens a fresh replay pass - // from the spill file. (process_left_unmatched_range - // already ran with right_data still set, so its - // schema access is not affected.) + // from the spill file. self.right_data = None; self.state = NLJState::EmitGlobalRightUnmatched; } else { @@ -2345,18 +2717,24 @@ impl NestedLoopJoinStream { /// true -> continue in the same EmitLeftUnmatched state /// false -> next state (Done) fn process_left_unmatched(&mut self) -> Result { - let left_data = self.get_left_data()?; - let left_batch = left_data.batch(); - - // ======== - // Check early return conditions - // ======== - // Early return if join type can't have unmatched rows let join_type_no_produce_left = !need_produce_result_in_final(self.join_type); - // Early return if another thread is already processing unmatched rows - let handled_by_other_partition = - self.left_emit_idx == 0 && !left_data.report_probe_completed(); + + // Decide emitter status exactly once per chunk: the partition + // whose `report_probe_completed` brings the counter to zero is + // the chunk emitter (and, in memory-limited mode, releases the + // chunk slot so the next leader can load the following chunk). + if !self.chunk_emitter_decided { + let left_data = self.get_left_data()?; + self.is_chunk_emitter = left_data.report_probe_completed(); + self.chunk_emitter_decided = true; + } + // Early return if another partition is the chunk emitter — we + // are not allowed to emit unmatched left rows. + let handled_by_other_partition = !self.is_chunk_emitter; + + let left_data = self.get_left_data()?; + let left_batch = left_data.batch(); // Stop processing unmatched rows, the caller will go to the next state let finished = self.left_emit_idx >= left_batch.num_rows(); @@ -3520,8 +3898,9 @@ pub(crate) mod tests { ); let filter = prepare_join_filter(); - // Join types that support memory-limited fallback should succeed - // even under tight memory limits (they spill to disk instead of OOM). + // All join types support memory-limited fallback under + // multi-partition right inputs (left visited state is shared + // across partitions via `FallbackCoordinator`). let fallback_join_types = vec![ JoinType::Inner, JoinType::Left, @@ -3532,6 +3911,7 @@ pub(crate) mod tests { JoinType::RightSemi, JoinType::RightAnti, JoinType::RightMark, + JoinType::Full, ]; for join_type in &fallback_join_types { @@ -3552,25 +3932,6 @@ pub(crate) mod tests { .await?; } - // FULL JOIN with multiple right partitions is intentionally not - // supported in the fallback path yet (cross-partition left-bitmap - // coordination is missing). It should still OOM under tight memory. - let runtime = RuntimeEnvBuilder::new() - .with_memory_limit(100, 1.0) - .build_arc()?; - let task_ctx = TaskContext::default().with_runtime(runtime); - let task_ctx = Arc::new(task_ctx); - let err = multi_partitioned_join_collect( - Arc::clone(&left), - Arc::clone(&right), - &JoinType::Full, - Some(filter.clone()), - task_ctx, - ) - .await - .unwrap_err(); - assert_contains!(err.to_string(), "Resources exhausted"); - Ok(()) } @@ -3935,4 +4296,249 @@ pub(crate) mod tests { ")); Ok(()) } + + // ======================================================================== + // Multi-partition memory-limited correctness tests + // + // These tests reproduce the cross-partition coordination bug in the + // memory-limited fallback path: each output partition independently + // constructs a per-chunk `JoinLeftData` with `AtomicUsize::new(1)`, + // so left-side visited state is not shared across right partitions. + // For join types that emit unmatched left rows in the final output + // (LEFT, LEFT SEMI, LEFT ANTI, LEFT MARK, FULL), this leads to a + // left row being emitted as unmatched by partitions whose right + // input did not match it — even when another partition did match. + // ======================================================================== + + /// Build the right table as one batch per row, so RepartitionExec can + /// distribute rows across multiple output partitions. + fn build_right_table_one_batch_per_row() -> Arc { + build_table( + ("a2", &vec![12, 2, 10]), + ("b2", &vec![10, 2, 10]), + ("c2", &vec![40, 80, 100]), + Some(1), + Vec::new(), + ) + } + + /// Run a NLJ across 4 right partitions under a tight memory limit, so + /// every output partition takes the memory-limited fallback path. The + /// right side is shuffled via `RepartitionExec(RoundRobinBatch(4))`. + async fn multi_partition_memory_limited_join_collect( + left: Arc, + right: Arc, + join_type: &JoinType, + join_filter: Option, + context: Arc, + ) -> Result<(Vec, Vec, MetricsSet)> { + let partition_count = 4; + let right = Arc::new(RepartitionExec::try_new( + right, + Partitioning::RoundRobinBatch(partition_count), + )?) as Arc; + + let nested_loop_join = + NestedLoopJoinExec::try_new(left, right, join_filter, join_type, None)?; + let columns = columns(&nested_loop_join.schema()); + + let mut batches = vec![]; + for i in 0..partition_count { + let stream = nested_loop_join.execute(i, Arc::clone(&context))?; + let more = common::collect(stream).await?; + batches.extend(more.into_iter().filter(|b| b.num_rows() > 0)); + } + + let metrics = nested_loop_join.metrics().unwrap(); + Ok((columns, batches, metrics)) + } + + #[tokio::test] + async fn test_nlj_memory_limited_multi_partition_left_join() -> Result<()> { + let task_ctx = task_ctx_with_memory_limit(50, 16)?; + let left = build_left_table(); + let right = build_right_table_one_batch_per_row(); + let filter = prepare_join_filter(); + + let (columns, batches, metrics) = multi_partition_memory_limited_join_collect( + left, + right, + &JoinType::Left, + Some(filter), + task_ctx, + ) + .await?; + + assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]); + assert!( + metrics.spill_count().unwrap_or(0) > 0, + "Expected spilling under tight memory limit" + ); + + // Expected output is identical to the single-partition spill path + // and the multi-partition non-spill path. Each left row appears + // exactly once: the matched (5,5,50)+(2,2,80) row, plus the two + // left rows filtered out by `b1 != 8` as unmatched. + allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+----+-----+----+----+----+ + | a1 | b1 | c1 | a2 | b2 | c2 | + +----+----+-----+----+----+----+ + | 11 | 8 | 110 | | | | + | 5 | 5 | 50 | 2 | 2 | 80 | + | 9 | 8 | 90 | | | | + +----+----+-----+----+----+----+ + ")); + Ok(()) + } + + #[tokio::test] + async fn test_nlj_memory_limited_multi_partition_full_join() -> Result<()> { + let task_ctx = task_ctx_with_memory_limit(50, 16)?; + let left = build_left_table(); + let right = build_right_table_one_batch_per_row(); + let filter = prepare_join_filter(); + + let (columns, batches, metrics) = multi_partition_memory_limited_join_collect( + left, + right, + &JoinType::Full, + Some(filter), + task_ctx, + ) + .await?; + + assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]); + assert!( + metrics.spill_count().unwrap_or(0) > 0, + "Expected spilling under tight memory limit" + ); + + // Expected: 1 matched + 2 left-unmatched + 2 right-unmatched. + allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+----+-----+----+----+-----+ + | a1 | b1 | c1 | a2 | b2 | c2 | + +----+----+-----+----+----+-----+ + | | | | 10 | 10 | 100 | + | | | | 12 | 10 | 40 | + | 11 | 8 | 110 | | | | + | 5 | 5 | 50 | 2 | 2 | 80 | + | 9 | 8 | 90 | | | | + +----+----+-----+----+----+-----+ + ")); + Ok(()) + } + + #[tokio::test] + async fn test_nlj_memory_limited_multi_partition_left_semi_join() -> Result<()> { + let task_ctx = task_ctx_with_memory_limit(50, 16)?; + let left = build_left_table(); + // Two right rows that both match the same left row (5,5,50). Without + // shared left-visited state across partitions, each matching partition + // emits the left row once, producing duplicates. + let right = build_table( + ("a2", &vec![2, 3, 10]), + ("b2", &vec![2, 2, 10]), + ("c2", &vec![80, 70, 100]), + Some(1), + Vec::new(), + ); + let filter = prepare_join_filter(); + + let (columns, batches, metrics) = multi_partition_memory_limited_join_collect( + left, + right, + &JoinType::LeftSemi, + Some(filter), + task_ctx, + ) + .await?; + + assert_eq!(columns, vec!["a1", "b1", "c1"]); + assert!( + metrics.spill_count().unwrap_or(0) > 0, + "Expected spilling under tight memory limit" + ); + + // Left semi: each left row appears at most once, even if it matches + // multiple right rows distributed across partitions. + allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+----+----+ + | a1 | b1 | c1 | + +----+----+----+ + | 5 | 5 | 50 | + +----+----+----+ + ")); + Ok(()) + } + + #[tokio::test] + async fn test_nlj_memory_limited_multi_partition_left_anti_join() -> Result<()> { + let task_ctx = task_ctx_with_memory_limit(50, 16)?; + let left = build_left_table(); + let right = build_right_table_one_batch_per_row(); + let filter = prepare_join_filter(); + + let (columns, batches, metrics) = multi_partition_memory_limited_join_collect( + left, + right, + &JoinType::LeftAnti, + Some(filter), + task_ctx, + ) + .await?; + + assert_eq!(columns, vec!["a1", "b1", "c1"]); + assert!( + metrics.spill_count().unwrap_or(0) > 0, + "Expected spilling under tight memory limit" + ); + + // Left anti: only left rows with no matching right row. + // (5,5,50) matches (2,2,80) under the filter, so it must NOT appear. + allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+----+-----+ + | a1 | b1 | c1 | + +----+----+-----+ + | 11 | 8 | 110 | + | 9 | 8 | 90 | + +----+----+-----+ + ")); + Ok(()) + } + + #[tokio::test] + async fn test_nlj_memory_limited_multi_partition_left_mark_join() -> Result<()> { + let task_ctx = task_ctx_with_memory_limit(50, 16)?; + let left = build_left_table(); + let right = build_right_table_one_batch_per_row(); + let filter = prepare_join_filter(); + + let (columns, batches, metrics) = multi_partition_memory_limited_join_collect( + left, + right, + &JoinType::LeftMark, + Some(filter), + task_ctx, + ) + .await?; + + assert_eq!(columns, vec!["a1", "b1", "c1", "mark"]); + assert!( + metrics.spill_count().unwrap_or(0) > 0, + "Expected spilling under tight memory limit" + ); + + // Left mark: every left row appears exactly once with a bool + // indicating whether it matched at least one right row. + allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+----+-----+-------+ + | a1 | b1 | c1 | mark | + +----+----+-----+-------+ + | 11 | 8 | 110 | false | + | 5 | 5 | 50 | true | + | 9 | 8 | 90 | false | + +----+----+-----+-------+ + ")); + Ok(()) + } } diff --git a/datafusion/sqllogictest/test_files/nested_loop_join_spill.slt b/datafusion/sqllogictest/test_files/nested_loop_join_spill.slt index b47fc5ac877c1..6eaf2b38a5bcf 100644 --- a/datafusion/sqllogictest/test_files/nested_loop_join_spill.slt +++ b/datafusion/sqllogictest/test_files/nested_loop_join_spill.slt @@ -128,12 +128,120 @@ FULL JOIN generate_series(1, 200) AS t2(v2) ---- 100199 199 99999 -# Restore settings to slt runner defaults -statement ok -RESET datafusion.runtime.memory_limit +# ============================================================================= +# Multi-partition memory-limited correctness tests +# +# These exercise the cross-partition shared-state path of NLJ's spill +# fallback: every right partition must observe the same per-chunk +# `JoinLeftData` (visited bitmap + probe-thread counter) so that join +# types which emit unmatched left rows (LEFT, LEFT SEMI, LEFT ANTI, +# LEFT MARK, FULL) produce the same results as the single-pass and +# single-partition paths. Without the coordinator, each partition would +# independently emit unmatched left rows from its own bitmap, producing +# duplicates. +# ============================================================================= statement ok SET datafusion.execution.target_partitions = 4 +# Memory budget tight enough to force NLJ left-side OOM and trigger the +# memory-limited fallback. Shared between all queries in this section. +statement ok +SET datafusion.runtime.memory_limit = '50K' + +# --- LEFT JOIN --- +# v1 in [1,5000], v2 in [1,100] with predicate (v1+v2)=101 AND v2<=50. +# Matches: v2 in [1..50] each pairs with v1 = 101 - v2 in [51..100] → 50 pairs +# Left only: v1 in [1..5000] \ [51..100] → 4950 unmatched +# Total LEFT JOIN output rows = 50 + 4950 = 5000 (each left row exactly once). +query I nosort +SELECT count(*) as cnt +FROM generate_series(1, 5000) AS t1(v1) +LEFT JOIN generate_series(1, 100) AS t2(v2) + ON (t1.v1 + t2.v2) = 101 AND t2.v2 <= 50 +---- +5000 + +# Verify the memory-limited fallback was actually taken (spill_count > 0) +# under target_partitions=4. The fallback shares per-chunk `JoinLeftData` +# across right partitions via `FallbackCoordinator`; without that path +# the test would simply OOM rather than spill. +query TT +EXPLAIN ANALYZE +SELECT count(*) as cnt +FROM generate_series(1, 5000) AS t1(v1) +LEFT JOIN generate_series(1, 100) AS t2(v2) + ON (t1.v1 + t2.v2) = 101 AND t2.v2 <= 50 +---- +Plan with Metrics +01)ProjectionExec: expr=[count(Int64(1))@0 as cnt], metrics=[] +02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))], metrics=[] +03)----CoalescePartitionsExec, metrics=[] +04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))], metrics=[] +05)--------NestedLoopJoinExec: join_type=Left, filter=v1@0 + v2@1 = 101, projection=[], metrics=[output_rows=5.00 K, spill_count=2, ] +06)----------ProjectionExec: expr=[value@0 as v1], metrics=[] +07)------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=5000, batch_size=8192], metrics=[] +08)----------ProjectionExec: expr=[value@0 as v2], metrics=[] +09)------------FilterExec: value@0 <= 50, metrics=[] +10)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, metrics=[] +11)----------------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100, batch_size=8192], metrics=[] + +# Same predicate, also asserts the unmatched-left count is correct +# (would be 4950 * N where N is the partition count without coordination). +query II nosort +SELECT count(*) as cnt, + sum(case when t2.v2 IS NULL then 1 else 0 end) as unmatched_left +FROM generate_series(1, 5000) AS t1(v1) +LEFT JOIN generate_series(1, 100) AS t2(v2) + ON (t1.v1 + t2.v2) = 101 AND t2.v2 <= 50 +---- +5000 4950 + +# --- FULL JOIN --- +# Same predicate. Output: +# - 50 matched pairs +# - 4950 unmatched left rows (NULL-padded right) +# - 50 unmatched right rows (v2 in [51..100], rejected by `v2 <= 50`) +# Total = 5050. +query III nosort +SELECT count(*) as cnt, + sum(case when t2.v2 IS NULL then 1 else 0 end) as unmatched_left, + sum(case when t1.v1 IS NULL then 1 else 0 end) as unmatched_right +FROM generate_series(1, 5000) AS t1(v1) +FULL JOIN generate_series(1, 100) AS t2(v2) + ON (t1.v1 + t2.v2) = 101 AND t2.v2 <= 50 +---- +5050 4950 50 + +# --- LEFT SEMI JOIN --- +# Each left row appears at most once, even though multiple right rows +# (potentially across multiple right partitions) could match it. +# The 50 left rows in [51..100] each match exactly one right row. +query I nosort +SELECT count(*) as cnt +FROM generate_series(1, 5000) AS t1(v1) +WHERE EXISTS ( + SELECT 1 FROM generate_series(1, 100) AS t2(v2) + WHERE (t1.v1 + t2.v2) = 101 AND t2.v2 <= 50 +) +---- +50 + +# --- LEFT ANTI JOIN --- +# Left rows with NO matching right row: 5000 - 50 = 4950. +query I nosort +SELECT count(*) as cnt +FROM generate_series(1, 5000) AS t1(v1) +WHERE NOT EXISTS ( + SELECT 1 FROM generate_series(1, 100) AS t2(v2) + WHERE (t1.v1 + t2.v2) = 101 AND t2.v2 <= 50 +) +---- +4950 + +# Restore settings to slt runner defaults +statement ok +RESET datafusion.runtime.memory_limit + statement ok RESET datafusion.catalog.create_default_catalog_and_schema