Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run tasks on the main thread #9855

Closed
wants to merge 7 commits into from
Closed

Run tasks on the main thread #9855

wants to merge 7 commits into from

Conversation

zanieb
Copy link
Contributor

@zanieb zanieb commented Jun 7, 2023

When feasible, task runs are now executed on the main thread instead of a worker thread. Previously, all task runs were run in a new worker thread. This should significantly improve performance in asynchronous and sequential use cases. This allows objects to be passed to and from tasks without worrying about thread safety unless you have opted into concurrency. For example, an HTTP client or database connection can be shared between a flow and its tasks now (unless synchronous concurrency is being used).

Tasks are run in the main thread when:

  • The task and flow are both asynchronous
  • The flow is synchronous and the task run is sequential

In a kind of funny way, this causes deadlocks when asynchronous and synchronous tasks are mixed in an asynchronous flow because if a synchronous task depends on an asynchronous task it will block the event loop waiting for the upstream to finish but the upstream cannot run since it is now on the same event loop. For example:

@flow
async def foo():
    upstream = await async_task.submit(42)  # Running in the background on the main thread event loop
    sync_task(upstream) # Blocks the event loop because there's no `await`
    # If the upstream is not done when `sync_task` is called, we deadlock

await foo()

To resolve this, I've added greenback 🤮 as a dependency. We no longer block the event loop when synchronous tasks are called from asynchronous flows which resolves this issue and addresses general performance concerns when calling synchronous tasks from asynchronous flows.

Closes #9412
Closes #10627

Example

import asyncio
import threading

from prefect import flow, task


@task
def sync_task():
    print(threading.current_thread())


@task
async def async_task():
    print(threading.current_thread())


@flow(log_prints=True)
def sync_flow():
    print(threading.current_thread())

    sync_task()  # Runs on main thread
    async_task()  # Runs on main thread
    sync_task.submit()  # Runs on worker thread
    async_task.submit()  # Runs on worker thread


@flow(log_prints=True)
async def async_flow():
    print(threading.current_thread())

    sync_task()  # Runs on worker thread
    sync_task.submit()  # Runs on worker thread
    await async_task()  # Runs on main thread
    await async_task.submit()  # Runs on main thread


sync_flow()
print("---")
asyncio.run(async_flow())
❯ python example.py
12:34:50.735 | INFO    | prefect.engine - Created flow run 'organic-wallaby' for flow 'sync-flow'
12:34:50.799 | INFO    | Flow run 'organic-wallaby' - <_MainThread(MainThread, started 8725699392)>
12:34:50.814 | INFO    | Flow run 'organic-wallaby' - Created task run 'sync_task-0' for task 'sync_task'
12:34:50.814 | INFO    | Flow run 'organic-wallaby' - Executing 'sync_task-0' immediately...
12:34:50.843 | INFO    | Task run 'sync_task-0' - <_MainThread(MainThread, started 8725699392)>
12:34:50.856 | INFO    | Task run 'sync_task-0' - Finished in state Completed()
12:34:50.870 | INFO    | Flow run 'organic-wallaby' - Created task run 'async_task-0' for task 'async_task'
12:34:50.871 | INFO    | Flow run 'organic-wallaby' - Executing 'async_task-0' immediately...
12:34:50.895 | INFO    | Task run 'async_task-0' - <_MainThread(MainThread, started 8725699392)>
12:34:50.911 | INFO    | Task run 'async_task-0' - Finished in state Completed()
12:34:50.924 | INFO    | Flow run 'organic-wallaby' - Created task run 'sync_task-1' for task 'sync_task'
12:34:50.925 | INFO    | Flow run 'organic-wallaby' - Submitted task run 'sync_task-1' for execution.
12:34:50.935 | INFO    | Flow run 'organic-wallaby' - Created task run 'async_task-1' for task 'async_task'
12:34:50.935 | INFO    | Flow run 'organic-wallaby' - Submitted task run 'async_task-1' for execution.
12:34:50.958 | INFO    | Task run 'sync_task-1' - <Thread(WorkerThread-0, started 6246838272)>
12:34:50.973 | INFO    | Task run 'async_task-1' - <Thread(WorkerThread-1, started 10804703232)>
12:34:50.977 | INFO    | Task run 'sync_task-1' - Finished in state Completed()
12:34:50.988 | INFO    | Task run 'async_task-1' - Finished in state Completed()
12:34:51.003 | INFO    | Flow run 'organic-wallaby' - Finished in state Completed('All states completed.')
---
12:34:51.071 | INFO    | prefect.engine - Created flow run 'melodic-bullfinch' for flow 'async-flow'
12:34:51.115 | INFO    | Flow run 'melodic-bullfinch' - <_MainThread(MainThread, started 8725699392)>
12:34:51.130 | INFO    | Flow run 'melodic-bullfinch' - Created task run 'sync_task-0' for task 'sync_task'
12:34:51.130 | INFO    | Flow run 'melodic-bullfinch' - Executing 'sync_task-0' immediately...
12:34:51.154 | INFO    | Task run 'sync_task-0' - <Thread(WorkerThread-2, started 10771050496)>
12:34:51.174 | INFO    | Task run 'sync_task-0' - Finished in state Completed()
12:34:51.186 | INFO    | Flow run 'melodic-bullfinch' - Created task run 'sync_task-1' for task 'sync_task'
12:34:51.187 | INFO    | Flow run 'melodic-bullfinch' - Submitted task run 'sync_task-1' for execution.
12:34:51.193 | INFO    | Flow run 'melodic-bullfinch' - Created task run 'async_task-0' for task 'async_task'
12:34:51.193 | INFO    | Flow run 'melodic-bullfinch' - Executing 'async_task-0' immediately...
12:34:51.219 | INFO    | Task run 'sync_task-1' - <Thread(WorkerThread-3, started 6263664640)>
12:34:51.231 | INFO    | Task run 'async_task-0' - <_MainThread(MainThread, started 8725699392)>
12:34:51.239 | INFO    | Task run 'sync_task-1' - Finished in state Completed()
12:34:51.248 | INFO    | Task run 'async_task-0' - Finished in state Completed()
12:34:51.259 | INFO    | Flow run 'melodic-bullfinch' - Created task run 'async_task-1' for task 'async_task'
12:34:51.259 | INFO    | Flow run 'melodic-bullfinch' - Submitted task run 'async_task-1' for execution.
12:34:51.282 | INFO    | Task run 'async_task-1' - <_MainThread(MainThread, started 8725699392)>
12:34:51.295 | INFO    | Task run 'async_task-1' - Finished in state Completed()
12:34:51.309 | INFO    | Flow run 'melodic-bullfinch' - Finished in state Completed('All states completed.')

Now possible, previously would fail with event loop errors

from prefect import flow, task
import asyncio
import httpx


@task()
async def query_api(client: httpx.AsyncClient):
    resp = await client.get("data")
    return resp


@flow()
async def my_flow():
    async with httpx.AsyncClient(base_url="https://example.com") as client:
        print(await query_api(client))


if __name__ == "__main__":
    asyncio.run(my_flow())

@zanieb zanieb requested a review from a team as a code owner June 7, 2023 17:31
Base automatically changed from concurrency-cleanup-3 to main June 7, 2023 18:35
@netlify
Copy link

netlify bot commented Jun 7, 2023

Deploy Preview for prefect-docs-preview ready!

Name Link
🔨 Latest commit cee77d1
🔍 Latest deploy log https://app.netlify.com/sites/prefect-docs-preview/deploys/6482294a4fbe7a000894742c
😎 Deploy Preview https://deploy-preview-9855--prefect-docs-preview.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site settings.

Comment on lines -182 to +184
return call.result()
return await call.aresult()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No change in behavior here, just clearer to call aresult even though result is safe since we just finished waiting.

Comment on lines 236 to 252
# Check for a running event loop to prevent blocking
if get_running_loop():
if not greenback.has_portal():
raise RuntimeError(
"Detected unsafe call to `from_sync` from thread with event loop. "
"Call `await greenback.ensure_portal()` to allow functionality."
)

# Use greenback to avoid blocking the event loop while waiting
return greenback.await_(
from_async.wait_for_call_in_loop_thread(
__call,
timeout=timeout,
done_callbacks=done_callbacks,
contexts=contexts,
)
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow isn't this magical!

Here, we avoid blocking the event loop in a from_sync call from a thread with an event loop by using greenback to schedule the call as a from_async waiter instead!

This error message is not intended to be user-facing.

Comment on lines 342 to +350
async def _run_async(self, coro):
from prefect._internal.concurrency.threads import in_global_loop

# Ensure the greenback portal is shimmed for this task so we can safely call
# back into async code from sync code if the user does something silly; avoid
# this overhead for our internal event loop
if not in_global_loop():
await greenback.ensure_portal()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a tough time figuring out just where to inject ensure_portal.

This is the only place that works without having the user do it themselves.

If we call this in the global event loop we actually can cause deadlocks.

@@ -1387,6 +1402,8 @@ async def submit_task_run(
),
log_prints=should_log_prints(task),
settings=prefect.context.SettingsContext.get().copy(),
user_thread=user_thread,
concurrency_type=task_runner.concurrency_type,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the concurrency type so we can check if the task run is sequential for scheduling on the main thread. We can't use flow_run_context.task_runner.concurrency_type because we use a different task runner for sequential task calls.

@zanieb
Copy link
Contributor Author

zanieb commented Jun 12, 2023

Unfortunately using greenback alongside sqlalchemy is not possible at this time. If the ephemeral application is running in the same thread as one in which we need to use greenback, then greenback and sqlalchemy will accidentally "mix" messages and crash each other.

There are a few options here:

  1. Open an issue on greenback and/or sqlalchemy and determine the best path towards isolation of their use of greenlets
  2. Run the ephemeral application in a separate thread (i.e. Run clients with ephemeral applications on the global event loop thread #9870); greenlets are isolated by thread boundaries
  3. Avoid use of greenlet entirely and just throw an exception if a user passes an asynchronous upstream to a blocking synchronous downstream task
  4. Learn a bunch about greenlets and vendor the relevant chunk of greenback with support for SQLAlchemy messages
  5. Make users await calls to synchronous tasks in asynchronous flows to avoid blocking the event loop in the first place; this would remove the need for greenback

@jakekaplan has the most context on these problems

Additionally, we could defer handling of tasks on the main thread for asynchronous flows and immediately release support for this feature in synchronous flows.

@coolbreeze2
Copy link

That great. When can we merge the feature.

@c-p-b
Copy link

c-p-b commented Jul 30, 2023

I would also absolutely love to see this get merged. It's blocking adoption of using Prefect for us because the current implementation creates incompatibilities with other, existing async libraries that we are using. To be more specific, to be able to use Prefect to orchestrate Dagger is made possible with this pull request. To see it work in action, check out this project and run it on the released version of prefect vs this branch:

https://github.com/airbytehq/aircmd/blob/6d9ff612a8db37b5988382e427fc33fa090ab9c0/pyproject.toml#L12

The problem is that any other async client that needs to share the data prefect is passing across flows and tasks is going to have a difficult time because Prefect creates creates a new thread pool with each task. With this pr, it "just works" since they are all in the same thread pool

It also helps make the execution model easier to reason about and develop against

@jakekaplan
Copy link
Contributor

Looking into the best path forward to get this merged

@c-p-b
Copy link

c-p-b commented Aug 2, 2023

Looking into the best path forward to get this merged

Thank you jake! looking forward to seeing it get in!

@milonimrod
Copy link

Any update on this? This is a blocker for us. It prevents us from using aiohttp, aiokafka and any shared async package

@zanieb
Copy link
Contributor Author

zanieb commented Aug 22, 2023

Closing this as I am not working on it and a new pull request will need to be opened if someone else takes it up.

@zanieb zanieb closed this Aug 22, 2023
@c-p-b
Copy link

c-p-b commented Aug 22, 2023

@jakekaplan any chance we could get a new pr going? Don't want this to fall though the cracks as a missed opportunity for a great improvement!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants