-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Run tasks on the main thread #9855
Conversation
9591d53
to
d1a9571
Compare
✅ Deploy Preview for prefect-docs-preview ready!
To edit notification comments on pull requests, go to your Netlify site settings. |
return call.result() | ||
return await call.aresult() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No change in behavior here, just clearer to call aresult
even though result
is safe since we just finished waiting.
# Check for a running event loop to prevent blocking | ||
if get_running_loop(): | ||
if not greenback.has_portal(): | ||
raise RuntimeError( | ||
"Detected unsafe call to `from_sync` from thread with event loop. " | ||
"Call `await greenback.ensure_portal()` to allow functionality." | ||
) | ||
|
||
# Use greenback to avoid blocking the event loop while waiting | ||
return greenback.await_( | ||
from_async.wait_for_call_in_loop_thread( | ||
__call, | ||
timeout=timeout, | ||
done_callbacks=done_callbacks, | ||
contexts=contexts, | ||
) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow isn't this magical!
Here, we avoid blocking the event loop in a from_sync
call from a thread with an event loop by using greenback
to schedule the call as a from_async
waiter instead!
This error message is not intended to be user-facing.
async def _run_async(self, coro): | ||
from prefect._internal.concurrency.threads import in_global_loop | ||
|
||
# Ensure the greenback portal is shimmed for this task so we can safely call | ||
# back into async code from sync code if the user does something silly; avoid | ||
# this overhead for our internal event loop | ||
if not in_global_loop(): | ||
await greenback.ensure_portal() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a tough time figuring out just where to inject ensure_portal
.
This is the only place that works without having the user do it themselves.
If we call this in the global event loop we actually can cause deadlocks.
@@ -1387,6 +1402,8 @@ async def submit_task_run( | |||
), | |||
log_prints=should_log_prints(task), | |||
settings=prefect.context.SettingsContext.get().copy(), | |||
user_thread=user_thread, | |||
concurrency_type=task_runner.concurrency_type, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need the concurrency type so we can check if the task run is sequential for scheduling on the main thread. We can't use flow_run_context.task_runner.concurrency_type
because we use a different task runner for sequential task calls.
db0805e
to
cee77d1
Compare
Unfortunately using There are a few options here:
@jakekaplan has the most context on these problems Additionally, we could defer handling of tasks on the main thread for asynchronous flows and immediately release support for this feature in synchronous flows. |
That great. When can we merge the feature. |
I would also absolutely love to see this get merged. It's blocking adoption of using Prefect for us because the current implementation creates incompatibilities with other, existing async libraries that we are using. To be more specific, to be able to use Prefect to orchestrate Dagger is made possible with this pull request. To see it work in action, check out this project and run it on the released version of prefect vs this branch: https://github.com/airbytehq/aircmd/blob/6d9ff612a8db37b5988382e427fc33fa090ab9c0/pyproject.toml#L12 The problem is that any other async client that needs to share the data prefect is passing across flows and tasks is going to have a difficult time because Prefect creates creates a new thread pool with each task. With this pr, it "just works" since they are all in the same thread pool It also helps make the execution model easier to reason about and develop against |
Looking into the best path forward to get this merged |
Thank you jake! looking forward to seeing it get in! |
Any update on this? This is a blocker for us. It prevents us from using aiohttp, aiokafka and any shared async package |
Closing this as I am not working on it and a new pull request will need to be opened if someone else takes it up. |
@jakekaplan any chance we could get a new pr going? Don't want this to fall though the cracks as a missed opportunity for a great improvement! |
When feasible, task runs are now executed on the main thread instead of a worker thread. Previously, all task runs were run in a new worker thread. This should significantly improve performance in asynchronous and sequential use cases. This allows objects to be passed to and from tasks without worrying about thread safety unless you have opted into concurrency. For example, an HTTP client or database connection can be shared between a flow and its tasks now (unless synchronous concurrency is being used).
Tasks are run in the main thread when:
In a kind of funny way, this causes deadlocks when asynchronous and synchronous tasks are mixed in an asynchronous flow because if a synchronous task depends on an asynchronous task it will block the event loop waiting for the upstream to finish but the upstream cannot run since it is now on the same event loop. For example:
To resolve this, I've added
greenback
🤮 as a dependency. We no longer block the event loop when synchronous tasks are called from asynchronous flows which resolves this issue and addresses general performance concerns when calling synchronous tasks from asynchronous flows.Closes #9412
Closes #10627
Example
Now possible, previously would fail with event loop errors