-
Notifications
You must be signed in to change notification settings - Fork 1.5k
[multistage] Enable Tests for New Optimizer + Bug Fixes #15958
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,12 @@ | |
| */ | ||
| package org.apache.pinot.calcite.rel.rules; | ||
|
|
||
| import com.google.common.base.Preconditions; | ||
| import com.google.common.collect.ImmutableList; | ||
| import java.util.ArrayList; | ||
| import java.util.EnumSet; | ||
| import java.util.List; | ||
| import javax.annotation.Nullable; | ||
| import org.apache.calcite.plan.Contexts; | ||
| import org.apache.calcite.plan.hep.HepRelVertex; | ||
| import org.apache.calcite.rel.RelNode; | ||
|
|
@@ -28,7 +34,14 @@ | |
| import org.apache.calcite.rel.core.Project; | ||
| import org.apache.calcite.rel.core.RelFactories; | ||
| import org.apache.calcite.rel.core.TableScan; | ||
| import org.apache.calcite.rel.core.Window; | ||
| import org.apache.calcite.rex.RexCall; | ||
| import org.apache.calcite.rex.RexInputRef; | ||
| import org.apache.calcite.rex.RexLiteral; | ||
| import org.apache.calcite.rex.RexNode; | ||
| import org.apache.calcite.rex.RexWindowBound; | ||
| import org.apache.calcite.rex.RexWindowBounds; | ||
| import org.apache.calcite.sql.SqlAggFunction; | ||
| import org.apache.calcite.sql.SqlKind; | ||
| import org.apache.calcite.sql2rel.SqlToRelConverter; | ||
| import org.apache.calcite.tools.RelBuilder; | ||
|
|
@@ -122,4 +135,129 @@ public static String extractFunctionName(RexCall function) { | |
| SqlKind funcSqlKind = function.getOperator().getKind(); | ||
| return funcSqlKind == SqlKind.OTHER_FUNCTION ? function.getOperator().getName() : funcSqlKind.name(); | ||
| } | ||
|
|
||
| public static class WindowUtils { | ||
| // Supported window functions | ||
| // OTHER_FUNCTION supported are: BOOL_AND, BOOL_OR | ||
| private static final EnumSet<SqlKind> SUPPORTED_WINDOW_FUNCTION_KIND = | ||
| EnumSet.of(SqlKind.SUM, SqlKind.SUM0, SqlKind.MIN, SqlKind.MAX, SqlKind.COUNT, SqlKind.ROW_NUMBER, SqlKind.RANK, | ||
| SqlKind.DENSE_RANK, SqlKind.NTILE, SqlKind.LAG, SqlKind.LEAD, SqlKind.FIRST_VALUE, SqlKind.LAST_VALUE, | ||
| SqlKind.OTHER_FUNCTION); | ||
|
|
||
| public static void validateWindows(Window window) { | ||
| int numGroups = window.groups.size(); | ||
| // For Phase 1 we only handle single window groups | ||
| Preconditions.checkState(numGroups == 1, | ||
| String.format("Currently only 1 window group is supported, query has %d groups", numGroups)); | ||
|
|
||
| // Validate that only supported window aggregation functions are present | ||
| Window.Group windowGroup = window.groups.get(0); | ||
| validateWindowAggCallsSupported(windowGroup); | ||
|
|
||
| // Validate the frame | ||
| validateWindowFrames(windowGroup); | ||
| } | ||
|
|
||
| /** | ||
| * Replaces the reference to literal arguments in the window group with the actual literal values. | ||
| * NOTE: {@link Window} has a field called "constants" which contains the literal values. If the input reference is | ||
| * beyond the window input size, it is a reference to the constants. | ||
| */ | ||
| public static Window.Group updateLiteralArgumentsInWindowGroup(Window window) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add more comments about how does RexInputRef(x) find the element in constant list with more details? That's what I get by using copilot. 1. Understanding the InputsWhen Calcite constructs a
So, if your input row has N columns, and your window function needs K constants, the total "input" to the Window node is N + K. 2. How RexInputRef Works in This Context
Example: Suppose your table has 2 columns: SQL
Input to the Window node: employee_id | salary | 2 | 0 -- | -- | -- | -- ... | ... | 2 | 0So:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is lifted as is from |
||
| Window.Group oldWindowGroup = window.groups.get(0); | ||
| RelNode input = unboxRel(window.getInput()); | ||
| int numInputFields = input.getRowType().getFieldCount(); | ||
| List<RexNode> projects = input instanceof Project ? ((Project) input).getProjects() : null; | ||
|
|
||
| List<Window.RexWinAggCall> newAggCallWindow = new ArrayList<>(oldWindowGroup.aggCalls.size()); | ||
| boolean windowChanged = false; | ||
| for (Window.RexWinAggCall oldAggCall : oldWindowGroup.aggCalls) { | ||
| boolean changed = false; | ||
| List<RexNode> oldOperands = oldAggCall.getOperands(); | ||
| List<RexNode> newOperands = new ArrayList<>(oldOperands.size()); | ||
| for (RexNode oldOperand : oldOperands) { | ||
| RexLiteral literal = getLiteral(oldOperand, numInputFields, window.constants, projects); | ||
| if (literal != null) { | ||
| newOperands.add(literal); | ||
| changed = true; | ||
| windowChanged = true; | ||
| } else { | ||
| newOperands.add(oldOperand); | ||
| } | ||
| } | ||
| if (changed) { | ||
| newAggCallWindow.add( | ||
| new Window.RexWinAggCall((SqlAggFunction) oldAggCall.getOperator(), oldAggCall.type, newOperands, | ||
| oldAggCall.ordinal, oldAggCall.distinct, oldAggCall.ignoreNulls)); | ||
| } else { | ||
| newAggCallWindow.add(oldAggCall); | ||
| } | ||
| } | ||
|
|
||
| RexWindowBound lowerBound = oldWindowGroup.lowerBound; | ||
| RexNode offset = lowerBound.getOffset(); | ||
| if (offset != null) { | ||
| RexLiteral literal = getLiteral(offset, numInputFields, window.constants, projects); | ||
| if (literal == null) { | ||
| throw new IllegalStateException( | ||
| "Could not read window lower bound literal value from window group: " + oldWindowGroup); | ||
| } | ||
| lowerBound = lowerBound.isPreceding() ? RexWindowBounds.preceding(literal) : RexWindowBounds.following(literal); | ||
| windowChanged = true; | ||
| } | ||
| RexWindowBound upperBound = oldWindowGroup.upperBound; | ||
| offset = upperBound.getOffset(); | ||
| if (offset != null) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add comment on when the offset is null; unbounded preceding / unbounded following / current row
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lifted as is from |
||
| RexLiteral literal = getLiteral(offset, numInputFields, window.constants, projects); | ||
| if (literal == null) { | ||
| throw new IllegalStateException( | ||
| "Could not read window upper bound literal value from window group: " + oldWindowGroup); | ||
| } | ||
| upperBound = upperBound.isFollowing() ? RexWindowBounds.following(literal) : RexWindowBounds.preceding(literal); | ||
| windowChanged = true; | ||
| } | ||
|
|
||
| return windowChanged ? new Window.Group(oldWindowGroup.keys, oldWindowGroup.isRows, lowerBound, upperBound, | ||
| oldWindowGroup.exclude, oldWindowGroup.orderKeys, newAggCallWindow) : oldWindowGroup; | ||
| } | ||
|
|
||
| private static void validateWindowAggCallsSupported(Window.Group windowGroup) { | ||
| for (Window.RexWinAggCall aggCall : windowGroup.aggCalls) { | ||
| SqlKind aggKind = aggCall.getKind(); | ||
| Preconditions.checkState(SUPPORTED_WINDOW_FUNCTION_KIND.contains(aggKind), | ||
| String.format("Unsupported Window function kind: %s. Only aggregation functions are supported!", aggKind)); | ||
| } | ||
| } | ||
|
|
||
| private static void validateWindowFrames(Window.Group windowGroup) { | ||
| RexWindowBound lowerBound = windowGroup.lowerBound; | ||
| RexWindowBound upperBound = windowGroup.upperBound; | ||
|
|
||
| boolean hasOffset = (lowerBound.isPreceding() && !lowerBound.isUnbounded()) || (upperBound.isFollowing() | ||
| && !upperBound.isUnbounded()); | ||
|
|
||
| if (!windowGroup.isRows) { | ||
| Preconditions.checkState(!hasOffset, "RANGE window frame with offset PRECEDING / FOLLOWING is not supported"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. https://docs.pinot.apache.org/windows-functions The range frame clause with offset is claimed to be supported in the doc. Do I miss the content?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. my bad. |
||
| } | ||
| } | ||
|
|
||
| @Nullable | ||
| private static RexLiteral getLiteral(RexNode rexNode, int numInputFields, ImmutableList<RexLiteral> constants, | ||
| @Nullable List<RexNode> projects) { | ||
| if (!(rexNode instanceof RexInputRef)) { | ||
| return null; | ||
| } | ||
| int index = ((RexInputRef) rexNode).getIndex(); | ||
| if (index >= numInputFields) { | ||
| return constants.get(index - numInputFields); | ||
| } | ||
| if (projects != null) { | ||
| RexNode project = projects.get(index); | ||
| if (project instanceof RexLiteral) { | ||
| return (RexLiteral) project; | ||
| } | ||
| } | ||
| return null; | ||
| } | ||
| } | ||
| } | ||

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: lifted as is from the window exchange rule