Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Unity] Disco: A Framework-Agnostic SPMD Runtime for Distributed Inference/Training #15622

Merged
merged 5 commits into from
Aug 28, 2023

Conversation

junrushao
Copy link
Member

@junrushao junrushao commented Aug 25, 2023

Disco is a distributed runtime that consists of a controler and a cluster of workers. The
controler is responsible for managing the workers by broadcasting commands to all the workers
together, and the workers are responsible for executing the commands and. The controler and
workers communicate with each other through a bi-directional channel.

Different from a generic system, Disco is designed to as "single-program-multiple-data" (SPMD)
runtime, which means that all the workers execute the same instruction at the same time, but the
data they are working on may be different. For example, in data parallelism, each worker may
work on a different batches of the data, but they all execute the same set of instructions.
Therefore, imagine there is a virtual machine that executes the program, the structures of
workers' register files could be considered as "identical" (single program) although the values
may differ (multiple data).

DRef. Following the design above, consider the program in SPMD in a virtual ISA, then each
worker is a virtual machine instance to execute the ISA maintaining its own register file.
The controler denotes each of their register files with a unique integer "register id",
and the workers use this id to refer to the register file that resides on itself.
DRef is a control-side object backed by such a register id. The data it contains is not assumed
to be directly accessible by the controler, with an exception for worker-0, which is a special
worker that is always co-located with the controler.

Worker-0. Worker-0 is a special worker that is always co-located with the controler.
It is assumed that the controler can synchronize with and access the registers of worker-0.
The Disco session provides multiple APIs to interact specifically with the worker-0.
To shared data with other workers, a common paradigm in Disco is to copy data from the
controler-side NDArray to the worker-0, and then copy it to other workers using primitives on
the data plane, for example, broadcast and send.

Control plane. The controler broadcasts commands to all the workers as control signals.
For example, the control may ask all workers to load a library or call a function respectively.
Common control signals include: shutdown, retrievel a global PackedFunc, call packed function,
etc. The controler is assumed to keep a message channel to each worker to implement the broadcast
behavior, and the message channel may vary depends on usecases.

Data plane. The data channel is usually used to exchange data between workers, especially for
tensor data which is usually large. For example, performing an allreduce operator for sharded
matrix multiplication, or broadcasting for an input tensor. For efficiency, the data channel is
usually backed by NCCL on NVIDIA GPUs, RCCL on AMD GPUs, or MPI on CPUs.

Session. A Disco session is a primary interface to interact with the Disco runtime, serving
as a global context that manages the control and workers. It could be implemented as a
multi-threaded with a pool of workers for single-node multi-gpu scenarios, or TCP sockets for
workloads that span over a cluster of nodes.

Channel. Disco channel is a bi-directional communication channel between the controler and
workers for exchanging control signals. It is no different from a generic RPC channel, but
adopts TVM's PackedFunc calling convention to support polymorphic and variadic arguments.

Joined work with @LeshengJin

@tvm-bot
Copy link
Collaborator

tvm-bot commented Aug 25, 2023

Thanks for contributing to TVM! Please refer to the contributing guidelines https://tvm.apache.org/docs/contribute/ for useful information and tips. Please request code reviews from Reviewers by @-ing them in a comment.

Generated by tvm-bot

@junrushao junrushao marked this pull request as ready for review August 25, 2023 23:47
@junrushao junrushao changed the title [Unity] Disco: A framework-agnostic Runtime for Distributed Inference/Training [Unity] Disco: A Framework-Agnostic SPMD Runtime for Distributed Inference/Training Aug 25, 2023
@junrushao junrushao force-pushed the feature/2023-08-02/disco branch 4 times, most recently from 2a13c68 to eeff583 Compare August 26, 2023 15:20
python/tvm/runtime/disco/session.py Show resolved Hide resolved
python/tvm/runtime/disco/session.py Outdated Show resolved Hide resolved
python/tvm/runtime/disco/session.py Show resolved Hide resolved
python/tvm/runtime/disco/session.py Outdated Show resolved Hide resolved
python/tvm/runtime/disco/session.py Outdated Show resolved Hide resolved
src/runtime/disco/nccl/nccl.cc Show resolved Hide resolved
src/runtime/disco/worker.h Outdated Show resolved Hide resolved
src/runtime/disco/threaded_session.cc Show resolved Hide resolved
@junrushao junrushao force-pushed the feature/2023-08-02/disco branch 8 times, most recently from c66f5ab to d0cb92a Compare August 27, 2023 18:07
This PR introduces NCCL in the cmake system.
NCCL is NVIDIA's library for distributed communication.
This PR exposes `Module.GetFunction` as a global PackedFunc.
Previously, the only way to access this method is via TVM's
C API, but the C++ PackedFunc API is missing. This PR patches
this issue.
There exist some basic functionality to convert Device and DLDeviceType
to std::string, but they are not following the common naming convention
in TVM, and thus less discoverable. This commit makes changes
accordingly:
- `runtime::DeviceName` to `runtime::DLDeviceType2Str`
- move declaration of `operator << (std::ostream&, Device)` from
  `runtime/device_api.h` to `runtime/packed_func.h`
This PR introduces object support in TVM RPC protocol by introducing three
new interfaces in `rpc_reference.h`:
- `uint64_t GetObjectBytes(Object* obj)`, which is a required
  implementation that returns the length of the object during serialization;
- `void WriteObject(Object* obj)` used to serialize an object to a
  writable channel;
- `void ReadObject(int* type_code, TVMValue* value)`, which deserializes
  a TVM Object from a channel.

To serialize an object, a recommended paradigm is to write its
`type_index` first, and then its content. For example, `ShapeTuple` can
be serialized as:

```C++
// pseudocode
void WriteObject(Object* obj) {
  if (obj is ShapeTuple) {
    this->Write<uint32_t>(type_index of ShapeTuple);
    this->Write<int32_t>(obj->ndim);
    this->WriteArray<int64_t>(obj->shape);
  } else {
    throw Unsupported;
  }
}

uint64_t GetObjectBytes(Object* obj) {
  uint64_t result = 0;
  if (obj is ShapeTuple) {
    result += sizeof(uint32_t); # for `type_index`
    result += sizeof(int32_t);  # for `ndim`
    result += sizeof(int64_t) * obj->ndim; # for content of the shape
  } else {
    throw Unsupported;
  }
  return result;
}
```

To deserialize an object, similar to serialization, the recommended
approach paradigm is to read `type_index` and disptch based on it.

Caveat on deserialization: RPC Reference itself does not own or allocate
any memory to store objects, meaning extra logic is usually required in
`ReadObject` to keep their liveness.
@junrushao junrushao force-pushed the feature/2023-08-02/disco branch 2 times, most recently from 417f9f1 to ccae41c Compare August 27, 2023 19:07
Copy link
Contributor

@LeshengJin LeshengJin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work! Would bring more followup prs after the merge.

Copy link
Contributor

@jinhongyii jinhongyii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

include/tvm/runtime/disco/session.h Show resolved Hide resolved
…rence/Training

Disco is a distributed runtime that consists of a controler and a cluster of workers. The
controler is responsible for managing the workers by broadcasting commands to all the workers
together, and the workers are responsible for executing the commands and. The controler and
workers communicate with each other through a bi-directional channel.

Different from a generic system, Disco is designed to as "single-program-multiple-data" (SPMD)
runtime, which means that all the workers execute the same instruction at the same time, but the
data they are working on may be different. For example, in data parallelism, each worker may
work on a different batches of the data, but they all execute the same set of instructions.
Therefore, imagine there is a virtual machine that executes the program, the structures of
workers' register files could be considered as "identical" (single program) although the values
may differ (multiple data).

**DRef.** Following the design above, consider the program in SPMD in a virtual ISA, then each
worker is a virtual machine instance to execute the ISA maintaining its own register file.
The controler denotes each of their register files with a unique integer "register id",
and the workers use this id to refer to the register file that resides on itself.
DRef is a control-side object backed by such a register id. The data it contains is not assumed
to be directly accessible by the controler, with an exception for worker-0, which is a special
worker that is always co-located with the controler.

**Worker-0.** Worker-0 is a special worker that is always co-located with the controler.
It is assumed that the controler can synchronize with and access the registers of worker-0.
The Disco session provides multiple APIs to interact specifically with the worker-0.
To shared data with other workers, a common paradigm in Disco is to copy data from the
controler-side NDArray to the worker-0, and then copy it to other workers using primitives on
the data plane, for example, `broadcast` and `send`.

**Control plane.** The controler broadcasts commands to all the workers as control signals.
For example, the control may ask all workers to load a library or call a function respectively.
Common control signals include: shutdown, retrievel a global PackedFunc, call packed function,
etc. The controler is assumed to keep a message channel to each worker to implement the broadcast
behavior, and the message channel may vary depends on usecases.

**Data plane.** The data channel is usually used to exchange data between workers, especially for
tensor data which is usually large. For example, performing an allreduce operator for sharded
matrix multiplication, or broadcasting for an input tensor. For efficiency, the data channel is
usually backed by NCCL on NVIDIA GPUs, RCCL on AMD GPUs, or MPI on CPUs.

**Session.** A Disco session is a primary interface to interact with the Disco runtime, serving
as a global context that manages the control and workers. It could be implemented as a
multi-threaded with a pool of workers for single-node multi-gpu scenarios, or TCP sockets for
workloads that span over a cluster of nodes.

**Channel.** Disco channel is a bi-directional communication channel between the controler and
workers for exchanging control signals. It is no different from a generic RPC channel, but
adopts TVM's PackedFunc calling convention to support polymorphic and variadic arguments.

Co-Authored-by: Lesheng Jin <34279105+LeshengJin@users.noreply.github.com>
@junrushao junrushao merged commit d3856d3 into apache:unity Aug 28, 2023
1 check passed
@dtrealm
Copy link

dtrealm commented Nov 9, 2023

Great work, do you have any plans to implement MPI communication for multiple nodes?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants