Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,22 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine

private static final String EXPLAIN_NAME = "COMBINE_SELECT_ORDERBY_MINMAX";

// For min/max value based combine, when a thread detects that no more segments need to be processed, it inserts this
// special IntermediateResultsBlock into the BlockingQueue to awake the main thread
private static final BaseResultsBlock LAST_RESULTS_BLOCK =
// For min/max value based combine, when a thread detects that no more segment needs to be processed, it inserts this
// special results block, which can be skipped during the merge phase
private static final BaseResultsBlock EMPTY_RESULTS_BLOCK =
new SelectionResultsBlock(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]),
Collections.emptyList());

// Use an AtomicInteger to track the number of operators skipped (no result inserted into the BlockingQueue)
private final AtomicInteger _numOperatorsSkipped = new AtomicInteger();
private final AtomicReference<Comparable> _globalBoundaryValue = new AtomicReference<>();
// Use an AtomicInteger to track the end operator id, beyond which no operator needs to be processed
private final AtomicInteger _endOperatorId;
private final int _numRowsToKeep;
private final List<MinMaxValueContext> _minMaxValueContexts;
private final AtomicReference<Comparable> _globalBoundaryValue = new AtomicReference<>();

MinMaxValueBasedSelectionOrderByCombineOperator(List<Operator> operators, QueryContext queryContext,
ExecutorService executorService) {
super(operators, queryContext, executorService);
_endOperatorId = new AtomicInteger(_numOperators);
_numRowsToKeep = queryContext.getLimit() + queryContext.getOffset();

List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions();
Expand Down Expand Up @@ -146,6 +147,11 @@ protected void processSegments() {

int operatorId;
while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
if (operatorId >= _endOperatorId.get()) {
_blockingQueue.offer(EMPTY_RESULTS_BLOCK);
continue;
}

// Calculate the boundary value from global boundary and thread boundary
Comparable boundaryValue = _globalBoundaryValue.get();
if (boundaryValue == null) {
Expand Down Expand Up @@ -173,9 +179,9 @@ protected void processSegments() {
if (minMaxValueContext._minValue != null) {
int result = minMaxValueContext._minValue.compareTo(boundaryValue);
if (result > 0 || (result == 0 && numOrderByExpressions == 1)) {
_numOperatorsSkipped.getAndAdd((_numOperators - operatorId - 1) / _numTasks);
_blockingQueue.offer(LAST_RESULTS_BLOCK);
return;
_endOperatorId.set(operatorId);
_blockingQueue.offer(EMPTY_RESULTS_BLOCK);
continue;
}
}
} else {
Expand All @@ -184,9 +190,9 @@ protected void processSegments() {
if (minMaxValueContext._maxValue != null) {
int result = minMaxValueContext._maxValue.compareTo(boundaryValue);
if (result < 0 || (result == 0 && numOrderByExpressions == 1)) {
_numOperatorsSkipped.getAndAdd((_numOperators - operatorId - 1) / _numTasks);
_blockingQueue.offer(LAST_RESULTS_BLOCK);
return;
_endOperatorId.set(operatorId);
_blockingQueue.offer(EMPTY_RESULTS_BLOCK);
continue;
}
}
}
Expand Down Expand Up @@ -248,7 +254,7 @@ protected BaseResultsBlock mergeResults()
SelectionResultsBlock mergedBlock = null;
int numBlocksMerged = 0;
long endTimeMs = _queryContext.getEndTimeMs();
while (numBlocksMerged + _numOperatorsSkipped.get() < _numOperators) {
while (numBlocksMerged < _numOperators) {
BaseResultsBlock blockToMerge =
_blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (blockToMerge == null) {
Expand All @@ -258,6 +264,10 @@ protected BaseResultsBlock mergeResults()
return new ExceptionResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
new TimeoutException("Timed out while polling results block")));
}
if (blockToMerge == EMPTY_RESULTS_BLOCK) {
numBlocksMerged++;
continue;
}
if (blockToMerge.getProcessingExceptions() != null) {
// Caught exception while processing segment, skip merging the remaining results blocks and directly return
// the exception
Expand All @@ -266,9 +276,7 @@ protected BaseResultsBlock mergeResults()
if (mergedBlock == null) {
mergedBlock = (SelectionResultsBlock) blockToMerge;
} else {
if (blockToMerge != LAST_RESULTS_BLOCK) {
mergeResultsBlocks(mergedBlock, (SelectionResultsBlock) blockToMerge);
}
mergeResultsBlocks(mergedBlock, (SelectionResultsBlock) blockToMerge);
}
numBlocksMerged++;

Expand Down