Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public void runJob() {
try {
long timeOutMs = reducerContext.getReduceTimeOutMs() - (System.currentTimeMillis() - start);
if (!countDownLatch.await(timeOutMs, TimeUnit.MILLISECONDS)) {
throw new TimeoutException("Timed out in broker reduce phase");
throw new TimeoutException("Timed out on broker reduce phase");
}
Throwable t = exception.get();
if (t != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public class ReceivingMailbox {
public static final int DEFAULT_MAX_PENDING_BLOCKS = 5;

private static final Logger LOGGER = LoggerFactory.getLogger(ReceivingMailbox.class);
private static final MseBlockWithStats CANCELLED_ERROR_BLOCK = new MseBlockWithStats(
ErrorMseBlock.fromException(new RuntimeException("Cancelled by receiver")), Collections.emptyList());
// This was previously a static final attribute, but now that includes server and stage, we cannot use constants
private volatile MseBlockWithStats _cancelledErrorBlock;

private final String _id;
// TODO: Make the queue size configurable
Expand Down Expand Up @@ -152,7 +152,7 @@ private ReceivingMailboxStatus offerPrivate(MseBlock block, List<DataBuffer> sta
MseBlockWithStats errorBlock = _errorBlock.get();
if (errorBlock != null) {
LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring the late block", _id);
return errorBlock == CANCELLED_ERROR_BLOCK ? ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR;
return errorBlock == _cancelledErrorBlock ? ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR;
}
if (timeoutMs <= 0) {
LOGGER.debug("Mailbox: {} is already timed out", _id);
Expand All @@ -177,7 +177,7 @@ private ReceivingMailboxStatus offerPrivate(MseBlock block, List<DataBuffer> sta
} else {
LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring the late block", _id);
_blocks.clear();
return errorBlock == CANCELLED_ERROR_BLOCK ? ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR;
return errorBlock == _cancelledErrorBlock ? ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR;
}
} else {
LOGGER.debug("Failed to offer block into mailbox: {} within: {}ms", _id, timeoutMs);
Expand Down Expand Up @@ -233,8 +233,14 @@ public void earlyTerminate() {
*/
public void cancel() {
LOGGER.debug("Cancelling mailbox: {}", _id);
if (_errorBlock.compareAndSet(null, CANCELLED_ERROR_BLOCK)) {
_blocks.clear();
if (_errorBlock.get() == null) {
MseBlockWithStats errorBlock = new MseBlockWithStats(
ErrorMseBlock.fromError(QueryErrorCode.EXECUTION_TIMEOUT, "Cancelled by receiver"),
Collections.emptyList());
if (_errorBlock.compareAndSet(null, errorBlock)) {
_cancelledErrorBlock = errorBlock;
_blocks.clear();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,13 @@ public ErrorMseBlock(int stageId, int workerId, String serverId, Map<QueryErrorC
public static ErrorMseBlock fromMap(Map<QueryErrorCode, String> errorMessages) {
int stage;
int worker;
String server;
String server = QueryThreadContext.isInitialized() ? QueryThreadContext.getInstanceId() : "unknown";
if (MseWorkerThreadContext.isInitialized()) {
stage = MseWorkerThreadContext.getStageId();
worker = MseWorkerThreadContext.getWorkerId();
server = QueryThreadContext.getInstanceId();
} else {
stage = -1; // Default value when not initialized
worker = -1; // Default value when not initialized
server = null; // Default value when not initialized
}
return new ErrorMseBlock(stage, worker, server, errorMessages);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.util.DataBlockExtractUtils;
import org.apache.pinot.core.util.trace.TracedThreadFactory;
import org.apache.pinot.query.MseWorkerThreadContext;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
Expand Down Expand Up @@ -614,7 +615,11 @@ public static QueryResult runReducer(long requestId,
ArrayList<Object[]> resultRows = new ArrayList<>();
MseBlock block;
MultiStageQueryStats queryStats;
try (OpChain opChain = PlanNodeToOpChain.convert(rootNode, executionContext, (a, b) -> { })) {
try (
QueryThreadContext.CloseableContext mseCloseableCtx = MseWorkerThreadContext.open();
OpChain opChain = PlanNodeToOpChain.convert(rootNode, executionContext, (a, b) -> { })) {
MseWorkerThreadContext.setStageId(0);
MseWorkerThreadContext.setWorkerId(0);
MultiStageOperator rootOperator = opChain.getRoot();
block = rootOperator.nextBlock();
while (block.isData()) {
Expand Down Expand Up @@ -648,21 +653,21 @@ public static QueryResult runReducer(long requestId,
Map.Entry<QueryErrorCode, String> error;
String from;
if (errorBlock.getStageId() >= 0) {
from = "from stage " + errorBlock.getStageId();
from = " from stage " + errorBlock.getStageId();
if (errorBlock.getServerId() != null) {
from += " on server " + errorBlock.getServerId();
from += " on " + errorBlock.getServerId();
}
} else {
from = "from servers";
from = "";
}
if (queryExceptions.size() == 1) {
error = queryExceptions.entrySet().iterator().next();
errorMessage = "Received 1 error " + from + ": " + error.getValue();
errorMessage = "Received 1 error" + from + ": " + error.getValue();
} else {
error = queryExceptions.entrySet().stream()
.max(QueryDispatcher::compareErrors)
.orElseThrow();
errorMessage = "Received " + queryExceptions.size() + " errors " + from + ". "
errorMessage = "Received " + queryExceptions.size() + " errors" + from + ". "
+ "The one with highest priority is: " + error.getValue();
}
QueryProcessingException processingEx = new QueryProcessingException(error.getKey().getId(), errorMessage);
Expand Down
Loading