Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.core.common.Operator;
Expand Down Expand Up @@ -60,7 +61,9 @@ public abstract class BaseCombineOperator<T extends BaseResultsBlock> extends Ba
protected final ExecutorService _executorService;
protected final int _numTasks;
protected final Future[] _futures;
// Use a _blockingQueue to store the intermediate results blocks
// Use an AtomicInteger to track the next operator to execute
protected final AtomicInteger _nextOperatorId = new AtomicInteger();
// Use a BlockingQueue to store the intermediate results blocks
protected final BlockingQueue<BaseResultsBlock> _blockingQueue = new LinkedBlockingQueue<>();
protected final AtomicLong _totalWorkerThreadCpuTimeNs = new AtomicLong(0);

Expand All @@ -86,7 +89,6 @@ protected BaseResultsBlock getNextBlock() {
Phaser phaser = new Phaser(1);
Tracing.activeRecording().setNumTasks(_numTasks);
for (int i = 0; i < _numTasks; i++) {
int taskIndex = i;
_futures[i] = _executorService.submit(new TraceRunnable() {
@Override
public void runJob() {
Expand All @@ -100,7 +102,7 @@ public void runJob() {
return;
}
try {
processSegments(taskIndex);
processSegments();
} catch (EarlyTerminationException e) {
// Early-terminated by interruption (canceled by the main thread)
} catch (Exception e) {
Expand Down Expand Up @@ -151,9 +153,10 @@ public void runJob() {
/**
* Executes query on one or more segments in a worker thread.
*/
protected void processSegments(int taskIndex) {
for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
Operator operator = _operators.get(operatorIndex);
protected void processSegments() {
int operatorId;
while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
Operator operator = _operators.get(operatorId);
T resultsBlock;
try {
if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
Expand All @@ -176,14 +179,14 @@ protected void processSegments(int taskIndex) {
}

/**
* Invoked when {@link #processSegments(int)} throws exception.
* Invoked when {@link #processSegments()} throws exception.
*/
protected void onException(Exception e) {
_blockingQueue.offer(new ExceptionResultsBlock(e));
}

/**
* Invoked when {@link #processSegments(int)} is finished (called in the finally block).
* Invoked when {@link #processSegments()} is finished (called in the finally block).
*/
protected void onFinish() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,10 @@ public String toExplainString() {
* Executes query on one segment in a worker thread and merges the results into the indexed table.
*/
@Override
protected void processSegments(int taskIndex) {
for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
Operator operator = _operators.get(operatorIndex);
protected void processSegments() {
int operatorId;
while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
Operator operator = _operators.get(operatorId);
try {
if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
((AcquireReleaseColumnsSegmentOperator) operator).acquire();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public String toExplainString() {
* documents to fulfill the LIMIT and OFFSET requirement.
*/
@Override
protected void processSegments(int taskIndex) {
protected void processSegments() {
List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions();
assert orderByExpressions != null;
int numOrderByExpressions = orderByExpressions.size();
Expand All @@ -144,7 +144,8 @@ protected void processSegments(int taskIndex) {
// segment result is merged.
Comparable threadBoundaryValue = null;

for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
int operatorId;
while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
// Calculate the boundary value from global boundary and thread boundary
Comparable boundaryValue = _globalBoundaryValue.get();
if (boundaryValue == null) {
Expand All @@ -164,15 +165,15 @@ protected void processSegments(int taskIndex) {
}

// Check if the segment can be skipped
MinMaxValueContext minMaxValueContext = _minMaxValueContexts.get(operatorIndex);
MinMaxValueContext minMaxValueContext = _minMaxValueContexts.get(operatorId);
if (boundaryValue != null) {
if (asc) {
// For ascending order, no need to process more segments if the column min value is larger than the
// boundary value, or is equal to the boundary value and the there is only one order-by expression
if (minMaxValueContext._minValue != null) {
int result = minMaxValueContext._minValue.compareTo(boundaryValue);
if (result > 0 || (result == 0 && numOrderByExpressions == 1)) {
_numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numTasks);
_numOperatorsSkipped.getAndAdd((_numOperators - operatorId - 1) / _numTasks);
_blockingQueue.offer(LAST_RESULTS_BLOCK);
return;
}
Expand All @@ -183,7 +184,7 @@ protected void processSegments(int taskIndex) {
if (minMaxValueContext._maxValue != null) {
int result = minMaxValueContext._maxValue.compareTo(boundaryValue);
if (result < 0 || (result == 0 && numOrderByExpressions == 1)) {
_numOperatorsSkipped.getAndAdd((_numOperators - operatorIndex - 1) / _numTasks);
_numOperatorsSkipped.getAndAdd((_numOperators - operatorId - 1) / _numTasks);
_blockingQueue.offer(LAST_RESULTS_BLOCK);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ public String toExplainString() {
}

@Override
protected void processSegments(int threadIndex) {
for (int operatorIndex = threadIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
Operator operator = _operators.get(operatorIndex);
protected void processSegments() {
int operatorId;
while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
Operator operator = _operators.get(operatorId);
SelectionResultsBlock resultsBlock;
try {
if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
Expand Down