Closed
Description
What are you really trying to do?
I have a sync
activity that triggers a utility method that needs to make sure it's not being called inside a workflow.
At a high level, the code is structured like this:
# activities.py
@activity.defn
def my_activity(arg: MyArg):
utility.do_stuff()
# utility.py
def do_stuff():
if workflow.in_workflow():
raise RuntimeError("Can not call from inside a workflow")
Describe the bug
The activity triggers the following error:
RuntimeError
no running event loop
which can be traced to the maybe_current()
method:
Minimal Reproduction
The example above should be enough to reproduce.
The worker is configured with a ThreadPoolExecutor
to execute the sync
activities.
Environment/Versions
- Temporal Version: Python 1.11.0
- Running inside Docker