-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TaskExecutor to cancel all tasks on exception #12689
TaskExecutor to cancel all tasks on exception #12689
Conversation
When operations are parallelized, like query rewrite, or search, or createWeight, one of the tasks may throw an exception. In that case we wait for all tasks to be completed before re-throwing the exception that were caught. Tasks that were not started when the exception is captured though can be safely skipped. Ideally we would also cancel ongoing tasks but I left that for another time.
* | ||
* @param <T> the return type of all the callables | ||
*/ | ||
private static final class TaskGroup<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the diff is hard to read. The task is the same as before, with the addition of some FutureTask methods override:
- setException to handle the exception and cancel all tasks on exception. we can't simply catch the exception on run, because FutureTask#run does not throw it. We could have wrapped the callable, but I prefer leaving the original callable unchanged and overrideing FutureTask behavior instead
- cancel to deal with task cancellations: the original behaviour would be for future.get to throw a cancellation exception when called on a cancelled task, while still leaving the task running. We instead want to wait for all tasks to be completed before returning.
I introduced the private TaskGroup abstraction to group all tasks and primarily to make the cancelAll
method available against a final list. This is to address the dependency between the creation of the FutureTask, which needs to cancel all tasks on exception, yet the tasks list is populated only once all tasks have been created.
} | ||
} | ||
} | ||
assert assertAllFuturesCompleted() : "Some tasks are still running?"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is new: we have a test for it but I thought an assertion is also good, to verify that we don't leave running tasks behind before returning.
@@ -43,7 +47,8 @@ public class TestTaskExecutor extends LuceneTestCase { | |||
public static void createExecutor() { | |||
executorService = | |||
Executors.newFixedThreadPool( | |||
1, new NamedThreadFactory(TestTaskExecutor.class.getSimpleName())); | |||
random().nextBoolean() ? 1 : 2, | |||
new NamedThreadFactory(TestTaskExecutor.class.getSimpleName())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have a test that verifies the case for multiple exceptions, which becomes impossible to reproduce with a single threaded pool, because the first exception makes us cancel all other tasks, hence you can't have another exception thrown. That's why I randomized the pool size between 1 and 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks good to me in general, I'm just slightly uncomfortable with extending FutureTask
with so many changes in behavior, I'd rather like to have our own RunnableFuture
.
exc.addSuppressed(newException); | ||
private FutureTask<T> createTask(Callable<T> callable) { | ||
AtomicBoolean started = new AtomicBoolean(false); | ||
return new FutureTask<>(callable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point, we're changing FutureTask
's behavior enough that I'm wondering if we should implement RunnableFuture
directly (possibly wrapping a FutureTask
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure. I guess that overriding run, setException and cancel makes it a lot. yet RunnableFuture has a lot of methods that I am not keen on implementing myself :)
I could go for wrapping the callable instead , that would remove the need to override run and setException. It would be the same logic but integrated into the callable itself that is provided to FutureTask. Would you be more comfortable with that approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise I can go for your suggested wrapping approach, that still relies on FutureTask internally, but I am not entirely sure how that differs from extending FutureTask directly. Could you help me understand that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my view it differs because it makes it more obvious what is handed over to FutureTask vs. what not. To be clear, I'm nit picking a bit here. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am happy to make you happy :) I take that you'd prefer to wrap FutureTask over wrapping the callable in combination with extending FutureTask#cancel (another alternative I mentioned above that requires less extending of FutureTask)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried wrapping FutureTask, but the methods that I was calling I could only call from a sub-class, as they are protected. I did update my PR to wrap the callable, which requires less customization of FutureTask (only cancel and isCancelled need to be overwritten). I am now wondering if it still makes sense to have our own RunnableFuture instead of just subclassing FutureTask. the former seems overkill to me at this point, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks for the review @jpountz ! |
When operations are parallelized, like query rewrite, or search, or createWeight, one of the tasks may throw an exception. In that case we wait for all tasks to be completed before re-throwing the exception that were caught. Tasks that were not started when the exception is captured though can be safely skipped. Ideally we would also cancel ongoing tasks but I left that for another time.
When operations are parallelized, like query rewrite, or search, or
createWeight, one of the tasks may throw an exception. In that case we
wait for all tasks to be completed before re-throwing the exception that
were caught. Tasks that were not started when the exception is captured
though can be safely skipped. Ideally we would also cancel ongoing tasks
but I left that for another time.