Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,14 @@ public class HashJoinOperator extends MultiStageOperator {
private final List<TransformOperand> _nonEquiEvaluators;
private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);

// Below are specific parameters to protect the hash table from growing too large.
// Below are specific parameters to protect the server from a very large join operation.
// Once the hash table reaches the limit, we will throw exception or break the right table build process.
// The limit also applies to the number of joined rows in blocks from the left table.
/**
* Max rows allowed to build the right table hash collection.
* Max rows allowed to build the right table hash collection. Also max rows emitted in each join with a block from
* the left table.
*/
private final int _maxRowsInHashTable;
private final int _maxRowsInJoin;
/**
* Mode when join overflow happens, supported values: THROW or BREAK.
* THROW(default): Break right table build process, and throw exception, no JOIN with left table performed.
Expand All @@ -109,6 +111,7 @@ public class HashJoinOperator extends MultiStageOperator {

private boolean _isHashTableBuilt;
private int _currentRowsInHashTable;
private int _currentJoinedRows;
private TransferableBlock _upstreamErrorBlock;
private MultiStageQueryStats _leftSideStats;
private MultiStageQueryStats _rightSideStats;
Expand Down Expand Up @@ -145,8 +148,10 @@ public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator left
}
Map<String, String> metadata = context.getOpChainMetadata();
PlanNode.NodeHint nodeHint = node.getNodeHint();
_maxRowsInHashTable = getMaxRowInJoin(metadata, nodeHint);
_maxRowsInJoin = getMaxRowsInJoin(metadata, nodeHint);
_joinOverflowMode = getJoinOverflowMode(metadata, nodeHint);
_currentRowsInHashTable = 0;
_currentJoinedRows = 0;
}

@Override
Expand All @@ -165,7 +170,7 @@ protected Logger logger() {
return LOGGER;
}

private int getMaxRowInJoin(Map<String, String> opChainMetadata, @Nullable PlanNode.NodeHint nodeHint) {
private int getMaxRowsInJoin(Map<String, String> opChainMetadata, @Nullable PlanNode.NodeHint nodeHint) {
if (nodeHint != null) {
Map<String, String> joinOptions = nodeHint.getHintOptions().get(PinotHintOptions.JOIN_HINT_OPTIONS);
if (joinOptions != null) {
Expand Down Expand Up @@ -224,25 +229,13 @@ private void buildBroadcastHashTable()
while (!TransferableBlockUtils.isEndOfStream(rightBlock)) {
List<Object[]> container = rightBlock.getContainer();
// Row based overflow check.
if (container.size() + _currentRowsInHashTable > _maxRowsInHashTable) {
if (container.size() + _currentRowsInHashTable > _maxRowsInJoin) {
if (_joinOverflowMode == JoinOverFlowMode.THROW) {
ProcessingException resourceLimitExceededException =
new ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
resourceLimitExceededException.setMessage(
"Cannot build in memory hash table for join operator, reach number of rows limit: " + _maxRowsInHashTable
+ ". Consider increasing the limit for the maximum number of rows in a join either via the query "
+ "option '" + CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN + "' or the '"
+ PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN + "' hint in the '"
+ PinotHintOptions.JOIN_HINT_OPTIONS + "'. Alternatively, if partial results are acceptable, the join"
+ " overflow mode can be set to '" + JoinOverFlowMode.BREAK.name() + "' either via the query option '"
+ CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE + "' or the '"
+ PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE + "' hint in the '"
+ PinotHintOptions.JOIN_HINT_OPTIONS + "'. Furthermore, if there is a large disparity in the size of "
+ "the two tables being joined, use the smaller table as the right input instead of the left.");
throw resourceLimitExceededException;
throwProcessingExceptionForJoinRowLimitExceeded("Cannot build in memory hash table for join operator, "
+ "reached number of rows limit: " + _maxRowsInJoin);
} else {
// Just fill up the buffer.
int remainingRows = _maxRowsInHashTable - _currentRowsInHashTable;
int remainingRows = _maxRowsInJoin - _currentRowsInHashTable;
container = container.subList(0, remainingRows);
_statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
// setting only the rightTableOperator to be early terminated and awaits EOS block next.
Expand All @@ -254,8 +247,8 @@ private void buildBroadcastHashTable()
ArrayList<Object[]> hashCollection = _broadcastRightTable.computeIfAbsent(_rightKeySelector.getKey(row),
k -> new ArrayList<>(INITIAL_HEURISTIC_SIZE));
int size = hashCollection.size();
if ((size & size - 1) == 0 && size < _maxRowsInHashTable && size < Integer.MAX_VALUE / 2) { // is power of 2
hashCollection.ensureCapacity(Math.min(size << 1, _maxRowsInHashTable));
if ((size & size - 1) == 0 && size < _maxRowsInJoin && size < Integer.MAX_VALUE / 2) { // is power of 2
hashCollection.ensureCapacity(Math.min(size << 1, _maxRowsInJoin));
}
hashCollection.add(row);
}
Expand All @@ -272,15 +265,18 @@ private void buildBroadcastHashTable()
_statMap.merge(StatKey.TIME_BUILDING_HASH_TABLE_MS, System.currentTimeMillis() - startTime);
}

private TransferableBlock buildJoinedDataBlock() {
if (_isTerminated) {
assert _leftSideStats != null;
return TransferableBlockUtils.getEndOfStreamTransferableBlock(_leftSideStats);
}

private TransferableBlock buildJoinedDataBlock() throws ProcessingException {
// Keep reading the input blocks until we find a match row or all blocks are processed.
// TODO: Consider batching the rows to improve performance.
while (true) {
if (_upstreamErrorBlock != null) {
return _upstreamErrorBlock;
}
if (_isTerminated) {
assert _leftSideStats != null;
return TransferableBlockUtils.getEndOfStreamTransferableBlock(_leftSideStats);
}

TransferableBlock leftBlock = _leftInput.nextBlock();
if (leftBlock.isErrorBlock()) {
return leftBlock;
Expand All @@ -307,7 +303,7 @@ private TransferableBlock buildJoinedDataBlock() {
}
}

private List<Object[]> buildJoinedRows(TransferableBlock leftBlock) {
private List<Object[]> buildJoinedRows(TransferableBlock leftBlock) throws ProcessingException {
switch (_joinType) {
case SEMI:
return buildJoinedDataBlockSemi(leftBlock);
Expand All @@ -319,22 +315,25 @@ private List<Object[]> buildJoinedRows(TransferableBlock leftBlock) {
}
}

private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock) {
private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock) throws ProcessingException {
List<Object[]> container = leftBlock.getContainer();
List<Object[]> rows = new ArrayList<>(container.size());

for (Object[] leftRow : container) {
Object key = _leftKeySelector.getKey(leftRow);
// SEMI-JOIN only checks existence of the key
if (_broadcastRightTable.containsKey(key)) {
if (incrementJoinedRowsAndCheckLimit()) {
break;
}
rows.add(joinRow(leftRow, null));
}
}

return rows;
}

private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock) {
private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock) throws ProcessingException {
List<Object[]> container = leftBlock.getContainer();
ArrayList<Object[]> rows = new ArrayList<>(container.size());

Expand All @@ -344,6 +343,9 @@ private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock)
List<Object[]> rightRows = _broadcastRightTable.get(key);
if (rightRows == null) {
if (needUnmatchedLeftRows()) {
if (incrementJoinedRowsAndCheckLimit()) {
break;
}
rows.add(joinRow(leftRow, null));
}
continue;
Expand All @@ -357,32 +359,45 @@ private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock)
Object[] resultRow = joinRow(leftRow, rightRow);
if (_nonEquiEvaluators.isEmpty() || _nonEquiEvaluators.stream()
.allMatch(evaluator -> BooleanUtils.isTrueInternalValue(evaluator.apply(resultRow)))) {
if (incrementJoinedRowsAndCheckLimit()) {
break;
}
rows.add(resultRow);
hasMatchForLeftRow = true;
if (_matchedRightRows != null) {
_matchedRightRows.computeIfAbsent(key, k -> new BitSet(numRightRows)).set(i);
}
}
}
if (_currentJoinedRows > _maxRowsInJoin) {
break;
}
if (!hasMatchForLeftRow && needUnmatchedLeftRows()) {
if (incrementJoinedRowsAndCheckLimit()) {
break;
}
rows.add(joinRow(leftRow, null));
}
}

return rows;
}

private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock) {
private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock) throws ProcessingException {
List<Object[]> container = leftBlock.getContainer();
List<Object[]> rows = new ArrayList<>(container.size());

for (Object[] leftRow : container) {
Object key = _leftKeySelector.getKey(leftRow);
// ANTI-JOIN only checks non-existence of the key
if (!_broadcastRightTable.containsKey(key)) {
if (incrementJoinedRowsAndCheckLimit()) {
break;
}
rows.add(joinRow(leftRow, null));
}
}

return rows;
}

Expand Down Expand Up @@ -432,6 +447,66 @@ private boolean needUnmatchedLeftRows() {
return _joinType == JoinRelType.LEFT || _joinType == JoinRelType.FULL;
}

private void earlyTerminateLeftInput() {
_leftInput.earlyTerminate();
TransferableBlock leftBlock = _leftInput.nextBlock();

while (!leftBlock.isSuccessfulEndOfStreamBlock()) {
if (leftBlock.isErrorBlock()) {
_upstreamErrorBlock = leftBlock;
return;
}
leftBlock = _leftInput.nextBlock();
}

assert leftBlock.isSuccessfulEndOfStreamBlock();
assert _rightSideStats != null;
_leftSideStats = leftBlock.getQueryStats();
assert _leftSideStats != null;
_leftSideStats.mergeInOrder(_rightSideStats, getOperatorType(), _statMap);
_isTerminated = true;
}

/**
* Increments {@link #_currentJoinedRows} and checks if the limit has been exceeded. If the limit has been exceeded,
* either an exception is thrown or the left input is early terminated based on the {@link #_joinOverflowMode}.
*
* @return {@code true} if the limit has been exceeded, {@code false} otherwise
*/
private boolean incrementJoinedRowsAndCheckLimit() throws ProcessingException {
_currentJoinedRows++;
if (_currentJoinedRows > _maxRowsInJoin) {
if (_joinOverflowMode == JoinOverFlowMode.THROW) {
throwProcessingExceptionForJoinRowLimitExceeded("Cannot process join, reached number of rows limit: "
+ _maxRowsInJoin);
} else {
// Skip over remaining blocks until we reach the end of stream since we already breached the rows limit.
logger().info("Terminating join operator early as the maximum number of rows limit was reached: {}",
_maxRowsInJoin);
earlyTerminateLeftInput();
_statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
return true;
}
}

return false;
}
Comment on lines +476 to +493
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't _currentJoinedRows always be equal to _rows.size()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're incrementing _currentJoinedRows for every matched row and checking against the max rows limit so that we can exit early as soon as the max rows limit is breached. Alternatively, we could just modify it once after processing all the rows from a block, but that would be less accurate. This alternative would be slightly more efficient since there would be less checks and increments but it could still result in a very large number of joined rows being emitted depending on the block size. I don't think the overhead from the integer increment and limit check should be too concerning given the complexity of the other existing operations in each iteration of the main join loops, WDYT?


private void throwProcessingExceptionForJoinRowLimitExceeded(String reason) throws ProcessingException {
ProcessingException resourceLimitExceededException =
new ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
resourceLimitExceededException.setMessage(
reason + ". Consider increasing the limit for the maximum number of rows in a join either via the query "
+ "option '" + CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN + "' or the '"
+ PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN + "' hint in the '"
+ PinotHintOptions.JOIN_HINT_OPTIONS + "'. Alternatively, if partial results are acceptable, the join"
+ " overflow mode can be set to '" + JoinOverFlowMode.BREAK.name() + "' either via the query option '"
+ CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE + "' or the '"
+ PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE + "' hint in the '"
+ PinotHintOptions.JOIN_HINT_OPTIONS + "'.");
throw resourceLimitExceededException;
}

public enum StatKey implements StatMap.Key {
//@formatter:off
EXECUTION_TIME_MS(StatMap.Type.LONG) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void shouldHandleInnerJoinOnInt() {
OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"}))
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
DataSchema resultSchema =
new DataSchema(new String[]{"int_col1", "string_col1", "int_col2", "string_co2"}, new ColumnDataType[]{
new DataSchema(new String[]{"int_col1", "string_col1", "int_col2", "string_col2"}, new ColumnDataType[]{
ColumnDataType.INT, ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.STRING
});
HashJoinOperator operator =
Expand All @@ -136,7 +136,7 @@ public void shouldHandleJoinOnEmptySelector() {
OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"}))
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
DataSchema resultSchema =
new DataSchema(new String[]{"int_col1", "string_col1", "int_col2", "string_co2"}, new ColumnDataType[]{
new DataSchema(new String[]{"int_col1", "string_col1", "int_col2", "string_col2"}, new ColumnDataType[]{
ColumnDataType.INT, ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.STRING
});
HashJoinOperator operator =
Expand Down Expand Up @@ -468,7 +468,7 @@ public void shouldPropagateLeftTableError() {
}

@Test
public void shouldPropagateJoinLimitError() {
public void shouldPropagateRightInputJoinLimitError() {
DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new ColumnDataType[]{
ColumnDataType.INT, ColumnDataType.STRING
});
Expand All @@ -493,11 +493,13 @@ public void shouldPropagateJoinLimitError() {
TransferableBlock block = operator.nextBlock();
assertTrue(block.isErrorBlock());
assertTrue(block.getExceptions().get(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE)
.contains("reach number of rows limit"));
.contains("reached number of rows limit"));
assertTrue(block.getExceptions().get(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE)
.contains("Cannot build in memory hash table"));
}

@Test
public void shouldHandleJoinWithPartialResultsWhenHitDataRowsLimit() {
public void shouldHandleJoinWithPartialResultsWhenHitDataRowsLimitOnRightInput() {
DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new ColumnDataType[]{
ColumnDataType.INT, ColumnDataType.STRING
});
Expand Down Expand Up @@ -529,6 +531,71 @@ public void shouldHandleJoinWithPartialResultsWhenHitDataRowsLimit() {
"Max rows in join should be reached");
}

@Test
public void shouldPropagateLeftInputJoinLimitError() {
DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new ColumnDataType[]{
ColumnDataType.INT, ColumnDataType.STRING
});
DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new ColumnDataType[]{
ColumnDataType.INT, ColumnDataType.STRING
});
when(_leftInput.nextBlock()).thenReturn(
OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"}))
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
when(_rightInput.nextBlock()).thenReturn(
OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new Object[]{2, "BB"}))
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
DataSchema resultSchema =
new DataSchema(new String[]{"int_col1", "string_col1", "int_co2", "string_col2"}, new ColumnDataType[]{
ColumnDataType.INT, ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.STRING
});
PlanNode.NodeHint nodeHint = new PlanNode.NodeHint(Map.of(PinotHintOptions.JOIN_HINT_OPTIONS,
Map.of(PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE, "THROW",
PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN, "2")));
HashJoinOperator operator =
getOperator(leftSchema, resultSchema, JoinRelType.INNER, List.of(1), List.of(1), List.of(), nodeHint);
TransferableBlock block = operator.nextBlock();
assertTrue(block.isErrorBlock());
assertTrue(block.getExceptions().get(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE)
.contains("reached number of rows limit"));
assertTrue(block.getExceptions().get(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE)
.contains("Cannot process join"));
}

@Test
public void shouldHandleJoinWithPartialResultsWhenHitDataRowsLimitOnLeftInput() {
DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new ColumnDataType[]{
ColumnDataType.INT, ColumnDataType.STRING
});
DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new ColumnDataType[]{
ColumnDataType.INT, ColumnDataType.STRING
});
when(_leftInput.nextBlock()).thenReturn(
OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new Object[]{2, "Aa"}, new Object[]{3, "Aa"}))
.thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{4, "Aa"}, new Object[]{5, "Aa"}))
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
when(_rightInput.nextBlock()).thenReturn(
OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}))
.thenReturn(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(0));
DataSchema resultSchema =
new DataSchema(new String[]{"int_col1", "string_col1", "int_co2", "string_col2"}, new ColumnDataType[]{
ColumnDataType.INT, ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.STRING
});
PlanNode.NodeHint nodeHint = new PlanNode.NodeHint(Map.of(PinotHintOptions.JOIN_HINT_OPTIONS,
Map.of(PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE, "BREAK",
PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN, "2")));
HashJoinOperator operator =
getOperator(leftSchema, resultSchema, JoinRelType.INNER, List.of(1), List.of(1), List.of(), nodeHint);
List<Object[]> resultRows = operator.nextBlock().getContainer();
Mockito.verify(_leftInput).earlyTerminate();
assertEquals(resultRows.size(), 2);
TransferableBlock block2 = operator.nextBlock();
assertTrue(block2.isSuccessfulEndOfStreamBlock());
StatMap<HashJoinOperator.StatKey> statMap = OperatorTestUtil.getStatMap(HashJoinOperator.StatKey.class, block2);
assertTrue(statMap.getBoolean(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN_REACHED),
"Max rows in join should be reached");
}

private HashJoinOperator getOperator(DataSchema leftSchema, DataSchema resultSchema, JoinRelType joinType,
List<Integer> leftKeys, List<Integer> rightKeys, List<RexExpression> nonEquiConditions,
PlanNode.NodeHint nodeHint) {
Expand Down