Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

import of module '__prefect_loader__' failed when 'multiprocessing.Pool.apply_async' is used within the flow via remote deployments #9329

Open
4 tasks done
rsampaths16 opened this issue Apr 26, 2023 · 9 comments
Labels
bug Something isn't working needs:research Blocked by investigation into feasibility and cause

Comments

@rsampaths16
Copy link
Contributor

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn't find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

Invoked directly

If the flow is invoked directly via python python example.py then it runs successfully without any issues

Invoked via agent through a deployment

If a deployment is created and it is invoked by an agent then the issues occurs that __prefect_loader__ fails pickling of the function

Reproduction

from prefect import flow
from multiprocessing import Pool
import time


@flow
def foo(a):
    p = Pool(4)
    for i in range(a):
        p.apply_async(bar, (i+1,), callback=ok, error_callback=error)
    p.close()
    p.join()


def ok(x):
    print(f"I am ok {x}")


def error(x):
    print(f"I am error {x}")


def bar(b):
    print('hi')
    time.sleep(b)
    print('bye')


if __name__ == '__main__':
    foo(5)

Error

I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed

Versions

###### prefect versions
2.8.5
2.10.5

###### python versions
3.7.9

Additional context

###
### A complete description of a Prefect Deployment for flow 'foo'
###
name: example
description: null
version: 6c8636a7bb4d72afb9b92c3025675f05
# The work queue that will handle this deployment's runs
work_queue_name: test
work_pool_name: default-agent-pool
tags: []
parameters: {}
schedule: null
is_schedule_active: true
infra_overrides: {}

###
### DO NOT EDIT BELOW THIS LINE
###
flow_name: foo
manifest_path: null
infrastructure:
  type: process
  env: {}
  labels: {}
  name: null
  command: null
  stream_output: true
  working_dir: null
  _block_document_id: 762ff9aa-b9e3-4e34-b047-7a26018db371
  _block_document_name: anonymous-617e9166-a2a4-48cc-94b5-4c36a7fe7276
  _is_anonymous: true
  block_type_slug: process
  _block_type_slug: process
storage: null
path: /home/ubuntu/test
entrypoint: example.py:foo
parameter_openapi_schema:
  title: Parameters
  type: object
  properties:
    a:
      title: a
      position: 0
  required:
  - a
  definitions: null
timestamp: '2023-04-20T07:01:21.085073+00:00'
@rsampaths16 rsampaths16 added bug Something isn't working status:triage labels Apr 26, 2023
@zanieb zanieb added status:accepted needs:research Blocked by investigation into feasibility and cause and removed status:triage labels Apr 26, 2023
@zanieb
Copy link
Contributor

zanieb commented Apr 26, 2023

Thanks for the report! I'm not sure how best to resolve this; it's going to take some investigation — contributions welcome!

@rsampaths16
Copy link
Contributor Author

rsampaths16 commented Apr 26, 2023

Hey @madkinsz - the main reason it is failing is because - __prefect_loader__ module isn't available in sys.modules to find the function bar.

Since the entry prefect.engine is called from within a subprocess I don't think we need to clean-up __prefect_loader__ sys.module after loading the script.

prefect/src/prefect/engine.py

Lines 2203 to 2210 in 70ca8f1

try:
enter_flow_run_engine_from_subprocess(flow_run_id)
except Abort as exc:
engine_logger.info(
f"Engine execution of flow run '{flow_run_id}' aborted by orchestrator:"
f" {exc}"
)
exit(0)

sys.modules.pop("__prefect_loader__")
sys.path.remove(parent_path)
sys.path.remove(working_directory)

The simple solve will be to just remove sys.modules.pop("__prefect_loader__") and let it be part of the sys.modules;

This will not mess with other flows as it is invoked within a sub-process.

@rsampaths16
Copy link
Contributor Author

From following function usages - it is either invoked as an app-command or as a subprocess ( via prefect.engine );

@rsampaths16
Copy link
Contributor Author

Let me know if you have any concerns regarding this @madkinsz?

@zanieb
Copy link
Contributor

zanieb commented Apr 26, 2023

@rsampaths16 I'd be curious to see what happens when that line is removed. I don't remember the implications of it and it seems like there may be difficulty retrieving the pickled objects in some cases still.

@rsampaths16
Copy link
Contributor Author

@madkinsz - removing that line keeps the loaded module within sys.modules ( i.e., the function is available to be pickled ); if we are worried about overriding then we can use a mangled name instead __prefect_loader_uuid4__ or __prefect_loader_path__ without . in the name

@ori-scala
Copy link

ori-scala commented Feb 19, 2024

Hi @rsampaths16 @zanieb I've encountered the same issue and removing that line solves it. Any chance you can do this small patch and save the day?

update - my bad, it just raises a different issue.

Bottom line - we need a fix for the above issue as we can't run starmap in a flow.

@secrettoad
Copy link

fyi this is also affecting torch.save when run from within a prefect flow that remotely executes the torch code via dask

@EmilRex
Copy link
Contributor

EmilRex commented Oct 18, 2024

Another example that reproduces this (I believe):

import multiprocessing as mp
from prefect import flow

def _task(i):
    pass

@flow
def main():
    with mp.Pool(5) as pool:
        pool.map(_task, range(5))

if __name__ == "__main__":
    main.serve()

Running a deployment results in:

Your flow 'main' is being served and polling for scheduled runs!

To trigger a run for this flow, use the following command:

        $ prefect deployment run 'main/main'

You can also run your flow via the Prefect UI: http://127.0.0.1:4200/deployments/deployment/b8c1cba6-ccdc-494d-9b47-109ac2dd6fc5

14:29:50.799 | INFO    | prefect.flow_runs.runner - Runner 'main' submitting flow run 'e2b6cbac-c516-47fa-ae78-90638cf3af6a'
14:29:50.836 | INFO    | prefect.flow_runs.runner - Opening process...
14:29:50.846 | INFO    | prefect.flow_runs.runner - Completed submission of flow run 'e2b6cbac-c516-47fa-ae78-90638cf3af6a'
14:29:51.507 | INFO    | Flow run 'puzzling-aardwark' - Downloading flow code from storage at '.'
14:29:51.573 | ERROR   | Flow run 'puzzling-aardwark' - Encountered exception during execution: PicklingError("Can't pickle <function _task at 0x106dea980>: import of module '__prefect_loader__' failed")
Traceback (most recent call last):
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 655, in run_context
    yield self
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 699, in run_flow_sync
    engine.call_flow_fn()
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 678, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/utilities/callables.py", line 206, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/mre.py", line 11, in main
    pool.map(_task, range(5))
  File "/opt/homebrew/Cellar/python@3.12/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/pool.py", line 367, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/pool.py", line 774, in get
    raise self._value
  File "/opt/homebrew/Cellar/python@3.12/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/pool.py", line 540, in _handle_tasks
    put(task)
  File "/opt/homebrew/Cellar/python@3.12/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function _task at 0x106dea980>: import of module '__prefect_loader__' failed
14:29:51.594 | ERROR   | Flow run 'puzzling-aardwark' - Finished in state Failed("Flow run encountered an exception: PicklingError: Can't pickle <function _task at 0x106dea980>: import of module '__prefect_loader__' failed")
14:29:51.595 | ERROR   | prefect.engine - Engine execution of flow run 'e2b6cbac-c516-47fa-ae78-90638cf3af6a' exited with unexpected exception
Traceback (most recent call last):
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/engine.py", line 42, in <module>
    run_flow(flow, flow_run=flow_run)
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 821, in run_flow
    return run_flow_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 701, in run_flow_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 255, in result
    raise self._raised
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 655, in run_context
    yield self
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 699, in run_flow_sync
    engine.call_flow_fn()
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 678, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/.venv/lib/python3.12/site-packages/prefect/utilities/callables.py", line 206, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/emilchristensen/github/scratch/multiprocessing-imports/mre.py", line 11, in main
    pool.map(_task, range(5))
  File "/opt/homebrew/Cellar/python@3.12/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/pool.py", line 367, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/pool.py", line 774, in get
    raise self._value
  File "/opt/homebrew/Cellar/python@3.12/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/pool.py", line 540, in _handle_tasks
    put(task)
  File "/opt/homebrew/Cellar/python@3.12/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function _task at 0x106dea980>: import of module '__prefect_loader__' failed
14:29:51.715 | ERROR   | prefect.flow_runs.runner - Process for flow run 'puzzling-aardwark' exited with status code: 1

Version:

Version:             3.0.10
API version:         0.8.4
Python version:      3.12.7
Git commit:          3aa2d893
Built:               Tue, Oct 15, 2024 1:31 PM
OS/Arch:             darwin/arm64
Profile:             local
Server type:         server
Pydantic version:    2.9.2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs:research Blocked by investigation into feasibility and cause
Projects
None yet
Development

No branches or pull requests

6 participants