Skip to content

Commit d60ec47

Browse files
committed
Introduce compute listener
1 parent bc1b77f commit d60ec47

File tree

4 files changed

+205
-200
lines changed

4 files changed

+205
-200
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,11 @@
2121
import org.elasticsearch.core.TimeValue;
2222
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
2323
import org.elasticsearch.index.seqno.SequenceNumbers;
24-
import org.elasticsearch.tasks.TaskCancelledException;
2524
import org.elasticsearch.xcontent.XContentBuilder;
2625

2726
import java.io.IOException;
2827
import java.util.Map;
2928
import java.util.Objects;
30-
import java.util.concurrent.atomic.AtomicReference;
3129
import java.util.concurrent.atomic.LongAdder;
3230

3331
/**
@@ -40,7 +38,7 @@ public abstract class AsyncOperator implements Operator {
4038
private volatile SubscribableListener<Void> blockedFuture;
4139

4240
private final Map<Long, Page> buffers = ConcurrentCollections.newConcurrentMap();
43-
private final AtomicReference<Exception> failure = new AtomicReference<>();
41+
private final FailureCollector failure = new FailureCollector();
4442
private final DriverContext driverContext;
4543

4644
private final int maxOutstandingRequests;
@@ -90,7 +88,7 @@ public void addInput(Page input) {
9088
onSeqNoCompleted(seqNo);
9189
}, e -> {
9290
releasePageOnAnyThread(input);
93-
onFailure(e);
91+
failure.unwrapAndCollect(e);
9492
onSeqNoCompleted(seqNo);
9593
});
9694
final long startNanos = System.nanoTime();
@@ -121,25 +119,6 @@ private void releasePageOnAnyThread(Page page) {
121119

122120
protected abstract void doClose();
123121

124-
private void onFailure(Exception e) {
125-
failure.getAndUpdate(first -> {
126-
if (first == null) {
127-
return e;
128-
}
129-
// ignore subsequent TaskCancelledException exceptions as they don't provide useful info.
130-
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
131-
return first;
132-
}
133-
if (ExceptionsHelper.unwrap(first, TaskCancelledException.class) != null) {
134-
return e;
135-
}
136-
if (ExceptionsHelper.unwrapCause(first) != ExceptionsHelper.unwrapCause(e)) {
137-
first.addSuppressed(e);
138-
}
139-
return first;
140-
});
141-
}
142-
143122
private void onSeqNoCompleted(long seqNo) {
144123
checkpoint.markSeqNoAsProcessed(seqNo);
145124
if (checkpoint.getPersistedCheckpoint() < checkpoint.getProcessedCheckpoint()) {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import org.elasticsearch.ElasticsearchException;
11+
import org.elasticsearch.ExceptionsHelper;
12+
import org.elasticsearch.tasks.TaskCancelledException;
13+
import org.elasticsearch.transport.TransportException;
14+
15+
import java.util.concurrent.Semaphore;
16+
import java.util.concurrent.atomic.AtomicReference;
17+
18+
/**
19+
* Collects exceptions occurred in compute, specifically ignores trivial exceptions such as {@link TaskCancelledException},
20+
* and selects the most appropriate exception to return to the caller.
21+
* To bound the memory usage, this class only collects the first 10 non-trivial exceptions.
22+
*/
23+
public final class FailureCollector {
24+
private final AtomicReference<Exception> exceptionRefs = new AtomicReference<>();
25+
private final Semaphore exceptionPermits = new Semaphore(10);
26+
27+
public void unwrapAndCollect(Exception originEx) {
28+
final Exception e = originEx instanceof TransportException
29+
? (originEx.getCause() instanceof Exception cause ? cause : new ElasticsearchException(originEx.getCause()))
30+
: originEx;
31+
exceptionRefs.getAndUpdate(first -> {
32+
if (first == null) {
33+
return e;
34+
}
35+
// ignore subsequent TaskCancelledException exceptions as they don't provide useful info.
36+
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
37+
return first;
38+
}
39+
if (ExceptionsHelper.unwrap(first, TaskCancelledException.class) != null) {
40+
return e;
41+
}
42+
if (ExceptionsHelper.unwrapCause(first) != ExceptionsHelper.unwrapCause(e)) {
43+
if (exceptionPermits.tryAcquire()) {
44+
first.addSuppressed(e);
45+
}
46+
}
47+
return first;
48+
});
49+
}
50+
51+
public Exception get() {
52+
return exceptionRefs.get();
53+
}
54+
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,18 @@
77

88
package org.elasticsearch.compute.operator.exchange;
99

10-
import org.elasticsearch.ElasticsearchException;
1110
import org.elasticsearch.ExceptionsHelper;
1211
import org.elasticsearch.action.ActionListener;
1312
import org.elasticsearch.action.support.RefCountingListener;
1413
import org.elasticsearch.action.support.SubscribableListener;
1514
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
1615
import org.elasticsearch.compute.data.Page;
16+
import org.elasticsearch.compute.operator.FailureCollector;
1717
import org.elasticsearch.core.Releasable;
18-
import org.elasticsearch.tasks.TaskCancelledException;
19-
import org.elasticsearch.transport.TransportException;
2018

2119
import java.util.List;
2220
import java.util.concurrent.Executor;
2321
import java.util.concurrent.atomic.AtomicInteger;
24-
import java.util.concurrent.atomic.AtomicReference;
2522

2623
/**
2724
* An {@link ExchangeSourceHandler} asynchronously fetches pages and status from multiple {@link RemoteSink}s
@@ -37,7 +34,7 @@ public final class ExchangeSourceHandler {
3734

3835
private final PendingInstances outstandingSinks;
3936
private final PendingInstances outstandingSources;
40-
private final AtomicReference<Exception> failure = new AtomicReference<>();
37+
private final FailureCollector failure = new FailureCollector();
4138

4239
public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor) {
4340
this.buffer = new ExchangeBuffer(maxBufferSize);
@@ -199,26 +196,8 @@ void fetchPage() {
199196
loopControl.exited();
200197
}
201198

202-
void onSinkFailed(Exception originEx) {
203-
final Exception e = originEx instanceof TransportException
204-
? (originEx.getCause() instanceof Exception cause ? cause : new ElasticsearchException(originEx.getCause()))
205-
: originEx;
206-
failure.getAndUpdate(first -> {
207-
if (first == null) {
208-
return e;
209-
}
210-
// ignore subsequent TaskCancelledException exceptions as they don't provide useful info.
211-
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
212-
return first;
213-
}
214-
if (ExceptionsHelper.unwrap(first, TaskCancelledException.class) != null) {
215-
return e;
216-
}
217-
if (ExceptionsHelper.unwrapCause(first) != ExceptionsHelper.unwrapCause(e)) {
218-
first.addSuppressed(e);
219-
}
220-
return first;
221-
});
199+
void onSinkFailed(Exception e) {
200+
failure.unwrapAndCollect(e);
222201
buffer.waitForReading().onResponse(null); // resume the Driver if it is being blocked on reading
223202
onSinkComplete();
224203
}

0 commit comments

Comments
 (0)