Skip to content

Commit a032d2f

Browse files
williamrandolphjasontedor
authored andcommitted
Allow unknown task time in QueueResizingEsTPE (#41810)
* Allow unknown task time in QueueResizingEsTPE The afterExecute method previously asserted that a TimedRunnable task must have a positive execution time. However, the code in TimedRunnable returns a value of -1 when a task time is unknown. Here, we expand the logic in the assertion to allow for that possibility, and we don't update our task time average if the value is negative. Fixes #41448 * Add a failure flag to TimedRunnable In order to be sure that a task has an execution time of -1 because of a failure, I'm adding a failure flag boolean to the TimedRunnable class. If execution time is negative for some other reason, an assertion will fail.
1 parent 091d43c commit a032d2f

File tree

4 files changed

+60
-10
lines changed

4 files changed

+60
-10
lines changed

server/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,11 +148,17 @@ protected void afterExecute(Runnable r, Throwable t) {
148148
assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue";
149149
final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r);
150150
final long taskNanos = timedRunnable.getTotalNanos();
151+
final boolean failedOrRejected = timedRunnable.getFailedOrRejected();
151152
final long totalNanos = totalTaskNanos.addAndGet(taskNanos);
152153

153154
final long taskExecutionNanos = timedRunnable.getTotalExecutionNanos();
154-
assert taskExecutionNanos >= 0 : "expected task to always take longer than 0 nanoseconds, got: " + taskExecutionNanos;
155-
executionEWMA.addValue(taskExecutionNanos);
155+
assert taskExecutionNanos >= 0 || (failedOrRejected && taskExecutionNanos == -1) :
156+
"expected task to always take longer than 0 nanoseconds or have '-1' failure code, got: " + taskExecutionNanos +
157+
", failedOrRejected: " + failedOrRejected;
158+
if (taskExecutionNanos != -1) {
159+
// taskExecutionNanos may be -1 if the task threw an exception
160+
executionEWMA.addValue(taskExecutionNanos);
161+
}
156162

157163
if (taskCount.incrementAndGet() == this.tasksPerFrame) {
158164
final long endTimeNs = System.nanoTime();

server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class TimedRunnable extends AbstractRunnable implements WrappedRunnable {
3030
private final long creationTimeNanos;
3131
private long startTimeNanos;
3232
private long finishTimeNanos = -1;
33+
private boolean failedOrRejected = false;
3334

3435
TimedRunnable(final Runnable original) {
3536
this.original = original;
@@ -48,6 +49,7 @@ public void doRun() {
4849

4950
@Override
5051
public void onRejection(final Exception e) {
52+
this.failedOrRejected = true;
5153
if (original instanceof AbstractRunnable) {
5254
((AbstractRunnable) original).onRejection(e);
5355
} else {
@@ -64,6 +66,7 @@ public void onAfter() {
6466

6567
@Override
6668
public void onFailure(final Exception e) {
69+
this.failedOrRejected = true;
6770
if (original instanceof AbstractRunnable) {
6871
((AbstractRunnable) original).onFailure(e);
6972
} else {
@@ -100,6 +103,14 @@ long getTotalExecutionNanos() {
100103
return Math.max(finishTimeNanos - startTimeNanos, 1);
101104
}
102105

106+
/**
107+
* If the task was failed or rejected, return true.
108+
* Otherwise, false.
109+
*/
110+
boolean getFailedOrRejected() {
111+
return this.failedOrRejected;
112+
}
113+
103114
@Override
104115
public Runnable unwrap() {
105116
return original;

server/src/test/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutorTests.java

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -226,16 +226,43 @@ public void testExecutionEWMACalculation() throws Exception {
226226
context.close();
227227
}
228228

229+
/** Use a runnable wrapper that simulates a task with unknown failures. */
230+
public void testExceptionThrowingTask() throws Exception {
231+
ThreadContext context = new ThreadContext(Settings.EMPTY);
232+
ResizableBlockingQueue<Runnable> queue =
233+
new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(),
234+
100);
235+
236+
QueueResizingEsThreadPoolExecutor executor =
237+
new QueueResizingEsThreadPoolExecutor(
238+
"test-threadpool", 1, 1, 1000,
239+
TimeUnit.MILLISECONDS, queue, 10, 200, exceptionalWrapper(), 10, TimeValue.timeValueMillis(1),
240+
EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context);
241+
executor.prestartAllCoreThreads();
242+
logger.info("--> executor: {}", executor);
243+
244+
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(0L));
245+
executeTask(executor, 1);
246+
executor.shutdown();
247+
executor.awaitTermination(10, TimeUnit.SECONDS);
248+
context.close();
249+
}
250+
229251
private Function<Runnable, WrappedRunnable> fastWrapper() {
230-
return (runnable) -> {
231-
return new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(100));
232-
};
252+
return (runnable) -> new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(100), false);
233253
}
234254

235255
private Function<Runnable, WrappedRunnable> slowWrapper() {
236-
return (runnable) -> {
237-
return new SettableTimedRunnable(TimeUnit.MINUTES.toNanos(2));
238-
};
256+
return (runnable) -> new SettableTimedRunnable(TimeUnit.MINUTES.toNanos(2), false);
257+
}
258+
259+
/**
260+
* The returned function outputs a WrappedRunnabled that simulates the case
261+
* where {@link TimedRunnable#getTotalExecutionNanos()} returns -1 because
262+
* the job failed or was rejected before it finished.
263+
*/
264+
private Function<Runnable, WrappedRunnable> exceptionalWrapper() {
265+
return (runnable) -> new SettableTimedRunnable(TimeUnit.NANOSECONDS.toNanos(-1), true);
239266
}
240267

241268
/** Execute a blank task {@code times} times for the executor */
@@ -248,10 +275,12 @@ private void executeTask(QueueResizingEsThreadPoolExecutor executor, int times)
248275

249276
public class SettableTimedRunnable extends TimedRunnable {
250277
private final long timeTaken;
278+
private final boolean testFailedOrRejected;
251279

252-
public SettableTimedRunnable(long timeTaken) {
280+
public SettableTimedRunnable(long timeTaken, boolean failedOrRejected) {
253281
super(() -> {});
254282
this.timeTaken = timeTaken;
283+
this.testFailedOrRejected = failedOrRejected;
255284
}
256285

257286
@Override
@@ -263,5 +292,10 @@ public long getTotalNanos() {
263292
public long getTotalExecutionNanos() {
264293
return timeTaken;
265294
}
295+
296+
@Override
297+
public boolean getFailedOrRejected() {
298+
return testFailedOrRejected;
299+
}
266300
}
267301
}

server/src/test/java/org/elasticsearch/node/NodeTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,6 @@ public void testAwaitCloseTimeoutsOnNonInterruptibleTask() throws Exception {
174174
shouldRun.set(false);
175175
}
176176

177-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/41448")
178177
public void testCloseOnInterruptibleTask() throws Exception {
179178
Node node = new MockNode(baseSettings().build(), basePlugins());
180179
node.start();

0 commit comments

Comments
 (0)