[multistage] framework to back-propagate metadata across opChains#11746
[multistage] framework to back-propagate metadata across opChains#11746walterddr merged 4 commits intoapache:masterfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #11746 +/- ##
=============================================
+ Coverage 14.41% 63.13% +48.72%
- Complexity 201 1140 +939
=============================================
Files 2342 2343 +1
Lines 125896 126100 +204
Branches 19362 19403 +41
=============================================
+ Hits 18146 79619 +61473
+ Misses 106213 40822 -65391
- Partials 1537 5659 +4122
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
77cd516 to
fc4185d
Compare
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
Outdated
Show resolved
Hide resolved
053450a to
76fa636
Compare
Jackie-Jiang
left a comment
There was a problem hiding this comment.
The logic looks good
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
Show resolved
Hide resolved
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
Outdated
Show resolved
Hide resolved
...ery-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
Outdated
Show resolved
Hide resolved
...uery-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java
Outdated
Show resolved
Hide resolved
| private static final ExpressionContext PLACEHOLDER_IDENTIFIER = ExpressionContext.forIdentifier("__PLACEHOLDER__"); | ||
|
|
||
| private final MultiStageOperator _inputOperator; | ||
| private final MultiStageOperator _upstreamOperator; |
There was a problem hiding this comment.
This is confusing. In the mailbox, upstream is the next operator (e.g. receiver is upstream for sender), but here upstream is input operator. Suggest reverting this change because input is more clear
There was a problem hiding this comment.
yes some are named upstream some are named input. will revert this change and make another PR to change the naming
| TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock( | ||
| OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap())); | ||
| sendTransferableBlock(eosBlockWithStats); | ||
| isEarlyTerminated = sendTransferableBlock(eosBlockWithStats); |
There was a problem hiding this comment.
We don't need to set early terminate when it is already EOS
| // acquire extra metadata block | ||
| block = _upstreamOperator.nextBlock(); |
There was a problem hiding this comment.
This is not very robust. Simply remove the break and let it follow the regular execution flow
| private static final Set<String> RANKING_FUNCTION_NAMES = ImmutableSet.of("RANK", "DENSE_RANK"); | ||
|
|
||
| private final MultiStageOperator _inputOperator; | ||
| private final MultiStageOperator _upstreamOperator; |
There was a problem hiding this comment.
Same here, suggest not changing it
| /** | ||
| * API to send a block to the destination mailboxes. | ||
| * @param block the block to be transferred | ||
| * @return true if any of the upstream mailboxes requested EOS (e.g. early termination) |
There was a problem hiding this comment.
This comment is incorrect. It returns true only if all receiving mailboxes are early terminated.
Side comment: Do you realize this upstream is the opposite of other upstream changes in the PR (in this place it refers to the next operator in the data flow, but in other places it refers to the previous operator in the data flow). Thus suggest not using upstream because it can be confusing
There was a problem hiding this comment.
yeah it is confusing, will create a separate PR for naming fix
| boolean isEarlyTerminated = true; | ||
| for (SendingMailbox sendingMailbox : _sendingMailboxes) { | ||
| if (!sendingMailbox.isTerminated()) { | ||
| if (!sendingMailbox.isEarlyTerminated()) { |
There was a problem hiding this comment.
We can do the block.isEndOfStreamBlock() check first, then check if all mailboxes are early terminated
bd61449 to
48ed081
Compare
| @Override | ||
| public void cancel(Throwable t) { | ||
| if (_isTerminated) { | ||
| if (_isEarlyTerminated || _isTerminated) { |
There was a problem hiding this comment.
We should not change this. Cancel should be applied even if early terminate is called
| // TODO: Revisit if this is the correct way to apply back pressure | ||
| private final BlockingQueue<TransferableBlock> _blocks = new ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS); | ||
| private final AtomicReference<TransferableBlock> _errorBlock = new AtomicReference<>(); | ||
| private final AtomicBoolean _isEarlyTerminated = new AtomicBoolean(false); |
There was a problem hiding this comment.
(minor) This can be defined as a volatile boolean
| private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5; | ||
|
|
||
| private final AtomicInteger _bufferSize = new AtomicInteger(DEFAULT_MAILBOX_QUEUE_CAPACITY); | ||
| private final AtomicBoolean _isEarlyTerminated = new AtomicBoolean(); |
There was a problem hiding this comment.
(minor) This can be defined as a volatile boolean
| isEarlyTerminated = false; | ||
| } else { | ||
| sendTransferableBlock(block); | ||
| isEarlyTerminated = sendTransferableBlock(block); |
There was a problem hiding this comment.
Can be simplified (remove isEarlyTerminated)
| isEarlyTerminated = sendTransferableBlock(block); | |
| if (sendTransferableBlock(block)) { | |
| earlyTerminate(); | |
| } |
| earlyTerminateMailboxes(); | ||
| } | ||
|
|
||
| protected void earlyTerminateMailboxes() { |
There was a problem hiding this comment.
Any specific reason why making this a separate method?
Changes
MultiStageOperatorsetEarlyTerminate()API which sets the early termination flag to true. This API is meant to populate upstream_isEarlyTerminateis set to true, the next call tonextBlock()should return EOS if it has not already done so.MailboxesSendingMailboxandReceivingMailboxnow bear a contract to indicate whether early termination has occurred from upstream (receiving --> sending)ReceivingMailboxes are set earlyTerminaton flag byReceiveOperatorReceivingMailboxpopulate this info back toSendingMailboxSendingMailboxreturns the boolean flag toSendingOperator, andSendingOperatorchecks this boolean flag and subsequently call its opChain'ssetEarlyTermination()method.TODO
operator.nextBlock()should occur if theoperatorin question has already returned EOS.