Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions pinot-query-planner/src/test/resources/queries/JoinPlans.json
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,25 @@
"\n LogicalTableScan(table=[[b]])",
"\n"
]
},
{
"description": "Inner join with limit",
"sql": "EXPLAIN PLAN FOR SELECT a.col1, a.ts, b.col3 FROM a JOIN b ON a.col1 = b.col2 LIMIT 100",
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[100])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[100])",
"\n LogicalProject(col1=[$0], ts=[$1], col3=[$3])",
"\n LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col1=[$0], ts=[$6])",
"\n LogicalTableScan(table=[[a]])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[b]])",
"\n"
]
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pinot.query.mailbox;

import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -61,35 +60,47 @@ public GrpcSendingMailbox(String id, ChannelManager channelManager, String hostn
@Override
public void send(TransferableBlock block)
throws IOException {
if (isTerminated()) {
return;
}
if (_contentObserver == null) {
_contentObserver = getContentObserver();
}
Preconditions.checkState(!_statusObserver.isFinished(), "Mailbox: %s is already closed", _id);
_contentObserver.onNext(toMailboxContent(block));
}

@Override
public void complete() {
if (isTerminated()) {
return;
}
_contentObserver.onCompleted();
}

@Override
public void cancel(Throwable t) {
if (!_statusObserver.isFinished()) {
LOGGER.debug("Cancelling mailbox: {}", _id);
if (_contentObserver == null) {
_contentObserver = getContentObserver();
}
try {
// NOTE: DO NOT use onError() because it will terminate the stream, and receiver might not get the callback
_contentObserver.onNext(toMailboxContent(TransferableBlockUtils.getErrorTransferableBlock(
new RuntimeException("Cancelled by sender with exception: " + t.getMessage(), t))));
_contentObserver.onCompleted();
} catch (Exception e) {
// Exception can be thrown if the stream is already closed, so we simply ignore it
LOGGER.debug("Caught exception cancelling mailbox: {}", _id, e);
}
if (isTerminated()) {
return;
}
LOGGER.debug("Cancelling mailbox: {}", _id);
if (_contentObserver == null) {
_contentObserver = getContentObserver();
}
try {
// NOTE: DO NOT use onError() because it will terminate the stream, and receiver might not get the callback
_contentObserver.onNext(toMailboxContent(TransferableBlockUtils.getErrorTransferableBlock(
new RuntimeException("Cancelled by sender with exception: " + t.getMessage(), t))));
_contentObserver.onCompleted();
} catch (Exception e) {
// Exception can be thrown if the stream is already closed, so we simply ignore it
LOGGER.debug("Caught exception cancelling mailbox: {}", _id, e);
}
}

@Override
public boolean isTerminated() {
// TODO: We cannot differentiate early termination vs stream error
return _statusObserver.isFinished();
}

private StreamObserver<MailboxContent> getContentObserver() {
Expand All @@ -102,7 +113,6 @@ private MailboxContent toMailboxContent(TransferableBlock block)
DataBlock dataBlock = block.getDataBlock();
byte[] bytes = dataBlock.toBytes();
ByteString byteString = UnsafeByteOperations.unsafeWrap(bytes);
return MailboxContent.newBuilder().setMailboxId(_id).setPayload(byteString)
.build();
return MailboxContent.newBuilder().setMailboxId(_id).setPayload(byteString).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class InMemorySendingMailbox implements SendingMailbox {
private final long _deadlineMs;

private ReceivingMailbox _receivingMailbox;
private volatile boolean _isTerminated;

public InMemorySendingMailbox(String id, MailboxService mailboxService, long deadlineMs) {
_id = id;
Expand All @@ -41,12 +42,27 @@ public InMemorySendingMailbox(String id, MailboxService mailboxService, long dea

@Override
public void send(TransferableBlock block) {
if (_isTerminated) {
return;
}
if (_receivingMailbox == null) {
_receivingMailbox = _mailboxService.getReceivingMailbox(_id);
}
long timeoutMs = _deadlineMs - System.currentTimeMillis();
if (!_receivingMailbox.offer(block, timeoutMs)) {
throw new RuntimeException(String.format("Failed to offer block into mailbox: %s within: %dms", _id, timeoutMs));
ReceivingMailbox.ReceivingMailboxStatus status = _receivingMailbox.offer(block, timeoutMs);
switch (status) {
case SUCCESS:
break;
case ERROR:
throw new RuntimeException(String.format("Mailbox: %s already errored out (received error block before)", _id));
case TIMEOUT:
throw new RuntimeException(
String.format("Timed out adding block into mailbox: %s with timeout: %dms", _id, timeoutMs));
case EARLY_TERMINATED:
_isTerminated = true;
break;
default:
throw new IllegalStateException("Unsupported mailbox status: " + status);
}
}

Expand All @@ -56,11 +72,19 @@ public void complete() {

@Override
public void cancel(Throwable t) {
if (_isTerminated) {
return;
}
LOGGER.debug("Cancelling mailbox: {}", _id);
if (_receivingMailbox == null) {
_receivingMailbox = _mailboxService.getReceivingMailbox(_id);
}
_receivingMailbox.setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(
new RuntimeException("Cancelled by sender with exception: " + t.getMessage(), t)));
}

@Override
public boolean isTerminated() {
return _isTerminated;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,37 +65,41 @@ public String getId() {
* Offers a non-error block into the mailbox within the timeout specified, returns whether the block is successfully
* added. If the block is not added, an error block is added to the mailbox.
*/
public boolean offer(TransferableBlock block, long timeoutMs) {
if (_errorBlock.get() != null) {
public ReceivingMailboxStatus offer(TransferableBlock block, long timeoutMs) {
TransferableBlock errorBlock = _errorBlock.get();
if (errorBlock != null) {
LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring the late block", _id);
return false;
return errorBlock == CANCELLED_ERROR_BLOCK ? ReceivingMailboxStatus.EARLY_TERMINATED
: ReceivingMailboxStatus.ERROR;
}
if (timeoutMs < 0) {
if (timeoutMs <= 0) {
LOGGER.debug("Mailbox: {} is already timed out", _id);
setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(
new TimeoutException("Timed out while offering data to mailbox: " + _id)));
return false;
return ReceivingMailboxStatus.TIMEOUT;
}
try {
if (_blocks.offer(block, timeoutMs, TimeUnit.MILLISECONDS)) {
if (_errorBlock.get() == null) {
errorBlock = _errorBlock.get();
if (errorBlock == null) {
_receiveMailCallback.accept(MailboxIdUtils.toOpChainId(_id));
return true;
return ReceivingMailboxStatus.SUCCESS;
} else {
LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring the late block", _id);
_blocks.clear();
return false;
return errorBlock == CANCELLED_ERROR_BLOCK ? ReceivingMailboxStatus.EARLY_TERMINATED
: ReceivingMailboxStatus.ERROR;
}
} else {
LOGGER.debug("Failed to offer block into mailbox: {} within: {}ms", _id, timeoutMs);
setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(
new TimeoutException("Timed out while waiting for receive operator to consume data from mailbox: " + _id)));
return false;
return ReceivingMailboxStatus.TIMEOUT;
}
} catch (InterruptedException e) {
LOGGER.error("Interrupted while offering block into mailbox: {}", _id);
setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(e));
return false;
return ReceivingMailboxStatus.ERROR;
}
}

Expand Down Expand Up @@ -133,4 +137,8 @@ public void cancel() {
public int getNumPendingBlocks() {
return _blocks.size();
}

public enum ReceivingMailboxStatus {
SUCCESS, ERROR, TIMEOUT, EARLY_TERMINATED
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,10 @@ void send(TransferableBlock block)
* No more blocks can be sent after calling this method.
*/
void cancel(Throwable t);

/**
* Returns whether the {@link ReceivingMailbox} is already closed. There is no need to send more blocks after the
* mailbox is terminated.
*/
boolean isTerminated();
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,27 @@ public void onNext(MailboxContent mailboxContent) {
}

long timeoutMs = Context.current().getDeadline().timeRemaining(TimeUnit.MILLISECONDS);
if (_mailbox.offer(block, timeoutMs)) {
_responseObserver.onNext(MailboxStatus.newBuilder().setMailboxId(mailboxId)
.putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY,
Integer.toString(_mailbox.getNumPendingBlocks())).build());
} else {
LOGGER.warn("Failed to add block into mailbox: {} within timeout: {}ms", mailboxId, timeoutMs);
cancelStream();
ReceivingMailbox.ReceivingMailboxStatus status = _mailbox.offer(block, timeoutMs);
switch (status) {
case SUCCESS:
_responseObserver.onNext(MailboxStatus.newBuilder().setMailboxId(mailboxId)
.putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY,
Integer.toString(_mailbox.getNumPendingBlocks())).build());
break;
case ERROR:
LOGGER.warn("Mailbox: {} already errored out (received error block before)", mailboxId);
cancelStream();
break;
case TIMEOUT:
LOGGER.warn("Timed out adding block into mailbox: {} with timeout: {}ms", mailboxId, timeoutMs);
cancelStream();
break;
case EARLY_TERMINATED:
LOGGER.debug("Mailbox: {} has been early terminated", mailboxId);
onCompleted();
break;
default:
throw new IllegalStateException("Unsupported mailbox status: " + status);
}
} catch (Exception e) {
String errorMessage = "Caught exception while processing blocks for mailbox: " + mailboxId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pinot.query.runtime.blocks;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -47,12 +46,6 @@ public class TransferableBlock implements Block {
private List<Object[]> _container;

public TransferableBlock(List<Object[]> container, DataSchema dataSchema, DataBlock.Type containerType) {
this(container, dataSchema, containerType, false);
}

@VisibleForTesting
TransferableBlock(List<Object[]> container, DataSchema dataSchema, DataBlock.Type containerType,
boolean isErrorBlock) {
_container = container;
_dataSchema = dataSchema;
_type = containerType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -118,37 +119,40 @@ public String toExplainString() {

@Override
protected TransferableBlock getNextBlock() {
boolean canContinue = true;
TransferableBlock transferableBlock;
try {
transferableBlock = _sourceOperator.nextBlock();
if (transferableBlock.isNoOpBlock()) {
return transferableBlock;
} else if (transferableBlock.isEndOfStreamBlock()) {
if (transferableBlock.isSuccessfulEndOfStreamBlock()) {
// Stats need to be populated here because the block is being sent to the mailbox
// and the receiving opChain will not be able to access the stats from the previous opChain
TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock(
OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
sendTransferableBlock(eosBlockWithStats);
} else {
sendTransferableBlock(transferableBlock);
}
} else { // normal blocks
// check whether we should continue depending on exchange queue condition.
canContinue = sendTransferableBlock(transferableBlock);
TransferableBlock block = _sourceOperator.nextBlock();
if (block.isNoOpBlock()) {
return block;
} else if (block.isErrorBlock()) {
sendTransferableBlock(block);
return block;
} else if (block.isSuccessfulEndOfStreamBlock()) {
// Stats need to be populated here because the block is being sent to the mailbox
// and the receiving opChain will not be able to access the stats from the previous opChain
TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock(
OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
sendTransferableBlock(eosBlockWithStats);
return block;
} else {
// Data block
boolean canContinue = sendTransferableBlock(block);
// Yield if we cannot continue to put transferable block into the sending queue
return canContinue ? block : TransferableBlockUtils.getNoOpTransferableBlock();
}
} catch (EarlyTerminationException e) {
// TODO: Query stats are not sent when opChain is early terminated
LOGGER.debug("Early terminating opChain: " + _context.getId());
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
} catch (Exception e) {
transferableBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
TransferableBlock errorBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
try {
LOGGER.error("Exception while transferring data on opChain: " + _context.getId(), e);
sendTransferableBlock(transferableBlock);
sendTransferableBlock(errorBlock);
} catch (Exception e2) {
LOGGER.error("Exception while sending error block.", e2);
}
return errorBlock;
}
// yield if we cannot continue to put transferable block into the sending queue
return canContinue ? transferableBlock : TransferableBlockUtils.getNoOpTransferableBlock();
}

private boolean sendTransferableBlock(TransferableBlock block)
Expand All @@ -157,7 +161,8 @@ private boolean sendTransferableBlock(TransferableBlock block)
if (_exchange.offerBlock(block, timeoutMs)) {
return _exchange.getRemainingCapacity() > 0;
} else {
throw new TimeoutException("Timeout while offering data block into the sending queue.");
throw new TimeoutException(
String.format("Timed out while offering block into the sending queue after %dms", timeoutMs));
}
}

Expand Down
Loading