Skip to content

fix(core): make appendcol row ordering deterministic on parallel engines#5474

Open
ahkcs wants to merge 1 commit into
opensearch-project:mainfrom
ahkcs:fix/appendcol-deterministic-ordering
Open

fix(core): make appendcol row ordering deterministic on parallel engines#5474
ahkcs wants to merge 1 commit into
opensearch-project:mainfrom
ahkcs:fix/appendcol-deterministic-ordering

Conversation

@ahkcs
Copy link
Copy Markdown
Collaborator

@ahkcs ahkcs commented May 27, 2026

Description

appendcol zips a subsearch's columns onto the main search's rows by position. Its lowering (CalciteRelNodeVisitor.visitAppendCol) implements this as a FULL JOIN of two ROW_NUMBER() OVER () windows (empty PARTITION BY / ORDER BY) on _row_number_main_ = _row_number_subsearch_, with no trailing sort.

That positional zip is only correct on a serial, order-preserving executor: a bare ROW_NUMBER() OVER () assigns sequence numbers in input order, and the join preserves it. On a parallel/distributed backend the row-number assignment is arbitrary and the hash join drops ordering, so columns get zipped onto the wrong rows and a downstream head slices a non-deterministic subset.

This is currently masked on the serial v2/Calcite engine, but it is a latent correctness bug for any parallel backend (the analytics engine, and the Spark pushdown path — the verifyPPLToSparkSQL golden output bakes in the same non-deterministic ROW_NUMBER() OVER ()).

Root cause (observed)

Running the query below through a parallel backend returned rows out of sort order, with cnt attached to the wrong rows and M rows leaking into the top 10:

source=<idx> | stats sum(age) as sum by gender, state | sort gender, state
  | appendcol [ stats count(age) as cnt by gender | sort gender ]
  | fields gender, state, sum, cnt | head 10

A baseline ... | sort gender, state | head 10 (no appendcol) returned correctly ordered rows on the same backend, isolating the cause to the row-number join.

Fix

Make visitAppendCol independent of implicit input-order preservation:

  1. Deterministic assignment — derive an explicit window ORDER BY from each child's collation (deriveCollationOrderKeys), so ROW_NUMBER follows the upstream sort. Falls back to the prior bare OVER () when the input carries no collation (positional correspondence is undefined without a sort).
  2. Deterministic output order — add a trailing sort by the row-number columns after the join (NULLS LAST; extra subsearch-only rows sort last), the same pattern streamstats already uses, so output order no longer depends on how the backend executes the join.

No behavior change on the serial v2/Calcite engine; the lowering becomes correct on parallel backends.

Results

CalcitePPLAppendcolIT run against the analytics-engine route (force-routed, parquet-backed indices) before/after, and on the v2/Calcite path:

Test analytics route (before) analytics route (after) v2/Calcite
testAppendCol
testAppendColOverride
Total 0/2 2/2 2/2

Testing

  • CalcitePPLAppendcolTest (5 unit tests) — updated expected logical plans + Spark SQL; all pass.
  • CalcitePPLAppendcolIT — 2/2 on the analytics-engine route and 2/2 on v2/Calcite.
  • NewAddedCommandsIT.testAppendcol — passes.
  • spotlessCheck clean on :core and :ppl.

Check List

  • New functionality includes testing.
  • New functionality has been documented (n/a — behavior-preserving fix).
  • Commits are signed per the DCO using --signoff.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

appendcol lowers to a FULL JOIN of two ROW_NUMBER() OVER () windows
(empty PARTITION BY / ORDER BY) on _row_number_main_ = _row_number_subsearch_,
with no trailing sort. That positional zip is only correct on a serial,
order-preserving executor: a bare ROW_NUMBER() OVER () assigns sequence
numbers in input order and the join preserves it. On a parallel/distributed
backend the row-number assignment is arbitrary and the hash join drops
ordering, so columns get zipped onto the wrong rows and downstream `head`
slices a non-deterministic subset.

Fix visitAppendCol to not depend on implicit input-order preservation:
- derive an explicit window ORDER BY from each child's collation
  (deriveCollationOrderKeys), so ROW_NUMBER assignment follows the upstream
  sort; falls back to the prior bare OVER () when the input has no collation
  (positional correspondence is undefined without a sort).
- add a trailing sort by the row-number columns after the join (NULLS LAST,
  same pattern as streamstats) so output order is deterministic regardless of
  how the backend executes the join.

No behavior change on the serial v2/Calcite engine; makes the lowering correct
on parallel backends. Updates CalcitePPLAppendcolTest expected plans/SparkSQL.

Signed-off-by: Kai Huang <ahkcs@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

When the input has no collation (empty list returned by deriveCollationOrderKeys), the ROW_NUMBER() window is created with an empty ORDER BY clause. On parallel backends this still assigns row numbers arbitrarily, defeating the fix's purpose. The code should either fall back to a deterministic ordering (e.g., all fields) or document that appendcol without an upstream sort remains non-deterministic on parallel engines.

private static List<RexNode> deriveCollationOrderKeys(CalcitePlanContext context) {
  RelBuilder relBuilder = context.relBuilder;
  List<RelCollation> collations =
      relBuilder.getCluster().getMetadataQuery().collations(relBuilder.peek());
  if (collations == null || collations.isEmpty()) {
    return List.of();
  }
  List<RexNode> orderKeys = new ArrayList<>();
  for (RelFieldCollation fieldCollation : collations.get(0).getFieldCollations()) {
    RexNode key = relBuilder.field(fieldCollation.getFieldIndex());
    if (fieldCollation.direction.isDescending()) {
      key = relBuilder.desc(key);
    }
    if (fieldCollation.nullDirection == RelFieldCollation.NullDirection.LAST) {
      key = relBuilder.nullsLast(key);
    } else if (fieldCollation.nullDirection == RelFieldCollation.NullDirection.FIRST) {
      key = relBuilder.nullsFirst(key);
    }
    orderKeys.add(key);
  }
  return orderKeys;
}
Incomplete Handling

The code only checks nullDirection for LAST and FIRST, but does not handle UNSPECIFIED. If nullDirection is UNSPECIFIED, no null-ordering modifier is applied, which may cause the window's ORDER BY to differ from the upstream collation's null semantics, leading to mismatched row numbering.

if (fieldCollation.nullDirection == RelFieldCollation.NullDirection.LAST) {
  key = relBuilder.nullsLast(key);
} else if (fieldCollation.nullDirection == RelFieldCollation.NullDirection.FIRST) {
  key = relBuilder.nullsFirst(key);
}

@ahkcs ahkcs added PPL Piped processing language enhancement New feature or request labels May 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request PPL Piped processing language

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant