Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,17 @@
* Special rule for Pinot, this rule is fixed to always insert an exchange or sort exchange below the WINDOW node.
* TODO:
* 1. Add support for more than one window group
* 2. Add support for functions other than aggregation functions (AVG, COUNT, MAX, MIN, SUM)
* 2. Add support for functions other than aggregation functions (AVG, COUNT, MAX, MIN, SUM, BOOL_AND, BOOL_OR)
* 3. Add support for custom frames
*/
public class PinotWindowExchangeNodeInsertRule extends RelOptRule {
public static final PinotWindowExchangeNodeInsertRule INSTANCE =
new PinotWindowExchangeNodeInsertRule(PinotRuleUtils.PINOT_REL_FACTORY);

// Supported window functions
// OTHER_FUNCTION supported are: BOOL_AND, BOOL_OR
private static final Set<SqlKind> SUPPORTED_WINDOW_FUNCTION_KIND = ImmutableSet.of(SqlKind.SUM, SqlKind.SUM0,
SqlKind.MIN, SqlKind.MAX, SqlKind.COUNT);
SqlKind.MIN, SqlKind.MAX, SqlKind.COUNT, SqlKind.OTHER_FUNCTION);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the other function we've supported? can we add a comment for this? (i think it is BOOL_AND and BOOL_OR)

but IMO, BOOL_AND and BOOL_OR are just MIN(booleanCol) and MAX(booleanCol) :-P i didn't understand why we needed that in the first place lol

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good point, I think we just saw that PostgreSQL has support for all aggregation functions and thought why not add what we have (at least the basic ones). Hope it's alright to leave this here. I've updated to add a comment here.


public PinotWindowExchangeNodeInsertRule(RelBuilderFactory factory) {
super(operand(LogicalWindow.class, any()), factory, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pinot.query.planner.stage.SortNode;
import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.planner.stage.TableScanNode;
import org.apache.pinot.query.planner.stage.WindowNode;
import org.apache.pinot.query.routing.VirtualServer;


Expand Down Expand Up @@ -81,6 +82,14 @@ public void attach(StageNode stageNode) {
_requiresSingletonInstance = _requiresSingletonInstance || (sortNode.getCollationKeys().size() > 0
&& sortNode.getOffset() != -1);
}
if (stageNode instanceof WindowNode) {
WindowNode windowNode = (WindowNode) stageNode;
// TODO: Figure out a way to parallelize Empty OVER() and OVER(ORDER BY) so the computation can be done across
// multiple nodes.
// Empty OVER() and OVER(ORDER BY) need to be processed on a singleton node. OVER() with PARTITION BY can be
// distributed as no global ordering is required across partitions.
_requiresSingletonInstance = _requiresSingletonInstance || (windowNode.getGroupSet().size() == 0);
}
}

public List<String> getScannedTables() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,46 @@

import com.clearspring.analytics.util.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexWindowBound;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.serde.ProtoProperties;


public class WindowNode extends AbstractStageNode {
@ProtoProperties
public List<RexExpression> _groupSet;
private List<RexExpression> _groupSet;
@ProtoProperties
public List<RexExpression> _orderSet;
private List<RexExpression> _orderSet;
@ProtoProperties
public List<RelFieldCollation.Direction> _orderSetDirection;
private List<RelFieldCollation.Direction> _orderSetDirection;
@ProtoProperties
public List<RelFieldCollation.NullDirection> _orderSetNullDirection;
private List<RelFieldCollation.NullDirection> _orderSetNullDirection;
@ProtoProperties
public List<RexExpression> _aggCalls;
private List<RexExpression> _aggCalls;
@ProtoProperties
public int _lowerBound;
private int _lowerBound;
@ProtoProperties
public int _upperBound;
@ProtoProperties
public boolean _isRows;
private int _upperBound;
@ProtoProperties
private List<RexExpression> _constants;
@ProtoProperties
private WindowFrameType _windowFrameType;

/**
* Enum to denote the type of window frame
* ROW - ROW type window frame
* RANGE - RANGE type window frame
*/
public enum WindowFrameType {
ROW,
RANGE
}

public WindowNode(int stageId) {
super(stageId);
Expand All @@ -62,7 +72,7 @@ public WindowNode(int stageId, List<Window.Group> windowGroups, List<RexLiteral>
String.format("Only a single window group is allowed! Number of window groups: %d", windowGroups.size()));
Window.Group windowGroup = windowGroups.get(0);

_groupSet = windowGroup.keys == null ? new ArrayList<>() : RexExpression.toRexInputRefs(windowGroup.keys);
_groupSet = windowGroup.keys == null ? Collections.emptyList() : RexExpression.toRexInputRefs(windowGroup.keys);
List<RelFieldCollation> relFieldCollations = windowGroup.orderKeys == null ? new ArrayList<>()
: windowGroup.orderKeys.getFieldCollations();
_orderSet = new ArrayList<>(relFieldCollations.size());
Expand All @@ -79,12 +89,12 @@ public WindowNode(int stageId, List<Window.Group> windowGroups, List<RexLiteral>
// Frame literals come in the constants from the LogicalWindow and the bound.getOffset() stores the
// InputRef to the constants array offset by the input array length. These need to be extracted here and
// set to the bounds.
validateFrameBounds(windowGroup.lowerBound, windowGroup.upperBound, windowGroup.isRows);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commented previously not sure isRows is a clear enough flag. let's name it isRowBased or use a enum

enum WindowBase {
  ROW, RANGE
}

Copy link
Contributor Author

@somandal somandal Mar 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done - I renamed it here, and store it as an enum in WindowAggregateOperator. If it's okay I'll create a util to store the enum in a common place for use in both places later as a followup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never mind, updated this to add the enum directly into WindowNode and access it in WindowAggregateOperator.


// Lower bound can only be unbounded preceding for now, set to Integer.MIN_VALUE
_lowerBound = Integer.MIN_VALUE;
// Upper bound can only be unbounded following or current row for now
_upperBound = windowGroup.upperBound.isUnbounded() ? Integer.MAX_VALUE : 0;
_isRows = windowGroup.isRows;
_windowFrameType = windowGroup.isRows ? WindowFrameType.ROW : WindowFrameType.RANGE;

// TODO: Constants are used to store constants needed such as the frame literals. For now just save this, need to
// extract the constant values into bounds as a part of frame support.
Expand Down Expand Up @@ -132,26 +142,11 @@ public int getUpperBound() {
return _upperBound;
}

public boolean isRows() {
return _isRows;
public WindowFrameType getWindowFrameType() {
return _windowFrameType;
}

public List<RexExpression> getConstants() {
return _constants;
}

private void validateFrameBounds(RexWindowBound lowerBound, RexWindowBound upperBound, boolean isRows) {
Preconditions.checkState(!isRows, "Only default frame is supported which must be RANGE and not ROWS");
Preconditions.checkState(lowerBound.isPreceding() && lowerBound.isUnbounded()
&& lowerBound.getOffset() == null,
String.format("Only default frame is supported, actual lower bound frame provided: %s", lowerBound));
if (_orderSet.isEmpty()) {
Preconditions.checkState(upperBound.isFollowing() && upperBound.isUnbounded()
&& upperBound.getOffset() == null,
String.format("Only default frame is supported, actual upper bound frame provided: %s", upperBound));
} else {
Preconditions.checkState(upperBound.isCurrentRow() && upperBound.getOffset() == null,
String.format("Only default frame is supported, actual upper bound frame provided: %s", upperBound));
}
}
}
Loading