diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index f1e162a6260f7..ad768d83b9075 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -230,6 +230,60 @@ ], "sqlState" : "22003" }, + "BIN_BY_ALIGN_TO_TYPE_MISMATCH" : { + "message" : [ + "The ALIGN TO expression type must match the range column type ." + ], + "sqlState" : "42804" + }, + "BIN_BY_COLUMN_NOT_FOUND" : { + "message" : [ + "The column referenced in BIN BY was not found in the input relation." + ], + "sqlState" : "42703" + }, + "BIN_BY_DISTRIBUTE_TYPE_MISMATCH" : { + "message" : [ + "The DISTRIBUTE UNIFORM columns in BIN BY must be numeric or DAY-TIME INTERVAL. Column has type ." + ], + "sqlState" : "42804" + }, + "BIN_BY_DUPLICATE_DISTRIBUTE_COLUMN" : { + "message" : [ + "Column appears multiple times in the BIN BY DISTRIBUTE UNIFORM clause." + ], + "sqlState" : "42701" + }, + "BIN_BY_INVALID_BIN_WIDTH" : { + "message" : [ + "The BIN WIDTH expression must be a positive DAY-TIME INTERVAL. Got: ." + ], + "sqlState" : "42K09" + }, + "BIN_BY_INVALID_RANGE" : { + "message" : [ + "The input row to BIN BY has range_start () greater than range_end (). BIN BY requires range_start <= range_end." + ], + "sqlState" : "22023" + }, + "BIN_BY_MISSING_DISTRIBUTE" : { + "message" : [ + "BIN BY requires at least one column in DISTRIBUTE UNIFORM (...)." + ], + "sqlState" : "42601" + }, + "BIN_BY_RANGE_TYPE_MISMATCH" : { + "message" : [ + "The range columns in BIN BY must be TIMESTAMP or TIMESTAMP_NTZ. Column has type . Both range columns must have the same type." + ], + "sqlState" : "42804" + }, + "BIN_BY_REQUIRES_TOP_LEVEL_COLUMN" : { + "message" : [ + "BIN BY requires a top-level column, but is a nested or computed field. Alias it to a top-level column in a select before BIN BY." + ], + "sqlState" : "42K09" + }, "CALL_ON_STREAMING_DATASET_UNSUPPORTED" : { "message" : [ "The method can not be called on streaming Dataset/DataFrame." @@ -7799,6 +7853,11 @@ "The ANALYZE TABLE command does not support views." ] }, + "BIN_BY" : { + "message" : [ + "Physical execution of BIN BY is not yet implemented." + ] + }, "CATALOG_INTERFACE_METHOD" : { "message" : [ "Catalog API is not supported by ." diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index 4f21b7b4b3c79..bd449524895ce 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -412,6 +412,7 @@ Below is a list of all the keywords in Spark SQL. |ADD|non-reserved|non-reserved|non-reserved| |AFTER|non-reserved|non-reserved|non-reserved| |AGGREGATE|non-reserved|non-reserved|non-reserved| +|ALIGN|non-reserved|non-reserved|non-reserved| |ALL|reserved|non-reserved|reserved| |ALTER|non-reserved|non-reserved|reserved| |ALWAYS|non-reserved|non-reserved|non-reserved| @@ -433,8 +434,12 @@ Below is a list of all the keywords in Spark SQL. |BERNOULLI|non-reserved|non-reserved|non-reserved| |BETWEEN|non-reserved|non-reserved|reserved| |BIGINT|non-reserved|non-reserved|reserved| +|BIN|non-reserved|non-reserved|non-reserved| |BINARY|non-reserved|non-reserved|reserved| |BINDING|non-reserved|non-reserved|non-reserved| +|BIN_DISTRIBUTE_RATIO|non-reserved|non-reserved|non-reserved| +|BIN_END|non-reserved|non-reserved|non-reserved| +|BIN_START|non-reserved|non-reserved|non-reserved| |BOOLEAN|non-reserved|non-reserved|reserved| |BOTH|reserved|non-reserved|reserved| |BUCKET|non-reserved|non-reserved|non-reserved| @@ -801,6 +806,7 @@ Below is a list of all the keywords in Spark SQL. |UNARCHIVE|non-reserved|non-reserved|non-reserved| |UNBOUNDED|non-reserved|non-reserved|non-reserved| |UNCACHE|non-reserved|non-reserved|non-reserved| +|UNIFORM|non-reserved|non-reserved|non-reserved| |UNION|reserved|strict-non-reserved|reserved| |UNIQUE|reserved|non-reserved|reserved| |UNKNOWN|reserved|non-reserved|reserved| @@ -828,6 +834,7 @@ Below is a list of all the keywords in Spark SQL. |WHEN|reserved|non-reserved|reserved| |WHERE|reserved|non-reserved|reserved| |WHILE|non-reserved|non-reserved|non-reserved| +|WIDTH|non-reserved|non-reserved|non-reserved| |WINDOW|non-reserved|non-reserved|reserved| |WITH|reserved|non-reserved|reserved| |WITHIN|reserved|non-reserved|reserved| diff --git a/python/pyspark/sql/tvf.py b/python/pyspark/sql/tvf.py index 97d8ce59913be..3f20fc3501bb2 100644 --- a/python/pyspark/sql/tvf.py +++ b/python/pyspark/sql/tvf.py @@ -551,11 +551,11 @@ def sql_keywords(self) -> DataFrame: Examples -------- >>> spark.tvf.sql_keywords().show() - +-------------+--------+ - | keyword|reserved| - +-------------+--------+ + +----------+--------+ + | keyword|reserved| + +----------+--------+ ... - +-------------+--------+... + +----------+--------+... """ return self._fn("sql_keywords") diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index af71f441012c1..bf759e501db9a 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -131,6 +131,7 @@ BANG: '!'; ADD: 'ADD'; AFTER: 'AFTER'; AGGREGATE: 'AGGREGATE'; +ALIGN: 'ALIGN'; ALL: 'ALL'; ALTER: 'ALTER'; ALWAYS: 'ALWAYS'; @@ -152,6 +153,10 @@ BEGIN: 'BEGIN'; BERNOULLI: 'BERNOULLI'; BETWEEN: 'BETWEEN'; BIGINT: 'BIGINT'; +BIN: 'BIN'; +BIN_DISTRIBUTE_RATIO: 'BIN_DISTRIBUTE_RATIO'; +BIN_END: 'BIN_END'; +BIN_START: 'BIN_START'; BINARY: 'BINARY'; BINDING: 'BINDING'; BOOLEAN: 'BOOLEAN'; @@ -519,6 +524,7 @@ TYPE: 'TYPE'; UNARCHIVE: 'UNARCHIVE'; UNBOUNDED: 'UNBOUNDED'; UNCACHE: 'UNCACHE'; +UNIFORM: 'UNIFORM'; UNION: 'UNION'; UNIQUE: 'UNIQUE'; UNKNOWN: 'UNKNOWN'; @@ -546,6 +552,7 @@ WEEKS: 'WEEKS'; WHEN: 'WHEN'; WHERE: 'WHERE'; WHILE: 'WHILE'; +WIDTH: 'WIDTH'; WINDOW: 'WINDOW'; WITH: 'WITH'; WITHIN: 'WITHIN'; diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 5761028f60234..28a76d30f073d 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -78,7 +78,7 @@ options { tokenVocab = SqlBaseLexer; } la == ANTI || la == JOIN || la == UNION || la == EXCEPT || la == SETMINUS || la == INTERSECT || la == ORDER || la == CLUSTER || la == DISTRIBUTE || la == SORT || la == LIMIT || la == OFFSET || - la == AGGREGATE || la == WINDOW || la == LATERAL; + la == AGGREGATE || la == WINDOW || la == LATERAL || la == BIN; } } @@ -1029,6 +1029,20 @@ unpivotAlias : AS? errorCapturingIdentifier ; +binByClause + : BIN BY LEFT_PAREN + RANGE rangeStart=multipartIdentifier TO rangeEnd=multipartIdentifier + BIN WIDTH binWidth=expression + (ALIGN TO origin=expression)? + DISTRIBUTE UNIFORM LEFT_PAREN + distributeCol+=multipartIdentifier + (COMMA distributeCol+=multipartIdentifier)* RIGHT_PAREN + (BIN_START AS binStartAlias=errorCapturingIdentifier)? + (BIN_END AS binEndAlias=errorCapturingIdentifier)? + (BIN_DISTRIBUTE_RATIO AS binRatioAlias=errorCapturingIdentifier)? + RIGHT_PAREN (AS? tblAlias=errorCapturingIdentifier)? + ; + lateralView : LATERAL VIEW (OUTER)? qualifiedName LEFT_PAREN (expression (COMMA expression)*)? RIGHT_PAREN tblName=identifier (AS? colName+=identifier (COMMA colName+=identifier)*)? ; @@ -1050,6 +1064,7 @@ relationExtension : joinRelation | pivotClause | unpivotClause + | binByClause ; joinRelation @@ -1904,6 +1919,7 @@ operatorPipeRightSide // messages in the event that both are present (this is not allowed). | pivotClause unpivotClause? | unpivotClause pivotClause? + | binByClause | sample | joinRelation | operator=(UNION | EXCEPT | SETMINUS | INTERSECT) setQuantifier? right=queryPrimary @@ -1935,6 +1951,7 @@ ansiNonReserved : ADD | AFTER | AGGREGATE + | ALIGN | ALTER | ALWAYS | ANALYZE @@ -1951,6 +1968,10 @@ ansiNonReserved | BERNOULLI | BETWEEN | BIGINT + | BIN + | BIN_DISTRIBUTE_RATIO + | BIN_END + | BIN_START | BINARY | BINARY_HEX | BINDING @@ -2252,6 +2273,7 @@ ansiNonReserved | UNARCHIVE | UNBOUNDED | UNCACHE + | UNIFORM | UNLOCK | UNPIVOT | UNSET @@ -2272,6 +2294,7 @@ ansiNonReserved | WEEKS | WHILE | WATERMARK + | WIDTH | WINDOW | WITHOUT | YEAR @@ -2313,6 +2336,7 @@ nonReserved : ADD | AFTER | AGGREGATE + | ALIGN | ALL | ALTER | ALWAYS @@ -2333,6 +2357,10 @@ nonReserved | BERNOULLI | BETWEEN | BIGINT + | BIN + | BIN_DISTRIBUTE_RATIO + | BIN_END + | BIN_START | BINARY | BINARY_HEX | BINDING @@ -2688,6 +2716,7 @@ nonReserved | UNARCHIVE | UNBOUNDED | UNCACHE + | UNIFORM | UNIQUE | UNKNOWN | UNLOCK @@ -2713,6 +2742,7 @@ nonReserved | WHILE | WHEN | WHERE + | WIDTH | WINDOW | WITH | WITHIN diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a27b6a19b4c65..62ac32d51e060 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -527,6 +527,7 @@ class Analyzer( ResolveGroupingAnalytics :: ResolvePivot :: ResolveUnpivot :: + ResolveBinBy :: ResolveOrdinalInOrderByAndGroupBy :: ExtractGenerator :: ResolveGenerate :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala index ec2ba4f692216..57aede9805d7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala @@ -197,6 +197,14 @@ object DeduplicateRelations extends Rule[LogicalPlan] { _.generatorOutput.map(_.exprId.id), newGenerate => newGenerate.copy(generatorOutput = newGenerate.generatorOutput.map(_.newInstance()))) + case b: BinBy => + deduplicateAndRenew[BinBy]( + existingRelations, + b, + _.producedAttributes.map(_.exprId.id).toSeq, + newBinBy => newBinBy.copy(appendedAttributes = + newBinBy.appendedAttributes.map(_.newInstance()))) + case e: Expand => deduplicateAndRenew[Expand]( existingRelations, @@ -459,6 +467,13 @@ object DeduplicateRelations extends Rule[LogicalPlan] { newVersion.copyTagsFrom(oldVersion) Seq((oldVersion, newVersion)) + case oldVersion: BinBy + if oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty => + val newVersion = oldVersion.copy( + appendedAttributes = oldVersion.appendedAttributes.map(_.newInstance())) + newVersion.copyTagsFrom(oldVersion) + Seq((oldVersion, newVersion)) + case oldVersion: Expand if oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty => val producedAttributes = oldVersion.producedAttributes diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveBinBy.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveBinBy.scala new file mode 100644 index 0000000000000..b4d18a3f1b709 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveBinBy.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import scala.collection.mutable +import scala.util.control.NonFatal + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.expressions.{Attribute, EmptyRow, Expression, ExprId, Literal} +import org.apache.spark.sql.catalyst.plans.logical.{BinBy, LogicalPlan, UnresolvedBinBy} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_BIN_BY +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{AnyTimestampType, DayTimeIntervalType, NumericType, TimestampNTZType, TimestampType} + +/** + * Resolves [[UnresolvedBinBy]] into [[BinBy]]: looks up column references against the child's + * output, validates types and foldability, and captures the session local time zone for the + * physical execution to use. + */ +object ResolveBinBy extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( + _.containsPattern(UNRESOLVED_BIN_BY), ruleId) { + case b: UnresolvedBinBy if !readyToResolve(b) => b + case b: UnresolvedBinBy => resolve(b) + } + + // `binWidthExpr` must be fully resolved before validation can proceed. The optional + // `originExpr` must also be resolved when present; an absent `ALIGN TO` clause defaults to + // `1970-01-01 00:00:00` and bypasses user-facing foldability and type checks. The + // range/distribute column references can stay as `UnresolvedAttribute`; `resolveColumn` below + // converts a missing name to `BIN_BY_COLUMN_NOT_FOUND` rather than letting the default + // resolution pass leave them unresolved indefinitely. + private def readyToResolve(b: UnresolvedBinBy): Boolean = { + b.childrenResolved && b.binWidthExpr.resolved && b.originExpr.forall(_.resolved) + } + + private def resolve(b: UnresolvedBinBy): LogicalPlan = { + val resolver = SQLConf.get.resolver + val child = b.child + + val rangeStart = resolveColumn(b.rangeStartCol, child, resolver) + val rangeEnd = resolveColumn(b.rangeEndCol, child, resolver) + val distributeAttrs = b.distributeColumns.map(c => resolveColumn(c, child, resolver)) + + val rangeType = rangeStart.dataType + if (!AnyTimestampType.acceptsType(rangeType)) { + throw QueryCompilationErrors.binByRangeTypeMismatchError(rangeStart.name, rangeType) + } + if (rangeEnd.dataType != rangeType) { + throw QueryCompilationErrors.binByRangeTypeMismatchError(rangeEnd.name, rangeEnd.dataType) + } + + if (!b.binWidthExpr.foldable) { + throw QueryCompilationErrors.binByNonFoldableInputError("BIN WIDTH", b.binWidthExpr) + } + // Evaluating a foldable expression can still throw (e.g., a CAST that fails under ANSI + // mode, integer overflow inside a constant fold). Surface any such failure as a clean + // analysis-time BIN_BY_INVALID_BIN_WIDTH rather than letting the raw exception escape. + val binWidthValid = b.binWidthExpr.dataType match { + case _: DayTimeIntervalType => + try { + val v = b.binWidthExpr.eval(EmptyRow) + v != null && v.asInstanceOf[Long] > 0L + } catch { + case NonFatal(_) => false + } + case _ => false + } + if (!binWidthValid) { + throw QueryCompilationErrors.binByInvalidBinWidthError(b.binWidthExpr) + } + + val sessionZone = SQLConf.get.sessionLocalTimeZone + val isLTZ = rangeType.isInstanceOf[TimestampType] + + // `ALIGN TO` is optional. When omitted, default the resolved plan's origin to + // `1970-01-01 00:00:00` in the session zone for `TIMESTAMP` (LTZ) and epoch for + // `TIMESTAMP_NTZ`. + val resolvedOrigin: Expression = b.originExpr match { + case Some(o) => + if (!o.foldable) { + throw QueryCompilationErrors.binByNonFoldableInputError("ALIGN TO", o) + } + if (o.dataType != rangeType) { + throw QueryCompilationErrors.binByAlignToTypeMismatchError(o.dataType, rangeType) + } + o + case None if isLTZ => + Literal(DateTimeUtils.daysToMicros(0, DateTimeUtils.getZoneId(sessionZone)), TimestampType) + case None => + Literal(0L, TimestampNTZType) + } + + if (distributeAttrs.isEmpty) { + throw QueryCompilationErrors.binByMissingDistributeError() + } + distributeAttrs.foreach { attr => + attr.dataType match { + case _: NumericType | _: DayTimeIntervalType => // ok + case other => + throw QueryCompilationErrors.binByDistributeTypeMismatchError(attr.name, other) + } + } + val seen = mutable.HashSet.empty[ExprId] + distributeAttrs.foreach { attr => + if (!seen.add(attr.exprId)) { + throw QueryCompilationErrors.binByDuplicateDistributeColumnError(attr.name) + } + } + + val appendedAttributes = BinBy.appendedAttributesWithAliases(rangeType, b.outputAliases) + + BinBy( + binWidthExpr = b.binWidthExpr, + rangeStart = rangeStart, + rangeEnd = rangeEnd, + originExpr = resolvedOrigin, + distributeColumns = distributeAttrs, + appendedAttributes = appendedAttributes, + child = child, + timeZoneId = if (isLTZ) Some(sessionZone) else None) + } + + private def resolveColumn( + expr: Expression, + child: LogicalPlan, + resolver: Resolver): Attribute = expr match { + case u: UnresolvedAttribute => + child.resolve(u.nameParts, resolver) match { + case Some(a: Attribute) => a + case Some(_) => + // Resolved to a NamedExpression that is not a top-level Attribute (e.g., + // `RANGE struct_col.field TO ...` resolves to an Alias wrapping GetStructField). + throw QueryCompilationErrors.binByRequiresTopLevelColumnError(u.name) + case None => throw QueryCompilationErrors.binByColumnNotFoundError(u.name) + } + case a: Attribute => a + case other => + // This branch is unreachable from user SQL: AstBuilder always builds UnresolvedAttribute, + // and resolved Attributes are handled above. + throw SparkException.internalError( + s"Unexpected expression in BIN BY column position: $other") + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 9a1593cababb8..5792f38986897 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2318,6 +2318,40 @@ class AstBuilder extends DataTypeAstBuilder (exprs, alias) } + /** + * Add an [[UnresolvedBinBy]] to a logical plan; the analyzer rewrites it to [[BinBy]] during + * analysis. + */ + private def withBinBy( + ctx: BinByClauseContext, + query: LogicalPlan): LogicalPlan = withOrigin(ctx) { + val binWidth = expression(ctx.binWidth) + val originExpr = Option(ctx.origin).map(expression) + val rangeStart = UnresolvedAttribute(visitMultipartIdentifier(ctx.rangeStart)) + val rangeEnd = UnresolvedAttribute(visitMultipartIdentifier(ctx.rangeEnd)) + val distributeColumns = ctx.distributeCol.asScala + .map(c => UnresolvedAttribute(visitMultipartIdentifier(c))).toSeq + val outputAliases = BinByOutputAliases( + binStart = Option(ctx.binStartAlias).map(getIdentifierText), + binEnd = Option(ctx.binEndAlias).map(getIdentifierText), + binRatio = Option(ctx.binRatioAlias).map(getIdentifierText)) + + val binBy = UnresolvedBinBy( + binWidthExpr = binWidth, + rangeStartCol = rangeStart, + rangeEndCol = rangeEnd, + originExpr = originExpr, + distributeColumns = distributeColumns, + outputAliases = outputAliases, + child = query) + + if (ctx.tblAlias != null) { + SubqueryAlias(getIdentifierText(ctx.tblAlias), binBy) + } else { + binBy + } + } + /** * Add a [[Generate]] (Lateral View) to a logical plan. */ @@ -2381,9 +2415,11 @@ class AstBuilder extends DataTypeAstBuilder withJoinRelation(extension.joinRelation(), left) } else if (extension.pivotClause() != null) { withPivot(extension.pivotClause(), left) - } else { - assert(extension.unpivotClause() != null) + } else if (extension.unpivotClause() != null) { withUnpivot(extension.unpivotClause(), left) + } else { + assert(extension.binByClause() != null) + withBinBy(extension.binByClause(), left) } } } @@ -7424,6 +7460,8 @@ class AstBuilder extends DataTypeAstBuilder throw QueryParsingErrors.unpivotWithPivotInFromClauseNotAllowedError(ctx) } withUnpivot(c, left) + }.getOrElse(Option(ctx.binByClause()).map { c => + withBinBy(c, left) }.getOrElse(Option(ctx.sample).map { c => withSample(c, left) }.getOrElse(Option(ctx.joinRelation()).map { c => @@ -7435,7 +7473,7 @@ class AstBuilder extends DataTypeAstBuilder withQueryResultClauses(c, PipeOperator(left), forPipeOperators = true) }.getOrElse( visitOperatorPipeAggregate(ctx, left) - )))))))))))) + ))))))))))))) } private def visitOperatorPipeSet( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index a27d9e5269745..cb8b69a59ff06 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.{AliasIdentifier, InternalRow, SQLConfHelper} import org.apache.spark.sql.catalyst.analysis.{Analyzer, AnsiTypeCoercion, MultiInstanceRelation, Resolver, TypeCoercion, TypeCoercionBase, UnresolvedUnaryNode} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} @@ -1700,6 +1701,122 @@ case class Unpivot( } +/** + * Optional user-supplied names for the three columns appended by [[BinBy]]. A `None` falls back + * to the default name (`bin_start`, `bin_end`, `bin_distribute_ratio`) when the output schema is + * constructed during analysis. + */ +case class BinByOutputAliases( + binStart: Option[String] = None, + binEnd: Option[String] = None, + binRatio: Option[String] = None) { + def effectiveBinStart: String = binStart.getOrElse("bin_start") + def effectiveBinEnd: String = binEnd.getOrElse("bin_end") + def effectiveBinRatio: String = binRatio.getOrElse("bin_distribute_ratio") +} + +object BinByOutputAliases { + val empty: BinByOutputAliases = BinByOutputAliases() +} + +/** + * Unresolved counterpart of [[BinBy]] produced by the parser. `ResolveBinBy` enforces type and + * foldability constraints and rewrites this into a resolved [[BinBy]]. + * + * @param binWidthExpr Bin-width expression (DAY-TIME INTERVAL). + * @param rangeStartCol Reference to the row's measurement-window start column. + * @param rangeEndCol Reference to the row's measurement-window end column. + * @param originExpr Optional alignment anchor expression (`ALIGN TO` clause). `None` + * when the clause is omitted; `ResolveBinBy` defaults the resolved + * plan's origin to `1970-01-01 00:00:00`. + * @param distributeColumns Columns whose values are proportionally redistributed across sub-rows. + * @param outputAliases Optional renames for the three appended output columns. + * @param child Input relation. + */ +case class UnresolvedBinBy( + binWidthExpr: Expression, + rangeStartCol: Expression, + rangeEndCol: Expression, + originExpr: Option[Expression], + distributeColumns: Seq[Expression], + outputAliases: BinByOutputAliases, + child: LogicalPlan) extends UnresolvedUnaryNode { + + final override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_BIN_BY) + + override protected def withNewChildInternal(newChild: LogicalPlan): UnresolvedBinBy = + copy(child = newChild) +} + +/** + * Aligns range-typed rows to fixed-width bin boundaries by splitting any row whose + * `[range_start, range_end)` crosses a boundary and proportionally redistributing values in + * `distributeColumns` across the resulting sub-ranges. Emits one or more output rows per input row + * plus three appended columns with default names `bin_start`, `bin_end`, `bin_distribute_ratio`. + * The names come from the resolved `appendedAttributes`. + * + * Bin boundaries align to `originExpr + k * binWidthExpr` for integer `k`. For `TIMESTAMP` (LTZ) + * inputs the boundary arithmetic uses civil-time in the session zone for multi-day bins; sub-day + * LTZ bins and `TIMESTAMP_NTZ` bins use UTC microsecond arithmetic. + * + * Construction invariants (bin width is positive day-time, range columns are timestamps and share + * a type, origin matches the range column type, distribute columns are numeric and distinct) are + * enforced by `ResolveBinBy`. Code paths that construct `BinBy` outside the analyzer are + * responsible for upholding the same invariants; only the `timeZoneId`-vs-range-type pairing is + * checked in the constructor itself (raises `INTERNAL_ERROR`). The three appended-column names + * are not checked for collisions with `child.output`; a colliding name surfaces as + * `AMBIGUOUS_REFERENCE` when the output is referenced downstream rather than at the BIN BY clause + * itself. + * + * @param binWidthExpr Resolved day-time interval literal (or foldable expression). + * @param rangeStart Resolved attribute holding each row's window-start timestamp. + * @param rangeEnd Resolved attribute holding each row's window-end timestamp. + * @param originExpr Resolved alignment-anchor expression. + * @param distributeColumns Resolved columns to proportionally redistribute. + * @param appendedAttributes The three output attributes appended after `child.output`. Provided + * by the analyzer; held in the case class so the attribute `ExprId`s + * remain stable across `.output` calls and `withNewChildInternal`. + * @param child Input relation. + * @param timeZoneId Captured session local time zone for LTZ inputs; `None` for NTZ. + * Required when `rangeStart.dataType` is `TimestampType`; must be + * `None` when it is `TimestampNTZType`. + */ +case class BinBy( + binWidthExpr: Expression, + rangeStart: Attribute, + rangeEnd: Attribute, + originExpr: Expression, + distributeColumns: Seq[Attribute], + appendedAttributes: Seq[Attribute], + child: LogicalPlan, + timeZoneId: Option[String]) + extends UnaryNode { + + if (timeZoneId.isDefined != rangeStart.dataType.isInstanceOf[TimestampType]) { + throw SparkException.internalError( + s"timeZoneId must be set iff rangeStart is TIMESTAMP (LTZ); got rangeStart.dataType=" + + s"${rangeStart.dataType}, timeZoneId=$timeZoneId") + } + + override def output: Seq[Attribute] = child.output ++ appendedAttributes + + override def producedAttributes: AttributeSet = AttributeSet(appendedAttributes) + + final override val nodePatterns: Seq[TreePattern] = Seq(BIN_BY) + + override protected def withNewChildInternal(newChild: LogicalPlan): BinBy = + copy(child = newChild) +} + +object BinBy { + def appendedAttributesWithAliases( + rangeType: DataType, + aliases: BinByOutputAliases): Seq[Attribute] = Seq( + AttributeReference(aliases.effectiveBinStart, rangeType, nullable = true)(), + AttributeReference(aliases.effectiveBinEnd, rangeType, nullable = true)(), + AttributeReference(aliases.effectiveBinRatio, DoubleType, nullable = true)()) +} + /** * A logical plan node for creating a logical limit, which is split into two separate logical nodes: * a [[LocalLimit]], which is a partition local limit, followed by a [[GlobalLimit]]. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala index a890d43f0672c..e6f32bad65675 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala @@ -100,6 +100,7 @@ object RuleIdCollection { "org.apache.spark.sql.catalyst.analysis.ResolveRowLevelCommandAssignments" :: "org.apache.spark.sql.catalyst.analysis.ResolveCursors" :: "org.apache.spark.sql.catalyst.analysis.ResolveFetchCursor" :: + "org.apache.spark.sql.catalyst.analysis.ResolveBinBy" :: "org.apache.spark.sql.catalyst.analysis.ResolveSetVariable" :: "org.apache.spark.sql.catalyst.analysis.ResolveTableConstraints" :: "org.apache.spark.sql.catalyst.analysis.ResolveTableSpec" :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala index d94a506da82d1..9d1f26757f6ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala @@ -130,6 +130,7 @@ object TreePattern extends Enumeration { val AGGREGATE: Value = Value val APPEND_COLUMNS: Value = Value val AS_OF_JOIN: Value = Value + val BIN_BY: Value = Value val COLLECT_METRICS: Value = Value val COMMAND: Value = Value val CTE: Value = Value @@ -186,6 +187,7 @@ object TreePattern extends Enumeration { // Unresolved Plan patterns (Alphabetically ordered) val NAMED_STREAMING_RELATION: Value = Value val PLAN_WITH_UNRESOLVED_IDENTIFIER: Value = Value + val UNRESOLVED_BIN_BY: Value = Value val UNRESOLVED_EVENT_TIME_WATERMARK: Value = Value val UNRESOLVED_HAVING: Value = Value val UNRESOLVED_HINT: Value = Value diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 9b899867a9e37..b89610e08b1de 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -297,6 +297,76 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("types" -> dataTypes.mkString(", "))) } + def binByAlignToTypeMismatchError( + originType: DataType, + rangeType: DataType): Throwable = { + new AnalysisException( + errorClass = "BIN_BY_ALIGN_TO_TYPE_MISMATCH", + messageParameters = Map( + "originType" -> toSQLType(originType), + "rangeType" -> toSQLType(rangeType))) + } + + def binByColumnNotFoundError(columnName: String): Throwable = { + new AnalysisException( + errorClass = "BIN_BY_COLUMN_NOT_FOUND", + messageParameters = Map("columnName" -> toSQLId(columnName))) + } + + def binByRequiresTopLevelColumnError(columnName: String): Throwable = { + new AnalysisException( + errorClass = "BIN_BY_REQUIRES_TOP_LEVEL_COLUMN", + messageParameters = Map("columnName" -> toSQLId(columnName))) + } + + def binByDistributeTypeMismatchError( + columnName: String, + columnType: DataType): Throwable = { + new AnalysisException( + errorClass = "BIN_BY_DISTRIBUTE_TYPE_MISMATCH", + messageParameters = Map( + "columnName" -> toSQLId(columnName), + "columnType" -> toSQLType(columnType))) + } + + def binByDuplicateDistributeColumnError(columnName: String): Throwable = { + new AnalysisException( + errorClass = "BIN_BY_DUPLICATE_DISTRIBUTE_COLUMN", + messageParameters = Map("columnName" -> toSQLId(columnName))) + } + + def binByInvalidBinWidthError(expr: Expression): Throwable = { + new AnalysisException( + errorClass = "BIN_BY_INVALID_BIN_WIDTH", + messageParameters = Map("expr" -> toSQLExpr(expr))) + } + + def binByMissingDistributeError(): Throwable = { + new AnalysisException( + errorClass = "BIN_BY_MISSING_DISTRIBUTE", + messageParameters = Map.empty) + } + + def binByNonFoldableInputError(inputName: String, expr: Expression): Throwable = { + new AnalysisException( + errorClass = "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT", + messageParameters = Map( + "sqlExpr" -> toSQLExpr(expr), + "inputName" -> toSQLId(inputName), + "inputType" -> toSQLType(expr.dataType), + "inputExpr" -> toSQLExpr(expr))) + } + + def binByRangeTypeMismatchError( + columnName: String, + columnType: DataType): Throwable = { + new AnalysisException( + errorClass = "BIN_BY_RANGE_TYPE_MISMATCH", + messageParameters = Map( + "columnName" -> toSQLId(columnName), + "columnType" -> toSQLType(columnType))) + } + def unsupportedIfNotExistsError(tableName: String): Throwable = { new AnalysisException( errorClass = "UNSUPPORTED_FEATURE.INSERT_PARTITION_SPEC_IF_NOT_EXISTS", diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveBinBySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveBinBySuite.scala new file mode 100644 index 0000000000000..b77243be38e33 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveBinBySuite.scala @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.SparkThrowable +import org.apache.spark.sql.catalyst.QueryPlanningTracker +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, Expression, Literal} +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.logical.{BinBy, BinByOutputAliases, Join, JoinHint, LocalRelation, LogicalPlan, SubqueryAlias, UnresolvedBinBy} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ + +class ResolveBinBySuite extends AnalysisTest { + + private val tsStart = $"ts_start".timestamp + private val tsEnd = $"ts_end".timestamp + private val tsStartNtz = $"ts_start".timestampNTZ + private val tsEndNtz = $"ts_end".timestampNTZ + private val value = $"value".long + private val label = $"label".string + + private val ltzChild: LogicalPlan = LocalRelation(tsStart, tsEnd, value, label) + private val ntzChild: LogicalPlan = LocalRelation(tsStartNtz, tsEndNtz, value) + + private val fiveMinutes: Expression = Literal(5L * 60L * 1000000L, DayTimeIntervalType()) + private val ltzOrigin: Expression = Literal(0L, TimestampType) + private val ntzOrigin: Expression = Literal(0L, TimestampNTZType) + + private def unresolved( + child: LogicalPlan = ltzChild, + binWidth: Expression = fiveMinutes, + rangeStart: Expression = tsStart, + rangeEnd: Expression = tsEnd, + originExpr: Option[Expression] = Some(ltzOrigin), + distribute: Seq[Expression] = Seq(value), + aliases: BinByOutputAliases = BinByOutputAliases.empty): UnresolvedBinBy = + UnresolvedBinBy(binWidth, rangeStart, rangeEnd, originExpr, distribute, aliases, child) + + private def expectError(u: UnresolvedBinBy, condition: String): Unit = { + val ex = intercept[SparkThrowable](ResolveBinBy.apply(u)) + assert(ex.getCondition == condition, + s"expected condition '$condition' but got '${ex.getCondition}'") + } + + test("user-supplied foldable ALIGN TO of matching type is preserved") { + val originExpr: Expression = Literal(123456L, TimestampType) + val resolved = ResolveBinBy.apply(unresolved(originExpr = Some(originExpr))) + assert(resolved.asInstanceOf[BinBy].originExpr == originExpr) + } + + test("omitted ALIGN TO fills the default origin per type and session zone") { + // LTZ, UTC: epoch. + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { + val utc = ResolveBinBy.apply(unresolved(originExpr = None)).asInstanceOf[BinBy] + assert(utc.originExpr == Literal(0L, TimestampType)) + } + + // LTZ, non-UTC: shifted by the zone offset. LA is UTC-8 on 1970-01-01, so local + // midnight is UTC 08:00 = 28800000000 micros. + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") { + val la = ResolveBinBy.apply(unresolved(originExpr = None)).asInstanceOf[BinBy] + assert(la.originExpr == Literal(8L * 3600L * 1000000L, TimestampType)) + } + + // NTZ: epoch, zone-independent. + val ntz = ResolveBinBy.apply( + unresolved(child = ntzChild, rangeStart = tsStartNtz, rangeEnd = tsEndNtz, + originExpr = None)).asInstanceOf[BinBy] + assert(ntz.originExpr == Literal(0L, TimestampNTZType)) + } + + test("rejects invalid ALIGN TO") { + // Type must match the range columns, both directions. + expectError(unresolved(originExpr = Some(ntzOrigin)), "BIN_BY_ALIGN_TO_TYPE_MISMATCH") + expectError( + unresolved(child = ntzChild, rangeStart = tsStartNtz, rangeEnd = tsEndNtz, + originExpr = Some(ltzOrigin)), + "BIN_BY_ALIGN_TO_TYPE_MISMATCH") + // Must be foldable. + val originAttr = AttributeReference("o", TimestampType, nullable = true)() + val childWithOrigin = LocalRelation(tsStart, tsEnd, value, originAttr) + expectError( + unresolved(child = childWithOrigin, originExpr = Some(originAttr)), + "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT") + } + + test("rejects invalid BIN WIDTH") { + def invalid(w: Expression): Unit = + expectError(unresolved(binWidth = w), "BIN_BY_INVALID_BIN_WIDTH") + invalid(Literal(1, YearMonthIntervalType())) // year-month + invalid(Literal(0L, DayTimeIntervalType())) // zero + invalid(Literal(-1L, DayTimeIntervalType())) // negative + invalid(Literal(null, DayTimeIntervalType())) // null + // A foldable CAST that throws on eval (ANSI) surfaces cleanly, not as a raw exception. + withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { + invalid(Cast(Literal.create("not an interval", StringType), DayTimeIntervalType())) + } + // Non-foldable is rejected before evaluation. + val widthAttr = AttributeReference("w", DayTimeIntervalType(), nullable = true)() + val childWithWidth = LocalRelation(tsStart, tsEnd, value, widthAttr) + expectError( + unresolved(child = childWithWidth, binWidth = widthAttr), + "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT") + } + + test("captures the session zone for LTZ inputs and none for NTZ") { + // LTZ: columns resolve and a zone is captured. + val ltz = ResolveBinBy.apply(unresolved()).asInstanceOf[BinBy] + assert(ltz.rangeStart == tsStart) + assert(ltz.rangeEnd == tsEnd) + assert(ltz.distributeColumns == Seq(value)) + assert(ltz.timeZoneId.isDefined) + + // The configured zone is the one captured. + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") { + val la = ResolveBinBy.apply(unresolved()).asInstanceOf[BinBy] + assert(la.timeZoneId.contains("America/Los_Angeles")) + } + + // NTZ: no zone captured. + val ntz = ResolveBinBy.apply( + unresolved(child = ntzChild, rangeStart = tsStartNtz, rangeEnd = tsEndNtz, + originExpr = Some(ntzOrigin))).asInstanceOf[BinBy] + assert(ntz.timeZoneId.isEmpty) + } + + test("resolved BinBy output schema appends three columns, honoring renames") { + // Default names and types. + val default = ResolveBinBy.apply(unresolved()).asInstanceOf[BinBy] + assert(default.output.length == ltzChild.output.length + 3) + val appended = default.output.takeRight(3) + assert(appended.map(_.name) == Seq("bin_start", "bin_end", "bin_distribute_ratio")) + assert(appended.map(_.dataType) == Seq(TimestampType, TimestampType, DoubleType)) + + // All three renamed. + val full = ResolveBinBy.apply( + unresolved(aliases = BinByOutputAliases(Some("ws"), Some("we"), Some("frac")))) + .asInstanceOf[BinBy] + assert(full.output.takeRight(3).map(_.name) == Seq("ws", "we", "frac")) + + // Partial rename; the other two keep defaults. + val partial = ResolveBinBy.apply( + unresolved(aliases = BinByOutputAliases(binStart = Some("ws")))).asInstanceOf[BinBy] + assert(partial.output.takeRight(3).map(_.name) == Seq("ws", "bin_end", "bin_distribute_ratio")) + } + + test("resolves UnresolvedAttribute references against child output") { + val tsStartU = UnresolvedAttribute("ts_start") + val tsEndU = UnresolvedAttribute("ts_end") + val valueU = UnresolvedAttribute("value") + val result = ResolveBinBy.apply( + unresolved(rangeStart = tsStartU, rangeEnd = tsEndU, distribute = Seq(valueU))) + val bi = result.asInstanceOf[BinBy] + assert(bi.rangeStart.exprId == tsStart.exprId) + assert(bi.rangeEnd.exprId == tsEnd.exprId) + assert(bi.distributeColumns.map(_.exprId) == Seq(value.exprId)) + } + + test("multipart identifiers disambiguate same-name columns across a JOIN") { + val t1Start = AttributeReference("ts_start", TimestampType, nullable = true)() + val t1End = AttributeReference("ts_end", TimestampType, nullable = true)() + val t2Start = AttributeReference("ts_start", TimestampType, nullable = true)() + val t2End = AttributeReference("ts_end", TimestampType, nullable = true)() + val t2Value = AttributeReference("value", LongType, nullable = true)() + val t1 = SubqueryAlias("t1", LocalRelation(t1Start, t1End)) + val t2 = SubqueryAlias("t2", LocalRelation(t2Start, t2End, t2Value)) + val join = Join(t1, t2, Inner, None, JoinHint.NONE) + + // t1. picks t1's columns. + val u1 = UnresolvedBinBy( + binWidthExpr = fiveMinutes, + rangeStartCol = UnresolvedAttribute(Seq("t1", "ts_start")), + rangeEndCol = UnresolvedAttribute(Seq("t1", "ts_end")), + originExpr = Some(ltzOrigin), + distributeColumns = Seq(UnresolvedAttribute(Seq("t2", "value"))), + outputAliases = BinByOutputAliases.empty, + child = join) + val resolved1 = ResolveBinBy.apply(u1).asInstanceOf[BinBy] + assert(resolved1.rangeStart.exprId == t1Start.exprId) + assert(resolved1.rangeEnd.exprId == t1End.exprId) + + // t2. picks t2's columns. + val u2 = UnresolvedBinBy( + binWidthExpr = fiveMinutes, + rangeStartCol = UnresolvedAttribute(Seq("t2", "ts_start")), + rangeEndCol = UnresolvedAttribute(Seq("t2", "ts_end")), + originExpr = Some(ltzOrigin), + distributeColumns = Seq(UnresolvedAttribute(Seq("t2", "value"))), + outputAliases = BinByOutputAliases.empty, + child = join) + val resolved2 = ResolveBinBy.apply(u2).asInstanceOf[BinBy] + assert(resolved2.rangeStart.exprId == t2Start.exprId) + assert(resolved2.rangeEnd.exprId == t2End.exprId) + } + + test("rejects unresolvable column references with BIN_BY_COLUMN_NOT_FOUND") { + expectError( + unresolved(rangeStart = UnresolvedAttribute("nonexistent")), "BIN_BY_COLUMN_NOT_FOUND") + expectError( + unresolved(distribute = Seq(UnresolvedAttribute("nonexistent"))), "BIN_BY_COLUMN_NOT_FOUND") + } + + test("rejects nested/computed column references with BIN_BY_REQUIRES_TOP_LEVEL_COLUMN") { + // A struct-field access resolves to a non-Attribute (Alias(GetStructField)). The column + // exists, so this is distinct from BIN_BY_COLUMN_NOT_FOUND. + val structField = AttributeReference( + "outer", + StructType(Seq( + StructField("ts_start", TimestampType, nullable = true), + StructField("ts_end", TimestampType, nullable = true))), + nullable = true)() + val numeric = AttributeReference("value", LongType, nullable = true)() + expectError( + UnresolvedBinBy( + binWidthExpr = fiveMinutes, + rangeStartCol = UnresolvedAttribute(Seq("outer", "ts_start")), + rangeEndCol = UnresolvedAttribute(Seq("outer", "ts_end")), + originExpr = Some(ltzOrigin), + distributeColumns = Seq(numeric), + outputAliases = BinByOutputAliases.empty, + child = LocalRelation(structField, numeric)), + "BIN_BY_REQUIRES_TOP_LEVEL_COLUMN") + } + + test("rejects non-timestamp or mismatched RANGE columns") { + // Range columns must be TIMESTAMP/TIMESTAMP_NTZ... + expectError(unresolved(rangeStart = value), "BIN_BY_RANGE_TYPE_MISMATCH") + // ...and both must share the same type. + expectError(unresolved(rangeEnd = tsEndNtz), "BIN_BY_RANGE_TYPE_MISMATCH") + } + + test("rejects invalid DISTRIBUTE UNIFORM columns") { + // Empty (defensive: the parser also rejects this), non-numeric, and duplicate columns. + expectError(unresolved(distribute = Seq.empty), "BIN_BY_MISSING_DISTRIBUTE") + expectError(unresolved(distribute = Seq(label)), "BIN_BY_DISTRIBUTE_TYPE_MISMATCH") + expectError(unresolved(distribute = Seq(value, value)), "BIN_BY_DUPLICATE_DISTRIBUTE_COLUMN") + } + + test("BinBy survives full Analyzer + CheckAnalysis (regression for producedAttributes)") { + // Without BinBy.producedAttributes, CheckAnalysis would flag MISSING_ATTRIBUTES. + assertAnalysisSuccess(unresolved()) + assertAnalysisSuccess( + unresolved(aliases = BinByOutputAliases(Some("ws"), Some("we"), Some("frac")))) + } + + test("self-join over a shared BinBy subtree is deduplicated") { + // Reference the same resolved BinBy subtree on both sides of a join: the appended + // bin_start/bin_end/bin_distribute_ratio attributes start with identical exprIds. + val binBy = ResolveBinBy.apply(unresolved()).asInstanceOf[BinBy] + val selfJoin = Join(binBy, binBy, Inner, None, JoinHint.NONE) + + // DeduplicateRelations must renew the right side's appended attributes; without the + // BinBy cases in both dedup phases the conflicting exprIds make analysis fail. + val analyzed = getAnalyzer.executeAndCheck(selfJoin, new QueryPlanningTracker) + + val binBys = analyzed.collect { case b: BinBy => b } + assert(binBys.size == 2, s"expected two BinBy nodes, got ${binBys.size}") + val appendedExprIds = binBys.flatMap(_.appendedAttributes.map(_.exprId)) + assert(appendedExprIds.distinct.size == appendedExprIds.size, + "appended BinBy attributes must have distinct exprIds across the two join sides") + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index c0495a47ce698..514aa51f1d4b9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -2746,6 +2746,174 @@ class PlanParserSuite extends AnalysisTest { parameters = Map("error" -> "'ts'", "hint" -> "")) } + test("BIN BY: clause parses into the expected UnresolvedBinBy") { + // Minimal clause: all optionals (ALIGN TO, output renames) omitted. + val minimal = singleBinBy(parsePlan( + """SELECT * FROM metrics BIN BY ( + | RANGE ts_start TO ts_end + | BIN WIDTH INTERVAL '5' MINUTE + | DISTRIBUTE UNIFORM (value) + |)""".stripMargin)) + assert(minimal.rangeStartCol == UnresolvedAttribute(Seq("ts_start"))) + assert(minimal.rangeEndCol == UnresolvedAttribute(Seq("ts_end"))) + assert(minimal.originExpr.isEmpty) + assert(minimal.distributeColumns == Seq(UnresolvedAttribute(Seq("value")))) + assert(minimal.outputAliases == BinByOutputAliases.empty) + + // Qualified column references and a partial HOUR TO MINUTE interval as BIN WIDTH. + val qualified = singleBinBy(parsePlan( + """SELECT * FROM t1 JOIN t2 ON t1.id = t2.id BIN BY ( + | RANGE t1.ts_start TO t1.ts_end + | BIN WIDTH INTERVAL '5:30' HOUR TO MINUTE + | DISTRIBUTE UNIFORM (t1.value) + |)""".stripMargin)) + assert(qualified.rangeStartCol == UnresolvedAttribute(Seq("t1", "ts_start"))) + assert(qualified.rangeEndCol == UnresolvedAttribute(Seq("t1", "ts_end"))) + assert(qualified.distributeColumns == Seq(UnresolvedAttribute(Seq("t1", "value")))) + + // Maximal clause: explicit ALIGN TO, multiple DISTRIBUTE UNIFORM columns, all three renames. + val maximal = singleBinBy(parsePlan( + """SELECT * FROM metrics BIN BY ( + | RANGE ts_start TO ts_end + | BIN WIDTH INTERVAL '1' HOUR + | ALIGN TO TIMESTAMP '2024-01-01 00:30:00' + | DISTRIBUTE UNIFORM (bytes_sent, requests) + | BIN_START AS ws + | BIN_END AS we + | BIN_DISTRIBUTE_RATIO AS frac + |)""".stripMargin)) + assert(maximal.originExpr.contains(parseExpression("TIMESTAMP '2024-01-01 00:30:00'"))) + assert(maximal.distributeColumns == Seq( + UnresolvedAttribute(Seq("bytes_sent")), + UnresolvedAttribute(Seq("requests")))) + assert(maximal.outputAliases.binStart.contains("ws")) + assert(maximal.outputAliases.binEnd.contains("we")) + assert(maximal.outputAliases.binRatio.contains("frac")) + } + + test("BIN BY: malformed clauses raise PARSE_SYNTAX_ERROR") { + def assertSyntaxError(query: String): Unit = + assert(parseException(query).getCondition == "PARSE_SYNTAX_ERROR") + + // BIN WIDTH missing. + assertSyntaxError( + """SELECT * FROM metrics BIN BY ( + | RANGE ts_start TO ts_end + | DISTRIBUTE UNIFORM (value) + |)""".stripMargin) + + // DISTRIBUTE UNIFORM missing. + assertSyntaxError( + """SELECT * FROM metrics BIN BY ( + | RANGE ts_start TO ts_end + | BIN WIDTH INTERVAL '5' MINUTE + |)""".stripMargin) + + // DISTRIBUTE UNIFORM with an empty column list. + assertSyntaxError( + """SELECT * FROM metrics BIN BY ( + | RANGE ts_start TO ts_end + | BIN WIDTH INTERVAL '5' MINUTE + | DISTRIBUTE UNIFORM () + |)""".stripMargin) + + // RANGE missing. + assertSyntaxError( + """SELECT * FROM metrics BIN BY ( + | BIN WIDTH INTERVAL '5' MINUTE + | DISTRIBUTE UNIFORM (value) + |)""".stripMargin) + + // Clauses out of order: BIN WIDTH must come after RANGE. + assertSyntaxError( + """SELECT * FROM metrics BIN BY ( + | BIN WIDTH INTERVAL '5' MINUTE + | RANGE ts_start TO ts_end + | DISTRIBUTE UNIFORM (value) + |)""".stripMargin) + } + + test("BIN BY: new keywords stay non-reserved and `bin` still parses as a table alias") { + // The seven new keywords are in `ansiNonReserved`, so they stay usable as plain identifiers + // outside a BIN BY clause. + assert(binByNodes(parsePlan( + "SELECT bin, width, uniform, align, bin_start, bin_end, bin_distribute_ratio " + + "FROM t WHERE bin > 0")).isEmpty) + + // `bin` not followed by BY stays a table alias, both bare and when referenced downstream. + assert(binByNodes(parsePlan("SELECT * FROM t bin")).isEmpty) + assert(binByNodes(parsePlan("SELECT bin.x FROM t bin WHERE bin.x > 0")).isEmpty) + } + + test("BIN BY: composition, chaining, trailing alias, and pipe operator") { + // Composes with a subquery in FROM and a downstream WHERE. + assert(binByNodes(parsePlan( + """SELECT * FROM (SELECT * FROM metrics) BIN BY ( + | RANGE ts_start TO ts_end + | BIN WIDTH INTERVAL '5' MINUTE + | DISTRIBUTE UNIFORM (value) + |) WHERE value > 0""".stripMargin)).size == 1) + + // Composes after PIVOT. + assert(binByNodes(parsePlan( + """SELECT * FROM events + |PIVOT (sum(amount) FOR category IN ('a', 'b')) + |BIN BY ( + | RANGE ts_start TO ts_end + | BIN WIDTH INTERVAL '5' MINUTE + | DISTRIBUTE UNIFORM (a, b) + |)""".stripMargin)).size == 1) + + // Two BIN BY clauses chain on the same relation. + assert(binByNodes(parsePlan( + """SELECT * FROM metrics + |BIN BY ( + | RANGE ts_start TO ts_end + | BIN WIDTH INTERVAL '5' MINUTE + | DISTRIBUTE UNIFORM (value) + |) + |BIN BY ( + | RANGE bin_start TO bin_end + | BIN WIDTH INTERVAL '1' HOUR + | DISTRIBUTE UNIFORM (value) + | BIN_START AS hr_start + | BIN_END AS hr_end + | BIN_DISTRIBUTE_RATIO AS hr_ratio + |)""".stripMargin)).size == 2) + + // Trailing alias, with and without the AS keyword, wraps the BinBy in a SubqueryAlias. + Seq(") AS binned", ") binned").foreach { tail => + val parsed = parsePlan( + s"""SELECT * FROM metrics BIN BY ( + | RANGE ts_start TO ts_end + | BIN WIDTH INTERVAL '5' MINUTE + | DISTRIBUTE UNIFORM (value) + |$tail""".stripMargin) + val aliases = parsed.collect { case s: SubqueryAlias if s.alias == "binned" => s } + assert(aliases.size == 1, s"alias tail '$tail' did not produce a SubqueryAlias") + assert(aliases.head.child.isInstanceOf[UnresolvedBinBy]) + } + + // Works as a SQL pipe-operator stage with |>. + assert(binByNodes(parsePlan( + """FROM metrics + ||> BIN BY ( + | RANGE ts_start TO ts_end + | BIN WIDTH INTERVAL '5' MINUTE + | DISTRIBUTE UNIFORM (value) + |)""".stripMargin)).size == 1) + } + private def intercept(sqlCommand: String, messages: String*): Unit = interceptParseException(parsePlan)(sqlCommand, messages: _*)() + + // Helpers shared by the BIN BY parser tests. + private def binByNodes(plan: LogicalPlan): Seq[UnresolvedBinBy] = + plan.collect { case b: UnresolvedBinBy => b } + + private def singleBinBy(plan: LogicalPlan): UnresolvedBinBy = { + val nodes = binByNodes(plan) + assert(nodes.size == 1, s"expected exactly one BIN BY node, got ${nodes.size}") + nodes.head + } } diff --git a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala index 1f525a541daae..747d9dc5d0ff6 100644 --- a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala +++ b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala @@ -210,7 +210,7 @@ class SparkConnectDatabaseMetaDataSuite extends ConnectFunSuite with RemoteSpark val metadata = conn.getMetaData // scalastyle:off line.size.limit // CURRENT_PATH and SYSTEM are excluded: getSQLKeywords drops SQL:2003 reserved words (see companion). - assert(metadata.getSQLKeywords === "ADD,AFTER,AGGREGATE,ALWAYS,ANALYZE,ANTI,ANY_VALUE,APPROX,ARCHIVE,ASC,BERNOULLI,BINDING,BUCKET,BUCKETS,BYTE,CACHE,CASCADE,CATALOG,CATALOGS,CHANGE,CHANGES,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATION,COLLATIONS,COLLECTION,COLUMNS,COMMENT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONTAINS,CONTINUE,COST,CURRENT_DATABASE,CURRENT_SCHEMA,DATA,DATABASE,DATABASES,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAYOFYEAR,DAYS,DBPROPERTIES,DEFAULT_PATH,DEFINED,DEFINER,DELAY,DELIMITED,DESC,DFS,DIRECTORIES,DIRECTORY,DISTANCE,DISTRIBUTE,DIV,DO,ELSEIF,ENFORCED,ESCAPED,EVOLUTION,EXACT,EXCHANGE,EXCLUDE,EXCLUSIVE,EXIT,EXPLAIN,EXPORT,EXTEND,EXTENDED,FIELDS,FILEFORMAT,FIRST,FLOW,FOLLOWING,FORMAT,FORMATTED,FOUND,FUNCTIONS,GENERATED,GEOGRAPHY,GEOMETRY,HANDLER,HOURS,IDENTIFIED,IDENTIFIER,IF,IGNORE,ILIKE,IMMEDIATE,INCLUDE,INCLUSIVE,INCREMENT,INDEX,INDEXES,INPATH,INPUT,INPUTFORMAT,INVOKER,ITEMS,ITERATE,JSON,KEY,KEYS,LAST,LAZY,LEAVE,LEVEL,LIMIT,LINES,LIST,LOAD,LOCATION,LOCK,LOCKS,LOGICAL,LONG,LOOP,MACRO,MAP,MATCHED,MATERIALIZED,MEASURE,METRICS,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTES,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NEAREST,NORELY,NULLS,OFFSET,OPTION,OPTIONS,OUTPUTFORMAT,OVERWRITE,PARTITIONED,PARTITIONS,PATH,PERCENT,PIVOT,PLACING,PRECEDING,PRINCIPALS,PROCEDURES,PROPERTIES,PURGE,QUALIFY,QUARTER,QUERY,RECORDREADER,RECORDWRITER,RECOVER,RECURSION,REDUCE,REFRESH,RELY,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,ROLE,ROLES,SCHEMA,SCHEMAS,SECONDS,SECURITY,SEMI,SEPARATED,SERDE,SERDEPROPERTIES,SETS,SHORT,SHOW,SIMILARITY,SINGLE,SKEWED,SORT,SORTED,SOURCE,STATISTICS,STORED,STRATIFY,STREAM,STREAMING,STRING,STRUCT,SUBSTR,SYNC,SYSTEM_PATH,SYSTEM_TIME,SYSTEM_VERSION,TABLES,TARGET,TBLPROPERTIES,TERMINATED,TIMEDIFF,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TOUCH,TRANSACTION,TRANSACTIONS,TRANSFORM,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNLOCK,UNPIVOT,UNSET,UNTIL,USE,VAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WATERMARK,WEEK,WEEKS,WHILE,X,YEARS,ZONE") + assert(metadata.getSQLKeywords === "ADD,AFTER,AGGREGATE,ALIGN,ALWAYS,ANALYZE,ANTI,ANY_VALUE,APPROX,ARCHIVE,ASC,BERNOULLI,BIN,BINDING,BIN_DISTRIBUTE_RATIO,BIN_END,BIN_START,BUCKET,BUCKETS,BYTE,CACHE,CASCADE,CATALOG,CATALOGS,CHANGE,CHANGES,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATION,COLLATIONS,COLLECTION,COLUMNS,COMMENT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONTAINS,CONTINUE,COST,CURRENT_DATABASE,CURRENT_SCHEMA,DATA,DATABASE,DATABASES,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAYOFYEAR,DAYS,DBPROPERTIES,DEFAULT_PATH,DEFINED,DEFINER,DELAY,DELIMITED,DESC,DFS,DIRECTORIES,DIRECTORY,DISTANCE,DISTRIBUTE,DIV,DO,ELSEIF,ENFORCED,ESCAPED,EVOLUTION,EXACT,EXCHANGE,EXCLUDE,EXCLUSIVE,EXIT,EXPLAIN,EXPORT,EXTEND,EXTENDED,FIELDS,FILEFORMAT,FIRST,FLOW,FOLLOWING,FORMAT,FORMATTED,FOUND,FUNCTIONS,GENERATED,GEOGRAPHY,GEOMETRY,HANDLER,HOURS,IDENTIFIED,IDENTIFIER,IF,IGNORE,ILIKE,IMMEDIATE,INCLUDE,INCLUSIVE,INCREMENT,INDEX,INDEXES,INPATH,INPUT,INPUTFORMAT,INVOKER,ITEMS,ITERATE,JSON,KEY,KEYS,LAST,LAZY,LEAVE,LEVEL,LIMIT,LINES,LIST,LOAD,LOCATION,LOCK,LOCKS,LOGICAL,LONG,LOOP,MACRO,MAP,MATCHED,MATERIALIZED,MEASURE,METRICS,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTES,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NEAREST,NORELY,NULLS,OFFSET,OPTION,OPTIONS,OUTPUTFORMAT,OVERWRITE,PARTITIONED,PARTITIONS,PATH,PERCENT,PIVOT,PLACING,PRECEDING,PRINCIPALS,PROCEDURES,PROPERTIES,PURGE,QUALIFY,QUARTER,QUERY,RECORDREADER,RECORDWRITER,RECOVER,RECURSION,REDUCE,REFRESH,RELY,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,ROLE,ROLES,SCHEMA,SCHEMAS,SECONDS,SECURITY,SEMI,SEPARATED,SERDE,SERDEPROPERTIES,SETS,SHORT,SHOW,SIMILARITY,SINGLE,SKEWED,SORT,SORTED,SOURCE,STATISTICS,STORED,STRATIFY,STREAM,STREAMING,STRING,STRUCT,SUBSTR,SYNC,SYSTEM_PATH,SYSTEM_TIME,SYSTEM_VERSION,TABLES,TARGET,TBLPROPERTIES,TERMINATED,TIMEDIFF,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TOUCH,TRANSACTION,TRANSACTIONS,TRANSFORM,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNIFORM,UNLOCK,UNPIVOT,UNSET,UNTIL,USE,VAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WATERMARK,WEEK,WEEKS,WHILE,WIDTH,X,YEARS,ZONE") // scalastyle:on line.size.limit } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 92818c12bfa09..6f904bc690c51 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -937,6 +937,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { throw SparkException.internalError( "Deduplicate operator for non streaming data source should have been replaced " + "by aggregate in the optimizer") + case _: logical.BinBy => + throw new SparkUnsupportedOperationException("UNSUPPORTED_FEATURE.BIN_BY") case logical.DeserializeToObject(deserializer, objAttr, child) => execution.DeserializeToObjectExec(deserializer, objAttr, planLater(child)) :: Nil diff --git a/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out b/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out index 6bcbdd2840f90..8482a10b22922 100644 --- a/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out @@ -7,6 +7,7 @@ struct ADD false AFTER false AGGREGATE false +ALIGN false ALL true ALTER false ALWAYS false @@ -28,8 +29,12 @@ BEGIN false BERNOULLI false BETWEEN false BIGINT false +BIN false BINARY false BINDING false +BIN_DISTRIBUTE_RATIO false +BIN_END false +BIN_START false BOOLEAN false BOTH true BUCKET false @@ -392,6 +397,7 @@ TYPE false UNARCHIVE false UNBOUNDED false UNCACHE false +UNIFORM false UNION true UNIQUE true UNKNOWN true @@ -419,6 +425,7 @@ WEEKS false WHEN true WHERE true WHILE false +WIDTH false WINDOW false WITH true WITHIN true diff --git a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out index a010343264469..4a48c1c01d7b7 100644 --- a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out @@ -7,6 +7,7 @@ struct ADD false AFTER false AGGREGATE false +ALIGN false ALL false ALTER false ALWAYS false @@ -28,8 +29,12 @@ BEGIN false BERNOULLI false BETWEEN false BIGINT false +BIN false BINARY false BINDING false +BIN_DISTRIBUTE_RATIO false +BIN_END false +BIN_START false BOOLEAN false BOTH false BUCKET false @@ -392,6 +397,7 @@ TYPE false UNARCHIVE false UNBOUNDED false UNCACHE false +UNIFORM false UNION false UNIQUE false UNKNOWN false @@ -419,6 +425,7 @@ WEEKS false WHEN false WHERE false WHILE false +WIDTH false WINDOW false WITH false WITHIN false diff --git a/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out b/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out index a010343264469..4a48c1c01d7b7 100644 --- a/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out @@ -7,6 +7,7 @@ struct ADD false AFTER false AGGREGATE false +ALIGN false ALL false ALTER false ALWAYS false @@ -28,8 +29,12 @@ BEGIN false BERNOULLI false BETWEEN false BIGINT false +BIN false BINARY false BINDING false +BIN_DISTRIBUTE_RATIO false +BIN_END false +BIN_START false BOOLEAN false BOTH false BUCKET false @@ -392,6 +397,7 @@ TYPE false UNARCHIVE false UNBOUNDED false UNCACHE false +UNIFORM false UNION false UNIQUE false UNKNOWN false @@ -419,6 +425,7 @@ WEEKS false WHEN false WHERE false WHILE false +WIDTH false WINDOW false WITH false WITHIN false diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index 1ecf5b3dae4a0..e20beaa151817 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -214,7 +214,7 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer { val sessionHandle = client.openSession(user, "") val infoValue = client.getInfo(sessionHandle, GetInfoType.CLI_ODBC_KEYWORDS) // scalastyle:off line.size.limit - assert(infoValue.getStringValue == "ADD,AFTER,AGGREGATE,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,APPROX,ARCHIVE,ARRAY,AS,ASC,ASENSITIVE,AT,ATOMIC,AUTHORIZATION,BEGIN,BERNOULLI,BETWEEN,BIGINT,BINARY,BINDING,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALL,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHANGES,CHAR,CHARACTER,CHECK,CLEAR,CLOSE,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLATIONS,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONDITION,CONSTRAINT,CONTAINS,CONTINUE,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATABASE,CURRENT_DATE,CURRENT_PATH,CURRENT_SCHEMA,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,CURSOR,DATA,DATABASE,DATABASES,DATE,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAY,DAYOFYEAR,DAYS,DBPROPERTIES,DEC,DECIMAL,DECLARE,DEFAULT,DEFAULT_PATH,DEFINED,DEFINER,DELAY,DELETE,DELIMITED,DESC,DESCRIBE,DETERMINISTIC,DFS,DIRECTORIES,DIRECTORY,DISTANCE,DISTINCT,DISTRIBUTE,DIV,DO,DOUBLE,DROP,ELSE,ELSEIF,END,ENFORCED,ESCAPE,ESCAPED,EVOLUTION,EXACT,EXCEPT,EXCHANGE,EXCLUDE,EXCLUSIVE,EXECUTE,EXISTS,EXIT,EXPLAIN,EXPORT,EXTEND,EXTENDED,EXTERNAL,EXTRACT,FALSE,FETCH,FIELDS,FILEFORMAT,FILTER,FIRST,FLOAT,FLOW,FOLLOWING,FOR,FOREIGN,FORMAT,FORMATTED,FOUND,FROM,FULL,FUNCTION,FUNCTIONS,GENERATED,GEOGRAPHY,GEOMETRY,GLOBAL,GRANT,GROUP,GROUPING,HANDLER,HAVING,HOUR,HOURS,IDENTIFIED,IDENTIFIER,IDENTITY,IF,IGNORE,ILIKE,IMMEDIATE,IMPORT,IN,INCLUDE,INCLUSIVE,INCREMENT,INDEX,INDEXES,INNER,INPATH,INPUT,INPUTFORMAT,INSENSITIVE,INSERT,INT,INTEGER,INTERSECT,INTERVAL,INTO,INVOKER,IS,ITEMS,ITERATE,JOIN,JSON,KEY,KEYS,LANGUAGE,LAST,LATERAL,LAZY,LEADING,LEAVE,LEFT,LEVEL,LIKE,LIMIT,LINES,LIST,LOAD,LOCAL,LOCATION,LOCK,LOCKS,LOGICAL,LONG,LOOP,MACRO,MAP,MATCHED,MATERIALIZED,MAX,MEASURE,MERGE,METRICS,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTE,MINUTES,MODIFIES,MONTH,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NATURAL,NEAREST,NEXT,NO,NONE,NORELY,NOT,NULL,NULLS,NUMERIC,OF,OFFSET,ON,ONLY,OPEN,OPTION,OPTIONS,OR,ORDER,OUT,OUTER,OUTPUTFORMAT,OVER,OVERLAPS,OVERLAY,OVERWRITE,PARTITION,PARTITIONED,PARTITIONS,PATH,PERCENT,PIVOT,PLACING,POSITION,PRECEDING,PRIMARY,PRINCIPALS,PROCEDURE,PROCEDURES,PROPERTIES,PURGE,QUALIFY,QUARTER,QUERY,RANGE,READ,READS,REAL,RECORDREADER,RECORDWRITER,RECOVER,RECURSION,RECURSIVE,REDUCE,REFERENCES,REFRESH,RELY,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,RETURN,RETURNS,REVOKE,RIGHT,ROLE,ROLES,ROLLBACK,ROLLUP,ROW,ROWS,SCHEMA,SCHEMAS,SECOND,SECONDS,SECURITY,SELECT,SEMI,SEPARATED,SERDE,SERDEPROPERTIES,SESSION_USER,SET,SETS,SHORT,SHOW,SIMILARITY,SINGLE,SKEWED,SMALLINT,SOME,SORT,SORTED,SOURCE,SPECIFIC,SQL,SQLEXCEPTION,SQLSTATE,START,STATISTICS,STORED,STRATIFY,STREAM,STREAMING,STRING,STRUCT,SUBSTR,SUBSTRING,SYNC,SYSTEM,SYSTEM_PATH,SYSTEM_TIME,SYSTEM_VERSION,TABLE,TABLES,TABLESAMPLE,TARGET,TBLPROPERTIES,TERMINATED,THEN,TIME,TIMEDIFF,TIMESTAMP,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TO,TOUCH,TRAILING,TRANSACTION,TRANSACTIONS,TRANSFORM,TRIM,TRUE,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNION,UNIQUE,UNKNOWN,UNLOCK,UNPIVOT,UNSET,UNTIL,UPDATE,USE,USER,USING,VALUE,VALUES,VAR,VARCHAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WATERMARK,WEEK,WEEKS,WHEN,WHERE,WHILE,WINDOW,WITH,WITHIN,WITHOUT,X,YEAR,YEARS,ZONE") + assert(infoValue.getStringValue == "ADD,AFTER,AGGREGATE,ALIGN,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,APPROX,ARCHIVE,ARRAY,AS,ASC,ASENSITIVE,AT,ATOMIC,AUTHORIZATION,BEGIN,BERNOULLI,BETWEEN,BIGINT,BIN,BINARY,BINDING,BIN_DISTRIBUTE_RATIO,BIN_END,BIN_START,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALL,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHANGES,CHAR,CHARACTER,CHECK,CLEAR,CLOSE,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLATIONS,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONDITION,CONSTRAINT,CONTAINS,CONTINUE,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATABASE,CURRENT_DATE,CURRENT_PATH,CURRENT_SCHEMA,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,CURSOR,DATA,DATABASE,DATABASES,DATE,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAY,DAYOFYEAR,DAYS,DBPROPERTIES,DEC,DECIMAL,DECLARE,DEFAULT,DEFAULT_PATH,DEFINED,DEFINER,DELAY,DELETE,DELIMITED,DESC,DESCRIBE,DETERMINISTIC,DFS,DIRECTORIES,DIRECTORY,DISTANCE,DISTINCT,DISTRIBUTE,DIV,DO,DOUBLE,DROP,ELSE,ELSEIF,END,ENFORCED,ESCAPE,ESCAPED,EVOLUTION,EXACT,EXCEPT,EXCHANGE,EXCLUDE,EXCLUSIVE,EXECUTE,EXISTS,EXIT,EXPLAIN,EXPORT,EXTEND,EXTENDED,EXTERNAL,EXTRACT,FALSE,FETCH,FIELDS,FILEFORMAT,FILTER,FIRST,FLOAT,FLOW,FOLLOWING,FOR,FOREIGN,FORMAT,FORMATTED,FOUND,FROM,FULL,FUNCTION,FUNCTIONS,GENERATED,GEOGRAPHY,GEOMETRY,GLOBAL,GRANT,GROUP,GROUPING,HANDLER,HAVING,HOUR,HOURS,IDENTIFIED,IDENTIFIER,IDENTITY,IF,IGNORE,ILIKE,IMMEDIATE,IMPORT,IN,INCLUDE,INCLUSIVE,INCREMENT,INDEX,INDEXES,INNER,INPATH,INPUT,INPUTFORMAT,INSENSITIVE,INSERT,INT,INTEGER,INTERSECT,INTERVAL,INTO,INVOKER,IS,ITEMS,ITERATE,JOIN,JSON,KEY,KEYS,LANGUAGE,LAST,LATERAL,LAZY,LEADING,LEAVE,LEFT,LEVEL,LIKE,LIMIT,LINES,LIST,LOAD,LOCAL,LOCATION,LOCK,LOCKS,LOGICAL,LONG,LOOP,MACRO,MAP,MATCHED,MATERIALIZED,MAX,MEASURE,MERGE,METRICS,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTE,MINUTES,MODIFIES,MONTH,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NATURAL,NEAREST,NEXT,NO,NONE,NORELY,NOT,NULL,NULLS,NUMERIC,OF,OFFSET,ON,ONLY,OPEN,OPTION,OPTIONS,OR,ORDER,OUT,OUTER,OUTPUTFORMAT,OVER,OVERLAPS,OVERLAY,OVERWRITE,PARTITION,PARTITIONED,PARTITIONS,PATH,PERCENT,PIVOT,PLACING,POSITION,PRECEDING,PRIMARY,PRINCIPALS,PROCEDURE,PROCEDURES,PROPERTIES,PURGE,QUALIFY,QUARTER,QUERY,RANGE,READ,READS,REAL,RECORDREADER,RECORDWRITER,RECOVER,RECURSION,RECURSIVE,REDUCE,REFERENCES,REFRESH,RELY,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,RETURN,RETURNS,REVOKE,RIGHT,ROLE,ROLES,ROLLBACK,ROLLUP,ROW,ROWS,SCHEMA,SCHEMAS,SECOND,SECONDS,SECURITY,SELECT,SEMI,SEPARATED,SERDE,SERDEPROPERTIES,SESSION_USER,SET,SETS,SHORT,SHOW,SIMILARITY,SINGLE,SKEWED,SMALLINT,SOME,SORT,SORTED,SOURCE,SPECIFIC,SQL,SQLEXCEPTION,SQLSTATE,START,STATISTICS,STORED,STRATIFY,STREAM,STREAMING,STRING,STRUCT,SUBSTR,SUBSTRING,SYNC,SYSTEM,SYSTEM_PATH,SYSTEM_TIME,SYSTEM_VERSION,TABLE,TABLES,TABLESAMPLE,TARGET,TBLPROPERTIES,TERMINATED,THEN,TIME,TIMEDIFF,TIMESTAMP,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TO,TOUCH,TRAILING,TRANSACTION,TRANSACTIONS,TRANSFORM,TRIM,TRUE,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNIFORM,UNION,UNIQUE,UNKNOWN,UNLOCK,UNPIVOT,UNSET,UNTIL,UPDATE,USE,USER,USING,VALUE,VALUES,VAR,VARCHAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WATERMARK,WEEK,WEEKS,WHEN,WHERE,WHILE,WIDTH,WINDOW,WITH,WITHIN,WITHOUT,X,YEAR,YEARS,ZONE") // scalastyle:on line.size.limit } }