-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lazily submit tasks in ParallelIterable and add cancellation. #45
Lazily submit tasks in ParallelIterable and add cancellation. #45
Conversation
This removes the planner pool from ParallelIterable, which was used to submit all of the iterable's tasks in parallel. This was used to queue up tasks to read every manifest in a snapshot. However, when a caller stopped reading early, all tasks would still run and add results to the queue. Now, tasks are submitted from the thread consuming the iterator as it runs hasNext. If the caller stops consuming the iterator, then no new tasks are submitted. This also keeps track of the submitted tasks and will cancel them when the iterator is closed.
@mccheah, it would be great to get a review from you on this one. |
Still looking over this, but separately I wonder if we've looked into Java's built in parallel iterators, or spliterators. |
I'm not that impressed with the Spliterator API, and it wouldn't really help here where we want to control when background tasks are submitted and how they run. We could supply some implementation of tasks that is spliterable and use that as a fixed number of tasks, but that doesn't allow us to balance tasks across threads. It also doesn't allow us to avoid submitting tasks to read manifest contents into memory, which is the purpose of this commit. |
} | ||
|
||
private static class ParallelIterator<T> implements Iterator<T> { | ||
private static class ParallelIterator<T> implements Iterator<T>, Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if there's a more idiomatic way to do this, particularly one that doesn't require both:
- Busy waiting. Generally it's a flag in concurrent programming if busy waiting is used over alternative primitives like locks, conditions, queues, monitors, etc.
- Manual tracking of tasks by index.
I came up with the following alternative. Apologize that it has to be in pseudo-code form; due to the nature of the problem it's pretty hard to explain without the code. Let's see how this works out:
class ParallelIterator<T> implements Iterator<T>, Closeable {
private LinkedList<T> availableValues;
private LinkedList<Future<List<T>>> runningTasks;
private ExecutorService threadPool;
private Iterator<Iterable<T>> pendingValues;
// Constructor etc.
boolean hasNext() {
return !runningTasks.isEmpty() || !availableValues.isEmpty() || !pendingValues.isEmpty();
}
T next() {
if (!availableValues.isEmpty()) {
return availableValues.poll();
}
if (!runningTasks.isEmpty()) {
availableValues.addAll(runningTasks.poll().get());
return next(); // Or availableValues.poll() if we don't like recursion
}
if (pendingValues.hasNext()) {
// Buffer / eagerly submit some set of tasks, i.e. lookahead.
for (int i = 0; i < TASK_COUNT && pendingValues.hasNext(); i++) {
Iterable<T> nextPendingValues = pendingValues.next();
Future<List<T>> nextRunningTask = threadPool.submit(() -> ImmutableList.copyOf(nextPendingValues));
runningTasks.add(nextRunningTask);
}
return next(); // Recursive call that checks will now check based on running tasks
}
throw error; // No values remaining
}
}
The general idea is to keep a running iterator over the backing iterable. When calling next()
, submit tasks that are buffered into a worker queue of futures; each future represents computing the next group of values. Then on next()
:
- Get an available value from a completed task, if possible
- Else check the work queue and see if a new batch of values is ready
- Otherwise submit more work and wait
What do you think about this approach? The advantages are:
- No busy waiting
- No need to maintain indices manually. Everything is done via collection primitives (
poll
,iterator
, etc.)
There's a few ways this framework can be adjusted. For example on next
, if we determine that there is only some minimum number of running tasks remaining, we can choose to eagerly submit work ahead of the user actually requesting for those values - thereby we pipeline the main thread's work on the values with the worker thread's work that produces the values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe manual task tracking is required for cancellation of futures when the iterator is closed. Cancellation doesn't seem possible in this approach though, isn't?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure it is - iterate through runningTasks
and call cancel
on everything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I missed that!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of removing the busy wait. I always feel bad about those. But after thinking through your suggestion, I think it makes sense to go with what's in this PR.
There are a couple of issues with your suggestion:
availableValues
doesn't have data until a complete task has been processed. The primary use of this is to get files for a query, and engines like Presto start processing tasks as soon as they are returned. So we don't want to delay waiting for a manifest scan to complete before returning matches. (Side note: this is why we store newer changes first in the manifest list)runningTasks
imposes a FIFO order on tasks. They are consumed in the order they are submitted, so a small task might be blocked from returning results by a large task.hasNext
will return true if there are more tasks, but those tasks might not have results. For example, a scan that filters out all data files should not result in any calls tonext
, which is required by theIterator
contract to throwNoSuchElementException
.
To take care of the first problem, tasks should produce values directly to availableValues
, which should be a concurrent queue.
The second problem could be fixed by looping through runningTasks
every time to see which futures are finished, removing just the finished ones, and submitting new tasks to maintain the right number of tasks. That means removing elements from the list while iterating and so I think the simpler approach is to use an array of futures like this PR already does. We could make it work with the Collections API, but I think it is cleaner to use the manual index.
For the last problem, consider that case where all of the results are filtered out. In this implementation, the first call to next
is going to submit tasks, then wait on runningTasks.poll()
for each one, submit another set of tasks, then wait on each one, etc. What's effectively happening is a busy loop of the method body, but one that is implemented by recursive calls. I would rather have an explicit busy loop because it is easier to reason about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option that might be better is to replace Thread.sleep
with a poll-like operation with a timeout: wake when a new record is available or some timeout has elapsed (in case there are no more records).
That still requires a busy loop though, since there may not be more records and hasNext
can't return until it either the next record is available or there can't be another record. That would just make this a bit faster by not sleeping so long and probably isn't worth the added complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I see the problems with my proposed alternative. I strongly believe however that there should exist some solution that doesn't require busy waiting or the manual tracking of Futures. Don't block the merge on an alternative design, but we should think very hard about other ideas that will likely require some creativity and clever use of concurrency primitives.
// this cannot conclude that there are no more records until tasks have finished. while some | ||
// are running, return true when there is at least one item to return. | ||
while (!taskFuture.isDone()) { | ||
while (checkTasks()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I completely follow the logic here
When checkTasks()
returns true, we know for sure that some datafiles will be added to the queue (since either we submitted new tasks or there are already tasks running).
I wonder if a slightly different approach [which could be a bit more lazier] will help here. Can we block on the queue until its size > 0 instead of the while loop on checkTasks()
? In this way, we do not keep on scheduling more tasks until our queue gets some datafiles. Seems like this may be more in line with the general idea of the patch [lazily parsing manifests and computing datafiles] as it would only submit necessary number of tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that there may be no more results. This can't return until either a result is found or all tasks have completed and produced no results. So we have to continuously check whether new tasks should be submitted to get to the second case, where all tasks ran and produced no items.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation.
@@ -27,42 +27,21 @@ | |||
import java.util.concurrent.ThreadPoolExecutor; | |||
|
|||
public class ThreadPools { | |||
public static final String PLANNER_THREAD_POOL_SIZE_PROP = | |||
SystemProperties.PLANNER_THREAD_POOL_SIZE_PROP; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Seems like this property is no longer required. Would it make sense to remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean? I thought I did.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I meant from SystemProperties class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I'll remove it.
|
||
private static ExecutorService WORKER_POOL = MoreExecutors.getExitingExecutorService( | ||
(ThreadPoolExecutor) Executors.newFixedThreadPool( | ||
getPoolSize(WORKER_THREAD_POOL_SIZE_PROP, Runtime.getRuntime().availableProcessors()), | ||
WORKER_THREAD_POOL_SIZE, | ||
new ThreadFactoryBuilder() | ||
.setDaemon(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think MoreExecutors.getExitingExecutorService
already creates daemon threads
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This passes a ThreadFactoryBuilder
to set the name format. You're right that getExitingExecutorService
should use daemon threads, but I don't think that is a reason to omit it here since that is a requirement of this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
+1 |
Thanks for the reviews, everyone! |
) * Lazily submit tasks in ParallelIterable and add cancellation. This removes the planner pool from ParallelIterable, which was used to submit all of the iterable's tasks in parallel. This was used to queue up tasks to read every manifest in a snapshot. However, when a caller stopped reading early, all tasks would still run and add results to the queue. Now, tasks are submitted from the thread consuming the iterator as it runs hasNext. If the caller stops consuming the iterator, then no new tasks are submitted. This also keeps track of the submitted tasks and will cancel them when the iterator is closed. * Remove SystemProperties.PLANNER_THREAD_POOL_SIZE_PROP.
This removes the planner pool from ParallelIterable, which was used to
submit all of the iterable tasks in parallel. This was used to queue
up tasks to read every manifest in a snapshot. However, when a caller
stopped reading early, all tasks would still run and add results to the
queue.
Now, tasks are submitted from the thread consuming the iterator as it
runs hasNext. If the caller stops consuming the iterator, then no new
tasks are submitted. This also keeps track of the submitted tasks and
will cancel them when the iterator is closed.