Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pinot-common/src/main/proto/plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,15 @@ enum WindowFrameType {
RANGE = 1;
}

// SQL standard `<window exclusion>` clause. Names are wire-stable; renumbering breaks mixed-version brokers/servers.
// `EXCLUDE_NO_OTHERS = 0` so that nodes serialized before this field existed deserialize to the default behavior.
enum WindowExclusion {
EXCLUDE_NO_OTHERS = 0;
EXCLUDE_CURRENT_ROW = 1;
EXCLUDE_GROUP = 2;
EXCLUDE_TIES = 3;
}

message WindowNode {
repeated int32 keys = 1;
repeated Collation collations = 2;
Expand All @@ -251,6 +260,7 @@ message WindowNode {
int32 lowerBound = 5;
int32 upperBound = 6;
repeated Literal constants = 7;
WindowExclusion exclude = 8;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This extends the broker-to-server plan wire format without any mixed-version guard. A newer broker can now serialize exclude != EXCLUDE_NO_OTHERS to an older server, and that server will just ignore field 8 / the new enum and execute legacy NO_OTHERS semantics. During a rolling upgrade that turns EXCLUDE CURRENT ROW/GROUP/TIES into silently wrong results instead of a rejected query, so this needs a capability/version gate or broker-side fallback before merge.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is required since the EXCLUDE clause isn't currently supported. So the expectation would be to only use the new feature after the entire cluster is upgraded to the newer version.

}

// A node that doesn't carry semantic information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,9 @@ public PlanNode visitWindow(WindowNode node, PlanNode context) {
if (node.getUpperBound() != otherNode.getUpperBound()) {
return null;
}
if (node.getExclude() != otherNode.getExclude()) {
return null;
}
if (!node.getConstants().equals(otherNode.getConstants())) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ public Void visitWindow(WindowNode node, Void context) {

Window.Group group =
new Window.Group(keys, isRow, getWindowBound(node.getLowerBound()), getWindowBound(node.getUpperBound()),
RexWindowExclusion.EXCLUDE_NO_OTHER, orderKeys, aggCalls);
toRexWindowExclusion(node.getExclude()), orderKeys, aggCalls);

List<RexLiteral> constants =
node.getConstants().stream().map(constant -> RexExpressionUtils.toRexLiteral(_builder, constant))
Expand All @@ -395,6 +395,21 @@ public Void visitWindow(WindowNode node, Void context) {
return null;
}

private static RexWindowExclusion toRexWindowExclusion(WindowNode.WindowExclusion exclude) {
switch (exclude) {
case NO_OTHERS:
return RexWindowExclusion.EXCLUDE_NO_OTHER;
case CURRENT_ROW:
return RexWindowExclusion.EXCLUDE_CURRENT_ROW;
case GROUP:
return RexWindowExclusion.EXCLUDE_GROUP;
case TIES:
return RexWindowExclusion.EXCLUDE_TIES;
default:
throw new IllegalStateException("Unsupported WindowExclusion: " + exclude);
}
}

private RexWindowBound getWindowBound(int bound) {
if (bound == Integer.MIN_VALUE) {
return RexWindowBounds.UNBOUNDED_PRECEDING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,18 +629,12 @@ private ValueNode convertLogicalValues(LogicalValues node) {
convertInputs(node.getInputs()), literalRows);
}

/**
* TODO: Add support for exclude clauses ({@link org.apache.calcite.rex.RexWindowExclusion})
*/
private WindowNode convertLogicalWindow(LogicalWindow node) {
// Only a single Window Group should exist per WindowNode.
Preconditions.checkState(node.groups.size() == 1, "Only a single window group is allowed, got: %s",
node.groups.size());
Window.Group windowGroup = node.groups.get(0);

Preconditions.checkState(windowGroup.exclude == RexWindowExclusion.EXCLUDE_NO_OTHER,
"EXCLUDE clauses for window functions are not currently supported");

int numAggregates = windowGroup.aggCalls.size();
List<RexExpression.FunctionCall> aggCalls = new ArrayList<>(numAggregates);
for (int i = 0; i < numAggregates; i++) {
Expand Down Expand Up @@ -684,7 +678,18 @@ private WindowNode convertLogicalWindow(LogicalWindow node) {
}
return new WindowNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), NodeHint.fromRelHints(node.getHints()),
convertInputs(node.getInputs()), windowGroup.keys.asList(), windowGroup.orderKeys.getFieldCollations(),
aggCalls, windowFrameType, lowerBound, upperBound, constants);
aggCalls, windowFrameType, lowerBound, upperBound, fromRexWindowExclusion(windowGroup.exclude), constants);
}

public static WindowNode.WindowExclusion fromRexWindowExclusion(RexWindowExclusion exclude) {
if (exclude == RexWindowExclusion.EXCLUDE_CURRENT_ROW) {
return WindowNode.WindowExclusion.CURRENT_ROW;
} else if (exclude == RexWindowExclusion.EXCLUDE_GROUP) {
return WindowNode.WindowExclusion.GROUP;
} else if (exclude == RexWindowExclusion.EXCLUDE_TIES) {
return WindowNode.WindowExclusion.TIES;
}
return WindowNode.WindowExclusion.NO_OTHERS;
}

private SortNode convertLogicalSort(LogicalSort node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.query.planner.logical.RelToPlanNodeConverter;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.logical.RexExpressionUtils;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAggregate;
Expand Down Expand Up @@ -188,7 +189,8 @@ public static WindowNode convertWindow(Window node) {
}
return new WindowNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), NodeHint.fromRelHints(node.getHints()),
new ArrayList<>(), windowGroup.keys.asList(), windowGroup.orderKeys.getFieldCollations(),
aggCalls, windowFrameType, lowerBound, upperBound, constants);
aggCalls, windowFrameType, lowerBound, upperBound,
RelToPlanNodeConverter.fromRexWindowExclusion(windowGroup.exclude), constants);
}

public static SortNode convertSort(Sort node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,20 @@ public class WindowNode extends BasePlanNode {
// Integer.MAX_VALUE represents UNBOUNDED FOLLOWING which is only allowed for the upper bound (ensured by Calcite).
private final int _lowerBound;
private final int _upperBound;
private final WindowExclusion _exclude;
private final List<RexExpression.Literal> _constants;

public WindowNode(int stageId, DataSchema dataSchema, NodeHint nodeHint, List<PlanNode> inputs, List<Integer> keys,
List<RelFieldCollation> collations, List<RexExpression.FunctionCall> aggCalls, WindowFrameType windowFrameType,
int lowerBound, int upperBound, List<RexExpression.Literal> constants) {
int lowerBound, int upperBound, WindowExclusion exclude, List<RexExpression.Literal> constants) {
super(stageId, dataSchema, nodeHint, inputs);
_keys = keys;
_collations = collations;
_aggCalls = aggCalls;
_windowFrameType = windowFrameType;
_lowerBound = lowerBound;
_upperBound = upperBound;
_exclude = exclude;
_constants = constants;
}

Expand Down Expand Up @@ -74,6 +76,10 @@ public int getUpperBound() {
return _upperBound;
}

public WindowExclusion getExclude() {
return _exclude;
}

public List<RexExpression.Literal> getConstants() {
return _constants;
}
Expand All @@ -91,7 +97,7 @@ public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {
@Override
public PlanNode withInputs(List<PlanNode> inputs) {
return new WindowNode(_stageId, _dataSchema, _nodeHint, inputs, _keys, _collations, _aggCalls, _windowFrameType,
_lowerBound, _upperBound, _constants);
_lowerBound, _upperBound, _exclude, _constants);
}

@Override
Expand All @@ -108,13 +114,14 @@ public boolean equals(Object o) {
WindowNode that = (WindowNode) o;
return _lowerBound == that._lowerBound && _upperBound == that._upperBound && Objects.equals(_aggCalls,
that._aggCalls) && Objects.equals(_keys, that._keys) && Objects.equals(_collations, that._collations)
&& _windowFrameType == that._windowFrameType && Objects.equals(_constants, that._constants);
&& _windowFrameType == that._windowFrameType && _exclude == that._exclude
&& Objects.equals(_constants, that._constants);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), _aggCalls, _keys, _collations, _windowFrameType, _lowerBound, _upperBound,
_constants);
_exclude, _constants);
}

/**
Expand All @@ -125,4 +132,18 @@ public int hashCode() {
public enum WindowFrameType {
ROWS, RANGE
}

/**
* Enum to denote the frame exclusion option (SQL standard {@code EXCLUDE} clause).
* {@link #NO_OTHERS} is the default and means no rows are excluded.
* {@link #CURRENT_ROW} excludes only the current row from the frame.
* {@link #GROUP} excludes the current row and all its ordering peers.
* {@link #TIES} excludes the ordering peers of the current row but keeps the current row.
*
* <p>The constant names are part of the wire protocol via {@code Plan.WindowExclusion} and must remain stable across
* mixed-version brokers and servers.
*/
public enum WindowExclusion {
NO_OTHERS, CURRENT_ROW, GROUP, TIES
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ private static WindowNode deserializeWindowNode(Plan.PlanNode protoNode) {
extractInputs(protoNode), protoWindowNode.getKeysList(), convertCollations(protoWindowNode.getCollationsList()),
convertFunctionCalls(protoWindowNode.getAggCallsList()),
convertWindowFrameType(protoWindowNode.getWindowFrameType()), protoWindowNode.getLowerBound(),
protoWindowNode.getUpperBound(), convertLiterals(protoWindowNode.getConstantsList()));
protoWindowNode.getUpperBound(), convertWindowExclusion(protoWindowNode.getExclude()),
convertLiterals(protoWindowNode.getConstantsList()));
}

private static ExplainedNode deserializeExplainedNode(Plan.PlanNode protoNode) {
Expand Down Expand Up @@ -496,4 +497,19 @@ private static WindowNode.WindowFrameType convertWindowFrameType(Plan.WindowFram
throw new IllegalStateException("Unsupported WindowFrameType: " + windowFrameType);
}
}

private static WindowNode.WindowExclusion convertWindowExclusion(Plan.WindowExclusion exclude) {
switch (exclude) {
case EXCLUDE_NO_OTHERS:
return WindowNode.WindowExclusion.NO_OTHERS;
case EXCLUDE_CURRENT_ROW:
return WindowNode.WindowExclusion.CURRENT_ROW;
case EXCLUDE_GROUP:
return WindowNode.WindowExclusion.GROUP;
case EXCLUDE_TIES:
return WindowNode.WindowExclusion.TIES;
default:
throw new IllegalStateException("Unsupported WindowExclusion: " + exclude);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ public Void visitWindow(WindowNode node, Plan.PlanNode.Builder builder) {
.setWindowFrameType(convertWindowFrameType(node.getWindowFrameType()))
.setLowerBound(node.getLowerBound())
.setUpperBound(node.getUpperBound())
.setExclude(convertWindowExclusion(node.getExclude()))
.addAllConstants(convertLiterals(node.getConstants()))
.build();
builder.setWindowNode(windowNode);
Expand Down Expand Up @@ -477,5 +478,20 @@ private static Plan.WindowFrameType convertWindowFrameType(WindowNode.WindowFram
throw new IllegalStateException("Unsupported WindowFrameType: " + windowFrameType);
}
}

private static Plan.WindowExclusion convertWindowExclusion(WindowNode.WindowExclusion exclude) {
switch (exclude) {
case NO_OTHERS:
return Plan.WindowExclusion.EXCLUDE_NO_OTHERS;
case CURRENT_ROW:
return Plan.WindowExclusion.EXCLUDE_CURRENT_ROW;
case GROUP:
return Plan.WindowExclusion.EXCLUDE_GROUP;
case TIES:
return Plan.WindowExclusion.EXCLUDE_TIES;
default:
throw new IllegalStateException("Unsupported WindowExclusion: " + exclude);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -624,12 +624,6 @@ public void testWindowFunctions() {
+ "CURRENT ROW) FROM a";
e = expectThrows(RuntimeException.class, () -> _queryEnvironment.planQuery(ntileQueryWithNoArg));
assertTrue(e.getMessage().contains("expecting 1 argument"));

String excludeCurrentRowQuery =
"SELECT col1, col2, SUM(col3) OVER (PARTITION BY col1 ORDER BY col2 ROWS BETWEEN UNBOUNDED PRECEDING AND "
+ "CURRENT ROW EXCLUDE CURRENT ROW) FROM a";
e = expectThrows(RuntimeException.class, () -> _queryEnvironment.planQuery(excludeCurrentRowQuery));
assertTrue(e.getMessage().contains("EXCLUDE clauses for window functions are not currently supported"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ public WindowAggregateOperator(OpChainExecutionContext context, MultiStageOperat
for (int i = 0; i < numKeys; i++) {
_keys[i] = keys.get(i);
}
WindowFrame windowFrame = new WindowFrame(node.getWindowFrameType(), node.getLowerBound(), node.getUpperBound());
WindowFrame windowFrame =
new WindowFrame(node.getWindowFrameType(), node.getLowerBound(), node.getUpperBound(), node.getExclude());
Preconditions.checkState(
windowFrame.isRowType() || ((windowFrame.isUnboundedPreceding() || windowFrame.isLowerBoundCurrentRow()) && (
windowFrame.isUnboundedFollowing() || windowFrame.isUpperBoundCurrentRow())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

/**
* Defines the window frame to be used for a window function. The 'lowerBound' and 'upperBound' indicate the frame
* boundaries to be used. The frame can be of two types: ROWS or RANGE.
* boundaries to be used. The frame can be of two types: ROWS or RANGE. Optionally an {@code EXCLUDE} clause may
* specify a subset of rows around the current row to be excluded from the frame.
*/
public class WindowFrame {
// Enum to denote the FRAME type, can be either ROWS or RANGE types
Expand All @@ -33,11 +34,14 @@ public class WindowFrame {
// Integer.MAX_VALUE represents UNBOUNDED FOLLOWING which is only allowed for the upper bound (ensured by Calcite).
private final int _lowerBound;
private final int _upperBound;
private final WindowNode.WindowExclusion _exclude;

public WindowFrame(WindowNode.WindowFrameType type, int lowerBound, int upperBound) {
public WindowFrame(WindowNode.WindowFrameType type, int lowerBound, int upperBound,
WindowNode.WindowExclusion exclude) {
_type = type;
_lowerBound = lowerBound;
_upperBound = upperBound;
_exclude = exclude;
}

public boolean isUnboundedPreceding() {
Expand Down Expand Up @@ -68,8 +72,17 @@ public int getUpperBound() {
return _upperBound;
}

public WindowNode.WindowExclusion getExclude() {
return _exclude;
}

public boolean isExcludeNoOthers() {
return _exclude == WindowNode.WindowExclusion.NO_OTHERS;
}

@Override
public String toString() {
return "WindowFrame{" + "type=" + _type + ", lowerBound=" + _lowerBound + ", upperBound=" + _upperBound + '}';
return "WindowFrame{type=" + _type + ", lowerBound=" + _lowerBound + ", upperBound=" + _upperBound + ", exclude="
+ _exclude + '}';
}
}
Loading
Loading