Fix GIL-deadlocks when doing async iteration#6
Conversation
SemMulder
left a comment
There was a problem hiding this comment.
Awesome! This is great work!
The choices made here look like common sense to me, which then makes it confusing to me why upstream made different choices. 😕
This journey, and the solution in this PR, do make me wary of the use of async on the Rust side when interfacing with Python. Debugging using GDB was also not a fun experience with how Rust compiles Futures to state machines, very hard to make sense out of that, at scale. Something to warn others about who need to write Python interop stuff in the future :).
How do you see the use of async Rust in this Python library? Would it make sense to evaluate whether it's use is warranted, long-term?
Code looks good to me! I think we can improve the documentation a little bit in places though :).
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| let waker = cx.waker(); | ||
| Python::with_gil(|py| { | ||
| py.allow_threads(|| pin!(&mut self.0).poll(&mut Context::from_waker(waker))) |
There was a problem hiding this comment.
pin!(&mut self.0)
The pinning is needed because of the way Rust implements Futures I guess?
&mut Context::from_waker(waker))
Can't we just forward cx directly here?
| /// An alternative runtime for `pyo3_async_runtimes` | ||
| /// since `pyo3_async_runtimes::tokio` runs its _own_ Tokio runtime | ||
| /// rather than using whatever is in scope or allowing the user to pass the specific runtime. | ||
| /// | ||
| /// How this runtime works, is to use whatever Tokio runtime was entered using `runtime.enter()` beforehand |
There was a problem hiding this comment.
Oof, that pyo3_async_runtimes::tokio::future_into_py feels like it does more than what it says on the tin 😅.
Maybe add a link to this: https://docs.rs/pyo3-async-runtimes/latest/src/pyo3_async_runtimes/tokio.rs.html#188-198
| impl pyo3_async_runtimes::generic::ContextExt for TokioRuntimeThatIsInScope { | ||
| fn scope<F, R>( | ||
| locals: TaskLocals, | ||
| fut: F, | ||
| ) -> std::pin::Pin<Box<dyn std::future::Future<Output = R> + Send>> | ||
| where | ||
| F: std::future::Future<Output = R> + Send + 'static, | ||
| { | ||
| let cell = UnsyncOnceCell::new(); | ||
| cell.set(locals).unwrap(); | ||
|
|
||
| Box::pin(TASK_LOCALS.scope(cell, fut)) | ||
| } | ||
|
|
||
| fn get_task_locals() -> Option<TaskLocals> { | ||
| TASK_LOCALS | ||
| .try_with(|c| { | ||
| c.get() | ||
| .map(|locals| Python::with_gil(|py| locals.clone_ref(py))) | ||
| }) | ||
| .unwrap_or_default() | ||
| } | ||
| } |
There was a problem hiding this comment.
Maybe add something like:
// Copied from https://docs.rs/pyo3-async-runtimes/latest/src/pyo3_async_runtimes/tokio.rs.html#100-119| /// Note that we very intentionally use the multi-threaded scheduler | ||
| /// but with a single thread. | ||
| /// | ||
| /// We **cannot** use the current_thread scheduler, | ||
| /// since that would result in the Python task scheduler (e.g. `asyncio`) | ||
| /// and the Rust task scheduler (`Tokio`) to run on the same thread. | ||
| /// This seems to work fine, until you end up with a Python future | ||
| /// that depends on a Rust future completing or vice-versa: | ||
| /// Since the task schedulers each have their own task queues, | ||
| /// work co-operatively, and know nothing of each-other, | ||
| /// they will not (nor can they) yield to the other. | ||
| /// The result: deadlocks! | ||
| /// | ||
| /// Therefore, we run the Tokio scheduler on a separate thread. | ||
| /// Since switching between the 'Python scheduler thread' and the | ||
| /// 'Tokio scheduler thread' is preemptive, | ||
| /// the same problem now no longer occurs: | ||
| /// Both schedulers are able to make forward progress (even on a 1-CPU machine). |
There was a problem hiding this comment.
Sanity check: is the fact that block_on executes the passed Future on the current thread problematic?
If block_on is called from a thread that also has a Python asyncio eventloop, that loop will not make progress while the thread is blocking on the Future. This might not matter for sync function calls, but it might for async calls? I think the only Python to Rust async calls we have are around PyChunksAsyncIter? Regardless, it might be worth mentioning/thinking about?
There was a problem hiding this comment.
Is the fact that block_on executes the passed Future on the current thread problematic?
It is not, as long as:
- We can be sure that code does not try to wait for a Python future
- We won't attempt to lock a Tokio mutex in this thread
I agree that it should be added to the documentation of (our) block_on/block_until_interrupted function.
| async_util::async_allow_threads(Box::pin(async move { | ||
| match me.stream_completed_submission_chunks(submission_id).await { | ||
| Ok(iter) => { | ||
| let async_iter = PyChunksAsyncIter::from(iter); | ||
| Ok(async_iter) | ||
| } | ||
| Err(e) => PyResult::Err(e.into()), | ||
| } | ||
| Err(e) => PyResult::Err(e.into()), | ||
| } | ||
| }) | ||
| })), |
There was a problem hiding this comment.
It's curious to me that upstream doesn't have an out-of-the box way to deal with this. Same with the GIL semantics around Iterator we implement ourselves, above.
There was a problem hiding this comment.
For the sync iterator they now do, but for async iteration unfortunately not yet. There is an experimental-async feature flag that adds some more async support in pyo3 (without pyo3_async_runtimes), but __anext__ is still missing there.
There was a problem hiding this comment.
For the sync iterator they now do
Is it possible for us to use that here instead of implementing it manually using std::iter::from_fn?
There was a problem hiding this comment.
Or is upstream's thing for consuming a Rust iterator in Python, rather than consuming a Python iterator in Rust?
| runtime.block_on(async { | ||
| // We lock the stream in a separate Tokio task | ||
| // that explicitly runs on the runtime thread rather than on the main Python thread. | ||
| // This reduces the possibility for deadlocks even further. |
There was a problem hiding this comment.
This reduces the possibility for deadlocks even further.
Maybe add how this reduces the possibility of a deadlock? It costs quite a lot of brainpower for a reader to come up with that, so just putting it here explicitly saves us a bunch of brain cells ;)
Same for __anext__ below.
|
The plan: Merge this PR as-is so the deadlock-fixes are online early. |
|
@OpsBotPrime merge |
…dlocks (rather than 45min later)
…ures, and ensure we can unlock the GIL in coroutines as well
…t__ methods. They should now never use the GIL inside their bodies (except maybe for momentary logging somewhere deeply inside), and locking the main Tokio mutex explicitly happens on a separate on-runtime Tokio task.
…e documentation of its construction
Approved-by: Qqwy Priority: Normal Auto-deploy: false
|
Rebased as 963eaa8, waiting for CI … |
|
CI job 🟡 started. |
b873e3e to
963eaa8
Compare
In rare situations we saw deadlocks happening in production, specifically when doing async iteration of the submission-output iterator (returning that Rust stream back to Python element-by-element).
This PR resolves this issue, by relinquishing the GIL more often.
__next__and__anext__of the (sync/async) output result iterator.pyo3_async_runtimeswe do not use its tokio module that secretly spawns its own runtime but rather we depend on the one that we've explicitly constructed ourselves.