-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
import of module '__prefect_loader__' failed when 'multiprocessing.Pool.apply_async' is used within the flow via remote deployments #9329
Comments
Thanks for the report! I'm not sure how best to resolve this; it's going to take some investigation — contributions welcome! |
Hey @madkinsz - the main reason it is failing is because - Since the entry Lines 2203 to 2210 in 70ca8f1
prefect/src/prefect/utilities/importtools.py Lines 166 to 168 in 70ca8f1
The simple solve will be to just remove This will not mess with other flows as it is invoked within a sub-process. |
From following function usages - it is either invoked as an app-command or as a subprocess ( via |
Let me know if you have any concerns regarding this @madkinsz? |
@rsampaths16 I'd be curious to see what happens when that line is removed. I don't remember the implications of it and it seems like there may be difficulty retrieving the pickled objects in some cases still. |
@madkinsz - removing that line keeps the loaded module within sys.modules ( i.e., the function is available to be pickled ); if we are worried about overriding then we can use a mangled name instead |
Hi @rsampaths16 @zanieb I've encountered the same issue and removing that line solves it. Any chance you can do this small patch and save the day? update - my bad, it just raises a different issue. Bottom line - we need a fix for the above issue as we can't run starmap in a flow. |
fyi this is also affecting torch.save when run from within a prefect flow that remotely executes the torch code via dask |
Another example that reproduces this (I believe): import multiprocessing as mp
from prefect import flow
def _task(i):
pass
@flow
def main():
with mp.Pool(5) as pool:
pool.map(_task, range(5))
if __name__ == "__main__":
main.serve() Running a deployment results in:
Version:
|
First check
Bug summary
Invoked directly
If the flow is invoked directly via python
python example.py
then it runs successfully without any issuesInvoked via agent through a deployment
If a deployment is created and it is invoked by an agent then the issues occurs that
__prefect_loader__
fails pickling of the functionReproduction
Error
Versions
Additional context
The text was updated successfully, but these errors were encountered: