Skip to content

Commit dc1bd6a

Browse files
H-Huangfacebook-github-bot
authored andcommitted
Remove PROCESS GROUP rpc backend (pytorch#62411)
Summary: Pull Request resolved: pytorch#62411 Test Plan: Imported from OSS Reviewed By: mrshenli Differential Revision: D29990408 Pulled By: H-Huang fbshipit-source-id: 183d3b316767b12993cebbe32b73c2850fd1cc42
1 parent 2ec4f69 commit dc1bd6a

File tree

12 files changed

+23
-325
lines changed

12 files changed

+23
-325
lines changed

docs/source/rpc.rst

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -191,52 +191,6 @@ Example::
191191
:inherited-members:
192192

193193

194-
Process Group Backend
195-
"""""""""""""""""""""
196-
197-
.. warning ::
198-
The Process Group Backend will be deprecated soon, we recommend using the
199-
TensorPipe Backend instead.
200-
201-
The Process Group agent instantiates a process group from
202-
the :mod:`~torch.distributed` module and utilizes its point-to-point
203-
communication capabilities to send RPC messages. Internally, the process
204-
group uses `the Gloo library <https://github.com/facebookincubator/gloo/>`_.
205-
206-
Gloo has been hardened by years of extensive use in PyTorch and is thus very
207-
reliable. However, as it was designed to perform collective communication, it
208-
may not always be the best fit for RPC. For example, each networking operation
209-
is synchronous and blocking, which means that it cannot be run in parallel with
210-
others. Moreover, it opens a connection between all pairs of nodes, and brings
211-
down all of them when one fails, thus reducing the resiliency and the elasticity
212-
of the system.
213-
214-
Example::
215-
216-
>>> import os
217-
>>> from torch.distributed import rpc
218-
>>> os.environ['MASTER_ADDR'] = 'localhost'
219-
>>> os.environ['MASTER_PORT'] = '29500'
220-
>>>
221-
>>> rpc.init_rpc(
222-
>>> "worker1",
223-
>>> rank=0,
224-
>>> world_size=2,
225-
>>> backend=rpc.BackendType.PROCESS_GROUP,
226-
>>> rpc_backend_options=rpc.ProcessGroupRpcBackendOptions(
227-
>>> num_send_recv_threads=16,
228-
>>> rpc_timeout=20 # 20 second timeout
229-
>>> )
230-
>>> )
231-
>>>
232-
>>> # omitting init_rpc invocation on worker2
233-
234-
235-
.. autoclass:: ProcessGroupRpcBackendOptions
236-
:members:
237-
:inherited-members:
238-
239-
240194
.. _rref:
241195

242196
RRef

test/distributed/test_store.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ def test_init_pg_and_rpc_with_same_socket(self):
197197
world_size=1,
198198
)
199199

200-
backend_opts = rpc.ProcessGroupRpcBackendOptions(
200+
backend_opts = rpc.TensorPipeRpcBackendOptions(
201201
init_method=f"tcp://{addr}:{port}"
202202
)
203203
rpc.init_rpc(

torch/_C/_distributed_rpc.pyi

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ from ._distributed_c10d import ProcessGroup, Store
88

99
# This module is defined in torch/csrc/distributed/rpc/init.cpp
1010

11-
_DEFAULT_NUM_SEND_RECV_THREADS: int
1211
_DEFAULT_INIT_METHOD: str
1312
_DEFAULT_NUM_WORKER_THREADS: int
1413
_UNSET_RPC_TIMEOUT: float
@@ -66,36 +65,6 @@ class PyRRef:
6665
def __repr__(self) -> str: ...
6766
...
6867

69-
class ProcessGroupRpcBackendOptions(RpcBackendOptions):
70-
num_send_recv_threads: int
71-
def __init__(
72-
self,
73-
num_send_recv_threads: int,
74-
rpc_timeout: float,
75-
init_method: str
76-
): ...
77-
78-
class ProcessGroupAgent(RpcAgent):
79-
def __init__(
80-
self,
81-
store: Store,
82-
worker_name: str,
83-
pg: ProcessGroup,
84-
numSendRecvThreads: int,
85-
rpcTimeout: timedelta
86-
): ...
87-
@overload
88-
def get_worker_info(self) -> WorkerInfo: ...
89-
@overload
90-
def get_worker_info(self, workerName: str) -> WorkerInfo: ...
91-
@overload
92-
def get_worker_info(self, id: int) -> WorkerInfo: ...
93-
def get_worker_infos(self) -> List[WorkerInfo]: ...
94-
def _get_device_map(self, dst: WorkerInfo) -> Dict[torch.device, torch.device]: ...
95-
def join(self): ...
96-
def shutdown(self): ...
97-
def sync(self): ...
98-
9968
class _TensorPipeRpcBackendOptionsBase(RpcBackendOptions):
10069
num_worker_threads: int
10170
device_maps: Dict[str, Dict[torch.device, torch.device]]

torch/_C/_distributed_rpc_testing.pyi

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
import torch
22
from ._distributed_c10d import ProcessGroup, Store
33
from ._distributed_rpc import (
4-
ProcessGroupAgent,
5-
ProcessGroupRpcBackendOptions,
64
_TensorPipeRpcBackendOptionsBase,
75
TensorPipeAgent,
86
WorkerInfo,
@@ -12,21 +10,6 @@ from datetime import timedelta
1210

1311
# This module is defined in torch/csrc/distributed/rpc/testing/init.cpp
1412

15-
class FaultyProcessGroupRpcBackendOptions(ProcessGroupRpcBackendOptions):
16-
def __init__(
17-
self,
18-
num_send_recv_threads: int,
19-
rpc_timeout: float,
20-
init_method: str,
21-
messages_to_fail: List[str],
22-
messages_to_delay: Dict[str, float],
23-
num_fail_sends: int,
24-
): ...
25-
num_send_recv_threads: int
26-
messages_to_fail: List[str]
27-
messages_to_delay: Dict[str, float]
28-
num_fail_sends: int
29-
3013
class FaultyTensorPipeRpcBackendOptions(_TensorPipeRpcBackendOptionsBase):
3114
def __init__(
3215
self,
@@ -42,28 +25,6 @@ class FaultyTensorPipeRpcBackendOptions(_TensorPipeRpcBackendOptionsBase):
4225
messages_to_delay: Dict[str, float]
4326
num_fail_sends: int
4427

45-
class FaultyProcessGroupAgent(ProcessGroupAgent):
46-
def __init__(
47-
self,
48-
store: Store,
49-
name: str,
50-
process_group: ProcessGroup,
51-
num_send_recv_threads: int,
52-
rpc_timeout: timedelta,
53-
messages_to_fail: List[str],
54-
messages_to_delay: Dict[str, float],
55-
num_fail_sends: int,
56-
): ...
57-
def join(self): ...
58-
def shutdown(self): ...
59-
@overload
60-
def get_worker_info(self) -> WorkerInfo: ...
61-
@overload
62-
def get_worker_info(self, workerName: str) -> WorkerInfo: ...
63-
@overload
64-
def get_worker_info(self, id: int) -> WorkerInfo: ...
65-
def get_worker_infos(self) -> List[WorkerInfo]: ...
66-
6728
class FaultyTensorPipeAgent(TensorPipeAgent):
6829
def __init__(
6930
self,

torch/csrc/distributed/rpc/init.cpp

Lines changed: 0 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
#include <torch/csrc/python_headers.h>
22

3-
#include <torch/csrc/distributed/rpc/process_group_agent.h>
43
#include <torch/csrc/distributed/rpc/profiler/remote_profiler_manager.h>
54
#include <torch/csrc/distributed/rpc/profiler/server_process_global_profiler.h>
65
#include <torch/csrc/distributed/rpc/py_rref.h>
@@ -514,97 +513,6 @@ PyObject* rpc_init(PyObject* _unused, PyObject* noargs) {
514513
// not releasing GIL to avoid context switch
515514
.def("__repr__", &PyRRef::str);
516515

517-
shared_ptr_class_<ProcessGroupRpcBackendOptions>(
518-
module,
519-
"ProcessGroupRpcBackendOptions",
520-
rpcBackendOptions,
521-
R"(
522-
The backend options class for ``ProcessGroupAgent``, which is derived
523-
from ``RpcBackendOptions``.
524-
525-
Args:
526-
num_send_recv_threads (int, optional): The number of threads in
527-
the thread-pool used by ``ProcessGroupAgent`` (default: 4).
528-
rpc_timeout (float, optional): The default timeout, in seconds,
529-
for RPC requests (default: 60 seconds). If the
530-
RPC has not completed in this timeframe, an exception
531-
indicating so will be raised. Callers can override this
532-
timeout for individual RPCs in
533-
:meth:`~torch.distributed.rpc.rpc_sync` and
534-
:meth:`~torch.distributed.rpc.rpc_async` if necessary.
535-
init_method (str, optional): The URL to initialize
536-
``ProcessGroupGloo`` (default: ``env://``).
537-
)")
538-
.def(
539-
py::init<int, float, std::string>(),
540-
py::arg("num_send_recv_threads") = kDefaultNumSendRecvThreads,
541-
py::arg("rpc_timeout") = kDefaultRpcTimeoutSeconds,
542-
py::arg("init_method") = kDefaultInitMethod)
543-
.def_readwrite(
544-
"num_send_recv_threads",
545-
&ProcessGroupRpcBackendOptions::numSendRecvThreads,
546-
R"(
547-
The number of threads in the thread-pool used by ProcessGroupAgent.
548-
)");
549-
550-
module.attr("_DEFAULT_NUM_SEND_RECV_THREADS") =
551-
py::cast(kDefaultNumSendRecvThreads);
552-
553-
shared_ptr_class_<ProcessGroupAgent>(module, "ProcessGroupAgent", rpcAgent)
554-
.def(py::init([](const c10::intrusive_ptr<::c10d::Store>& store,
555-
std::string workerName,
556-
const c10::intrusive_ptr<::c10d::ProcessGroup>& pg,
557-
int numSendRecvThreads,
558-
std::chrono::milliseconds rpcTimeout) {
559-
return std::shared_ptr<ProcessGroupAgent>(
560-
new ProcessGroupAgent(
561-
store,
562-
std::move(workerName),
563-
pg,
564-
numSendRecvThreads,
565-
rpcTimeout,
566-
std::make_unique<RequestCallbackImpl>()),
567-
impl::destroy_without_gil<ProcessGroupAgent>);
568-
}))
569-
.def(
570-
"get_worker_info",
571-
(const WorkerInfo& (ProcessGroupAgent::*)(void) const) &
572-
RpcAgent::getWorkerInfo,
573-
py::call_guard<py::gil_scoped_release>())
574-
.def(
575-
"get_worker_info",
576-
(const WorkerInfo& (ProcessGroupAgent::*)(const std::string&) const) &
577-
ProcessGroupAgent::getWorkerInfo,
578-
py::call_guard<py::gil_scoped_release>())
579-
.def(
580-
"get_worker_info",
581-
(const WorkerInfo& (ProcessGroupAgent::*)(worker_id_t id) const) &
582-
ProcessGroupAgent::getWorkerInfo,
583-
py::call_guard<py::gil_scoped_release>())
584-
.def(
585-
"get_worker_infos",
586-
(std::vector<WorkerInfo>(ProcessGroupAgent::*)() const) &
587-
ProcessGroupAgent::getWorkerInfos,
588-
py::call_guard<py::gil_scoped_release>())
589-
.def(
590-
"_get_device_map",
591-
(DeviceMap(ProcessGroupAgent::*)(const WorkerInfo& dst) const) &
592-
ProcessGroupAgent::getDeviceMap,
593-
py::call_guard<py::gil_scoped_release>())
594-
.def(
595-
"join",
596-
&ProcessGroupAgent::join,
597-
py::call_guard<py::gil_scoped_release>(),
598-
py::arg("shutdown") = false)
599-
.def(
600-
"shutdown",
601-
&ProcessGroupAgent::shutdown,
602-
py::call_guard<py::gil_scoped_release>())
603-
.def(
604-
"sync",
605-
&ProcessGroupAgent::sync,
606-
py::call_guard<py::gil_scoped_release>());
607-
608516
#ifdef USE_TENSORPIPE
609517

610518
// Base class: torch.distributed.rpc.RpcBackendOptions.

torch/csrc/distributed/rpc/message.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ enum RPCErrorType {
1212
UNKNOWN_ERROR = 0, /* Indicates that error type could not be parsed */
1313
TIMEOUT = 1, /* Indicates that the RPC has timed out */
1414
INTENTIONAL_FAILURE = 2 /* Deliberate failure, such as those injected by
15-
FaultyProcessGroupAgent for testing */
15+
FaultyAgent for testing */
1616
};
1717

1818
// The enum values are bitwise ORed with MessageType

torch/csrc/distributed/rpc/testing/init.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ PyObject* faulty_agent_init(PyObject* _unused, PyObject* noargs) {
3131
"_distributed_rpc_testing", "distributed rpc testing bindings");
3232
auto module = py::handle(m).cast<py::module>();
3333

34-
// Import the rpc_module so we can subclass ProcessGroupAgent and
35-
// TensorPipeAgent
34+
// Import the rpc_module so we can subclass TensorPipeAgent
3635
py::module rpc_module = py::module::import("torch.distributed.rpc");
3736

3837
shared_ptr_class_<FaultyTensorPipeRpcBackendOptions>(

torch/distributed/rpc/__init__.py

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,12 @@ def is_available():
4949
enable_gil_profiling,
5050
RpcBackendOptions,
5151
_TensorPipeRpcBackendOptionsBase,
52-
ProcessGroupRpcBackendOptions,
5352
RpcAgent,
5453
PyRRef,
55-
ProcessGroupAgent,
5654
TensorPipeAgent,
5755
RemoteProfilerManager,
5856
WorkerInfo,
5957
_DEFAULT_INIT_METHOD,
60-
_DEFAULT_NUM_SEND_RECV_THREADS,
6158
_DEFAULT_NUM_WORKER_THREADS,
6259
_UNSET_RPC_TIMEOUT,
6360
_DEFAULT_RPC_TIMEOUT_SEC,
@@ -95,10 +92,9 @@ def init_rpc(
9592
Name can only contain number, alphabet, underscore, colon,
9693
and/or dash, and must be shorter than 128 characters.
9794
backend (BackendType, optional): The type of RPC backend
98-
implementation. Supported values include
99-
``BackendType.TENSORPIPE`` (the default) and
100-
``BackendType.PROCESS_GROUP``. See :ref:`rpc-backends` for more
101-
information.
95+
implementation. Supported values is
96+
``BackendType.TENSORPIPE`` (the default).
97+
See :ref:`rpc-backends` for more information.
10298
rank (int): a globally unique id/rank of this node.
10399
world_size (int): The number of workers in the group.
104100
rpc_backend_options (RpcBackendOptions, optional): The options
@@ -126,10 +122,7 @@ def init_rpc(
126122
"Argument rpc_backend_options must be an instance of RpcBackendOptions"
127123
)
128124

129-
# To avoid breaking users that passed a ProcessGroupRpcBackendOptions
130-
# without specifying the backend as PROCESS_GROUP when that was the
131-
# default, we try to detect the backend from the options when only the
132-
# latter is passed.
125+
# Try to detect the backend from the options
133126
if backend is None and rpc_backend_options is not None:
134127
for candidate_backend in BackendType:
135128
if isinstance(
@@ -159,13 +152,12 @@ def init_rpc(
159152
backend = BackendType.TENSORPIPE # type: ignore[attr-defined]
160153

161154
if backend == BackendType.PROCESS_GROUP: # type: ignore[attr-defined]
162-
warnings.warn(
163-
"RPC was initialized with the PROCESS_GROUP backend which is "
164-
"deprecated and slated to be removed and superseded by the TENSORPIPE "
165-
"backend. It is recommended to migrate to the TENSORPIPE backend. "
166-
"PyTorch v1.9 will be the last release that carries PROCESS_GROUP "
167-
"RPC backend. If you have concerns or suggestions please comment in "
168-
"https://github.com/pytorch/pytorch/issues/55615"
155+
raise RuntimeError(
156+
"RPC was initialized with the PROCESS_GROUP backend which has "
157+
"been removed and is superseded by the TENSORPIPE backend. "
158+
"Please migrate to the TENSORPIPE backend. "
159+
"PyTorch v1.9 was the last release that carries PROCESS_GROUP "
160+
"RPC backend."
169161
)
170162

171163
if rpc_backend_options is None:

0 commit comments

Comments
 (0)