Skip to content

[AURON #1840] Preserve collect_set first-occurrence order#2285

Open
peter941221 wants to merge 5 commits into
apache:masterfrom
peter941221:fix/auron-1840-collect-set-order
Open

[AURON #1840] Preserve collect_set first-occurrence order#2285
peter941221 wants to merge 5 commits into
apache:masterfrom
peter941221:fix/auron-1840-collect-set-order

Conversation

@peter941221

Copy link
Copy Markdown
Contributor

What changed

AccSet::merge no longer swaps accumulators by set size. That swap changed the encounter order of collect_set, so no-shuffle Spark checks could see values in rhs-first order instead of first-occurrence order.

Why

Spark's collect_set preserves first-occurrence order in the no-shuffle path used by the affected aggregate suite.

Testing

  • git diff --check
  • cargo +nightly test --manifest-path native-engine/datafusion-ext-plans/Cargo.toml test_acc_set_merge -- --nocapture (blocked here by rdkafka-sys Windows build failure: %1 is not a valid Win32 application. (os error 193))

@peter941221 peter941221 marked this pull request as ready for review May 29, 2026 10:17
@peter941221 peter941221 force-pushed the fix/auron-1840-collect-set-order branch from 46ef225 to f092709 Compare June 1, 2026 22:43
@peter941221

Copy link
Copy Markdown
Contributor Author

Rebased this onto current master and kept the fix minimal. The change still just removes the size-based swap in AccSet::merge and adds order assertions for the merge cases. New CI is running on f092709.

@richox

richox commented Jun 3, 2026

Copy link
Copy Markdown
Contributor

do we really need to keep this order? if i dont understand wrong, this order we be discarded in bucket-merge spill strategy even if we maintain it in updating

@peter941221

Copy link
Copy Markdown
Contributor Author

@richox thanks for calling out the spill path. I checked that path and added a focused regression for it.

The spill bucket merge still comes back through the same collect_set merge path this PR changes:
AggTable::output bucket merge in agg_table.rs
-> partial_merge
-> merge_items in collect.rs
-> AccSet::merge in collect.rs

I also checked the spill round-trip itself. AccSetColumn::save_raw writes list.raw as-is, and unspill rebuilds it in order through load_raw / append_item, so spill does not discard the internal encounter order by itself.

To make that concrete, I pushed 645b3236, which adds test_acc_set_merge_preserves_first_occurrence_order_after_rhs_spill. It spills the larger RHS accumulator, unspills it, merges it into the LHS, and still gets [1, 2, 3] on the repo's pinned nightly-2025-05-09.

So I think we still need this fix for the spill path too.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR aims to match Spark’s collect_set no-shuffle behavior by preserving first-occurrence ordering during accumulator merges, avoiding RHS-first ordering caused by size-based swapping.

Changes:

  • Removed AccSet::merge’s size-based accumulator swap to preserve encounter order.
  • Added regression tests to validate first-occurrence order, including a spill/unspill merge path.
  • Added a source-compatible overload for NativeHelper.getDefaultNativeMetrics to support an in-progress keyed migration.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.

File Description
spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala Adds a compatibility overload delegating to the keyed metrics API.
native-engine/datafusion-ext-plans/src/agg/collect.rs Adjusts AccSet::merge behavior and expands tests to cover ordering expectations.
Comments suppressed due to low confidence (1)

native-engine/datafusion-ext-plans/src/agg/collect.rs:567

  • AccSet::merge drains other.set but leaves other.list.raw intact. This breaks AccSet invariants (list contains values not reflected in the set), can cause merged-away values to be serialized/spilled later via AccSetColumn::save_raw, and makes AccSetColumn::mem_used accounting incorrect because merge_items subtracts other_value_mem_size even though the underlying list memory is still retained. Also, for InternalSet::Huge, iterating the hash table does not preserve first-occurrence order; sorting by the stored position offset restores encounter order for large sets.
    pub fn merge(&mut self, other: &mut Self) {
        for pos_len in std::mem::take(&mut other.set).into_iter() {
            self.append_raw(other.list.ref_raw(pos_len));
        }
    }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants