[SPARK-57133][SQL] Add BIN BY relation operator parsing and resolution#56247
[SPARK-57133][SQL] Add BIN BY relation operator parsing and resolution#56247vranes wants to merge 7 commits into
Conversation
cloud-fan
left a comment
There was a problem hiding this comment.
1 blocking, 1 non-blocking, 0 nits.
A clean, well-tested incremental PR; the one blocking item is an analysis-time integration gap (DeduplicateRelations).
Correctness (1)
- basicLogicalOperators.scala:1803:
BinByproduces new attributes but isn't registered inDeduplicateRelations— self-joins over a sharedBinBysubtree (e.g. a temp view referenced twice) leave conflictingExprIds the analyzer can't resolve — see inline
Suggestions (1)
- ResolveBinBy.scala:153:
BIN_BY_COLUMN_NOT_FOUNDis misleading for a nested/computed reference (the column exists) — see inline
Verification
Traced DeduplicateRelations: it renews produced attributes only for explicitly-enumerated node types (Generate/Expand/Window/ScriptTransformation/AttachDistributedSequence/FlatMap*/MapIn*) in both renewDuplicatedRelations and collectConflictPlans; BinBy matches neither and hits the child-only fallbacks, so its appendedAttributes are never renewed. The Unpivot analogue is safe only because it lowers to Expand (a registered producer). The comment at DeduplicateRelations.scala:494-499 confirms an unregistered producer fails analysis.
|
|
||
| override def output: Seq[Attribute] = child.output ++ appendedAttributes | ||
|
|
||
| override def producedAttributes: AttributeSet = AttributeSet(appendedAttributes) |
There was a problem hiding this comment.
BinBy is a new attribute-producing logical node (producedAttributes = AttributeSet(appendedAttributes)), but it isn't registered with DeduplicateRelations.
That rule resolves self-join attribute conflicts by enumerating every attribute-producing node explicitly — Generate, Expand, Window, ScriptTransformation, AttachDistributedSequence, the FlatMap*/MapIn* family — in both renewDuplicatedRelations (DeduplicateRelations.scala:114-220) and collectConflictPlans (:363-487). BinBy is in neither, so it falls to the generic fallbacks (case plan: LogicalPlan at :222, case _ => plan.children.flatMap(...) at :489), which renew/recurse the children but never renew the node's own appendedAttributes. The comment at :494-499 spells out the result: an unhandled producer of new references makes "the analysis ... fail" unless another rule resolves it — and there's none for BinBy.
The Unpivot analogue avoids this only because it lowers to an Expand (UnpivotTransformer.scala), which is a registered producer; BinBy resolves to a bespoke node that skipped registration. So a query that self-joins a shared BinBy subtree (a temp view referenced twice, or a future DataFrame self-join) leaves bin_start/bin_end/bin_distribute_ratio with conflicting ExprIds the analyzer can't resolve — failing at analysis time, before the stubbed strategy, with a confusing ambiguous-reference/conflict error.
Since analysis is fully functional in this PR, this is in scope here. Suggested fix: add a BinBy case to both dedup phases (renew appendedAttributes, exactly as the Expand case does) plus a self-join / duplicated-view regression test. If you'd rather defer the integration to the execution PR, please add a test documenting the current behavior so the gap is tracked.
There was a problem hiding this comment.
Good catch, fixed.
Registered BinBy in both dedup phases. Added a self-join regression test in ResolveBinBySuite - it fails without the fix.
| case Some(_) => | ||
| // Resolved to a NamedExpression that is not a top-level Attribute (e.g., | ||
| // `RANGE struct_col.field TO ...` resolves to an Alias wrapping GetStructField). | ||
| throw QueryCompilationErrors.binByColumnNotFoundError(u.name) |
There was a problem hiding this comment.
When a reference resolves to something other than a top-level Attribute — e.g. RANGE struct_col.field TO ... resolving to an Alias(GetStructField) — this throws BIN_BY_COLUMN_NOT_FOUND ("The column outer.field was not found in the input relation"). That's a bit misleading: the column does exist; BIN BY simply requires a plain top-level column. A distinct message (nested/computed columns unsupported in BIN BY) would be clearer for the user.
Non-blocking — the comment here already acknowledges the case.
There was a problem hiding this comment.
Added a distinct BIN_BY_REQUIRES_TOP_LEVEL_COLUMN condition for the case where the reference resolves to a non-top-level attribute, mirroring EVENT_TIME_MUST_BE_TOP_LEVEL_COLUMN.
What changes were proposed in this pull request?
This is the first PR in a planned series implementing the
BIN BYrelation operator (SPARK-57133). It adds the parser, analyzer, and error classes. Physical execution is intentionally stubbed and lands in a follow-up PR.BIN BYis a relation-level operator (same grammar position asPIVOT/UNPIVOT) that aligns range-typed rows to fixed-width bin boundaries: it splits any row whose[range_start, range_end)crosses a boundary and proportionally redistributes selected numeric or day-time-interval values across the resulting sub-ranges. The target use case is telemetry and observability data, where each row carries its own measurement window (OpenTelemetry, Prometheus exports).Syntax:
What this PR adds:
SqlBaseLexer.g4,SqlBaseParser.g4): thebinByClauserule and 7 new non-reserved keywords (BIN,WIDTH,ALIGN,UNIFORM,BIN_START,BIN_END,BIN_DISTRIBUTE_RATIO), wired intorelationExtensionand the pipeoperatorPipeRightSide, with an optional trailing table alias.basicLogicalOperators.scala):UnresolvedBinBy(parser output) and the resolvedBinBy, plus theBinByOutputAliaseshelper. This follows the two-classUnpivot->UnpivotTransformerprecedent.AstBuilder.scala):withBinBy, which wraps the node in aSubqueryAliaswhen a trailing alias is present.ResolveBinBy.scala, wired intoAnalyzer.scala): resolves column references against the child output, validates types and foldability, fills the default origin (session-zone-anchored forTIMESTAMP, wall-clock epoch forTIMESTAMP_NTZ), captures the session time zone, and builds the output schema. Registered inRuleIdCollection; theBIN_BY/UNRESOLVED_BIN_BYtree patterns are added inTreePatterns.error-conditions.json,QueryCompilationErrors.scala): the 8BIN_BY_*conditions, with analysis-time builders for the 7 raised during resolution (the runtimeBIN_BY_INVALID_RANGEis defined here and raised in the execution PR).SparkStrategies.scala): the lowering throwsUnsupportedOperationExceptionuntil the execution PR lands.The output is the input columns plus three appended columns:
bin_startandbin_end(matching the range column type) andbin_distribute_ratio(DOUBLE, the fraction of the original range that fell into the bin). All three are renameable.Why are the changes needed?
Telemetry and observability sources emit rows that each carry their own
[start, end)measurement window. Re-bucketing such data onto a fixed grid today requires verbose SQL with manual boundary arithmetic, row explosion, and proportional splitting.BIN BYexpresses this as a single relation operator.Does this PR introduce any user-facing change?
No. The operator parses and resolves, but physical execution is intentionally stubbed in this PR (the strategy throws
UnsupportedOperationException), soBIN BYis not usable end to end yet; execution arrives in a follow-up PR. The 7 new keywords are non-reserved, so existing queries that use them as identifiers continue to parse unchanged.How was this patch tested?
New unit tests, all passing:
PlanParserSuite:BIN BYparsing (minimal and maximal clauses, qualified column references, output renames, trailing alias, and the pipe form), parse-error cases, and confirmation that the new keywords remain usable as identifiers.ResolveBinBySuite: resolution against the child output, session-zone capture, default-origin arithmetic (UTC, non-UTC, NTZ), output schema and renames, multipart disambiguation across a join, and every analysis-time error class.build/sbt 'catalyst/testOnly *ResolveBinBySuite *PlanParserSuite'reports 107 tests passed.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code