Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
private final int _defaultHllLog2m;
private final boolean _enableQueryLimitOverride;
private final boolean _enableDistinctCountBitmapOverride;
private final Map<Long, QueryServers> _queriesById = new ConcurrentHashMap<>();
private final boolean _enableQueryCancellation;
private final Map<Long, QueryServers> _queriesById;

public BaseBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
Expand Down Expand Up @@ -164,13 +163,13 @@ public BaseBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager
Broker.DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND));
_numDroppedLog = new AtomicInteger(0);
_numDroppedLogRateLimiter = RateLimiter.create(1.0);
_enableQueryCancellation =
boolean enableQueryCancellation =
Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
_queriesById = enableQueryCancellation ? new ConcurrentHashMap<>() : null;
LOGGER.info(
"Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps, "
+ "enabling query cancellation: {}",
_brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, _queryLogRateLimiter.getRate(),
_enableQueryCancellation);
+ "enabling query cancellation: {}", _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength,
_queryLogRateLimiter.getRate(), enableQueryCancellation);
}

private String getDefaultBrokerId() {
Expand All @@ -184,13 +183,13 @@ private String getDefaultBrokerId() {

@Override
public Map<Long, String> getRunningQueries() {
Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on broker");
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
return _queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()._query));
}

@VisibleForTesting
Set<ServerInstance> getRunningServers(long requestId) {
Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on broker");
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
QueryServers queryServers = _queriesById.get(requestId);
return (queryServers == null) ? Collections.emptySet() : queryServers._servers;
}
Expand All @@ -199,7 +198,7 @@ Set<ServerInstance> getRunningServers(long requestId) {
public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception {
Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on broker");
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
QueryServers queryServers = _queriesById.get(queryId);
if (queryServers == null) {
return false;
Expand Down Expand Up @@ -278,7 +277,7 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
requestContext.setQuery(query);
return handleRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext);
} finally {
if (_enableQueryCancellation) {
if (_queriesById != null) {
_queriesById.remove(requestId);
LOGGER.debug("Remove track of running query: {}", requestId);
}
Expand Down Expand Up @@ -665,7 +664,7 @@ private BrokerResponseNative handleRequest(long requestId, String query,
realtimeRoutingTable = null;
}
}
if (_enableQueryCancellation) {
if (_queriesById != null) {
// Start to track the running query for cancellation just before sending it out to servers to avoid any potential
// failures that could happen before sending it out, like failures to calculate the routing table etc.
// TODO: Even tracking the query as late as here, a potential race condition between calling cancel API and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public static void setMaxLinesOfStackTrace(int maxLinesOfStackTracePerFrame) {
public static final int ACCESS_DENIED_ERROR_CODE = 180;
public static final int TABLE_DOES_NOT_EXIST_ERROR_CODE = 190;
public static final int QUERY_EXECUTION_ERROR_CODE = 200;
public static final int QUERY_CANCELLATION_ERROR_CODE = 205;
// TODO: Handle these errors in broker
public static final int SERVER_SHUTTING_DOWN_ERROR_CODE = 210;
public static final int SERVER_OUT_OF_CAPACITY_ERROR_CODE = 211;
Expand Down Expand Up @@ -90,6 +91,8 @@ public static void setMaxLinesOfStackTrace(int maxLinesOfStackTracePerFrame) {
public static final ProcessingException TABLE_DOES_NOT_EXIST_ERROR =
new ProcessingException(TABLE_DOES_NOT_EXIST_ERROR_CODE);
public static final ProcessingException QUERY_EXECUTION_ERROR = new ProcessingException(QUERY_EXECUTION_ERROR_CODE);
public static final ProcessingException QUERY_CANCELLATION_ERROR =
new ProcessingException(QUERY_CANCELLATION_ERROR_CODE);
public static final ProcessingException SERVER_SCHEDULER_DOWN_ERROR =
new ProcessingException(SERVER_SHUTTING_DOWN_ERROR_CODE);
public static final ProcessingException SERVER_OUT_OF_CAPACITY_ERROR =
Expand Down Expand Up @@ -132,6 +135,7 @@ public static void setMaxLinesOfStackTrace(int maxLinesOfStackTracePerFrame) {
COMBINE_SEGMENT_PLAN_TIMEOUT_ERROR.setMessage("CombineSegmentPlanTimeoutError");
TABLE_DOES_NOT_EXIST_ERROR.setMessage("TableDoesNotExistError");
QUERY_EXECUTION_ERROR.setMessage("QueryExecutionError");
QUERY_CANCELLATION_ERROR.setMessage("QueryCancellationError");
SERVER_SCHEDULER_DOWN_ERROR.setMessage("ServerShuttingDown");
SERVER_OUT_OF_CAPACITY_ERROR.setMessage("ServerOutOfCapacity");
SERVER_TABLE_MISSING_ERROR.setMessage("ServerTableMissing");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.core.util.trace.TraceRunnable;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.exception.QueryCancelledException;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -118,6 +119,8 @@ public void runJob() {
IntermediateResultsBlock mergedBlock;
try {
mergedBlock = mergeResults();
} catch (InterruptedException e) {
throw new QueryCancelledException("Cancelled while merging results blocks", e);
} catch (Exception e) {
LOGGER.error("Caught exception while merging results blocks (query: {})", _queryContext, e);
mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR, e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.spi.exception.QueryCancelledException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -76,8 +77,10 @@ protected IntermediateResultsBlock getNextBlock() {
assert orderByExpressions != null;
if (orderByExpressions.get(0).getExpression().getType() == ExpressionContext.Type.IDENTIFIER) {
try {
return new MinMaxValueBasedSelectionOrderByCombineOperator(_operators, _queryContext,
_executorService).getNextBlock();
return new MinMaxValueBasedSelectionOrderByCombineOperator(_operators, _queryContext, _executorService)
.getNextBlock();
} catch (QueryCancelledException e) {
throw e;
} catch (Exception e) {
LOGGER.warn("Caught exception while using min/max value based combine, using the default combine", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
import org.apache.pinot.core.util.trace.TraceCallable;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.exception.QueryCancelledException;
import org.apache.pinot.spi.trace.InvocationRecording;
import org.apache.pinot.spi.trace.InvocationScope;
import org.apache.pinot.spi.trace.Tracing;
Expand Down Expand Up @@ -154,6 +155,8 @@ public List<Operator> callJob() {
Throwable cause = e.getCause();
if (cause instanceof BadQueryRequestException) {
throw (BadQueryRequestException) cause;
} else if (e instanceof InterruptedException) {
throw new QueryCancelledException("Cancelled while running CombinePlanNode", e);
} else {
throw new RuntimeException("Caught exception while running CombinePlanNode.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.exception.QueryCancelledException;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants;
import org.joda.time.Interval;
Expand Down Expand Up @@ -245,16 +246,26 @@ private DataTable processQueryInternal(ServerQueryRequest queryRequest, Executor
queryRequest.isEnableStreaming());
} catch (Exception e) {
_serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1);

// Do not log error for BadQueryRequestException because it's caused by bad query
dataTable = DataTableFactory.getEmptyDataTable();
// Do not log verbose error for BadQueryRequestException and QueryCancelledException.
if (e instanceof BadQueryRequestException) {
LOGGER.info("Caught BadQueryRequestException while processing requestId: {}, {}", requestId, e.getMessage());
dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
} else if (e instanceof QueryCancelledException) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Cancelled while processing requestId: {}", requestId, e);
} else {
LOGGER.info("Cancelled while processing requestId: {}, {}", requestId, e.getMessage());
}
// NOTE most likely the onFailure() callback registered on query future in InstanceRequestHandler would
// return the error table to broker sooner than here. But in case of race condition, we construct the error
// table here too.
dataTable.addException(QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR,
"Query cancelled on: " + _instanceDataManager.getInstanceId()));
} else {
LOGGER.error("Exception processing requestId {}", requestId, e);
dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
}

dataTable = DataTableFactory.getEmptyDataTable();
dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
} finally {
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
tableDataManager.releaseSegment(segmentDataManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,12 @@
package org.apache.pinot.core.query.scheduler;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.RateLimiter;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAccumulator;
Expand Down Expand Up @@ -77,9 +71,7 @@ public abstract class QueryScheduler {
private final RateLimiter _queryLogRateLimiter;
private final RateLimiter _numDroppedLogRateLimiter;
private final AtomicInteger _numDroppedLogCounter;
private final boolean _enableQueryCancellation;
protected volatile boolean _isRunning = false;
private final Map<String, Future<byte[]>> _queryFuturesById = new ConcurrentHashMap<>();
/**
* Constructor to initialize QueryScheduler
* @param queryExecutor QueryExecutor engine to use
Expand All @@ -102,11 +94,6 @@ public QueryScheduler(PinotConfiguration config, QueryExecutor queryExecutor, Re
_numDroppedLogRateLimiter = RateLimiter.create(1.0d);
_numDroppedLogCounter = new AtomicInteger(0);
LOGGER.info("Query log max rate: {}", _queryLogRateLimiter.getRate());

_enableQueryCancellation = Boolean.parseBoolean(config.getProperty(ENABLE_QUERY_CANCELLATION_KEY));
if (_enableQueryCancellation) {
LOGGER.info("Enable query cancellation");
}
}

/**
Expand All @@ -117,76 +104,6 @@ public QueryScheduler(PinotConfiguration config, QueryExecutor queryExecutor, Re
*/
public abstract ListenableFuture<byte[]> submit(ServerQueryRequest queryRequest);

/**
* Submit a query for execution and track runtime context about the query for things like cancellation.
* @param queryRequest query to schedule for execution
* @return Listenable future for query result representing serialized response. Custom callbacks can be added on
* the future to clean up the runtime context tracked during query execution.
*/
public ListenableFuture<byte[]> submitQuery(ServerQueryRequest queryRequest) {
ListenableFuture<byte[]> future = submit(queryRequest);
if (_enableQueryCancellation) {
String queryId = queryRequest.getQueryId();
// Track the running query for cancellation.
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Keep track of running query: {}", queryId);
}
_queryFuturesById.put(queryId, future);
// And remove the track when the query ends.
Futures.addCallback(future, new FutureCallback<byte[]>() {
@Override
public void onSuccess(@Nullable byte[] ignored) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Remove track of running query: {} on success", queryId);
}
_queryFuturesById.remove(queryId);
}

@Override
public void onFailure(Throwable ignored) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Remove track of running query: {} on failure", queryId);
}
_queryFuturesById.remove(queryId);
}
}, MoreExecutors.directExecutor());
}
return future;
}

/**
* Cancel a query as identified by the queryId. This method is non-blocking and the query may still run for a while
* after calling this method. This method can be called multiple times.
* TODO: refine the errmsg when query is cancelled, instead of bubbling up the executor's CancellationException.
*
* @param queryId a unique Id to find the query
* @return true if a running query exists for the given queryId.
*/
public boolean cancelQuery(String queryId) {
Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on server");
// Keep the future as it'll be cleaned up by the thread executing the query.
Future<byte[]> future = _queryFuturesById.get(queryId);
if (future == null) {
return false;
}
boolean done = future.isDone();
if (!done) {
future.cancel(true);
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Cancelled query: {} that's done: {}", queryId, done);
}
return true;
}

/**
* @return list of ids of the queries currently running on the server.
*/
public Set<String> getRunningQueryIds() {
Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on server");
return new HashSet<>(_queryFuturesById.keySet());
}

/**
* Query scheduler name for logging
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pinot.common.utils.TlsUtils;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.server.access.AccessControl;
import org.apache.pinot.spi.env.PinotConfiguration;


/**
Expand Down Expand Up @@ -82,8 +83,8 @@ public static ChannelHandler getDataTableHandler(QueryRouter queryRouter, Server
* The {@code getInstanceRequestHandler} return a {@code InstanceRequestHandler} Netty inbound handler on Pinot
* Server side to handle the serialized instance requests sent from Pinot Broker.
*/
public static ChannelHandler getInstanceRequestHandler(QueryScheduler queryScheduler, ServerMetrics serverMetrics,
AccessControl accessControl) {
return new InstanceRequestHandler(queryScheduler, serverMetrics, accessControl);
public static ChannelHandler getInstanceRequestHandler(String instanceName, PinotConfiguration config,
QueryScheduler queryScheduler, ServerMetrics serverMetrics, AccessControl accessControl) {
return new InstanceRequestHandler(instanceName, config, queryScheduler, serverMetrics, accessControl);
}
}
Loading