Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ default int getExceptionsSize() {
*/
boolean isMaxRowsInJoinReached();

/**
* Returns whether the limit for max rows in window has been reached.
*/
boolean isMaxRowsInWindowReached();

/**
* Returns the total time used for query execution in milliseconds.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ public boolean isMaxRowsInJoinReached() {
return false;
}

@JsonIgnore
@Override
public boolean isMaxRowsInWindowReached() {
return false;
}

@Override
public long getTimeUsedMs() {
return _timeUsedMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,24 @@
* TODO: Currently this class cannot be used to deserialize the JSON response.
*/
@JsonPropertyOrder({
"resultTable", "partialResult", "exceptions", "numGroupsLimitReached", "maxRowsInJoinReached", "timeUsedMs",
"stageStats", "maxRowsInOperator", "requestId", "brokerId", "numDocsScanned", "totalDocs",
"numEntriesScannedInFilter", "numEntriesScannedPostFilter", "numServersQueried", "numServersResponded",
"numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
"numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "minConsumingFreshnessTimeMs",
"numSegmentsPrunedByBroker", "numSegmentsPrunedByServer", "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit",
"numSegmentsPrunedByValue", "brokerReduceTimeMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
"offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
"realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs",
"explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo"
"resultTable", "partialResult", "exceptions", "numGroupsLimitReached", "maxRowsInJoinReached",
"maxRowsInWindowReached", "timeUsedMs", "stageStats", "maxRowsInOperator", "requestId", "brokerId",
"numDocsScanned", "totalDocs", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", "numServersQueried",
"numServersResponded", "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched",
"numConsumingSegmentsQueried", "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched",
"minConsumingFreshnessTimeMs", "numSegmentsPrunedByBroker", "numSegmentsPrunedByServer", "numSegmentsPrunedInvalid",
"numSegmentsPrunedByLimit", "numSegmentsPrunedByValue", "brokerReduceTimeMs", "offlineThreadCpuTimeNs",
"realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
"offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
"realtimeTotalCpuTimeNs", "explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo"
})
public class BrokerResponseNativeV2 implements BrokerResponse {
private final StatMap<StatKey> _brokerStats = new StatMap<>(StatKey.class);
private final List<QueryProcessingException> _exceptions = new ArrayList<>();

private ResultTable _resultTable;
private boolean _maxRowsInJoinReached;
private boolean _maxRowsInWindowReached;
private long _timeUsedMs;
/**
* Statistics for each stage of the query execution.
Expand Down Expand Up @@ -121,6 +122,15 @@ public void mergeMaxRowsInJoinReached(boolean maxRowsInJoinReached) {
_maxRowsInJoinReached |= maxRowsInJoinReached;
}

@Override
public boolean isMaxRowsInWindowReached() {
return _maxRowsInWindowReached;
}

public void mergeMaxRowsInWindowReached(boolean maxRowsInWindowReached) {
_maxRowsInWindowReached |= maxRowsInWindowReached;
}

/**
* Returns the stage statistics.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.WindowOverFlowMode;


/**
Expand Down Expand Up @@ -280,4 +281,16 @@ public static JoinOverFlowMode getJoinOverflowMode(Map<String, String> queryOpti
String joinOverflowModeStr = queryOptions.get(QueryOptionKey.JOIN_OVERFLOW_MODE);
return joinOverflowModeStr != null ? JoinOverFlowMode.valueOf(joinOverflowModeStr) : null;
}

@Nullable
public static Integer getMaxRowsInWindow(Map<String, String> queryOptions) {
String maxRowsInWindow = queryOptions.get(QueryOptionKey.MAX_ROWS_IN_WINDOW);
return maxRowsInWindow != null ? Integer.parseInt(maxRowsInWindow) : null;
}

@Nullable
public static WindowOverFlowMode getWindowOverflowMode(Map<String, String> queryOptions) {
String windowOverflowModeStr = queryOptions.get(QueryOptionKey.WINDOW_OVERFLOW_MODE);
return windowOverflowModeStr != null ? WindowOverFlowMode.valueOf(windowOverflowModeStr) : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class PinotHintOptions {
public static final String AGGREGATE_HINT_OPTIONS = "aggOptions";
public static final String JOIN_HINT_OPTIONS = "joinOptions";
public static final String TABLE_HINT_OPTIONS = "tableOptions";
public static final String WINDOW_HINT_OPTIONS = "windowOptions";

/**
* Hint to denote that the aggregation node is the final aggregation stage which extracts the final result.
Expand Down Expand Up @@ -68,6 +69,19 @@ public static class AggregateOptions {
public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY = "max_initial_result_holder_capacity";
}

public static class WindowHintOptions {
/**
* Max rows allowed to cache the rows in window for further processing.
*/
public static final String MAX_ROWS_IN_WINDOW = "max_rows_in_window";
/**
* Mode when window overflow happens, supported values: THROW or BREAK.
* THROW(default): Break window cache build process, and throw exception, no further WINDOW operation performed.
* BREAK: Break window cache build process, continue to perform WINDOW operation, results might be partial.
*/
public static final String WINDOW_OVERFLOW_MODE = "window_overflow_mode";
}

public static class JoinHintOptions {
public static final String JOIN_STRATEGY = "join_strategy";
public static final String DYNAMIC_BROADCAST_JOIN_STRATEGY = "dynamic_broadcast";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ private static PlanNode convertLogicalValues(LogicalValues node, int currentStag
}

private static PlanNode convertLogicalWindow(LogicalWindow node, int currentStageId) {
return new WindowNode(currentStageId, node.groups, node.constants, toDataSchema(node.getRowType()));
return new WindowNode(currentStageId, node.groups, node.constants, toDataSchema(node.getRowType()),
node.getHints());
}

private static PlanNode convertLogicalSort(LogicalSort node, int currentStageId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rex.RexLiteral;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
Expand All @@ -51,6 +52,8 @@ public class WindowNode extends AbstractPlanNode {
private List<RexExpression> _constants;
@ProtoProperties
private WindowFrameType _windowFrameType;
@ProtoProperties
private NodeHint _windowHints;

/**
* Enum to denote the type of window frame
Expand All @@ -66,7 +69,7 @@ public WindowNode(int planFragmentId) {
}

public WindowNode(int planFragmentId, List<Window.Group> windowGroups, List<RexLiteral> constants,
DataSchema dataSchema) {
DataSchema dataSchema, List<RelHint> windowHints) {
super(planFragmentId, dataSchema);
// Only a single Window Group should exist per WindowNode.
Preconditions.checkState(windowGroups.size() == 1,
Expand Down Expand Up @@ -103,6 +106,7 @@ public WindowNode(int planFragmentId, List<Window.Group> windowGroups, List<RexL
for (RexLiteral constant : constants) {
_constants.add(RexExpressionUtils.fromRexLiteral(constant));
}
_windowHints = new NodeHint(windowHints);
}

@Override
Expand Down Expand Up @@ -150,4 +154,8 @@ public WindowFrameType getWindowFrameType() {
public List<RexExpression> getConstants() {
return _constants;
}

public NodeHint getWindowHints() {
return _windowHints;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ public void mergeInto(BrokerResponseNativeV2 response, StatMap<?> map) {
@SuppressWarnings("unchecked")
StatMap<WindowAggregateOperator.StatKey> stats = (StatMap<WindowAggregateOperator.StatKey>) map;
response.mergeMaxRowsInOperator(stats.getLong(WindowAggregateOperator.StatKey.EMITTED_ROWS));
response.mergeMaxRowsInWindowReached(
stats.getBoolean(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW_REACHED));
}
},;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,17 @@
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.commons.collections.CollectionUtils;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
import org.apache.pinot.query.planner.plannode.WindowNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
Expand All @@ -46,6 +51,7 @@
import org.apache.pinot.query.runtime.operator.window.WindowFunction;
import org.apache.pinot.query.runtime.operator.window.WindowFunctionFactory;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.WindowOverFlowMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -82,6 +88,8 @@
public class WindowAggregateOperator extends MultiStageOperator {
private static final String EXPLAIN_NAME = "WINDOW";
private static final Logger LOGGER = LoggerFactory.getLogger(WindowAggregateOperator.class);
private static final int DEFAULT_MAX_ROWS_IN_WINDOW = 1024 * 1024; // 2^20, around 1MM rows
private static final WindowOverFlowMode DEFAULT_WINDOW_OVERFLOW_MODE = WindowOverFlowMode.THROW;

// List of window functions which can only be applied as ROWS window frame type
public static final Set<String> ROWS_ONLY_FUNCTION_NAMES = ImmutableSet.of("ROW_NUMBER");
Expand All @@ -99,6 +107,19 @@ public class WindowAggregateOperator extends MultiStageOperator {
private final Map<Key, List<Object[]>> _partitionRows;
private final boolean _isPartitionByOnly;

// Below are specific parameters to protect the window cache from growing too large.
// Once the window cache reaches the limit, we will throw exception or break the cache build process.
/**
* Max rows allowed to build the right table hash collection.
*/
private final int _maxRowsInWindowCache;
/**
* Mode when window overflow happens, supported values: THROW or BREAK.
* THROW(default): Break window cache build process, and throw exception, no WINDOW operation performed.
* BREAK: Break window cache build process, continue to perform WINDOW operation, results might be partial or wrong.
*/
private final WindowOverFlowMode _windowOverflowMode;

private int _numRows;
private boolean _hasReturnedWindowAggregateBlock;
@Nullable
Expand All @@ -110,7 +131,7 @@ public WindowAggregateOperator(OpChainExecutionContext context, MultiStageOperat
List<RexExpression> groupSet, List<RexExpression> orderSet, List<RelFieldCollation.Direction> orderSetDirection,
List<RelFieldCollation.NullDirection> orderSetNullDirection, List<RexExpression> aggCalls, int lowerBound,
int upperBound, WindowNode.WindowFrameType windowFrameType, List<RexExpression> constants,
DataSchema resultSchema, DataSchema inputSchema) {
DataSchema resultSchema, DataSchema inputSchema, AbstractPlanNode.NodeHint hints) {
super(context);

_inputOperator = inputOperator;
Expand Down Expand Up @@ -142,6 +163,9 @@ public WindowAggregateOperator(OpChainExecutionContext context, MultiStageOperat

_numRows = 0;
_hasReturnedWindowAggregateBlock = false;
Map<String, String> metadata = context.getOpChainMetadata();
_maxRowsInWindowCache = getMaxRowInWindow(metadata, hints);
_windowOverflowMode = getWindowOverflowMode(metadata, hints);
}

@Override
Expand All @@ -155,6 +179,36 @@ protected Logger logger() {
return LOGGER;
}

private int getMaxRowInWindow(Map<String, String> opChainMetadata, @Nullable AbstractPlanNode.NodeHint nodeHint) {
if (nodeHint != null) {
Map<String, String> windowOptions = nodeHint._hintOptions.get(PinotHintOptions.WINDOW_HINT_OPTIONS);
if (windowOptions != null) {
String maxRowsInWindowStr = windowOptions.get(PinotHintOptions.WindowHintOptions.MAX_ROWS_IN_WINDOW);
if (maxRowsInWindowStr != null) {
return Integer.parseInt(maxRowsInWindowStr);
}
}
}
Integer maxRowsInWindow = QueryOptionsUtils.getMaxRowsInWindow(opChainMetadata);
return maxRowsInWindow != null ? maxRowsInWindow : DEFAULT_MAX_ROWS_IN_WINDOW;
}

private WindowOverFlowMode getWindowOverflowMode(Map<String, String> contextMetadata,
@Nullable AbstractPlanNode.NodeHint nodeHint) {
if (nodeHint != null) {
Map<String, String> windowOptions = nodeHint._hintOptions.get(PinotHintOptions.WINDOW_HINT_OPTIONS);
if (windowOptions != null) {
String windowOverflowModeStr = windowOptions.get(PinotHintOptions.WindowHintOptions.WINDOW_OVERFLOW_MODE);
if (windowOverflowModeStr != null) {
return WindowOverFlowMode.valueOf(windowOverflowModeStr);
}
}
}
WindowOverFlowMode windowOverflowMode =
QueryOptionsUtils.getWindowOverflowMode(contextMetadata);
return windowOverflowMode != null ? windowOverflowMode : DEFAULT_WINDOW_OVERFLOW_MODE;
}

@Override
public List<MultiStageOperator> getChildOperators() {
return ImmutableList.of(_inputOperator);
Expand All @@ -172,7 +226,8 @@ public String toExplainString() {
}

@Override
protected TransferableBlock getNextBlock() {
protected TransferableBlock getNextBlock()
throws ProcessingException {
if (_hasReturnedWindowAggregateBlock) {
return _eosBlock;
}
Expand Down Expand Up @@ -213,16 +268,34 @@ private boolean isPartitionByOnlyQuery(List<RexExpression> groupSet, List<RexExp
/**
* @return the final block, which must be either an end of stream or an error.
*/
private TransferableBlock computeBlocks() {
private TransferableBlock computeBlocks() throws ProcessingException {
TransferableBlock block = _inputOperator.nextBlock();
while (!TransferableBlockUtils.isEndOfStream(block)) {
List<Object[]> container = block.getContainer();
int containerSize = container.size();
if (_numRows + containerSize > _maxRowsInWindowCache) {
if (_windowOverflowMode == WindowOverFlowMode.THROW) {
ProcessingException resourceLimitExceededException =
new ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
resourceLimitExceededException.setMessage(
"Cannot build in memory window cache for WINDOW operator, reach number of rows limit: "
+ _maxRowsInWindowCache);
throw resourceLimitExceededException;
} else {
// Just fill up the buffer.
int remainingRows = _maxRowsInWindowCache - _numRows;
container = container.subList(0, remainingRows);
_statMap.merge(StatKey.MAX_ROWS_IN_WINDOW_REACHED, true);
// setting the inputOperator to be early terminated and awaits EOS block next.
_inputOperator.earlyTerminate();
}
}
for (Object[] row : container) {
_numRows++;
// TODO: Revisit null direction handling for all query types
Key key = AggregationUtils.extractRowKey(row, _groupSet);
_partitionRows.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
}
_numRows += containerSize;
block = _inputOperator.nextBlock();
}
// Early termination if the block is an error block
Expand All @@ -240,8 +313,7 @@ private TransferableBlock computeBlocks() {
List<List<Object>> windowFunctionResults = new ArrayList<>();
for (WindowFunction windowFunction : _windowFunctions) {
List<Object> processRows = windowFunction.processRows(rowList);
Preconditions.checkState(processRows.size() == rowList.size(),
"Number of rows in the result set must match the number of rows in the input set");
assert processRows.size() == rowList.size();
windowFunctionResults.add(processRows);
}

Expand Down Expand Up @@ -359,7 +431,8 @@ public boolean includeDefaultInJson() {
public boolean includeDefaultInJson() {
return true;
}
};
},
MAX_ROWS_IN_WINDOW_REACHED(StatMap.Type.BOOLEAN);
private final StatMap.Type _type;

StatKey(StatMap.Type type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public MultiStageOperator visitWindow(WindowNode node, OpChainExecutionContext c
return new WindowAggregateOperator(context, nextOperator, node.getGroupSet(), node.getOrderSet(),
node.getOrderSetDirection(), node.getOrderSetNullDirection(), node.getAggCalls(), node.getLowerBound(),
node.getUpperBound(), node.getWindowFrameType(), node.getConstants(), node.getDataSchema(),
node.getInputs().get(0).getDataSchema());
node.getInputs().get(0).getDataSchema(), node.getWindowHints());
}

@Override
Expand Down
Loading