Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazily submit tasks in ParallelIterable and add cancellation. #45

Merged
merged 2 commits into from
Jan 16, 2019

Conversation

rdblue
Copy link
Contributor

@rdblue rdblue commented Dec 11, 2018

This removes the planner pool from ParallelIterable, which was used to
submit all of the iterable tasks in parallel. This was used to queue
up tasks to read every manifest in a snapshot. However, when a caller
stopped reading early, all tasks would still run and add results to the
queue.

Now, tasks are submitted from the thread consuming the iterator as it
runs hasNext. If the caller stops consuming the iterator, then no new
tasks are submitted. This also keeps track of the submitted tasks and
will cancel them when the iterator is closed.

This removes the planner pool from ParallelIterable, which was used to
submit all of the iterable's tasks in parallel. This was used to queue
up tasks to read every manifest in a snapshot. However, when a caller
stopped reading early, all tasks would still run and add results to the
queue.

Now, tasks are submitted from the thread consuming the iterator as it
runs hasNext. If the caller stops consuming the iterator, then no new
tasks are submitted. This also keeps track of the submitted tasks and
will cancel them when the iterator is closed.
@rdblue
Copy link
Contributor Author

rdblue commented Dec 11, 2018

@mccheah, it would be great to get a review from you on this one.

@mccheah
Copy link
Contributor

mccheah commented Dec 11, 2018

Still looking over this, but separately I wonder if we've looked into Java's built in parallel iterators, or spliterators.

@rdblue
Copy link
Contributor Author

rdblue commented Dec 11, 2018

I'm not that impressed with the Spliterator API, and it wouldn't really help here where we want to control when background tasks are submitted and how they run.

We could supply some implementation of tasks that is spliterable and use that as a fixed number of tasks, but that doesn't allow us to balance tasks across threads. It also doesn't allow us to avoid submitting tasks to read manifest contents into memory, which is the purpose of this commit.

}

private static class ParallelIterator<T> implements Iterator<T> {
private static class ParallelIterator<T> implements Iterator<T>, Closeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if there's a more idiomatic way to do this, particularly one that doesn't require both:

  1. Busy waiting. Generally it's a flag in concurrent programming if busy waiting is used over alternative primitives like locks, conditions, queues, monitors, etc.
  2. Manual tracking of tasks by index.

I came up with the following alternative. Apologize that it has to be in pseudo-code form; due to the nature of the problem it's pretty hard to explain without the code. Let's see how this works out:

class ParallelIterator<T> implements Iterator<T>, Closeable {

  private LinkedList<T> availableValues;
  private LinkedList<Future<List<T>>> runningTasks;
  private ExecutorService threadPool;
  private Iterator<Iterable<T>> pendingValues;

  // Constructor etc.
  
  boolean hasNext() {
    return !runningTasks.isEmpty() || !availableValues.isEmpty() || !pendingValues.isEmpty();
  }

  T next() {
    if (!availableValues.isEmpty()) {
      return availableValues.poll();
    }
    if (!runningTasks.isEmpty()) {
      availableValues.addAll(runningTasks.poll().get());
      return next(); // Or availableValues.poll() if we don't like recursion
    }
    if (pendingValues.hasNext()) {
      // Buffer / eagerly submit some set of tasks, i.e. lookahead.
      for (int i = 0; i < TASK_COUNT && pendingValues.hasNext(); i++) {
        Iterable<T> nextPendingValues = pendingValues.next();
        Future<List<T>> nextRunningTask = threadPool.submit(() -> ImmutableList.copyOf(nextPendingValues));
        runningTasks.add(nextRunningTask);
      }
      return next(); // Recursive call that checks will now check based on running tasks
    }
    throw error; // No values remaining
  }
}

The general idea is to keep a running iterator over the backing iterable. When calling next(), submit tasks that are buffered into a worker queue of futures; each future represents computing the next group of values. Then on next():

  • Get an available value from a completed task, if possible
  • Else check the work queue and see if a new batch of values is ready
  • Otherwise submit more work and wait

What do you think about this approach? The advantages are:

  • No busy waiting
  • No need to maintain indices manually. Everything is done via collection primitives (poll, iterator, etc.)

There's a few ways this framework can be adjusted. For example on next, if we determine that there is only some minimum number of running tasks remaining, we can choose to eagerly submit work ahead of the user actually requesting for those values - thereby we pipeline the main thread's work on the values with the worker thread's work that produces the values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe manual task tracking is required for cancellation of futures when the iterator is closed. Cancellation doesn't seem possible in this approach though, isn't?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure it is - iterate through runningTasks and call cancel on everything.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I missed that!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of removing the busy wait. I always feel bad about those. But after thinking through your suggestion, I think it makes sense to go with what's in this PR.

There are a couple of issues with your suggestion:

  1. availableValues doesn't have data until a complete task has been processed. The primary use of this is to get files for a query, and engines like Presto start processing tasks as soon as they are returned. So we don't want to delay waiting for a manifest scan to complete before returning matches. (Side note: this is why we store newer changes first in the manifest list)
  2. runningTasks imposes a FIFO order on tasks. They are consumed in the order they are submitted, so a small task might be blocked from returning results by a large task.
  3. hasNext will return true if there are more tasks, but those tasks might not have results. For example, a scan that filters out all data files should not result in any calls to next, which is required by the Iterator contract to throw NoSuchElementException.

To take care of the first problem, tasks should produce values directly to availableValues, which should be a concurrent queue.

The second problem could be fixed by looping through runningTasks every time to see which futures are finished, removing just the finished ones, and submitting new tasks to maintain the right number of tasks. That means removing elements from the list while iterating and so I think the simpler approach is to use an array of futures like this PR already does. We could make it work with the Collections API, but I think it is cleaner to use the manual index.

For the last problem, consider that case where all of the results are filtered out. In this implementation, the first call to next is going to submit tasks, then wait on runningTasks.poll() for each one, submit another set of tasks, then wait on each one, etc. What's effectively happening is a busy loop of the method body, but one that is implemented by recursive calls. I would rather have an explicit busy loop because it is easier to reason about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option that might be better is to replace Thread.sleep with a poll-like operation with a timeout: wake when a new record is available or some timeout has elapsed (in case there are no more records).

That still requires a busy loop though, since there may not be more records and hasNext can't return until it either the next record is available or there can't be another record. That would just make this a bit faster by not sleeping so long and probably isn't worth the added complexity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I see the problems with my proposed alternative. I strongly believe however that there should exist some solution that doesn't require busy waiting or the manual tracking of Futures. Don't block the merge on an alternative design, but we should think very hard about other ideas that will likely require some creativity and clever use of concurrency primitives.

// this cannot conclude that there are no more records until tasks have finished. while some
// are running, return true when there is at least one item to return.
while (!taskFuture.isDone()) {
while (checkTasks()) {
Copy link
Contributor

@rdsr rdsr Dec 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I completely follow the logic here

When checkTasks() returns true, we know for sure that some datafiles will be added to the queue (since either we submitted new tasks or there are already tasks running).

I wonder if a slightly different approach [which could be a bit more lazier] will help here. Can we block on the queue until its size > 0 instead of the while loop on checkTasks()? In this way, we do not keep on scheduling more tasks until our queue gets some datafiles. Seems like this may be more in line with the general idea of the patch [lazily parsing manifests and computing datafiles] as it would only submit necessary number of tasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that there may be no more results. This can't return until either a result is found or all tasks have completed and produced no results. So we have to continuously check whether new tasks should be submitted to get to the second case, where all tasks ran and produced no items.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation.

@@ -27,42 +27,21 @@
import java.util.concurrent.ThreadPoolExecutor;

public class ThreadPools {
public static final String PLANNER_THREAD_POOL_SIZE_PROP =
SystemProperties.PLANNER_THREAD_POOL_SIZE_PROP;
Copy link
Contributor

@rdsr rdsr Dec 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Seems like this property is no longer required. Would it make sense to remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean? I thought I did.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I meant from SystemProperties class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I'll remove it.


private static ExecutorService WORKER_POOL = MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(
getPoolSize(WORKER_THREAD_POOL_SIZE_PROP, Runtime.getRuntime().availableProcessors()),
WORKER_THREAD_POOL_SIZE,
new ThreadFactoryBuilder()
.setDaemon(true)
Copy link
Contributor

@rdsr rdsr Dec 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think MoreExecutors.getExitingExecutorService already creates daemon threads

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This passes a ThreadFactoryBuilder to set the name format. You're right that getExitingExecutorService should use daemon threads, but I don't think that is a reason to omit it here since that is a requirement of this code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense!

Copy link
Contributor

@rdsr rdsr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@danielcweeks
Copy link
Contributor

+1

@danielcweeks danielcweeks merged commit 0dc3592 into apache:master Jan 16, 2019
@rdblue
Copy link
Contributor Author

rdblue commented Jan 16, 2019

Thanks for the reviews, everyone!

renato2099 pushed a commit to renato2099/incubator-iceberg that referenced this pull request Jan 30, 2019
)

* Lazily submit tasks in ParallelIterable and add cancellation.

This removes the planner pool from ParallelIterable, which was used to
submit all of the iterable's tasks in parallel. This was used to queue
up tasks to read every manifest in a snapshot. However, when a caller
stopped reading early, all tasks would still run and add results to the
queue.

Now, tasks are submitted from the thread consuming the iterator as it
runs hasNext. If the caller stops consuming the iterator, then no new
tasks are submitted. This also keeps track of the submitted tasks and
will cancel them when the iterator is closed.

* Remove SystemProperties.PLANNER_THREAD_POOL_SIZE_PROP.
bkahloon pushed a commit to bkahloon/iceberg that referenced this pull request Feb 27, 2021
…e#47)

* Hive: Use Hive table location in HiveIcebergSplit
* Hive: Fix schema not passed to Serde

Co-authored-by: Shardul Mahadik <smahadik@linkedin.com>
puchengy added a commit to puchengy/iceberg that referenced this pull request Jun 7, 2023
…ion and already sorted data. (apache#45)

(cherry picked from commit 07cc6e4)
puchengy added a commit to puchengy/iceberg that referenced this pull request Jun 7, 2023
…ion and already sorted data. (apache#45)

(cherry picked from commit 07cc6e4)
puchengy added a commit to puchengy/iceberg that referenced this pull request Jun 7, 2023
…ion and already sorted data. (apache#45) (apache#49)

(cherry picked from commit 07cc6e4)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants