Skip to content

Add MultiDistinctToCrossJoin optimizer rule for parallel distinct aggregates#20940

Open
Dandandan wants to merge 10 commits intoapache:mainfrom
Dandandan:multi-distinct-to-union
Open

Add MultiDistinctToCrossJoin optimizer rule for parallel distinct aggregates#20940
Dandandan wants to merge 10 commits intoapache:mainfrom
Dandandan:multi-distinct-to-union

Conversation

@Dandandan
Copy link
Copy Markdown
Contributor

@Dandandan Dandandan commented Mar 14, 2026

Which issue does this PR close?

Rationale for this change

Speedup / use less memory for multipleCOUNT(distinct)s.

│ QQuery 0 │        807.05 / 817.89 ±8.20 / 832.09 ms │       531.43 / 546.87 ±11.66 / 566.48 ms │ +1.50x faster │
│ QQuery 1 │        227.70 / 228.71 ±0.52 / 229.11 ms │        206.07 / 208.48 ±1.88 / 211.83 ms │ +1.10x faster │

What changes are included in this PR?

Add the MultiDistinctToCrossJoin with some tests, update plans.

Are these changes tested?

New and existing.

Are there any user-facing changes?

Note: I used Claude to generate the code, I reviewed the code myself.

@github-actions github-actions Bot added optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) labels Mar 14, 2026
@Dandandan
Copy link
Copy Markdown
Contributor Author

run benchmark clickbench_extended

@adriangbot
Copy link
Copy Markdown

🤖 Benchmark running (GKE) | trigger
Linux bench-c4059947897-230-zrd9d 6.12.55+ #1 SMP Sun Feb 1 08:59:41 UTC 2026 aarch64 GNU/Linux
Comparing multi-distinct-to-union (f03018e) to 9c3c01a (merge-base) diff using: clickbench_extended
Results will be posted here when complete

@Dandandan Dandandan changed the title feat: Add MultiDistinctToUnion optimizer rule for parallel distinct a… dd MultiDistinctToUnion optimizer rule for parallel distinct aggregates Mar 14, 2026
@Dandandan Dandandan changed the title dd MultiDistinctToUnion optimizer rule for parallel distinct aggregates Add MultiDistinctToUnion optimizer rule for parallel distinct aggregates Mar 14, 2026
@adriangbot
Copy link
Copy Markdown

🤖 Benchmark completed (GKE) | trigger

Details

Comparing HEAD and multi-distinct-to-union
--------------------
Benchmark clickbench_extended.json
--------------------
┏━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query    ┃                                    HEAD ┃                   multi-distinct-to-union ┃        Change ┃
┡━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0 │      868.72 / 883.32 ±15.10 / 910.39 ms │        529.35 / 558.48 ±21.42 / 592.18 ms │ +1.58x faster │
│ QQuery 1 │       228.13 / 229.39 ±1.30 / 231.56 ms │         204.65 / 208.60 ±2.30 / 210.71 ms │ +1.10x faster │
│ QQuery 2 │       534.63 / 535.33 ±0.48 / 536.14 ms │         529.52 / 533.14 ±3.00 / 536.94 ms │     no change │
│ QQuery 3 │       332.54 / 333.57 ±0.78 / 334.61 ms │         335.89 / 338.00 ±2.63 / 343.17 ms │     no change │
│ QQuery 4 │      671.13 / 691.89 ±12.82 / 703.04 ms │        690.63 / 710.50 ±14.61 / 727.56 ms │     no change │
│ QQuery 5 │ 9666.38 / 9914.13 ±142.81 / 10111.98 ms │ 10029.90 / 10119.15 ±135.48 / 10387.78 ms │     no change │
│ QQuery 6 │   1003.30 / 1010.08 ±10.25 / 1030.40 ms │      994.45 / 1010.25 ±12.36 / 1031.13 ms │     no change │
│ QQuery 7 │      791.30 / 814.39 ±14.18 / 831.49 ms │       827.78 / 880.70 ±76.41 / 1030.16 ms │  1.08x slower │
└──────────┴─────────────────────────────────────────┴───────────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                      ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)                      │ 14412.11ms │
│ Total Time (multi-distinct-to-union)   │ 14358.81ms │
│ Average Time (HEAD)                    │  1801.51ms │
│ Average Time (multi-distinct-to-union) │  1794.85ms │
│ Queries Faster                         │          2 │
│ Queries Slower                         │          1 │
│ Queries with No Change                 │          5 │
│ Queries with Failure                   │          0 │
└────────────────────────────────────────┴────────────┘

Resource Usage

clickbench_extended — base (merge-base)

Metric Value
Wall time 72.7s
Peak memory 33.2 GiB
Avg memory 28.3 GiB
CPU user 719.6s
CPU sys 24.1s
Disk read 0 B
Disk write 22.5 MiB

clickbench_extended — branch

Metric Value
Wall time 72.5s
Peak memory 34.2 GiB
Avg memory 27.5 GiB
CPU user 726.3s
CPU sys 29.0s
Disk read 0 B
Disk write 708.0 KiB

@Dandandan
Copy link
Copy Markdown
Contributor Author

run benchmark clickbench_extended

@adriangbot
Copy link
Copy Markdown

🤖 Benchmark running (GKE) | trigger
Linux bench-c4060020382-231-2jwlp 6.12.55+ #1 SMP Sun Feb 1 08:59:41 UTC 2026 aarch64 GNU/Linux
Comparing multi-distinct-to-union (4f23297) to 9c3c01a (merge-base) diff using: clickbench_extended
Results will be posted here when complete

@adriangbot
Copy link
Copy Markdown

🤖 Benchmark completed (GKE) | trigger

Details

Comparing HEAD and multi-distinct-to-union
--------------------
Benchmark clickbench_extended.json
--------------------
┏━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query    ┃                                     HEAD ┃                  multi-distinct-to-union ┃        Change ┃
┡━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0 │        807.05 / 817.89 ±8.20 / 832.09 ms │       531.43 / 546.87 ±11.66 / 566.48 ms │ +1.50x faster │
│ QQuery 1 │        227.70 / 228.71 ±0.52 / 229.11 ms │        206.07 / 208.48 ±1.88 / 211.83 ms │ +1.10x faster │
│ QQuery 2 │        535.00 / 538.15 ±2.28 / 541.01 ms │        534.82 / 537.43 ±1.48 / 539.15 ms │     no change │
│ QQuery 3 │        332.84 / 335.91 ±2.09 / 337.99 ms │        333.69 / 336.70 ±3.05 / 342.52 ms │     no change │
│ QQuery 4 │        669.30 / 678.09 ±6.68 / 687.54 ms │        688.11 / 696.03 ±8.92 / 712.84 ms │     no change │
│ QQuery 5 │ 9883.21 / 10062.74 ±202.87 / 10455.76 ms │ 9630.03 / 10016.96 ±200.87 / 10182.07 ms │     no change │
│ QQuery 6 │     989.16 / 1012.00 ±11.69 / 1019.74 ms │      978.54 / 999.13 ±13.15 / 1013.69 ms │     no change │
│ QQuery 7 │       790.67 / 818.91 ±28.07 / 853.39 ms │       796.78 / 818.32 ±17.57 / 847.16 ms │     no change │
└──────────┴──────────────────────────────────────────┴──────────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                      ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)                      │ 14492.40ms │
│ Total Time (multi-distinct-to-union)   │ 14159.92ms │
│ Average Time (HEAD)                    │  1811.55ms │
│ Average Time (multi-distinct-to-union) │  1769.99ms │
│ Queries Faster                         │          2 │
│ Queries Slower                         │          0 │
│ Queries with No Change                 │          6 │
│ Queries with Failure                   │          0 │
└────────────────────────────────────────┴────────────┘

Resource Usage

clickbench_extended — base (merge-base)

Metric Value
Wall time 73.1s
Peak memory 32.5 GiB
Avg memory 28.3 GiB
CPU user 724.5s
CPU sys 23.0s
Disk read 0 B
Disk write 21.8 MiB

clickbench_extended — branch

Metric Value
Wall time 71.5s
Peak memory 34.5 GiB
Avg memory 29.3 GiB
CPU user 722.7s
CPU sys 25.0s
Disk read 0 B
Disk write 76.0 KiB

@Dandandan Dandandan marked this pull request as ready for review March 14, 2026 09:53
@Dandandan Dandandan changed the title Add MultiDistinctToUnion optimizer rule for parallel distinct aggregates Add MultiDistinctToCrossJoin optimizer rule for parallel distinct aggregates Mar 14, 2026
@Dandandan Dandandan added the performance Make DataFusion faster label Mar 18, 2026
@github-actions github-actions Bot added the documentation Improvements or additions to documentation label Mar 21, 2026
/// SELECT count(DISTINCT a), count(DISTINCT b)
/// FROM (SELECT COUNT(DISTINCT a) FROM t)
/// CROSS JOIN (SELECT COUNT(DISTINCT b) FROM t)
/// ```
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you trying to express rewriting it like this?

SELECT s.cnt_a, t.cnt_b
FROM (SELECT COUNT(DISTINCT a) AS cnt_a FROM t) s
CROSS JOIN (SELECT COUNT(DISTINCT b) AS cnt_b FROM t) t


/// Returns true if all aggregate expressions are distinct aggregates on
/// different fields, with no filters or order_by, and no GROUP BY.
fn is_multi_distinct_agg(group_expr: &[Expr], aggr_expr: &[Expr]) -> bool {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we support rewriting
"SELECT COUNT(DISTINCT a), SUM(DISTINCT a) FROM t"? Or are there other rules that prioritize aggregating
"a" first and then calculating the count and sum separately? I don't know much about DataFusion's optimization logic; I just thought of this point.

Dandandan and others added 10 commits March 21, 2026 12:53
…ggregates

Rewrites queries with multiple DISTINCT aggregates on different columns
(e.g. SELECT COUNT(DISTINCT a), COUNT(DISTINCT b) FROM t) into a UNION ALL
of individual aggregates with an outer MAX to combine results. This enables
parallel execution of each distinct aggregate via InterleaveExec, and each
branch also benefits from SingleDistinctToGroupBy optimization.

Also adds function_registry support to OptimizerContext and removes the
redundant clickbench_extended.slt (already covered by clickbench.slt).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… method

Use the existing OptimizerConfig::function_registry() trait method (which
SessionState provides in production) instead of adding registry support to
OptimizerContext. Tests use a simple TestConfig wrapper. Also remove
redundant tests and clean up imports.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
CrossJoinExec runs each branch sequentially, so only one hash table is
live at a time. This is better for memory locality on high-cardinality
distinct aggregates vs UNION ALL which runs branches concurrently.

Also removes the function registry dependency (no MAX aggregate needed),
simplifying the implementation significantly.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Replace double-match (ref guard + unreachable move) with single
  destructure-by-move and early return via try_new_with_schema
- Use into_iter() instead of iter() to move Exprs into sub-aggregates
  instead of cloning them

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Use assert_optimized_plan_eq_display_indent_snapshot directly.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@Dandandan Dandandan force-pushed the multi-distinct-to-union branch from f1deeaa to 6458291 Compare March 21, 2026 11:57
@github-actions github-actions Bot removed the documentation Improvements or additions to documentation label Mar 21, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

optimizer Optimizer rules performance Make DataFusion faster sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add MultiDistinctToCrossJoin optimizer rule for parallel distinct aggregates

3 participants