Skip to content

Commit 7e441eb

Browse files
committed
Add repartition guard
1 parent bca67ca commit 7e441eb

1 file changed

Lines changed: 22 additions & 0 deletions

File tree

  • datafusion/physical-plan/src/joins/grace_hash_join

datafusion/physical-plan/src/joins/grace_hash_join/stream.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,7 @@ pub struct GraceHashJoinStream {
459459
adaptive_budget: AdaptivePartitionBudget,
460460
max_partition_passes: usize,
461461
compute_soft_cap_bytes: usize,
462+
repartition_enabled: bool,
462463
partition_stats: PartitionStatsSummary,
463464
state: GraceJoinState,
464465
}
@@ -551,6 +552,7 @@ impl GraceHashJoinStream {
551552
adaptive_budget,
552553
max_partition_passes,
553554
compute_soft_cap_bytes,
555+
repartition_enabled: true,
554556
partition_stats: PartitionStatsSummary::default(),
555557
state: GraceJoinState::WaitPartitioning,
556558
}
@@ -598,6 +600,17 @@ impl GraceHashJoinStream {
598600
self.partition_stats = stats.clone();
599601
self.adaptive_budget.prime_with_stats(&stats);
600602
self.adaptive_budget.update_active_partitions(1);
603+
// If every partition already fits in the current budget, disable further repartitioning.
604+
if let Some(max_bytes) = stats.max_partition_bytes() {
605+
if max_bytes <= self.adaptive_budget.current_limit() {
606+
self.repartition_enabled = false;
607+
debug!(
608+
"Grace hash join repartition disabled: max partition {} fits current limit {}",
609+
human_readable_size(max_bytes),
610+
human_readable_size(self.adaptive_budget.current_limit())
611+
);
612+
}
613+
}
601614
let left_bytes = Arc::new(Mutex::new(0usize));
602615
let right_bytes = Arc::new(Mutex::new(0usize));
603616
self.state = GraceJoinState::JoinPartition {
@@ -874,6 +887,15 @@ impl GraceHashJoinStream {
874887
need_repartition = true;
875888
}
876889

890+
if need_repartition && !self.repartition_enabled {
891+
debug!(
892+
"Grace hash join repartition suppressed: current limit {} covers loaded {}",
893+
human_readable_size(self.adaptive_budget.current_limit()),
894+
human_readable_size(total_loaded_bytes)
895+
);
896+
need_repartition = false;
897+
}
898+
877899
if need_repartition {
878900
// Do not split already-small partitions; instead keep them as-is.
879901
if work.total_bytes() <= MIN_REPARTITION_BYTES {

0 commit comments

Comments
 (0)