Skip to content

[multiprocessing] Eager tensor support pickle #48179

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/paddle/fluid/dataloader/dataloader_iter.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import itertools
import threading
import numpy as np
import multiprocessing
from collections import namedtuple
from paddle.fluid.framework import (
_set_expected_place,
Expand Down Expand Up @@ -422,6 +421,8 @@ def __init__(self, loader):
self._shutdown = False

def _init_workers(self):
import paddle.incubate.multiprocessing as multiprocessing

# multiprocess worker and indice queue list initial as empty
self._workers = []
self._worker_status = []
Expand Down
16 changes: 7 additions & 9 deletions python/paddle/fluid/dataloader/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,21 +373,19 @@ def _worker_loop(
out_queue.put((idx, batch, None))
batch, structure = _flatten_batch(batch)
if use_shared_memory:
# NOTE: In eager mode, Tensor._share_memory has no
# effect, fall back to _array_to_share_memory_tensor
def tensor_share_memory(tensor):
if _in_eager_without_dygraph_check():
return core._array_to_share_memory_tensor(tensor)
return tensor._share_memory()

def numpy2lodtensor(arr):
lodtensor = core.Tensor()
lodtensor.set(arr, core.CPUPlace())
return lodtensor

tensor_list = [
core._array_to_share_memory_tensor(b)
numpy2lodtensor(b)
if isinstance(b, np.ndarray)
else tensor_share_memory(b)
else b.value().get_tensor()
for b in batch
]
out_queue.put((idx, tensor_list, structure))
core._remove_tensor_list_mmap_fds(tensor_list)
else:
out_queue.put((idx, batch, structure))
except KeyboardInterrupt:
Expand Down
1 change: 1 addition & 0 deletions python/paddle/incubate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from . import autotune # noqa: F401
from . import nn # noqa: F401
from . import asp # noqa: F401
from . import multiprocessing # noqa: F401

from ..fluid.layers.loss import identity_loss

Expand Down