Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use query timeout for planning phase #5990

Merged
merged 1 commit into from
Sep 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public class AggregationOnlyCombineOperator extends BaseCombineOperator {
private static final String OPERATOR_NAME = "AggregationOnlyCombineOperator";

public AggregationOnlyCombineOperator(List<Operator> operators, QueryContext queryContext,
ExecutorService executorService, long timeOutMs) {
super(operators, queryContext, executorService, timeOutMs);
ExecutorService executorService, long endTimeMs) {
super(operators, queryContext, executorService, endTimeMs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,18 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
protected final List<Operator> _operators;
protected final QueryContext _queryContext;
protected final ExecutorService _executorService;
protected final long _timeOutMs;
protected final long _endTimeMs;

public BaseCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
long timeOutMs) {
long endTimeMs) {
_operators = operators;
_queryContext = queryContext;
_executorService = executorService;
_timeOutMs = timeOutMs;
_endTimeMs = endTimeMs;
}

@Override
protected IntermediateResultsBlock getNextBlock() {
long startTimeMs = System.currentTimeMillis();
long endTimeMs = startTimeMs + _timeOutMs;
int numOperators = _operators.size();
int numThreads = CombineOperatorUtils.getNumThreadsForQuery(numOperators);

Expand Down Expand Up @@ -124,7 +122,7 @@ public void runJob() {
int numBlocksMerged = 0;
while (numBlocksMerged < numOperators) {
IntermediateResultsBlock blockToMerge =
blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (blockToMerge == null) {
// Query times out, skip merging the remaining results blocks
LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,17 @@ public class GroupByCombineOperator extends BaseOperator<IntermediateResultsBloc
private final List<Operator> _operators;
private final QueryContext _queryContext;
private final ExecutorService _executorService;
private final long _timeOutMs;
private final long _endTimeMs;
// Limit on number of groups stored, beyond which no new group will be created
private final int _innerSegmentNumGroupsLimit;
private final int _interSegmentNumGroupsLimit;

public GroupByCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
long timeOutMs, int innerSegmentNumGroupsLimit) {
long endTimeMs, int innerSegmentNumGroupsLimit) {
_operators = operators;
_queryContext = queryContext;
_executorService = executorService;
_timeOutMs = timeOutMs;
_endTimeMs = endTimeMs;
_innerSegmentNumGroupsLimit = innerSegmentNumGroupsLimit;
_interSegmentNumGroupsLimit =
(int) Math.min((long) innerSegmentNumGroupsLimit * INTER_SEGMENT_NUM_GROUPS_LIMIT_FACTOR, Integer.MAX_VALUE);
Expand Down Expand Up @@ -189,11 +189,12 @@ public void runJob() {
}

try {
boolean opCompleted = operatorLatch.await(_timeOutMs, TimeUnit.MILLISECONDS);
long timeoutMs = _endTimeMs - System.currentTimeMillis();
boolean opCompleted = operatorLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
if (!opCompleted) {
// If this happens, the broker side should already timed out, just log the error and return
String errorMessage = String
.format("Timed out while combining group-by results after %dms, queryContext = %s", _timeOutMs,
.format("Timed out while combining group-by results after %dms, queryContext = %s", timeoutMs,
_queryContext);
LOGGER.error(errorMessage);
return new IntermediateResultsBlock(new TimeoutException(errorMessage));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,18 @@ public class GroupByOrderByCombineOperator extends BaseOperator<IntermediateResu
private final List<Operator> _operators;
private final QueryContext _queryContext;
private final ExecutorService _executorService;
private final long _timeOutMs;
private final long _endTimeMs;
private final int _indexedTableCapacity;
private final Lock _initLock;
private DataSchema _dataSchema;
private ConcurrentIndexedTable _indexedTable;

public GroupByOrderByCombineOperator(List<Operator> operators, QueryContext queryContext,
ExecutorService executorService, long timeOutMs) {
ExecutorService executorService, long endTimeMs) {
_operators = operators;
_queryContext = queryContext;
_executorService = executorService;
_timeOutMs = timeOutMs;
_endTimeMs = endTimeMs;
_initLock = new ReentrantLock();
_indexedTableCapacity = GroupByUtils.getTableCapacity(_queryContext);
}
Expand Down Expand Up @@ -220,11 +220,12 @@ public void runJob() {
}

try {
boolean opCompleted = operatorLatch.await(_timeOutMs, TimeUnit.MILLISECONDS);
long timeoutMs = _endTimeMs - System.currentTimeMillis();
boolean opCompleted = operatorLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
if (!opCompleted) {
// If this happens, the broker side should already timed out, just log the error and return
String errorMessage = String
.format("Timed out while combining group-by order-by results after %dms, queryContext = %s", _timeOutMs,
.format("Timed out while combining group-by order-by results after %dms, queryContext = %s", timeoutMs,
_queryContext);
LOGGER.error(errorMessage);
return new IntermediateResultsBlock(new TimeoutException(errorMessage));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public class SelectionOnlyCombineOperator extends BaseCombineOperator {
private final int _numRowsToKeep;

public SelectionOnlyCombineOperator(List<Operator> operators, QueryContext queryContext,
ExecutorService executorService, long timeOutMs) {
super(operators, queryContext, executorService, timeOutMs);
ExecutorService executorService, long endTimeMs) {
super(operators, queryContext, executorService, endTimeMs);
_numRowsToKeep = queryContext.getLimit();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public class SelectionOrderByCombineOperator extends BaseCombineOperator {
private final int _numRowsToKeep;

public SelectionOrderByCombineOperator(List<Operator> operators, QueryContext queryContext,
ExecutorService executorService, long timeOutMs) {
super(operators, queryContext, executorService, timeOutMs);
ExecutorService executorService, long endTimeMs) {
super(operators, queryContext, executorService, endTimeMs);
_numRowsToKeep = queryContext.getLimit() + queryContext.getOffset();
}

Expand All @@ -91,9 +91,6 @@ protected IntermediateResultsBlock getNextBlock() {
}

private IntermediateResultsBlock minMaxValueBasedCombine() {
long startTimeMs = System.currentTimeMillis();
long endTimeMs = startTimeMs + _timeOutMs;

List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions();
assert orderByExpressions != null;
int numOrderByExpressions = orderByExpressions.size();
Expand Down Expand Up @@ -270,7 +267,7 @@ public void runJob() {
int numBlocksMerged = 0;
while (numBlocksMerged + numOperatorsSkipped.get() < numOperators) {
IntermediateResultsBlock blockToMerge =
blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
blockingQueue.poll(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (blockToMerge == null) {
// Query times out, skip merging the remaining results blocks
LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,10 @@ public class CombinePlanNode implements PlanNode {
// Try to schedule 10 plans for each thread, or evenly distribute plans to all MAX_NUM_THREADS_PER_QUERY threads
private static final int TARGET_NUM_PLANS_PER_THREAD = 10;

private static final int TIME_OUT_IN_MILLISECONDS_FOR_PARALLEL_RUN = 10_000;

private final List<PlanNode> _planNodes;
private final QueryContext _queryContext;
private final ExecutorService _executorService;
private final long _timeOutMs;
private final long _endTimeMs;
private final int _numGroupsLimit;

/**
Expand All @@ -65,15 +63,15 @@ public class CombinePlanNode implements PlanNode {
* @param planNodes List of underlying plan nodes
* @param queryContext Query context
* @param executorService Executor service
* @param timeOutMs Time out in milliseconds for query execution (not for planning phase)
* @param endTimeMs End time in milliseconds for the query
* @param numGroupsLimit Limit of number of groups stored in each segment
*/
public CombinePlanNode(List<PlanNode> planNodes, QueryContext queryContext, ExecutorService executorService,
long timeOutMs, int numGroupsLimit) {
long endTimeMs, int numGroupsLimit) {
_planNodes = planNodes;
_queryContext = queryContext;
_executorService = executorService;
_timeOutMs = timeOutMs;
_endTimeMs = endTimeMs;
_numGroupsLimit = numGroupsLimit;
}

Expand All @@ -91,9 +89,6 @@ public Operator<IntermediateResultsBlock> run() {
} else {
// Large number of plan nodes, run them in parallel

// Calculate the time out timestamp
long endTimeMs = System.currentTimeMillis() + TIME_OUT_IN_MILLISECONDS_FOR_PARALLEL_RUN;

int numThreads = Math.min((numPlanNodes + TARGET_NUM_PLANS_PER_THREAD - 1) / TARGET_NUM_PLANS_PER_THREAD,
MAX_NUM_THREADS_PER_QUERY);

Expand Down Expand Up @@ -136,7 +131,7 @@ public List<Operator> callJob() {
try {
for (Future future : futures) {
List<Operator> ops =
(List<Operator>) future.get(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
(List<Operator>) future.get(_endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
operators.addAll(ops);
}
} catch (Exception e) {
Expand All @@ -163,22 +158,22 @@ public List<Operator> callJob() {
if (QueryContextUtils.isAggregationQuery(_queryContext)) {
if (_queryContext.getGroupByExpressions() == null) {
// Aggregation only
return new AggregationOnlyCombineOperator(operators, _queryContext, _executorService, _timeOutMs);
return new AggregationOnlyCombineOperator(operators, _queryContext, _executorService, _endTimeMs);
} else {
// Aggregation group-by
QueryOptions queryOptions = new QueryOptions(_queryContext.getQueryOptions());
if (queryOptions.isGroupByModeSQL()) {
return new GroupByOrderByCombineOperator(operators, _queryContext, _executorService, _timeOutMs);
return new GroupByOrderByCombineOperator(operators, _queryContext, _executorService, _endTimeMs);
}
return new GroupByCombineOperator(operators, _queryContext, _executorService, _timeOutMs, _numGroupsLimit);
return new GroupByCombineOperator(operators, _queryContext, _executorService, _endTimeMs, _numGroupsLimit);
}
} else {
if (_queryContext.getLimit() == 0 || _queryContext.getOrderByExpressions() == null) {
// Selection only
return new SelectionOnlyCombineOperator(operators, _queryContext, _executorService, _timeOutMs);
return new SelectionOnlyCombineOperator(operators, _queryContext, _executorService, _endTimeMs);
} else {
// Selection order-by
return new SelectionOrderByCombineOperator(operators, _queryContext, _executorService, _timeOutMs);
return new SelectionOrderByCombineOperator(operators, _queryContext, _executorService, _endTimeMs);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ public InstancePlanMakerImplV2(QueryExecutorConfig queryExecutorConfig) {

@Override
public Plan makeInstancePlan(List<IndexSegment> indexSegments, QueryContext queryContext,
ExecutorService executorService, long timeOutMs) {
ExecutorService executorService, long endTimeMs) {
List<PlanNode> planNodes = new ArrayList<>(indexSegments.size());
for (IndexSegment indexSegment : indexSegments) {
planNodes.add(makeSegmentPlanNode(indexSegment, queryContext));
}
CombinePlanNode combinePlanNode =
new CombinePlanNode(planNodes, queryContext, executorService, timeOutMs, _numGroupsLimit);
new CombinePlanNode(planNodes, queryContext, executorService, endTimeMs, _numGroupsLimit);
return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public interface PlanMaker {
* Returns an instance level {@link Plan} which contains the logical execution plan for multiple segments.
*/
Plan makeInstancePlan(List<IndexSegment> indexSegments, QueryContext queryContext, ExecutorService executorService,
long timeoutMs);
long endTimeMs);

/**
* Returns a segment level {@link PlanNode} which contains the logical execution plan for one segment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService e
if (schedulerWaitTimer != null) {
schedulerWaitTimer.stopAndRecord();
}
long querySchedulingTimeMs = System.currentTimeMillis() - timerContext.getQueryArrivalTimeMs();
long queryArrivalTimeMs = timerContext.getQueryArrivalTimeMs();
long querySchedulingTimeMs = System.currentTimeMillis() - queryArrivalTimeMs;
TimerContext.Timer queryProcessingTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PROCESSING);

long requestId = queryRequest.getRequestId();
Expand All @@ -116,10 +117,9 @@ public DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService e
queryTimeoutMs = timeoutFromQueryOptions;
}
}
long remainingTimeMs = queryTimeoutMs - querySchedulingTimeMs;

// Query scheduler wait time already exceeds query timeout, directly return
if (remainingTimeMs <= 0) {
if (querySchedulingTimeMs >= queryTimeoutMs) {
_serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.SCHEDULING_TIMEOUT_EXCEPTIONS, 1);
String errorMessage = String
.format("Query scheduling took %dms (longer than query timeout of %dms)", querySchedulingTimeMs,
Expand Down Expand Up @@ -213,8 +213,8 @@ public DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService e
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
indexSegments.add(segmentDataManager.getSegment());
}
Plan globalQueryPlan =
_planMaker.makeInstancePlan(indexSegments, queryContext, executorService, remainingTimeMs);
long endTimeMs = queryArrivalTimeMs + queryTimeoutMs;
Plan globalQueryPlan = _planMaker.makeInstancePlan(indexSegments, queryContext, executorService, endTimeMs);
planBuildTimer.stopAndRecord();

TimerContext.Timer planExecTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.Executors;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.segment.ReadMode;
import org.apache.pinot.common.utils.CommonConstants.Server;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.data.readers.GenericRowRecordReader;
import org.apache.pinot.core.indexsegment.IndexSegment;
Expand Down Expand Up @@ -227,8 +228,9 @@ private IntermediateResultsBlock getCombineResult(String query) {
for (IndexSegment indexSegment : _indexSegments) {
planNodes.add(PLAN_MAKER.makeSegmentPlanNode(indexSegment, queryContext));
}
CombinePlanNode combinePlanNode =
new CombinePlanNode(planNodes, queryContext, EXECUTOR, 1000, InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, queryContext, EXECUTOR,
System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS,
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
return combinePlanNode.run().nextBlock();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.apache.pinot.common.utils.CommonConstants.Server;
import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
Expand Down Expand Up @@ -55,7 +56,8 @@ public void testParallelExecution() {
return null;
});
}
CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService, 1000,
CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService,
System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS,
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
combinePlanNode.run();
Assert.assertEquals(numPlans, count.get());
Expand All @@ -64,15 +66,13 @@ public void testParallelExecution() {

@Test
public void testSlowPlanNode() {
// Warning: this test is slow (take 10 seconds).

AtomicBoolean notInterrupted = new AtomicBoolean();

List<PlanNode> planNodes = new ArrayList<>();
for (int i = 0; i < 20; i++) {
planNodes.add(() -> {
try {
Thread.sleep(20000);
Thread.sleep(10000);
} catch (InterruptedException e) {
// Thread should be interrupted
throw new RuntimeException(e);
Expand All @@ -81,8 +81,9 @@ public void testSlowPlanNode() {
return null;
});
}
CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService, 0,
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
CombinePlanNode combinePlanNode =
new CombinePlanNode(planNodes, _queryContext, _executorService, System.currentTimeMillis() + 100,
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
try {
combinePlanNode.run();
} catch (RuntimeException e) {
Expand All @@ -102,7 +103,8 @@ public void testPlanNodeThrowException() {
throw new RuntimeException("Inner exception message.");
});
}
CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService, 0,
CombinePlanNode combinePlanNode = new CombinePlanNode(planNodes, _queryContext, _executorService,
System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS,
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
try {
combinePlanNode.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ protected BrokerResponseNative getBrokerResponseForSqlQuery(String sqlQuery, Pla
*/
private BrokerResponseNative getBrokerResponse(QueryContext queryContext, PlanMaker planMaker) {
// Server side.
Plan plan = planMaker
.makeInstancePlan(getIndexSegments(), queryContext, EXECUTOR_SERVICE, Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
Plan plan = planMaker.makeInstancePlan(getIndexSegments(), queryContext, EXECUTOR_SERVICE,
System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
DataTable instanceResponse = plan.execute();

// Broker side.
Expand Down
Loading