Skip to content

Commit 81f95b9

Browse files
authored
Introduce compute listener (#110400)
Currently, if a child request fails, we automatically trigger cancellation for ES|QL requests. This can result in TaskCancelledException being collected by the RefCountingListener first, which then returns that exception to the caller. For example, if we encounter a CircuitBreakingException (429), we might incorrectly return a TaskCancelledException (400) instead. This change introduces the ComputeListener, a variant of RefCountingListener, which selects the most appropriate exception to return to the caller. I also integrated the following features into ComputeListener to simplify ComputeService: - Automatic cancellation of sub-tasks on failure. - Collection of profiles from sub-tasks. - Collection of response headers from sub-tasks.
1 parent 27e6b37 commit 81f95b9

File tree

9 files changed

+657
-230
lines changed

9 files changed

+657
-230
lines changed

docs/changelog/110400.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 110400
2+
summary: Introduce compute listener
3+
area: ES|QL
4+
type: bug
5+
issues: []

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,11 @@
2121
import org.elasticsearch.core.TimeValue;
2222
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
2323
import org.elasticsearch.index.seqno.SequenceNumbers;
24-
import org.elasticsearch.tasks.TaskCancelledException;
2524
import org.elasticsearch.xcontent.XContentBuilder;
2625

2726
import java.io.IOException;
2827
import java.util.Map;
2928
import java.util.Objects;
30-
import java.util.concurrent.atomic.AtomicReference;
3129
import java.util.concurrent.atomic.LongAdder;
3230

3331
/**
@@ -40,7 +38,7 @@ public abstract class AsyncOperator implements Operator {
4038
private volatile SubscribableListener<Void> blockedFuture;
4139

4240
private final Map<Long, Page> buffers = ConcurrentCollections.newConcurrentMap();
43-
private final AtomicReference<Exception> failure = new AtomicReference<>();
41+
private final FailureCollector failureCollector = new FailureCollector();
4442
private final DriverContext driverContext;
4543

4644
private final int maxOutstandingRequests;
@@ -77,7 +75,7 @@ public boolean needsInput() {
7775

7876
@Override
7977
public void addInput(Page input) {
80-
if (failure.get() != null) {
78+
if (failureCollector.hasFailure()) {
8179
input.releaseBlocks();
8280
return;
8381
}
@@ -90,7 +88,7 @@ public void addInput(Page input) {
9088
onSeqNoCompleted(seqNo);
9189
}, e -> {
9290
releasePageOnAnyThread(input);
93-
onFailure(e);
91+
failureCollector.unwrapAndCollect(e);
9492
onSeqNoCompleted(seqNo);
9593
});
9694
final long startNanos = System.nanoTime();
@@ -121,31 +119,12 @@ private void releasePageOnAnyThread(Page page) {
121119

122120
protected abstract void doClose();
123121

124-
private void onFailure(Exception e) {
125-
failure.getAndUpdate(first -> {
126-
if (first == null) {
127-
return e;
128-
}
129-
// ignore subsequent TaskCancelledException exceptions as they don't provide useful info.
130-
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
131-
return first;
132-
}
133-
if (ExceptionsHelper.unwrap(first, TaskCancelledException.class) != null) {
134-
return e;
135-
}
136-
if (ExceptionsHelper.unwrapCause(first) != ExceptionsHelper.unwrapCause(e)) {
137-
first.addSuppressed(e);
138-
}
139-
return first;
140-
});
141-
}
142-
143122
private void onSeqNoCompleted(long seqNo) {
144123
checkpoint.markSeqNoAsProcessed(seqNo);
145124
if (checkpoint.getPersistedCheckpoint() < checkpoint.getProcessedCheckpoint()) {
146125
notifyIfBlocked();
147126
}
148-
if (closed || failure.get() != null) {
127+
if (closed || failureCollector.hasFailure()) {
149128
discardPages();
150129
}
151130
}
@@ -164,7 +143,7 @@ private void notifyIfBlocked() {
164143
}
165144

166145
private void checkFailure() {
167-
Exception e = failure.get();
146+
Exception e = failureCollector.getFailure();
168147
if (e != null) {
169148
discardPages();
170149
throw ExceptionsHelper.convertToElastic(e);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverRunner.java

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,11 @@
77

88
package org.elasticsearch.compute.operator;
99

10-
import org.elasticsearch.ExceptionsHelper;
1110
import org.elasticsearch.action.ActionListener;
1211
import org.elasticsearch.common.util.concurrent.CountDown;
1312
import org.elasticsearch.common.util.concurrent.ThreadContext;
14-
import org.elasticsearch.tasks.TaskCancelledException;
1513

1614
import java.util.List;
17-
import java.util.concurrent.atomic.AtomicReference;
1815

1916
/**
2017
* Run a set of drivers to completion.
@@ -35,8 +32,8 @@ public DriverRunner(ThreadContext threadContext) {
3532
* Run all drivers to completion asynchronously.
3633
*/
3734
public void runToCompletion(List<Driver> drivers, ActionListener<Void> listener) {
38-
AtomicReference<Exception> failure = new AtomicReference<>();
3935
var responseHeadersCollector = new ResponseHeadersCollector(threadContext);
36+
var failure = new FailureCollector();
4037
CountDown counter = new CountDown(drivers.size());
4138
for (int i = 0; i < drivers.size(); i++) {
4239
Driver driver = drivers.get(i);
@@ -48,23 +45,7 @@ public void onResponse(Void unused) {
4845

4946
@Override
5047
public void onFailure(Exception e) {
51-
failure.getAndUpdate(first -> {
52-
if (first == null) {
53-
return e;
54-
}
55-
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
56-
return first;
57-
} else {
58-
if (ExceptionsHelper.unwrap(first, TaskCancelledException.class) != null) {
59-
return e;
60-
} else {
61-
if (first != e) {
62-
first.addSuppressed(e);
63-
}
64-
return first;
65-
}
66-
}
67-
});
48+
failure.unwrapAndCollect(e);
6849
for (Driver d : drivers) {
6950
if (driver != d) {
7051
d.cancel("Driver [" + driver.sessionId() + "] was cancelled or failed");
@@ -77,7 +58,7 @@ private void done() {
7758
responseHeadersCollector.collect();
7859
if (counter.countDown()) {
7960
responseHeadersCollector.finish();
80-
Exception error = failure.get();
61+
Exception error = failure.getFailure();
8162
if (error != null) {
8263
listener.onFailure(error);
8364
} else {
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import org.elasticsearch.ElasticsearchException;
11+
import org.elasticsearch.ExceptionsHelper;
12+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
13+
import org.elasticsearch.tasks.TaskCancelledException;
14+
import org.elasticsearch.transport.TransportException;
15+
16+
import java.util.List;
17+
import java.util.Queue;
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
20+
/**
21+
* {@code FailureCollector} is responsible for collecting exceptions that occur in the compute engine.
22+
* The collected exceptions are categorized into task-cancelled and non-task-cancelled exceptions.
23+
* To limit memory usage, this class collects only the first 10 exceptions in each category by default.
24+
* When returning the accumulated failure to the caller, this class prefers non-task-cancelled exceptions
25+
* over task-cancelled ones as they are more useful for diagnosing issues.
26+
*/
27+
public final class FailureCollector {
28+
private final Queue<Exception> cancelledExceptions = ConcurrentCollections.newQueue();
29+
private final AtomicInteger cancelledExceptionsCount = new AtomicInteger();
30+
31+
private final Queue<Exception> nonCancelledExceptions = ConcurrentCollections.newQueue();
32+
private final AtomicInteger nonCancelledExceptionsCount = new AtomicInteger();
33+
34+
private final int maxExceptions;
35+
private volatile boolean hasFailure = false;
36+
private Exception finalFailure = null;
37+
38+
public FailureCollector() {
39+
this(10);
40+
}
41+
42+
public FailureCollector(int maxExceptions) {
43+
if (maxExceptions <= 0) {
44+
throw new IllegalArgumentException("maxExceptions must be at least one");
45+
}
46+
this.maxExceptions = maxExceptions;
47+
}
48+
49+
public void unwrapAndCollect(Exception originEx) {
50+
final Exception e = originEx instanceof TransportException
51+
? (originEx.getCause() instanceof Exception cause ? cause : new ElasticsearchException(originEx.getCause()))
52+
: originEx;
53+
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
54+
if (cancelledExceptionsCount.incrementAndGet() <= maxExceptions) {
55+
cancelledExceptions.add(e);
56+
}
57+
} else {
58+
if (nonCancelledExceptionsCount.incrementAndGet() <= maxExceptions) {
59+
nonCancelledExceptions.add(e);
60+
}
61+
}
62+
hasFailure = true;
63+
}
64+
65+
/**
66+
* @return {@code true} if any failure has been collected, {@code false} otherwise
67+
*/
68+
public boolean hasFailure() {
69+
return hasFailure;
70+
}
71+
72+
/**
73+
* Returns the accumulated failure, preferring non-task-cancelled exceptions over task-cancelled ones.
74+
* Once this method builds the failure, incoming failures are discarded.
75+
*
76+
* @return the accumulated failure, or {@code null} if no failure has been collected
77+
*/
78+
public Exception getFailure() {
79+
if (hasFailure == false) {
80+
return null;
81+
}
82+
synchronized (this) {
83+
if (finalFailure == null) {
84+
finalFailure = buildFailure();
85+
}
86+
return finalFailure;
87+
}
88+
}
89+
90+
private Exception buildFailure() {
91+
assert hasFailure;
92+
assert Thread.holdsLock(this);
93+
int total = 0;
94+
Exception first = null;
95+
for (var exceptions : List.of(nonCancelledExceptions, cancelledExceptions)) {
96+
for (Exception e : exceptions) {
97+
if (first == null) {
98+
first = e;
99+
total++;
100+
} else if (first != e) {
101+
first.addSuppressed(e);
102+
total++;
103+
}
104+
if (total >= maxExceptions) {
105+
return first;
106+
}
107+
}
108+
}
109+
assert first != null;
110+
return first;
111+
}
112+
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,18 @@
77

88
package org.elasticsearch.compute.operator.exchange;
99

10-
import org.elasticsearch.ElasticsearchException;
1110
import org.elasticsearch.ExceptionsHelper;
1211
import org.elasticsearch.action.ActionListener;
1312
import org.elasticsearch.action.support.RefCountingListener;
1413
import org.elasticsearch.action.support.SubscribableListener;
1514
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
1615
import org.elasticsearch.compute.data.Page;
16+
import org.elasticsearch.compute.operator.FailureCollector;
1717
import org.elasticsearch.core.Releasable;
18-
import org.elasticsearch.tasks.TaskCancelledException;
19-
import org.elasticsearch.transport.TransportException;
2018

2119
import java.util.List;
2220
import java.util.concurrent.Executor;
2321
import java.util.concurrent.atomic.AtomicInteger;
24-
import java.util.concurrent.atomic.AtomicReference;
2522

2623
/**
2724
* An {@link ExchangeSourceHandler} asynchronously fetches pages and status from multiple {@link RemoteSink}s
@@ -37,7 +34,7 @@ public final class ExchangeSourceHandler {
3734

3835
private final PendingInstances outstandingSinks;
3936
private final PendingInstances outstandingSources;
40-
private final AtomicReference<Exception> failure = new AtomicReference<>();
37+
private final FailureCollector failure = new FailureCollector();
4138

4239
public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor) {
4340
this.buffer = new ExchangeBuffer(maxBufferSize);
@@ -54,7 +51,7 @@ private class ExchangeSourceImpl implements ExchangeSource {
5451
}
5552

5653
private void checkFailure() {
57-
Exception e = failure.get();
54+
Exception e = failure.getFailure();
5855
if (e != null) {
5956
throw ExceptionsHelper.convertToElastic(e);
6057
}
@@ -172,7 +169,7 @@ void fetchPage() {
172169
while (loopControl.isRunning()) {
173170
loopControl.exiting();
174171
// finish other sinks if one of them failed or source no longer need pages.
175-
boolean toFinishSinks = buffer.noMoreInputs() || failure.get() != null;
172+
boolean toFinishSinks = buffer.noMoreInputs() || failure.hasFailure();
176173
remoteSink.fetchPageAsync(toFinishSinks, ActionListener.wrap(resp -> {
177174
Page page = resp.takePage();
178175
if (page != null) {
@@ -199,26 +196,8 @@ void fetchPage() {
199196
loopControl.exited();
200197
}
201198

202-
void onSinkFailed(Exception originEx) {
203-
final Exception e = originEx instanceof TransportException
204-
? (originEx.getCause() instanceof Exception cause ? cause : new ElasticsearchException(originEx.getCause()))
205-
: originEx;
206-
failure.getAndUpdate(first -> {
207-
if (first == null) {
208-
return e;
209-
}
210-
// ignore subsequent TaskCancelledException exceptions as they don't provide useful info.
211-
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
212-
return first;
213-
}
214-
if (ExceptionsHelper.unwrap(first, TaskCancelledException.class) != null) {
215-
return e;
216-
}
217-
if (ExceptionsHelper.unwrapCause(first) != ExceptionsHelper.unwrapCause(e)) {
218-
first.addSuppressed(e);
219-
}
220-
return first;
221-
});
199+
void onSinkFailed(Exception e) {
200+
failure.unwrapAndCollect(e);
222201
buffer.waitForReading().onResponse(null); // resume the Driver if it is being blocked on reading
223202
onSinkComplete();
224203
}

0 commit comments

Comments
 (0)