Skip to content

[Eager] eager tensor support pickler #47025

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions python/paddle/fluid/tests/unittests/test_paddle_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,12 @@
import time
import paddle
import paddle.incubate.multiprocessing as mp
from paddle.fluid.framework import (
_enable_legacy_dygraph,
_test_eager_guard,
in_dygraph_mode,
)

REPEAT = 20
HAS_SHM_FILES = os.path.isdir('/dev/shm')


def fill_tensor(queue, event):
# make sure run in legacy dygraph
if in_dygraph_mode():
_enable_legacy_dygraph()
data = queue.get()
with paddle.no_grad():
data[0][:] = 5
Expand Down Expand Up @@ -182,36 +174,24 @@ def test_receive():

class TestMultiprocessingCpu(TestMultiprocessingBase):
def func_test_pass_tensor(self):
if in_dygraph_mode():
return
paddle.set_device("cpu")
self._test_sharing(repeat=REPEAT)

def test_pass_tensor(self):
with _test_eager_guard():
self.func_test_pass_tensor()
self.func_test_pass_tensor()

def func_test_pass_parambase(self):
if in_dygraph_mode():
return
paddle.set_device("cpu")
self._test_sharing(repeat=1, param=True)

def test_pass_parambase(self):
with _test_eager_guard():
self.func_test_pass_parambase()
self.func_test_pass_parambase()

def func_test_pass_empty(self):
if in_dygraph_mode():
return
paddle.set_device("cpu")
self._test_empty()

def test_pass_empty(self):
with _test_eager_guard():
self.func_test_pass_empty()
self.func_test_pass_empty()


Expand All @@ -221,14 +201,10 @@ class TestMultiprocessingGpu(TestMultiprocessingBase):
"core is not compiled with CUDA",
)
def func_test_pass_tensor(self):
if in_dygraph_mode():
return
paddle.set_device("gpu")
self._test_sharing(mp.get_context("spawn"), "gpu")

def test_pass_tensor(self):
with _test_eager_guard():
self.func_test_pass_tensor()
self.func_test_pass_tensor()


Expand Down
48 changes: 25 additions & 23 deletions python/paddle/incubate/multiprocessing/reductions.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ def _supported_check():
return True


class LRUSharedCache(OrderedDict):
class _LRUSharedCache(OrderedDict):
def __init__(self):
self.limit = 128
self._after_fork()
register_after_fork(self, LRUSharedCache._after_fork)
register_after_fork(self, _LRUSharedCache._after_fork)

def _after_fork(self):
self.lock = threading.Lock()
Expand All @@ -73,25 +73,25 @@ def __setitem__(self, key, value):
super().__setitem__(key, value)


shared_cache = LRUSharedCache()
shared_cache = _LRUSharedCache()


def cuda_from_cache(key):
def _cuda_from_cache(key):
lodtensor = shared_cache.get(key)
if lodtensor is None:
return None
return lodtensor


def rebuild_tensor(cls, lodtensor, metadata):
if cls == paddle.fluid.framework.ParamBase:
tensor = paddle.fluid.framework.ParamBase(
def _rebuild_tensor(cls, lodtensor, metadata):
if cls == paddle.fluid.framework.EagerParamBase:
tensor = paddle.fluid.framework.EagerParamBase(
lodtensor.shape(), lodtensor._dtype(), **metadata
)
tensor.value().get_tensor()._share_data_with(lodtensor)
else:
size, stop_gradient = metadata
tensor = paddle.fluid.core.VarBase()
tensor = paddle.fluid.core.eager.Tensor()
if lodtensor._is_initialized():
tensor.value().get_tensor()._share_data_with(lodtensor)
else:
Expand All @@ -100,7 +100,7 @@ def rebuild_tensor(cls, lodtensor, metadata):
return tensor


def reduce_tensor(tensor):
def _reduce_tensor(tensor):
lodtensor = tensor.value().get_tensor()

if not tensor.stop_gradient and not tensor.is_leaf:
Expand All @@ -113,29 +113,29 @@ def reduce_tensor(tensor):
or tensor.place.is_gpu_place()
or tensor.place.is_cuda_pinned_place()
):
if type(tensor) == paddle.fluid.framework.ParamBase:
if type(tensor) == paddle.fluid.framework.EagerParamBase:
metadata = copy.deepcopy(tensor.__dict__)
else:
metadata = (tensor.size, tensor.stop_gradient)

return (rebuild_tensor, (type(tensor), lodtensor, metadata))
return (_rebuild_tensor, (type(tensor), lodtensor, metadata))
else:
raise ValueError(
"Only support tensors of CPU/CUDA/CUDAPinned Place, Not support %s for now!"
% tensor.place
)


def rebuild_lodtensor_filename(cls, ipc_name, size, type_idx, dims, lod):
def _rebuild_lodtensor_filename(cls, ipc_name, size, type_idx, dims, lod):
lodtensor = cls._new_shared_filename((ipc_name, size, type_idx, dims, lod))
lodtensor._shared_decref()
return lodtensor


def rebuild_cuda_tensor(
def _rebuild_cuda_tensor(
cls, handle, offset_bytes, size, type_idx, dims, lod, device_idx
):
cache_tensor = cuda_from_cache((handle, offset_bytes))
cache_tensor = _cuda_from_cache((handle, offset_bytes))
if cache_tensor is None:
lodtensor = cls._new_shared_cuda(
(handle, offset_bytes, size, type_idx, dims, lod, device_idx)
Expand All @@ -155,33 +155,33 @@ def rebuild_cuda_tensor(
return lodtensor


def rebuild_lodtensor_empty(cls):
def _rebuild_lodtensor_empty(cls):
# TODO: check if tensor initialized
# TODO: handle the dtype of empty tensor
return cls()


def reduce_lodtensor(lodtensor):
def _reduce_lodtensor(lodtensor):
if (
lodtensor._place().is_cpu_place()
or lodtensor._place().is_cuda_pinned_place()
):
for dim in lodtensor.shape():
if dim == 0:
# Empty tensors have nothing be mmapped.
return (rebuild_lodtensor_empty, (type(lodtensor),))
return (_rebuild_lodtensor_empty, (type(lodtensor),))

# Default use share filename stratege
metadata = (
lodtensor._share_filename()
) # ipc_name, size, type_idx, dims, lod
rebuild = rebuild_lodtensor_filename
rebuild = _rebuild_lodtensor_filename
lodtensor._shared_incref()
# TODO, maintain reference for lodtensor
# TODO: support file_discriptor stratege
elif lodtensor._place().is_gpu_place():
metadata = lodtensor._share_cuda()
rebuild = rebuild_cuda_tensor
rebuild = _rebuild_cuda_tensor
else:
raise RuntimeError("We only support pass cpu/gpu lodtensor for now!")

Expand All @@ -192,7 +192,9 @@ def init_reductions():
if not _supported_check():
return

ForkingPickler.register(paddle.Tensor, reduce_tensor)
ForkingPickler.register(paddle.fluid.core.VarBase, reduce_tensor)
ForkingPickler.register(paddle.fluid.framework.ParamBase, reduce_tensor)
ForkingPickler.register(paddle.fluid.core.LoDTensor, reduce_lodtensor)
ForkingPickler.register(paddle.Tensor, _reduce_tensor)
ForkingPickler.register(paddle.fluid.core.eager.Tensor, _reduce_tensor)
ForkingPickler.register(
paddle.fluid.framework.EagerParamBase, _reduce_tensor
)
ForkingPickler.register(paddle.fluid.core.LoDTensor, _reduce_lodtensor)