-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Enforce max rows in join limit on joined rows with left input as well #13922
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -94,12 +94,14 @@ public class HashJoinOperator extends MultiStageOperator { | |
| private final List<TransformOperand> _nonEquiEvaluators; | ||
| private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class); | ||
|
|
||
| // Below are specific parameters to protect the hash table from growing too large. | ||
| // Below are specific parameters to protect the server from a very large join operation. | ||
| // Once the hash table reaches the limit, we will throw exception or break the right table build process. | ||
| // The limit also applies to the number of joined rows in blocks from the left table. | ||
| /** | ||
| * Max rows allowed to build the right table hash collection. | ||
| * Max rows allowed to build the right table hash collection. Also max rows emitted in each join with a block from | ||
| * the left table. | ||
| */ | ||
| private final int _maxRowsInHashTable; | ||
| private final int _maxRowsInJoin; | ||
| /** | ||
| * Mode when join overflow happens, supported values: THROW or BREAK. | ||
| * THROW(default): Break right table build process, and throw exception, no JOIN with left table performed. | ||
|
|
@@ -109,6 +111,7 @@ public class HashJoinOperator extends MultiStageOperator { | |
|
|
||
| private boolean _isHashTableBuilt; | ||
| private int _currentRowsInHashTable; | ||
| private int _currentJoinedRows; | ||
| private TransferableBlock _upstreamErrorBlock; | ||
| private MultiStageQueryStats _leftSideStats; | ||
| private MultiStageQueryStats _rightSideStats; | ||
|
|
@@ -145,8 +148,10 @@ public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator left | |
| } | ||
| Map<String, String> metadata = context.getOpChainMetadata(); | ||
| PlanNode.NodeHint nodeHint = node.getNodeHint(); | ||
| _maxRowsInHashTable = getMaxRowInJoin(metadata, nodeHint); | ||
| _maxRowsInJoin = getMaxRowsInJoin(metadata, nodeHint); | ||
| _joinOverflowMode = getJoinOverflowMode(metadata, nodeHint); | ||
| _currentRowsInHashTable = 0; | ||
| _currentJoinedRows = 0; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -165,7 +170,7 @@ protected Logger logger() { | |
| return LOGGER; | ||
| } | ||
|
|
||
| private int getMaxRowInJoin(Map<String, String> opChainMetadata, @Nullable PlanNode.NodeHint nodeHint) { | ||
| private int getMaxRowsInJoin(Map<String, String> opChainMetadata, @Nullable PlanNode.NodeHint nodeHint) { | ||
| if (nodeHint != null) { | ||
| Map<String, String> joinOptions = nodeHint.getHintOptions().get(PinotHintOptions.JOIN_HINT_OPTIONS); | ||
| if (joinOptions != null) { | ||
|
|
@@ -224,25 +229,13 @@ private void buildBroadcastHashTable() | |
| while (!TransferableBlockUtils.isEndOfStream(rightBlock)) { | ||
| List<Object[]> container = rightBlock.getContainer(); | ||
| // Row based overflow check. | ||
| if (container.size() + _currentRowsInHashTable > _maxRowsInHashTable) { | ||
| if (container.size() + _currentRowsInHashTable > _maxRowsInJoin) { | ||
| if (_joinOverflowMode == JoinOverFlowMode.THROW) { | ||
| ProcessingException resourceLimitExceededException = | ||
| new ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE); | ||
| resourceLimitExceededException.setMessage( | ||
| "Cannot build in memory hash table for join operator, reach number of rows limit: " + _maxRowsInHashTable | ||
| + ". Consider increasing the limit for the maximum number of rows in a join either via the query " | ||
| + "option '" + CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN + "' or the '" | ||
| + PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN + "' hint in the '" | ||
| + PinotHintOptions.JOIN_HINT_OPTIONS + "'. Alternatively, if partial results are acceptable, the join" | ||
| + " overflow mode can be set to '" + JoinOverFlowMode.BREAK.name() + "' either via the query option '" | ||
| + CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE + "' or the '" | ||
| + PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE + "' hint in the '" | ||
| + PinotHintOptions.JOIN_HINT_OPTIONS + "'. Furthermore, if there is a large disparity in the size of " | ||
| + "the two tables being joined, use the smaller table as the right input instead of the left."); | ||
| throw resourceLimitExceededException; | ||
| throwProcessingExceptionForJoinRowLimitExceeded("Cannot build in memory hash table for join operator, " | ||
| + "reached number of rows limit: " + _maxRowsInJoin); | ||
| } else { | ||
| // Just fill up the buffer. | ||
| int remainingRows = _maxRowsInHashTable - _currentRowsInHashTable; | ||
| int remainingRows = _maxRowsInJoin - _currentRowsInHashTable; | ||
| container = container.subList(0, remainingRows); | ||
| _statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true); | ||
| // setting only the rightTableOperator to be early terminated and awaits EOS block next. | ||
|
|
@@ -254,8 +247,8 @@ private void buildBroadcastHashTable() | |
| ArrayList<Object[]> hashCollection = _broadcastRightTable.computeIfAbsent(_rightKeySelector.getKey(row), | ||
| k -> new ArrayList<>(INITIAL_HEURISTIC_SIZE)); | ||
| int size = hashCollection.size(); | ||
| if ((size & size - 1) == 0 && size < _maxRowsInHashTable && size < Integer.MAX_VALUE / 2) { // is power of 2 | ||
| hashCollection.ensureCapacity(Math.min(size << 1, _maxRowsInHashTable)); | ||
| if ((size & size - 1) == 0 && size < _maxRowsInJoin && size < Integer.MAX_VALUE / 2) { // is power of 2 | ||
| hashCollection.ensureCapacity(Math.min(size << 1, _maxRowsInJoin)); | ||
| } | ||
| hashCollection.add(row); | ||
| } | ||
|
|
@@ -272,15 +265,18 @@ private void buildBroadcastHashTable() | |
| _statMap.merge(StatKey.TIME_BUILDING_HASH_TABLE_MS, System.currentTimeMillis() - startTime); | ||
| } | ||
|
|
||
| private TransferableBlock buildJoinedDataBlock() { | ||
| if (_isTerminated) { | ||
| assert _leftSideStats != null; | ||
| return TransferableBlockUtils.getEndOfStreamTransferableBlock(_leftSideStats); | ||
| } | ||
|
|
||
| private TransferableBlock buildJoinedDataBlock() throws ProcessingException { | ||
| // Keep reading the input blocks until we find a match row or all blocks are processed. | ||
| // TODO: Consider batching the rows to improve performance. | ||
| while (true) { | ||
| if (_upstreamErrorBlock != null) { | ||
| return _upstreamErrorBlock; | ||
| } | ||
| if (_isTerminated) { | ||
| assert _leftSideStats != null; | ||
| return TransferableBlockUtils.getEndOfStreamTransferableBlock(_leftSideStats); | ||
| } | ||
|
|
||
| TransferableBlock leftBlock = _leftInput.nextBlock(); | ||
| if (leftBlock.isErrorBlock()) { | ||
| return leftBlock; | ||
|
|
@@ -307,7 +303,7 @@ private TransferableBlock buildJoinedDataBlock() { | |
| } | ||
| } | ||
|
|
||
| private List<Object[]> buildJoinedRows(TransferableBlock leftBlock) { | ||
| private List<Object[]> buildJoinedRows(TransferableBlock leftBlock) throws ProcessingException { | ||
| switch (_joinType) { | ||
| case SEMI: | ||
| return buildJoinedDataBlockSemi(leftBlock); | ||
|
|
@@ -319,22 +315,25 @@ private List<Object[]> buildJoinedRows(TransferableBlock leftBlock) { | |
| } | ||
| } | ||
|
|
||
| private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock) { | ||
| private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock) throws ProcessingException { | ||
| List<Object[]> container = leftBlock.getContainer(); | ||
| List<Object[]> rows = new ArrayList<>(container.size()); | ||
|
|
||
| for (Object[] leftRow : container) { | ||
| Object key = _leftKeySelector.getKey(leftRow); | ||
| // SEMI-JOIN only checks existence of the key | ||
| if (_broadcastRightTable.containsKey(key)) { | ||
| if (incrementJoinedRowsAndCheckLimit()) { | ||
| break; | ||
| } | ||
| rows.add(joinRow(leftRow, null)); | ||
| } | ||
| } | ||
|
|
||
| return rows; | ||
| } | ||
|
|
||
| private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock) { | ||
| private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock) throws ProcessingException { | ||
| List<Object[]> container = leftBlock.getContainer(); | ||
| ArrayList<Object[]> rows = new ArrayList<>(container.size()); | ||
|
|
||
|
|
@@ -344,6 +343,9 @@ private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock) | |
| List<Object[]> rightRows = _broadcastRightTable.get(key); | ||
| if (rightRows == null) { | ||
| if (needUnmatchedLeftRows()) { | ||
| if (incrementJoinedRowsAndCheckLimit()) { | ||
| break; | ||
| } | ||
| rows.add(joinRow(leftRow, null)); | ||
| } | ||
| continue; | ||
|
|
@@ -357,32 +359,45 @@ private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock) | |
| Object[] resultRow = joinRow(leftRow, rightRow); | ||
| if (_nonEquiEvaluators.isEmpty() || _nonEquiEvaluators.stream() | ||
| .allMatch(evaluator -> BooleanUtils.isTrueInternalValue(evaluator.apply(resultRow)))) { | ||
| if (incrementJoinedRowsAndCheckLimit()) { | ||
| break; | ||
| } | ||
| rows.add(resultRow); | ||
| hasMatchForLeftRow = true; | ||
| if (_matchedRightRows != null) { | ||
| _matchedRightRows.computeIfAbsent(key, k -> new BitSet(numRightRows)).set(i); | ||
| } | ||
| } | ||
| } | ||
| if (_currentJoinedRows > _maxRowsInJoin) { | ||
| break; | ||
| } | ||
| if (!hasMatchForLeftRow && needUnmatchedLeftRows()) { | ||
| if (incrementJoinedRowsAndCheckLimit()) { | ||
| break; | ||
| } | ||
| rows.add(joinRow(leftRow, null)); | ||
| } | ||
| } | ||
|
|
||
| return rows; | ||
| } | ||
|
|
||
| private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock) { | ||
| private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock) throws ProcessingException { | ||
| List<Object[]> container = leftBlock.getContainer(); | ||
| List<Object[]> rows = new ArrayList<>(container.size()); | ||
|
|
||
| for (Object[] leftRow : container) { | ||
| Object key = _leftKeySelector.getKey(leftRow); | ||
| // ANTI-JOIN only checks non-existence of the key | ||
| if (!_broadcastRightTable.containsKey(key)) { | ||
| if (incrementJoinedRowsAndCheckLimit()) { | ||
| break; | ||
| } | ||
| rows.add(joinRow(leftRow, null)); | ||
| } | ||
| } | ||
|
|
||
| return rows; | ||
| } | ||
|
|
||
|
|
@@ -432,6 +447,66 @@ private boolean needUnmatchedLeftRows() { | |
| return _joinType == JoinRelType.LEFT || _joinType == JoinRelType.FULL; | ||
| } | ||
|
|
||
| private void earlyTerminateLeftInput() { | ||
| _leftInput.earlyTerminate(); | ||
| TransferableBlock leftBlock = _leftInput.nextBlock(); | ||
|
|
||
| while (!leftBlock.isSuccessfulEndOfStreamBlock()) { | ||
| if (leftBlock.isErrorBlock()) { | ||
| _upstreamErrorBlock = leftBlock; | ||
| return; | ||
| } | ||
| leftBlock = _leftInput.nextBlock(); | ||
| } | ||
|
|
||
| assert leftBlock.isSuccessfulEndOfStreamBlock(); | ||
| assert _rightSideStats != null; | ||
| _leftSideStats = leftBlock.getQueryStats(); | ||
| assert _leftSideStats != null; | ||
| _leftSideStats.mergeInOrder(_rightSideStats, getOperatorType(), _statMap); | ||
| _isTerminated = true; | ||
| } | ||
|
|
||
| /** | ||
| * Increments {@link #_currentJoinedRows} and checks if the limit has been exceeded. If the limit has been exceeded, | ||
| * either an exception is thrown or the left input is early terminated based on the {@link #_joinOverflowMode}. | ||
| * | ||
| * @return {@code true} if the limit has been exceeded, {@code false} otherwise | ||
| */ | ||
| private boolean incrementJoinedRowsAndCheckLimit() throws ProcessingException { | ||
| _currentJoinedRows++; | ||
| if (_currentJoinedRows > _maxRowsInJoin) { | ||
| if (_joinOverflowMode == JoinOverFlowMode.THROW) { | ||
| throwProcessingExceptionForJoinRowLimitExceeded("Cannot process join, reached number of rows limit: " | ||
| + _maxRowsInJoin); | ||
| } else { | ||
| // Skip over remaining blocks until we reach the end of stream since we already breached the rows limit. | ||
| logger().info("Terminating join operator early as the maximum number of rows limit was reached: {}", | ||
| _maxRowsInJoin); | ||
| earlyTerminateLeftInput(); | ||
| _statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true); | ||
| return true; | ||
| } | ||
| } | ||
|
|
||
| return false; | ||
| } | ||
|
Comment on lines
+476
to
+493
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. isn't
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're incrementing |
||
|
|
||
| private void throwProcessingExceptionForJoinRowLimitExceeded(String reason) throws ProcessingException { | ||
| ProcessingException resourceLimitExceededException = | ||
| new ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE); | ||
| resourceLimitExceededException.setMessage( | ||
| reason + ". Consider increasing the limit for the maximum number of rows in a join either via the query " | ||
| + "option '" + CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN + "' or the '" | ||
| + PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN + "' hint in the '" | ||
| + PinotHintOptions.JOIN_HINT_OPTIONS + "'. Alternatively, if partial results are acceptable, the join" | ||
| + " overflow mode can be set to '" + JoinOverFlowMode.BREAK.name() + "' either via the query option '" | ||
| + CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE + "' or the '" | ||
| + PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE + "' hint in the '" | ||
| + PinotHintOptions.JOIN_HINT_OPTIONS + "'."); | ||
| throw resourceLimitExceededException; | ||
| } | ||
|
|
||
| public enum StatKey implements StatMap.Key { | ||
| //@formatter:off | ||
| EXECUTION_TIME_MS(StatMap.Type.LONG) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.