[VL] Fix ClassCastException in Delta stats for non-offloadable aggregations#12292
Open
felipepessoto wants to merge 1 commit into
Open
[VL] Fix ClassCastException in Delta stats for non-offloadable aggregations#12292felipepessoto wants to merge 1 commit into
felipepessoto wants to merge 1 commit into
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Prevent Delta writes from crashing when per-file statistics aggregation cannot be offloaded to Velox by detecting non-offloadable plans on the driver and falling back to the row-based tracker.
Changes:
- Added a driver-side
canOffloadStatscheck to decide whether to use the native stats tracker or a safe fallback. - Added a warning + fallback path for non-offloadable per-file statistics aggregations.
- Added a regression test covering TIMESTAMP_NTZ stats write behavior.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala | Adds offloadability detection and fallback selection to avoid WholeStageTransformer cast failures. |
| backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala | Same offloadability detection + fallback behavior for the delta33 variant. |
| backends-velox/src-delta/test/scala/org/apache/gluten/execution/GlutenDeltaStatsSuite.scala | Adds regression coverage ensuring writes succeed when TIMESTAMP_NTZ stats aren’t offloadable. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+123
to
+126
| val aggregates = statsColExpr.collect { | ||
| case ae: AggregateExpression if ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] => | ||
| ae | ||
| } |
| statsResultAttrs, | ||
| StatisticsInputNode(dataCols)) | ||
| val projOp = ProjectExec(statsResultAttrs, aggOp) | ||
| val offloads = Seq(OffloadOthers()).map(_.toStrcitRule()) |
Comment on lines
+123
to
+126
| val aggregates = statsColExpr.collect { | ||
| case ae: AggregateExpression if ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] => | ||
| ae | ||
| } |
| statsResultAttrs, | ||
| StatisticsInputNode(dataCols)) | ||
| val projOp = ProjectExec(statsResultAttrs, aggOp) | ||
| val offloads = Seq(OffloadOthers()).map(_.toStrcitRule()) |
Comment on lines
+42
to
+47
| // The maxValue statistic for a TIMESTAMP_NTZ near Long.MaxValue triggers the per-file | ||
| // statistics aggregation that cannot be offloaded to Velox. | ||
| val nearMaxMicros = Long.MaxValue - 999L | ||
| val data = Seq(nearMaxMicros) | ||
| .toDF("micros") | ||
| .selectExpr("micros AS id", "CAST(TIMESTAMP_MICROS(micros) AS TIMESTAMP_NTZ) AS ts") |
…ations GlutenDeltaJobStatsTracker builds a SortAggregateExec -> ProjectExec plan for the per-file statistics aggregation, runs Gluten's HeuristicTransform, then unconditionally casts the result to a WholeStageTransformer. When the stats aggregation cannot be offloaded to Velox -- e.g. min/max over TIMESTAMP_NTZ, as exercised by Delta's DataSkippingDeltaV1Suite "data skipping on TIMESTAMP_NTZ near Long.MaxValue" -- the projection stays a vanilla ProjectExec and the cast throws java.lang.ClassCastException: ProjectExec cannot be cast to WholeStageTransformer in the per-task tracker constructor, failing the write. Decide on the driver whether the aggregation actually offloads: add canOffloadStats(), which dry-runs the same transform pipeline once and checks whether it collapses into a WholeStageTransformer. If it does not, route the DeltaJobStatisticsTracker to the existing GlutenDeltaJobStatsFallbackTracker (columnar-to-row + the original Delta tracker, which produces correct stats for any type) instead of the native tracker. Evaluating this on the driver also avoids the per-task constructor allocating a single-thread executor and a NativePlanEvaluator before the cast. Applied to both the Delta 3.x (src-delta33) and Delta 4.x (src-delta40) copies. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
626cc18 to
e8a84f3
Compare
Contributor
Author
|
@zhztheplayer @philo-he similar to #12290, I found the issue when running the Delta CI #12278. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes are proposed in this pull request?
Uncovered by the Delta Spark UT pipeline (#12278).
GlutenDeltaJobStatsTrackerbuilds the per-file statistics aggregation as aSortAggregateExec -> ProjectExecplan, runs Gluten'sHeuristicTransform, then unconditionally casts the result to aWholeStageTransformer. When the statistics aggregation cannot be offloaded to Velox -- for examplemin/maxover aTIMESTAMP_NTZcolumn, as exercised by Delta'sDataSkippingDeltaV1Suite"data skipping on TIMESTAMP_NTZ near Long.MaxValue" -- the projection stays a vanillaProjectExecand the cast throws:in the per-task tracker constructor (
GlutenDeltaJobStatsTracker.scala), failing the write.This PR decides on the driver whether the aggregation actually offloads: a new
canOffloadStats()dry-runs the same transform pipeline once and checks whether it collapses into aWholeStageTransformer. If it does not, theDeltaJobStatisticsTrackeris routed to the existingGlutenDeltaJobStatsFallbackTracker(columnar-to-row + the original Delta tracker, which produces correct statistics for any type) instead of the native tracker. Evaluating this on the driver also avoids the per-task constructor allocating a single-thread executor and aNativePlanEvaluatorbefore the cast. The fix is applied to both the Delta 3.x (src-delta33) and Delta 4.x (src-delta40) copies.How was this patch tested?
Added
GlutenDeltaStatsSuite, which writes a Delta table whoseTIMESTAMP_NTZmin/max statistics cannot be offloaded to Velox. Before this change the write crashes with theClassCastExceptionabove; after it, the write succeeds via the row-based fallback tracker.Locally verified (Spark 3.5, Scala 2.12): the new suite fails without the fix (
Tests: succeeded 0, failed 1, ClassCastException) and passes with it (succeeded 1, failed 0). A companion test-only PR (#12293) demonstrates the same red/green contrast on CI. Also confirmed end-to-end against Delta'sDataSkippingDeltaV1Suite"TIMESTAMP_NTZ near Long.MaxValue" (succeeded 2, failed 0 with the fix).scalafmt/spotless report no changes.PR/CI unit test before the fix: #12293 / https://github.com/apache/gluten/actions/runs/27509527767/job/81307865870?pr=12293
Was this patch authored or co-authored using generative AI tooling?
Generated-by: GitHub Copilot CLI (claude-opus-4.8)