Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for Tasks to run on the Main Thread #10627

Closed
3 tasks done
jakekaplan opened this issue Sep 5, 2023 · 10 comments · Fixed by #11930
Closed
3 tasks done

Allow for Tasks to run on the Main Thread #10627

jakekaplan opened this issue Sep 5, 2023 · 10 comments · Fixed by #11930
Labels
enhancement An improvement of an existing feature

Comments

@jakekaplan
Copy link
Contributor

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar request and didn't find it.
  • I searched the Prefect documentation for this feature.

Prefect Version

2.x

Describe the current behavior

Currently all task are run in a new worker thread. This limits what sort of objects can be passed into tasks as parameters. This is not clear to a user that when they're passing something to a task it needs to be thread safe as it's going to be executed on a different thread.

Describe the proposed behavior

Allow for tasks to run the on the Main Thread. This would:

  • Allowing for things like an HTTP client or database connection to be shared between a flow and its tasks
  • Users not being subject to thread safety concerns unless they've opted into concurrency
  • Significantly improve performance in asynchronous and sequential use cases.

Example Use

Currently the below is not possible, because you can't pass an HTTP client to a different thread.

from prefect import flow, task
import asyncio
import httpx


@task()
async def query_api(client: httpx.AsyncClient):
    resp = await client.get("data")
    return resp


@flow()
async def my_flow():
    async with httpx.AsyncClient(base_url="https://example.com") as client:
        print(await query_api(client))


if __name__ == "__main__":
    asyncio.run(my_flow())

Additional context

A previous PR #9855 was drafted that would enable this. However it added greenback as a dependency. This is not entirely compatible with our existing setup as sqlalchemy also uses greenback.

From that pr:

Unfortunately using greenback alongside sqlalchemy is not possible at this time. If the ephemeral application is running in the same thread as one in which we need to use greenback, then greenback and sqlalchemy will accidentally "mix" messages and crash each other.

There are a few options here:

  1. Open an issue on greenback and/or sqlalchemy and determine the best path towards isolation of their use of greenlets
  2. Run the ephemeral application in a separate thread (i.e. Run clients with ephemeral applications on the global event loop thread #9870); greenlets are isolated by thread boundaries
  3. Avoid use of greenlet entirely and just throw an exception if a user passes an asynchronous upstream to a blocking synchronous downstream task
  4. Learn a bunch about greenlets and vendor the relevant chunk of greenback with support for SQLAlchemy messages
  5. Make users await calls to synchronous tasks in asynchronous flows to avoid blocking the event loop in the first place; this would remove the need for greenback
  6. We could defer handling of tasks on the main thread for asynchronous flows and immediately release support for this feature in synchronous flows.
@jakekaplan jakekaplan added enhancement An improvement of an existing feature needs:triage labels Sep 5, 2023
@c-p-b
Copy link

c-p-b commented Sep 7, 2023

Would any new proposal need to not include greenback as a dependency? Or if introducing greenback, would greenlets need to be isolated as mentioned on the PR? Or would option 3 as @zanieb mentions be the most straightforward way, assuming that this kind of behavior is acceptable from Prefect's perspective?

I think for my use case, basically any option but 6 would work, but if we did 6 it would not solve my problem. 5 seems also acceptable from my perspective but I know little about how it would affect how other users use prefect. 4 sounds kind of painful and a lot of extra heavy lifting and maintenance, but I might be wrong on that, I know little about how that interaction would work. 3 seems like it would be the easiest, at the cost of removing some functionality, and 5 seems like the easiest to preserve functionality, at the cost of some ergonomics?

If we can define what exactly would be acceptable for the existing PR I can help drive what needs to be done :)

@jakekaplan
Copy link
Contributor Author

@cpdeethree thanks for your interest in this!

Ideally I would like to not introduce greenback as a dependency if it can be avoided, but it's not off the table. It does require some more complex maneuvering to either run the ephemeral api application on a separate thread (which is currently tied to the client, option2) or another way to split the client and server up.

I pretty much agree with your assessment of the following:

3 seems like it would be the easiest, at the cost of removing some functionality, and 5 seems like the easiest to preserve functionality, at the cost of some ergonomics?

I will investigate internally between 3 & 5 what makes the most sense. Will post back here start of next week

@jakekaplan
Copy link
Contributor Author

jakekaplan commented Sep 12, 2023

Update on this, after some internal discussion we've opted to investigate option 2. While that is the more complex option and we might not be able to prioritize it as soon we'd like, 3 or 5 may be considered a regression in behavior. Option 2 is also likely the best long term solution.

@c-p-b
Copy link

c-p-b commented Sep 14, 2023

In the interest of possibly seeing if option 1 is straightforward, I opened an issue on greenback to get feedback from their maintainers as well

@mikeoconnor0308
Copy link

Any update on this?

@zanieb
Copy link
Contributor

zanieb commented Feb 8, 2024

@jakekaplan oremanj/greenback#22 was just resolved! <3

@zzstoatzz
Copy link
Collaborator

reopening as we needed to revert #11930 in #12054 due to #12053

@zanieb
Copy link
Contributor

zanieb commented Feb 25, 2024

I've annotated the reproduction from #12053 with some details about what's going on:

from prefect import flow, task


@task
async def async_task():
    pass

@flow
async def async_flow():
    # The body of this flow is running in a new event loop on the main thread
    # The "waiter" for `sync_flow` is managing executing of this call

    # This task is scheduled to run on the main thread, its pushed to the main thread waiter's queue
    future = await async_task.submit()  

    # Here's the deadlock, we wait for the task above to finish. However, execution of this task 
    # needs to be performed by the waiter that is currently executing the body of this flow. 
    # Because this flow can't return until the task finishes, and the task cannot start until the
    # waiter is done executing this flow, we deadlock.
    await future.result()

    # The task _should_ be scheduled on the same thread as its parent flow, but it'd be nice to
    # avoid a deadlock. The root of the problem is that the task is being submitted to a
    # synchronous waiter which would execute the task in a new event loop. Instead, we want to 
    # submit it to the event loop that the flow is on.

@flow
def sync_flow():
    # This flow runs on the main thread, executed by the waiter created by `sync_flow`
    async_flow()


if __name__ == "__main__":
    # This flow on main thread, creates a waiter for work to be sent back to the main thread
    sync_flow()

And here's a commit that resolves the issue zanieb@25f12bc

In summary:

  1. Restores the behavior of always running asynchronous flows in synchronous flows in a new thread
  2. Stores the execution event loop of asynchronous flows on the context
  3. Runs asynchronous tasks in asynchronous flows on the flow's event loop
  4. Waits for tasks to finish from within the flow's execution thread to ensure they are done before we shut down the thread

I think it'd make sense to expand test coverage and talk a bit more about what's going on before trying to reintroduce the change. If it'd be helpful, I can put up a pull request for discussion. I haven't really polished the commit though.

@abrookins abrookins changed the title Allow for Tasks to run the on the Main Thread Allow for Tasks to run on the Main Thread Mar 11, 2024
@zhen0
Copy link
Member

zhen0 commented Mar 22, 2024

Thanks @zanieb! If you have time to put one up for discussion, we would welcome a PR from you any day! ❤️

@WillRaphaelson
Copy link
Contributor

This is supported in full in 3.x

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement An improvement of an existing feature
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants