-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Fix possible deadlock in ParallelIterable #11781
Conversation
ec139c4
to
da723f0
Compare
core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
Outdated
Show resolved
Hide resolved
core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
Outdated
Show resolved
Hide resolved
core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
Outdated
Show resolved
Hide resolved
b072cc9
to
cab3521
Compare
b41baf9
to
c94513b
Compare
rebased |
c94513b
to
66ed87f
Compare
From a discussion I had with @sopel39 today; I think we can go forward this solution but I think it will basically re-introduce the memory usage issue that we saw previously. (For some use cases) From our discussion I believe we have been working at this from the wrong direction. Recap -The original implementation basically assumed that we could read as made iterators as parallelism allowed. This led to basically unbounded memory usage. This ended up becoming an issue for systems using a lightweight coordinator which would be required to have a huge memory footprint (essentially all the metadata entires for a table would be held in memory per query). Next, to solve our issue with the memory footprint we added what is essentially a buffered read-ahead. A queue is filled with elements from the files we are reading in parallel and we check the queue depth every time we add elements to bound its size. The max queue size here is checked at every pull so we really can never go more than parallelism items over the max queue size. Unfortunately, this leads to the current deadlock issue since we can potentially yield an iterator in the middle of a file and be left only with iterators for files which cannot yet be opened because all file handles are owned by yielded iterators. The current proposed solution is to switch checking the queue size per element and instead check only before opening a new file for the first time. This means that any file that is opened is read completely into the read-ahead queue. This fixes the dead lock issue as we will never yield in the middle of the file but possibly reintroduces the memory issue. In the worst case scenario we would open up to "parallelism" files simultaneously and load them all into the queue before having a chance to check our queue size. Where do we go from here -The current implementation is basically trying to solve the issue of : But this is actually a bit over general for what we are actually trying to do. Our actual problem is The key difference here is that we know exactly how long each file is before we open it. Instead of simply opening I think what we should do is a bit like this (haven't thought this part through too much - synchronization primitives are just for representation, I realize it won't work exactly like this)
|
It was observed that with high concurrency/high workload scenario cluster deadlocks due to manifest readers waiting for connection from S3 pool. Specifically, ManifestGroup#plan will create ManifestReader per every ParallelIterable.Task. These readers will effectively hold onto S3 connection from the pool. When ParallelIterable queue is full, Task will be tabled for later use. Consider scenario: S3 connection pool size=1 approximateMaxQueueSize=1 workerPoolSize=1 ParallelIterable1: starts TaskP1 ParallelIterable1: TaskP1 produces result, queue gets full, TaskP1 is put on hold (holds S3 connection) ParallelIterable2: starts TaskP2, TaskP2 is scheduled on workerPool but is blocked on S3 connection pool ParallelIterable1: result gets consumed, TaskP1 is scheduled again ParallelIterable1: TaskP1 waits for workerPool to be free, but TaskP2 is waiting for TaskP1 to release connection The fix make sure Task is finished once it's started. This way limited resources like connection pool are not put on hold. Queue size might exceed strict limits, but it should still be bounded. Fixes apache#11768
66ed87f
to
3436e7f
Compare
@findepi I'm on board with this but I want to make sure you are also happy with this. I'm unsure of whether having the ability to yield before files will really help memory pressure, so I'm slightly thinking we should just revert the yielding capabilities all together and just go back to the implementation from last year. I'm willing to go forward though with either direction for now. In the future I think we really need a fake filesystem benchmarking test that we can use to simulate how the algorithms we write here will work since we are mostly working blind. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As noted in my comment to Piotr, I think this is a fix to the deadlock but I think it may be better to just remove the yielding behavior all together till we have a better replacement. If folks have experience where file level yielding would appropriately limit memory usage, I think we can go forward with this as an interim solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @RussellSpitzer for the great summary.
This change looks fine.
I also feel ok to revert the yield change completely, as I feel this change of avoiding deadlock might make the yield solution ineffective anyway. The scenario with OOM was large thread pool (like 184 thread) and large manifest files (like hundreds of MBs or GBs).
@sopel39 I assume you tried this change and it avoided deadlock problem. Has this been tested with the OOM scenario for large manifest files?
BTW, I like the new direction that @RussellSpitzer outlined. using byte size (instead of number of elements) is more intuitive and easier to calculate a good default to cap memory foot print. E.g., the max read bytes could be set to 1/10 of JVM heap size. 1/10 is just an example. We can estimate a proper default value based on typical file size/memory foot print ratio. there could be an edge condition that a single manifest file is larger than the default value. In this case and if the buffer is empty or less than half full, task should be allowed. |
We tested in staging env. So far coordinator didn't show any excessive mem usage |
Would this restore OOM problem? |
Potentially this fix would also restore the OOM problem |
IIRC the OOM problem was a real production problem (cc @raunaqmorarka @dekimir @losipiuk), so I am not convinced it's OK to restore it. |
@findepi the deadlock issue is probably the worse of the two evils. the deadlock issue probably needs to be addressed urgently. |
It seems we're in agreement. The deadlock must be resolved. This PR supposedly avoids any deadlocks at the cost of increased memory usage, which is fair. I think we should not remove memory pressure protections. |
Given that many approvals, it looks ready to go. |
thank you @sopel39 for the PR and @osscm @RussellSpitzer @stevenzwu for reviews! |
* Fix ParallelIterable deadlock It was observed that with high concurrency/high workload scenario cluster deadlocks due to manifest readers waiting for connection from S3 pool. Specifically, ManifestGroup#plan will create ManifestReader per every ParallelIterable.Task. These readers will effectively hold onto S3 connection from the pool. When ParallelIterable queue is full, Task will be tabled for later use. Consider scenario: S3 connection pool size=1 approximateMaxQueueSize=1 workerPoolSize=1 ParallelIterable1: starts TaskP1 ParallelIterable1: TaskP1 produces result, queue gets full, TaskP1 is put on hold (holds S3 connection) ParallelIterable2: starts TaskP2, TaskP2 is scheduled on workerPool but is blocked on S3 connection pool ParallelIterable1: result gets consumed, TaskP1 is scheduled again ParallelIterable1: TaskP1 waits for workerPool to be free, but TaskP2 is waiting for TaskP1 to release connection The fix make sure Task is finished once it's started. This way limited resources like connection pool are not put on hold. Queue size might exceed strict limits, but it should still be bounded. Fixes #11768 * Do not submit a task when there is no space in queue
It was observed that with high concurrency/high workload scenario cluster deadlocks due to manifest readers waiting for connection from S3 pool.
Specifically, ManifestGroup#plan will create ManifestReader per every ParallelIterable.Task. These readers will effectively hold onto S3 connection from the pool. When ParallelIterable queue is full, Task will be tabled for later use.
Consider scenario:
S3 connection pool size=1
approximateMaxQueueSize=1
workerPoolSize=1
ParallelIterable1: starts TaskP1
ParallelIterable1: TaskP1 produces result, queue gets full, TaskP1 is put on hold (holds S3 connection)
ParallelIterable2: starts TaskP2, TaskP2 is scheduled on workerPool but is blocked on S3 connection pool
ParallelIterable1: result gets consumed, TaskP1 is scheduled again
ParallelIterable1: TaskP1 waits for workerPool to be free, but TaskP2 is waiting for TaskP1 to release connection
The fix make sure Task is finished once it's started. This way limited resources like connection pool are not put on hold. Queue size might exceed strict limits, but it should still be bounded.
Fixes #11768