Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async context manager in flow causes RuntimeError: Event loop is closed #9412

Closed
4 tasks done
TWeatherston opened this issue May 3, 2023 · 9 comments · Fixed by #11930
Closed
4 tasks done

Async context manager in flow causes RuntimeError: Event loop is closed #9412

TWeatherston opened this issue May 3, 2023 · 9 comments · Fixed by #11930
Labels
bug Something isn't working great writeup This is a wonderful example of our standards

Comments

@TWeatherston
Copy link

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn't find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

I have a flow that makes numerous requests to the same API. I use a httpx client to make these requests and each request has its own task, with the client being passed as a parameter to the task.

This was working fine on Prefect version v2.8.4 but we recently upgraded to the latest version and this is now throwing a RuntimeError: Event loop is closed exception. I seem to get this error on anything >= v2.8.7.

If I change the task to a regular method then the flow completes just fine.

Reproduction

from prefect import flow, task
import asyncio
import httpx


@task()
async def query_api(client: httpx.AsyncClient):
    resp = await client.get("data")
    return resp


@flow()
async def my_flow():
    async with httpx.AsyncClient(base_url="https://example.com") as client:
        print(await query_api(client))


if __name__ == "__main__":
    asyncio.run(my_flow())

Error

Traceback (most recent call last):
  File "/home/project/atheon_prefect/flows/collection/my_flow.py", line 19, in <module>
    asyncio.run(my_flow())
  File "/home/.pyenv/versions/3.10.3/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/.pyenv/versions/3.10.3/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "/home/project/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 109, in wait_for_call_in_loop_thread
    return call.result()
  File "/home/project/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/home/.pyenv/versions/3.10.3/lib/python3.10/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/home/.pyenv/versions/3.10.3/lib/python3.10/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/home/project/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/home/project/.venv/lib/python3.10/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/project/.venv/lib/python3.10/site-packages/prefect/engine.py", line 259, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "/home/project/.venv/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/home/project/.venv/lib/python3.10/site-packages/prefect/engine.py", line 673, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/home/project/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/home/project/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/home/project/atheon_prefect/flows/collection/my_flow.py", line 14, in my_flow
    async with httpx.AsyncClient(base_url="https://example.com") as client:
  File "/home/project/.venv/lib/python3.10/site-packages/httpx/_client.py", line 2003, in __aexit__
    await self._transport.__aexit__(exc_type, exc_value, traceback)
  File "/home/project/.venv/lib/python3.10/site-packages/httpx/_transports/default.py", line 332, in __aexit__
    await self._pool.__aexit__(exc_type, exc_value, traceback)
  File "/home/project/.venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 318, in __aexit__
    await self.aclose()
  File "/home/project/.venv/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 305, in aclose
    await connection.aclose()
  File "/home/project/.venv/lib/python3.10/site-packages/httpcore/_async/connection.py", line 159, in aclose
    await self._connection.aclose()
  File "/home/project/.venv/lib/python3.10/site-packages/httpcore/_async/http11.py", line 232, in aclose
    await self._network_stream.aclose()
  File "/home/project/.venv/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 54, in aclose
    await self._stream.aclose()
  File "/home/project/.venv/lib/python3.10/site-packages/anyio/streams/tls.py", line 192, in aclose
    await self.transport_stream.aclose()
  File "/home/project/.venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 1323, in aclose
    self._transport.close()
  File "/home/.pyenv/versions/3.10.3/lib/python3.10/asyncio/selector_events.py", line 697, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/home/.pyenv/versions/3.10.3/lib/python3.10/asyncio/base_events.py", line 750, in call_soon
    self._check_closed()
  File "/home/.pyenv/versions/3.10.3/lib/python3.10/asyncio/base_events.py", line 515, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

Process finished with exit code 1

Versions

Version:             2.8.7
API version:         0.8.4
Python version:      3.10.3
Git commit:          a6d6c6fc
Built:               Thu, Mar 23, 2023 3:27 PM
OS/Arch:             linux/x86_64
Profile:             dev
Server type:         cloud

Additional context

No response

@TWeatherston TWeatherston added bug Something isn't working status:triage labels May 3, 2023
@zanieb
Copy link
Contributor

zanieb commented May 3, 2023

Hey @TWeatherston — Tasks and flows do not share an event loop at this time. It's on the roadmap to submit async tasks to the same event loop again.

I believe your client is binding the temporary event loop we create for the task then trying to use that same loop to handle context exit.

@zanieb zanieb added status:roadmap great writeup This is a wonderful example of our standards and removed status:triage labels May 3, 2023
@TWeatherston
Copy link
Author

@madkinsz Ah that's unfortunate but thanks for the explanation!

@waydegg
Copy link

waydegg commented May 11, 2023

What's the current solution to use a client (such as a database client) across/between flows and tasks?

@jiaulislam
Copy link

I'm also facing same issue for below. Is there any workaround for the issue for now?

from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
import httpx
import asyncio
from devtools import debug


@task(
    name="EBL_CHECK_ORDER_STATUS_API_TASK",
    description="Check the order status of an Order ID at EBL end",
)
async def call_ebl_order_status_api(order_id: str, client: httpx.AsyncClient):
    r = await client.get(
        f"http://localhost:8080/api/v1/ebl-pg/fetch-current-status/{order_id}"
    )
    if r.status_code != 200:
        return None
    return r.json()


@task(
        name="PLIL_PENDING_ORDERS"
)
def get_pending_orders_from_db(bank_code: str):
    results = [
        "plil00334055",
        "plil00333726",
        "plil00332425",
        "plil00332419",
        "plil00331457",
    ]
    return results


@flow(
    name="PaymentStatus Orchestration Flow",
    description="All PG/MFS/AGENT-BANKING payment status check",
    task_runner=ConcurrentTaskRunner
)
async def run_flows():
    pending_orders = get_pending_orders_from_db('004')
    api_coros = []
    async with httpx.AsyncClient() as client:
        for order in pending_orders:
            api_coros.append(call_ebl_order_status_api(order, client))

        results = await asyncio.gather(*api_coros)

        debug(results)


if __name__ == "__main__":
    asyncio.run(run_flows())

@zanieb
Copy link
Contributor

zanieb commented Jun 7, 2023

Will be closed by #9855

@maitlandmarshall
Copy link
Contributor

@zanieb unfortunately #9855 has gone stale and wasn't merged.

As a work around, use the async context manager within the task, rather than passing the client from the flow to the task:

def get_session():
    return AsyncClient(
        base_url=API_URL,
        headers={"Authorization": f"Bearer {API_ACCESS_TOKEN}"},
    )


@task(cache_expiration=timedelta(hours=1), cache_key_fn=task_input_hash)
async def get(endpoint: str, params=None) -> dict[str] | list[dict[str]]:
    async with get_session() as session:
        res = await session.get(endpoint, params=params)
        res.raise_for_status()

        return res.json()

@TWeatherston
Copy link
Author

@zanieb unfortunately #9855 has gone stale and wasn't merged.

As a work around, use the async context manager within the task, rather than passing the client from the flow to the task:

def get_session():
    return AsyncClient(
        base_url=API_URL,
        headers={"Authorization": f"Bearer {API_ACCESS_TOKEN}"},
    )


@task(cache_expiration=timedelta(hours=1), cache_key_fn=task_input_hash)
async def get(endpoint: str, params=None) -> dict[str] | list[dict[str]]:
    async with get_session() as session:
        res = await session.get(endpoint, params=params)
        res.raise_for_status()

        return res.json()

Yeah this is basically what we ended up having to do. Unfortunately using a client in this way negates most of the benefits of using a client, especially if you have a large number of requests to make in lots of separate tasks.

@NikiTsv
Copy link

NikiTsv commented Jan 11, 2024

Hello, I am having similar issue with SqlAlchemy/Pg sessions.
I have to create a brand new session with a brand new engine for each Prefect Task otherwise if I commit a session in one task I get this error in the next flow (atleast during tests).
If I create only a new session per task (without new engine/pool) then testing two flows sequentially it only works if I don't do any session commits in the first test.

RuntimeError: Task <Task pending name='Task-107' coro=<Call._run_async() running at /usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:345> cb=[_run_until_complete_cb() at /usr/local/lib/python3.11/asyncio/base_events.py:180]> got Future <Future pending cb=[Protocol._on_waiter_completed()]> attached to a different loop

asyncpg/protocol/protocol.pyx

More stack trace here:

prefect_app/flows/my_flow/test_flow.py:26: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py:182: in wait_for_call_in_loop_thread
    return call.result()
/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:282: in result
    return self.future.result(timeout=timeout)
/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:168: in result
    return self.__get_result()
/usr/local/lib/python3.11/concurrent/futures/_base.py:401: in __get_result
    raise self._exception
/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:345: in _run_async
    result = await coro
/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py:51: in with_injected_client
    return await fn(*args, **kwargs)
/usr/local/lib/python3.11/site-packages/prefect/engine.py:386: in create_then_begin_flow_run
    return await state.result(fetch=True)
/usr/local/lib/python3.11/site-packages/prefect/states.py:91: in _get_state_result
    raise await get_state_exception(state)
/usr/local/lib/python3.11/site-packages/prefect/engine.py:841: in orchestrate_flow_run
    result = await flow_call.aresult()
/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:291: in aresult
    return await asyncio.wrap_future(self.future)
/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:345: in _run_async
    result = await coro
prefect_app/flows/my_flow.py:18: in my_flow
    await handle_work(id)
/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py:182: in wait_for_call_in_loop_thread
    return call.result()
/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:282: in result
    return self.future.result(timeout=timeout)
/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:168: in result
    return self.__get_result()
/usr/local/lib/python3.11/concurrent/futures/_base.py:401: in __get_result
    raise self._exception
/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:345: in _run_async
    result = await coro
/usr/local/lib/python3.11/site-packages/prefect/engine.py:1324: in get_task_call_return_value
    return await future._result()
/usr/local/lib/python3.11/site-packages/prefect/futures.py:237: in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
/usr/local/lib/python3.11/site-packages/prefect/states.py:91: in _get_state_result
    raise await get_state_exception(state)
/usr/local/lib/python3.11/site-packages/prefect/engine.py:1760: in orchestrate_task_run
    result = await call.aresult()
/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:291: in aresult
    return await asyncio.wrap_future(self.future)
/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:345: in _run_async
    result = await coro
prefect_app/flows/utils.py:64: in wrapper
    return await func(*args, **kwargs)
prefect_app/flows/my_flow/flow.py:11: in handle_work
    await Service().do_work(
cz_core/app/service_layer/services/some_service/do_work.py:80: in handle_work
    await self.some_repo.get_existing_by_id(
cz_core/app/db/repositories/base_repository.py:263: in get_existing_by_id
    entity_db = await self.get_by_id(id=id)
cz_core/app/db/repositories/base_repository.py:260: in get_by_id
    return (await self.db.execute(query)).unique().scalar_one_or_none()
/usr/local/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py:454: in execute
    result = await greenlet_spawn(
/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py:190: in greenlet_spawn
    result = context.throw(*sys.exc_info())
/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py:2262: in execute
    return self._execute_internal(
/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py:2144: in _execute_internal
    result: Result[Any] = compile_state_cls.orm_execute_statement(
/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/context.py:293: in orm_execute_statement
    result = conn.execute(
/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py:1412: in execute
    return meth(
/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py:516: in _execute_on_connection
    return connection._execute_clauseelement(
/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py:1635: in _execute_clauseelement
    ret = self._execute_context(
/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py:1844: in _execute_context
    return self._exec_single_context(
/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py:1984: in _exec_single_context
    self._handle_dbapi_exception(
/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py:2342: in _handle_dbapi_exception
    raise exc_info[1].with_traceback(exc_info[2])
/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py:1965: in _exec_single_context
    self.dialect.do_execute(
/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py:921: in do_execute
    cursor.execute(statement, parameters)
/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:585: in execute
    self._adapt_connection.await_(
/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py:125: in await_only
    return current.driver.switch(awaitable)  # type: ignore[no-any-return]
/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py:185: in greenlet_spawn
    value = await result
/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:522: in _prepare_and_execute
    await adapt_connection._start_transaction()
/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:855: in _start_transaction
    self._handle_exception(error)
/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:804: in _handle_exception
    raise error
/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:853: in _start_transaction
    await self._transaction.start()
/usr/local/lib/python3.11/site-packages/asyncpg/transaction.py:138: in start
    await self._connection.execute(query)
/usr/local/lib/python3.11/site-packages/asyncpg/connection.py:317: in execute
    return await self._protocol.query(query, timeout)

This is how I start a session for every Prefect Task, using my own decorator:

def my_app_task(*task_args, **task_kwargs):
    def decorator(func):
        @task(*task_args, **task_kwargs)
        async def wrapper(*args, **kwargs):
            # assume this is the old code where engine is created **out/before** the task:
            # session = get_new_session_from_default_for_process()

            # this is the new code that works:
            session = get_session_with_new_engine(pool_size=1, max_overflow=3)
            with SqlAlchemyUnitOfWork(session=session):
                return await func(*args, **kwargs)
        return wrapper
    return decorator

I am a bit worried that the new approach might end up creating too many connections if many prefect jobs/tasks are executed concurrently.

Anyone with similar issue or someone that has an idea for a better approach of handling sessions/connections in Prefect and Prefect tests?

@TWeatherston
Copy link
Author

@NikiTsv I haven't tried using it myself but prefect-sqlalchemy might help make your life a bit easier? Looks like it's still a connection per task (recommended in the docs) but it handles all the connection/session stuff for you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working great writeup This is a wonderful example of our standards
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants