Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, errorMessage));
}

boolean traceEnabled = Boolean.parseBoolean(
sqlNodeAndOptions.getOptions().getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
Map<String, String> queryOptions = sqlNodeAndOptions.getOptions();
boolean traceEnabled = Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));

ResultTable queryResults;
Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>();
Expand All @@ -177,8 +177,8 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S

long executionStartTimeNs = System.nanoTime();
try {
queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, queryTimeoutMs,
sqlNodeAndOptions.getOptions(), stageIdStatsMap, traceEnabled);
queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, queryTimeoutMs, queryOptions,
stageIdStatsMap);
} catch (Throwable t) {
String consolidatedMessage = ExceptionUtils.consolidateExceptionMessages(t);
LOGGER.error("Caught exception executing request {}: {}, {}", requestId, query, consolidatedMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,12 @@

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
Expand All @@ -39,7 +36,6 @@
import org.apache.pinot.query.mailbox.MailboxIdUtils;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.plannode.MailboxSendNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.MailboxMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils;
Expand All @@ -50,14 +46,12 @@
import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
Expand All @@ -72,17 +66,13 @@ public class QueryRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(QueryRunner.class);
private static final String PINOT_V1_SERVER_QUERY_CONFIG_PREFIX = "pinot.server.query.executor";

// This is a temporary before merging the 2 type of executor.
private ServerQueryExecutorV1Impl _serverExecutor;
private HelixManager _helixManager;
private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
private MailboxService _mailboxService;
private String _hostname;
private int _port;
private ServerMetrics _serverMetrics;

private ExecutorService _opChainExecutor;

private OpChainSchedulerService _scheduler;
private MailboxService _mailboxService;
private ServerQueryExecutorV1Impl _leafQueryExecutor;

// Group-by settings
@Nullable
Expand All @@ -102,12 +92,14 @@ public class QueryRunner {
*/
public void init(PinotConfiguration config, InstanceDataManager instanceDataManager, HelixManager helixManager,
ServerMetrics serverMetrics) {
_helixManager = helixManager;
_serverMetrics = serverMetrics;

String instanceName = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
_hostname = instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ? instanceName.substring(
String hostname = instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ? instanceName.substring(
CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName;
_port = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
int port = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT);
_helixManager = helixManager;

// TODO: Consider using separate config for intermediate stage and leaf stage
String numGroupsLimitStr = config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT);
Expand All @@ -121,29 +113,28 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana
String joinOverflowModeStr = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_JOIN_OVERFLOW_MODE);
_joinOverflowMode = joinOverflowModeStr != null ? JoinOverFlowMode.valueOf(joinOverflowModeStr) : null;

//TODO: make this configurable
_opChainExecutor =
ExecutorServiceUtils.create(config, "pinot.query.runner.opchain", "op_chain_worker_on_" + port + "_port");
_scheduler = new OpChainSchedulerService(getOpChainExecutorService());
_mailboxService = new MailboxService(hostname, port, config);
try {
//TODO: make this configurable
_opChainExecutor = ExecutorServiceUtils.create(config, "pinot.query.runner.opchain",
"op_chain_worker_on_" + _port + "_port");
_scheduler = new OpChainSchedulerService(getOpChainExecutorService());
_mailboxService = new MailboxService(_hostname, _port, config);
_serverExecutor = new ServerQueryExecutorV1Impl();
_serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), instanceDataManager, serverMetrics);
_leafQueryExecutor = new ServerQueryExecutorV1Impl();
_leafQueryExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), instanceDataManager, serverMetrics);
} catch (Exception e) {
throw new RuntimeException(e);
}

LOGGER.info("Initialized QueryRunner with hostname: {}, port: {}", hostname, port);
}

public void start()
throws TimeoutException {
_helixPropertyStore = _helixManager.getHelixPropertyStore();
public void start() {
_mailboxService.start();
_serverExecutor.start();
_leafQueryExecutor.start();
}

public void shutDown()
throws TimeoutException {
_serverExecutor.shutDown();
public void shutDown() {
_leafQueryExecutor.shutDown();
_mailboxService.shutdown();
ExecutorServiceUtils.close(_opChainExecutor);
}
Expand All @@ -156,17 +147,15 @@ public void shutDown()
*/
public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadata) {
long requestId = Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
long timeoutMs = Long.parseLong(requestMetadata.get(QueryOptionKey.TIMEOUT_MS));
boolean isTraceEnabled =
Boolean.parseBoolean(requestMetadata.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
long timeoutMs = Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
long deadlineMs = System.currentTimeMillis() + timeoutMs;

setStageCustomProperties(distributedStagePlan.getStageMetadata().getCustomProperties(), requestMetadata);

// run pre-stage execution for all pipeline breakers
PipelineBreakerResult pipelineBreakerResult =
PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan, deadlineMs,
requestId, isTraceEnabled);
PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
requestMetadata, requestId, deadlineMs);

// Send error block to all the receivers if pipeline breaker fails
if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() != null) {
Expand All @@ -193,13 +182,15 @@ public void processQuery(DistributedStagePlan distributedStagePlan, Map<String,
}

// run OpChain
OpChainExecutionContext executionContext =
new OpChainExecutionContext(_mailboxService, requestId, distributedStagePlan.getStageId(),
distributedStagePlan.getServer(), deadlineMs, requestMetadata, distributedStagePlan.getStageMetadata(),
pipelineBreakerResult);
OpChain opChain;
if (DistributedStagePlan.isLeafStage(distributedStagePlan)) {
opChain = compileLeafStage(requestId, distributedStagePlan, requestMetadata, pipelineBreakerResult, deadlineMs,
isTraceEnabled);
opChain = compileLeafStage(executionContext, distributedStagePlan);
} else {
opChain = compileIntermediateStage(requestId, distributedStagePlan, requestMetadata, pipelineBreakerResult,
deadlineMs, isTraceEnabled);
opChain = PhysicalPlanVisitor.walkPlanNode(distributedStagePlan.getStageRoot(), executionContext);
}
_scheduler.register(opChain);
}
Expand Down Expand Up @@ -248,51 +239,36 @@ public ExecutorService getOpChainExecutorService() {
return _opChainExecutor;
}

private OpChain compileIntermediateStage(long requestId, DistributedStagePlan distributedStagePlan,
Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult, long deadlineMs,
boolean isTraceEnabled) {
PlanNode stageRoot = distributedStagePlan.getStageRoot();
OpChainExecutionContext opChainContext = new OpChainExecutionContext(_mailboxService, requestId,
stageRoot.getPlanFragmentId(), distributedStagePlan.getServer(), deadlineMs,
distributedStagePlan.getStageMetadata(), pipelineBreakerResult, isTraceEnabled);
return PhysicalPlanVisitor.walkPlanNode(stageRoot,
new PhysicalPlanContext(opChainContext, pipelineBreakerResult));
}

private OpChain compileLeafStage(long requestId, DistributedStagePlan distributedStagePlan,
Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult, long deadlineMs,
boolean isTraceEnabled) {
OpChainExecutionContext opChainContext = new OpChainExecutionContext(_mailboxService, requestId,
distributedStagePlan.getStageId(), distributedStagePlan.getServer(), deadlineMs,
distributedStagePlan.getStageMetadata(), pipelineBreakerResult, isTraceEnabled);
PhysicalPlanContext planContext = new PhysicalPlanContext(opChainContext, pipelineBreakerResult);
List<ServerPlanRequestContext> serverPlanRequestContexts = ServerPlanRequestUtils.constructServerQueryRequests(
planContext, distributedStagePlan, requestMetadataMap, _helixPropertyStore);
private OpChain compileLeafStage(OpChainExecutionContext executionContext,
DistributedStagePlan distributedStagePlan) {
List<ServerPlanRequestContext> serverPlanRequestContexts =
ServerPlanRequestUtils.constructServerQueryRequests(executionContext, distributedStagePlan,
_helixManager.getHelixPropertyStore());
List<ServerQueryRequest> serverQueryRequests = new ArrayList<>(serverPlanRequestContexts.size());
long queryArrivalTimeMs = System.currentTimeMillis();
for (ServerPlanRequestContext requestContext : serverPlanRequestContexts) {
serverQueryRequests.add(new ServerQueryRequest(requestContext.getInstanceRequest(),
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), System.currentTimeMillis()));
serverQueryRequests.add(
new ServerQueryRequest(requestContext.getInstanceRequest(), _serverMetrics, queryArrivalTimeMs));
}
MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot();
OpChainExecutionContext opChainExecutionContext = new OpChainExecutionContext(planContext);
MultiStageOperator leafStageOperator =
new LeafStageTransferableBlockOperator(opChainExecutionContext, this::processServerQueryRequest,
serverQueryRequests, sendNode.getDataSchema());
new LeafStageTransferableBlockOperator(executionContext, this::processServerQueryRequest, serverQueryRequests,
sendNode.getDataSchema());
MailboxSendOperator mailboxSendOperator =
new MailboxSendOperator(opChainExecutionContext, leafStageOperator, sendNode.getDistributionType(),
new MailboxSendOperator(executionContext, leafStageOperator, sendNode.getDistributionType(),
sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(), sendNode.getCollationDirections(),
sendNode.isSortOnSender(), sendNode.getReceiverStageId());
return new OpChain(opChainExecutionContext, mailboxSendOperator, Collections.emptyList());
return new OpChain(executionContext, mailboxSendOperator);
}

private InstanceResponseBlock processServerQueryRequest(ServerQueryRequest request) {
InstanceResponseBlock result;
try {
result = _serverExecutor.execute(request, getOpChainExecutorService());
result = _leafQueryExecutor.execute(request, getOpChainExecutorService());
} catch (Exception e) {
InstanceResponseBlock errorResponse = new InstanceResponseBlock();
errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE,
e.getMessage() + QueryException.getTruncatedStackTrace(e));
errorResponse.getExceptions()
.put(QueryException.QUERY_EXECUTION_ERROR_CODE, e.getMessage() + QueryException.getTruncatedStackTrace(e));
result = errorResponse;
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pinot.query.runtime.operator;

import java.util.List;
import java.util.function.Consumer;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
Expand All @@ -34,31 +33,21 @@
public class OpChain implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(OpChain.class);

private final MultiStageOperator _root;
private final List<String> _receivingMailboxIds;
private final OpChainId _id;
private final OpChainStats _stats;
private final Consumer<OpChainId> _opChainFinishCallback;
private final MultiStageOperator _root;
private final Consumer<OpChainId> _finishCallback;

public OpChain(OpChainExecutionContext context, MultiStageOperator root, List<String> receivingMailboxIds) {
this(context, root, receivingMailboxIds, (id) -> { });
public OpChain(OpChainExecutionContext context, MultiStageOperator root) {
this(context, root, (id) -> {
});
}

public OpChain(OpChainExecutionContext context, MultiStageOperator root, List<String> receivingMailboxIds,
Consumer<OpChainId> opChainFinishCallback) {
_root = root;
_receivingMailboxIds = receivingMailboxIds;
public OpChain(OpChainExecutionContext context, MultiStageOperator root, Consumer<OpChainId> finishCallback) {
_id = context.getId();
_stats = context.getStats();
_opChainFinishCallback = opChainFinishCallback;
}

public Operator<TransferableBlock> getRoot() {
return _root;
}

public List<String> getReceivingMailboxIds() {
return _receivingMailboxIds;
_root = root;
_finishCallback = finishCallback;
}

public OpChainId getId() {
Expand All @@ -70,6 +59,10 @@ public OpChainStats getStats() {
return _stats;
}

public Operator<TransferableBlock> getRoot() {
return _root;
}

@Override
public String toString() {
return "OpChain{" + _id + "}";
Expand All @@ -86,7 +79,7 @@ public void close() {
try {
_root.close();
} finally {
_opChainFinishCallback.accept(getId());
_finishCallback.accept(getId());
LOGGER.trace("OpChain callback called");
}
}
Expand All @@ -102,7 +95,7 @@ public void cancel(Throwable e) {
try {
_root.cancel(e);
} finally {
_opChainFinishCallback.accept(getId());
_finishCallback.accept(getId());
LOGGER.trace("OpChain callback called");
}
}
Expand Down
Loading