-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async context manager in flow causes RuntimeError: Event loop is closed #9412
Comments
Hey @TWeatherston — Tasks and flows do not share an event loop at this time. It's on the roadmap to submit async tasks to the same event loop again. I believe your client is binding the temporary event loop we create for the task then trying to use that same loop to handle context exit. |
@madkinsz Ah that's unfortunate but thanks for the explanation! |
What's the current solution to use a client (such as a database client) across/between flows and tasks? |
I'm also facing same issue for below. Is there any workaround for the issue for now? from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
import httpx
import asyncio
from devtools import debug
@task(
name="EBL_CHECK_ORDER_STATUS_API_TASK",
description="Check the order status of an Order ID at EBL end",
)
async def call_ebl_order_status_api(order_id: str, client: httpx.AsyncClient):
r = await client.get(
f"http://localhost:8080/api/v1/ebl-pg/fetch-current-status/{order_id}"
)
if r.status_code != 200:
return None
return r.json()
@task(
name="PLIL_PENDING_ORDERS"
)
def get_pending_orders_from_db(bank_code: str):
results = [
"plil00334055",
"plil00333726",
"plil00332425",
"plil00332419",
"plil00331457",
]
return results
@flow(
name="PaymentStatus Orchestration Flow",
description="All PG/MFS/AGENT-BANKING payment status check",
task_runner=ConcurrentTaskRunner
)
async def run_flows():
pending_orders = get_pending_orders_from_db('004')
api_coros = []
async with httpx.AsyncClient() as client:
for order in pending_orders:
api_coros.append(call_ebl_order_status_api(order, client))
results = await asyncio.gather(*api_coros)
debug(results)
if __name__ == "__main__":
asyncio.run(run_flows()) |
Will be closed by #9855 |
@zanieb unfortunately #9855 has gone stale and wasn't merged. As a work around, use the async context manager within the task, rather than passing the client from the flow to the task: def get_session():
return AsyncClient(
base_url=API_URL,
headers={"Authorization": f"Bearer {API_ACCESS_TOKEN}"},
)
@task(cache_expiration=timedelta(hours=1), cache_key_fn=task_input_hash)
async def get(endpoint: str, params=None) -> dict[str] | list[dict[str]]:
async with get_session() as session:
res = await session.get(endpoint, params=params)
res.raise_for_status()
return res.json() |
Yeah this is basically what we ended up having to do. Unfortunately using a client in this way negates most of the benefits of using a client, especially if you have a large number of requests to make in lots of separate tasks. |
Hello, I am having similar issue with SqlAlchemy/Pg sessions.
More stack trace here:
This is how I start a session for every Prefect Task, using my own decorator:
I am a bit worried that the new approach might end up creating too many connections if many prefect jobs/tasks are executed concurrently. Anyone with similar issue or someone that has an idea for a better approach of handling sessions/connections in Prefect and Prefect tests? |
@NikiTsv I haven't tried using it myself but prefect-sqlalchemy might help make your life a bit easier? Looks like it's still a connection per task (recommended in the docs) but it handles all the connection/session stuff for you. |
First check
Bug summary
I have a flow that makes numerous requests to the same API. I use a httpx client to make these requests and each request has its own task, with the client being passed as a parameter to the task.
This was working fine on Prefect version v2.8.4 but we recently upgraded to the latest version and this is now throwing a RuntimeError: Event loop is closed exception. I seem to get this error on anything >= v2.8.7.
If I change the task to a regular method then the flow completes just fine.
Reproduction
Error
Versions
Additional context
No response
The text was updated successfully, but these errors were encountered: