Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public abstract class BaseMailboxReceiveOperator extends MultiStageOperator {
protected final MailboxService _mailboxService;
protected final RelDistribution.Type _exchangeType;
protected final List<String> _mailboxIds;
private final BlockingMultiStreamConsumer.OfTransferableBlock _multiConsumer;
protected final BlockingMultiStreamConsumer.OfTransferableBlock _multiConsumer;

public BaseMailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType,
int senderStageId) {
Expand All @@ -73,14 +73,6 @@ public BaseMailboxReceiveOperator(OpChainExecutionContext context, RelDistributi
new BlockingMultiStreamConsumer.OfTransferableBlock(context.getId(), context.getDeadlineMs(), asyncStreams);
}

protected BlockingMultiStreamConsumer.OfTransferableBlock getMultiConsumer() {
return _multiConsumer;
}

public List<String> getMailboxIds() {
return _mailboxIds;
}

@Override
protected void earlyTerminate() {
_isEarlyTerminated = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ public String toExplainString() {

@Override
protected TransferableBlock getNextBlock() {
TransferableBlock block = getMultiConsumer().readBlockBlocking();
TransferableBlock block = _multiConsumer.readBlockBlocking();
// When early termination flag is set, caller is expecting an EOS block to be returned, however since the 2 stages
// between sending/receiving mailbox are setting early termination flag asynchronously, there's chances that the
// next block pulled out of the ReceivingMailbox to be an already buffered normal data block. This requires the
// MailboxReceiveOperator to continue pulling and dropping data block until an EOS block is observed.
while (_isEarlyTerminated && !block.isEndOfStreamBlock()) {
block = getMultiConsumer().readBlockBlocking();
block = _multiConsumer.readBlockBlocking();
}
return block;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class SortedMailboxReceiveOperator extends BaseMailboxReceiveOperator {
private final boolean _isSortOnSender;
private final List<Object[]> _rows = new ArrayList<>();

private boolean _isSortedBlockConstructed;
private TransferableBlock _eosBlock;

public SortedMailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType,
DataSchema dataSchema, List<RexExpression> collationKeys, List<Direction> collationDirections,
Expand All @@ -74,20 +74,25 @@ public String toExplainString() {

@Override
protected TransferableBlock getNextBlock() {
while (true) { // loop in order to keep asking if we receive data blocks
TransferableBlock block = getMultiConsumer().readBlockBlocking();
if (_eosBlock != null) {
return _eosBlock;
}
// Collect all the rows from the mailbox and sort them
while (true) {
TransferableBlock block = _multiConsumer.readBlockBlocking();
if (block.isDataBlock()) {
_rows.addAll(block.getContainer());
} else if (block.isErrorBlock()) {
return block;
} else {
assert block.isSuccessfulEndOfStreamBlock();

if (!_isSortedBlockConstructed && !_rows.isEmpty()) {
if (!_rows.isEmpty()) {
_eosBlock = block;
// TODO: This might not be efficient because we are sorting all the received rows. We should use a k-way merge
// when sender side is sorted.
_rows.sort(
new SortUtils.SortComparator(_collationKeys, _collationDirections, _collationNullDirections, _dataSchema,
false));
_isSortedBlockConstructed = true;
return new TransferableBlock(_rows, _dataSchema, DataBlock.Type.ROW);
} else {
return block;
Expand Down