Skip to content

Enforce max rows in join limit on joined rows with left input as well#13922

Merged
gortiz merged 1 commit intoapache:masterfrom
yashmayya:max-rows-in-join-left-input
Sep 2, 2024
Merged

Enforce max rows in join limit on joined rows with left input as well#13922
gortiz merged 1 commit intoapache:masterfrom
yashmayya:max-rows-in-join-left-input

Conversation

@yashmayya
Copy link
Contributor

@yashmayya yashmayya commented Sep 2, 2024

  • Currently, the maxRowsInJoin query option (https://docs.pinot.apache.org/users/user-guide-query/query-options#supported-query-options) / max_rows_in_join join hint (https://docs.pinot.apache.org/users/user-guide-query/multi-stage-query/operator-types/hash_join#max_rows_in_join) is only applied to the in-memory hash table built from the join's right input.
  • This patch applies the limit to the actual joined rows as well (i.e., after matching rows in the left input). This is important because without a default limit, an accidental large CROSS JOIN like query can cause servers to crash.
  • If the limit is reached while joining rows from the left input with the hash table built from the right input, the join overflow mode is checked.
  • By default (THROW), an exception is thrown and the query fails. If the join overflow mode is BREAK, then we early terminate the left input and return the joined rows.
  • Note that this enforcement is done on a per operator basis, so the total number of rows emitted in a join operation across all workers can still be more than the limit.
  • The documentation will also be updated to reflect this change in the enforcement of the max rows in join limit.

@codecov-commenter
Copy link

codecov-commenter commented Sep 2, 2024

Codecov Report

Attention: Patch coverage is 62.26415% with 20 lines in your changes missing coverage. Please review.

Project coverage is 57.81%. Comparing base (59551e4) to head (b64c842).
Report is 973 commits behind head on master.

Files with missing lines Patch % Lines
...pinot/query/runtime/operator/HashJoinOperator.java 62.26% 9 Missing and 11 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #13922      +/-   ##
============================================
- Coverage     61.75%   57.81%   -3.94%     
+ Complexity      207      197      -10     
============================================
  Files          2436     2586     +150     
  Lines        133233   142465    +9232     
  Branches      20636    21885    +1249     
============================================
+ Hits          82274    82362      +88     
- Misses        44911    53651    +8740     
- Partials       6048     6452     +404     
Flag Coverage Δ
custom-integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration <0.01% <0.00%> (-0.01%) ⬇️
integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration2 0.00% <0.00%> (ø)
java-11 57.76% <62.26%> (-3.94%) ⬇️
java-21 57.70% <62.26%> (-3.93%) ⬇️
skip-bytebuffers-false 57.80% <62.26%> (-3.95%) ⬇️
skip-bytebuffers-true 57.68% <62.26%> (+29.95%) ⬆️
temurin 57.81% <62.26%> (-3.94%) ⬇️
unittests 57.80% <62.26%> (-3.94%) ⬇️
unittests1 40.51% <62.26%> (-6.38%) ⬇️
unittests2 27.93% <0.00%> (+0.20%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@yashmayya yashmayya marked this pull request as ready for review September 2, 2024 11:20
Comment on lines +476 to +493
private boolean incrementJoinedRowsAndCheckLimit() throws ProcessingException {
_currentJoinedRows++;
if (_currentJoinedRows > _maxRowsInJoin) {
if (_joinOverflowMode == JoinOverFlowMode.THROW) {
throwProcessingExceptionForJoinRowLimitExceeded("Cannot process join, reached number of rows limit: "
+ _maxRowsInJoin);
} else {
// Skip over remaining blocks until we reach the end of stream since we already breached the rows limit.
logger().info("Terminating join operator early as the maximum number of rows limit was reached: {}",
_maxRowsInJoin);
earlyTerminateLeftInput();
_statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
return true;
}
}

return false;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't _currentJoinedRows always be equal to _rows.size()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're incrementing _currentJoinedRows for every matched row and checking against the max rows limit so that we can exit early as soon as the max rows limit is breached. Alternatively, we could just modify it once after processing all the rows from a block, but that would be less accurate. This alternative would be slightly more efficient since there would be less checks and increments but it could still result in a very large number of joined rows being emitted depending on the block size. I don't think the overhead from the integer increment and limit check should be too concerning given the complexity of the other existing operations in each iteration of the main join loops, WDYT?

@gortiz gortiz merged commit e8f5e9b into apache:master Sep 2, 2024
@Jackie-Jiang Jackie-Jiang added multi-stage Related to the multi-stage query engine documentation labels Sep 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation enhancement multi-stage Related to the multi-stage query engine

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants