Skip to content

Commit

Permalink
Mixed precision DDP hang fix and fine-grained option for DDP perf (py…
Browse files Browse the repository at this point in the history
…torch#13496)

Summary:
When go to mixed precision fp16 training, DDP randomly hangs.  Initially, I thought this smells like a similar NCCL bug I filed a while ago. It turns out it's not. Again, I am seeing different rank process has different size.  How could this even happen?

It turns out that take_tensors will generate a list of bucketed tensors in an un deterministic order, because, the key to the map is a pointer.  An interesting bug digging and fix.

Now fp16 DDP training should be fully working now.

Also, added another take_tensor fine grained helper that aims to improve the performance of DDP, making it a TODO to replace the DDP take_tensors with that.

Fixed: pytorch#12150
Pull Request resolved: pytorch#13496

Differential Revision: D12920985

Pulled By: teng-li

fbshipit-source-id: 26f3edae7be45a80fa7b2410a2e5a1baab212d9c
  • Loading branch information
teng-li authored and facebook-github-bot committed Nov 6, 2018
1 parent 84cfc28 commit 7481908
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 34 deletions.
80 changes: 64 additions & 16 deletions test/test_c10d.py
Original file line number Diff line number Diff line change
Expand Up @@ -1175,30 +1175,78 @@ def test_nccl_backend(self):
self._test_ddp_with_process_group(process_group, list(map(lambda i: torch.device('cuda:' + str(i)), gpus)))

@skip_if_not_multigpu
def test_dist_broadcast_coalesced(self):
@skip_if_not_nccl
def test_dist_broadcast_coalesced_nccl(self):
store = c10d.FileStore(self.file.name)
process_group = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)

device = torch.device('cuda')

for fine_grained in [False, True]:
target = torch.arange(60, dtype=torch.float16, device=device).chunk(5)
target += torch.arange(60, dtype=torch.float32, device=device).chunk(5)
target += torch.arange(60, dtype=torch.float16, device=device).chunk(5)
target += torch.arange(60, dtype=torch.float64, device=device).chunk(5)
target += torch.arange(60, dtype=torch.float16, device=device).chunk(5)
target += torch.arange(60, dtype=torch.float32, device=device).chunk(5)

if self.is_master:
# All processes should have these tensors in the end.
tensors = target
else:
# Non-master processes start with empty tensors and should be
# filled with the tensors from the master.
tensors = torch.zeros(60, dtype=torch.float16, device=device).chunk(5)
tensors += torch.zeros(60, dtype=torch.float32, device=device).chunk(5)
tensors += torch.zeros(60, dtype=torch.float16, device=device).chunk(5)
tensors += torch.zeros(60, dtype=torch.float64, device=device).chunk(5)
tensors += torch.zeros(60, dtype=torch.float16, device=device).chunk(5)
tensors += torch.zeros(60, dtype=torch.float32, device=device).chunk(5)

c10d._dist_broadcast_coalesced(
process_group,
tensors,
buffer_size=256,
fine_grained=fine_grained)

self.assertEqual(tensors, target)

@skip_if_not_multigpu
def test_dist_broadcast_coalesced_gloo(self):
store = c10d.FileStore(self.file.name)
options = c10d.ProcessGroupGloo.Options()
options.devices = [c10d.ProcessGroupGloo.create_tcp_device(interface="lo")]
process_group = c10d.ProcessGroupGloo(store, self.rank, self.world_size, options)

device = torch.device('cuda')

target = torch.arange(10, dtype=torch.float64, device=device).chunk(5)

if self.is_master:
# All processes should have these tensors in the end.
tensors = target
else:
# Non-master processes start with empty tensors and should be
# filled with the tensors from the master.
tensors = torch.zeros(10, device=device).chunk(5)

c10d._dist_broadcast_coalesced(
process_group,
tensors,
buffer_size=10)
for fine_grained in [False, True]:
target = torch.arange(60, dtype=torch.float16, device=device).chunk(5)
target += torch.arange(60, dtype=torch.float32, device=device).chunk(5)
target += torch.arange(60, dtype=torch.float16, device=device).chunk(5)
target += torch.arange(60, dtype=torch.float64, device=device).chunk(5)
target += torch.arange(60, dtype=torch.float16, device=device).chunk(5)
target += torch.arange(60, dtype=torch.float32, device=device).chunk(5)

if self.is_master:
# All processes should have these tensors in the end.
tensors = target
else:
# Non-master processes start with empty tensors and should be
# filled with the tensors from the master.
tensors = torch.zeros(60, dtype=torch.float16, device=device).chunk(5)
tensors += torch.zeros(60, dtype=torch.float32, device=device).chunk(5)
tensors += torch.zeros(60, dtype=torch.float16, device=device).chunk(5)
tensors += torch.zeros(60, dtype=torch.float64, device=device).chunk(5)
tensors += torch.zeros(60, dtype=torch.float16, device=device).chunk(5)
tensors += torch.zeros(60, dtype=torch.float32, device=device).chunk(5)

c10d._dist_broadcast_coalesced(
process_group,
tensors,
buffer_size=128,
fine_grained=fine_grained)

if not self.is_master:
self.assertEqual(tensors, target)

@skip_if_not_multigpu
Expand Down
6 changes: 4 additions & 2 deletions torch/csrc/distributed/c10d/ddp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ void copyBroadcastTensorsToReplicas(
void distBroadcastCoalesced(
ProcessGroup& processGroup,
std::vector<at::Tensor>& tensors,
int64_t bufferSize) {
auto tensorGroups = torch::utils::take_tensors(tensors, bufferSize);
int64_t bufferSize,
bool fineGrained) {
auto tensorGroups = torch::utils::take_tensors(
tensors, bufferSize, fineGrained);
// We store single-element vectors in `flatTensors` because
// `ProcessGroup::broadcast` takes a reference to a vector, which must be
// alive until the `wait()` call on the returned `Work` completes.
Expand Down
3 changes: 2 additions & 1 deletion torch/csrc/distributed/c10d/ddp.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ namespace c10d {
void distBroadcastCoalesced(
ProcessGroup& processGroup,
std::vector<at::Tensor>& tensors,
int64_t bufferSize);
int64_t bufferSize,
bool fineGrained = false);

void syncParams(
ProcessGroup& processGroup,
Expand Down
1 change: 1 addition & 0 deletions torch/csrc/distributed/c10d/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ PyObject* c10d_init(PyObject* _unused) {
py::arg("process_group"),
py::arg("tensors"),
py::arg("buffer_size"),
py::arg("fine_grained"),
py::call_guard<py::gil_scoped_release>());

module.def(
Expand Down
50 changes: 37 additions & 13 deletions torch/csrc/utils/tensor_flatten.cpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
#include "torch/csrc/utils/tensor_flatten.h"

#include <map>
#include <unordered_map>

namespace torch { namespace utils {

using namespace at;

std::vector<TensorGroup> take_tensors(TensorList tensors, size_t size_limit) {
std::vector<TensorGroup> take_tensors(
TensorList tensors,
size_t size_limit,
bool fine_grained) {
std::vector<TensorGroup> results;
results.reserve(tensors.size()); // an overapproximation, but at least we won't have to copy stuff around
std::unordered_map<at::Type*, TensorGroup> groups;
// an overapproximation, but at least we won't have to copy stuff around
results.reserve(tensors.size());
std::map<TypeID, TensorGroup> groups;
size_t cur_group_size = 0;

for (const auto & tensor : tensors) {
auto & type = tensor.type();
auto& type = tensor.type();
size_t tensor_size;
if (type.is_sparse()) {
const auto& indices = tensor._indices();
Expand All @@ -21,20 +28,37 @@ std::vector<TensorGroup> take_tensors(TensorList tensors, size_t size_limit) {
} else {
tensor_size = tensor.numel() * type.elementSizeInBytes();
}
auto & type_group = groups[&type];

auto& type_group = groups[type.ID()];
type_group.tensors.push_back(tensor);
type_group.size += tensor_size;
if (type_group.size + tensor_size >= size_limit) {
results.emplace_back();
std::swap(results.back(), type_group);

if (fine_grained) {
cur_group_size += tensor_size;
// Regardless the type, the current total size exceeds the limit
if (cur_group_size >= size_limit) {
// Spill all types to separate groups in results
for (auto& entry : groups) {
auto& group = entry.second;
results.emplace_back(std::move(group));
}
cur_group_size = 0;
groups.clear();
}
} else {
type_group.size += tensor_size;
if (type_group.size >= size_limit) {
results.emplace_back();
std::swap(results.back(), type_group);
}
}
}
// End case. Look for any remaining groups and return them.
for (auto & entry : groups) {
auto & group = entry.second;
if (group.size > 0) {
results.emplace_back(std::move(group));
for (auto& entry : groups) {
auto& group = entry.second;
if (!fine_grained && group.size == 0) {
continue;
}
results.emplace_back(std::move(group));
}
return results;
}
Expand Down
28 changes: 27 additions & 1 deletion torch/csrc/utils/tensor_flatten.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,33 @@ struct TensorGroup {
}
};

std::vector<TensorGroup> take_tensors(at::TensorList tensors, size_t size_limit);
// Helper function that takes a list of tensors and splits them into tensor
// groups by the size limit and outputs these tensor groups. If the input
// tensors are of different tensor types, they will be split into different
// groups as well.
//
// Two options of splitting provided to the user,
//
// Imagine the size_limit is 256 and the list of input tensors are:
// tensor_a(fp16 - 128 bytes),
// tensor_b(fp32 - 256 bytes),
// tensor_c(fp16 - 128 bytes),
//
// when fine_grained == false:
// The function will read the list of tensors sequentially and accumulate
// enough tensors for each data type until the size_limit, therefore:
// it will output: {{tensor_a, tensor_c}, {tensor_b}}
//
// when fine_grained == true:
// The function will read the list of tensors sequentially and accumulate
// enough tensors for all data types until the size_limit, and then split
// the accumulated tensors into different groups by data types, therefore:
// it will output: {{tensor_a}, {tensor_b}, {tensor_c}}
std::vector<TensorGroup> take_tensors(
at::TensorList tensors,
size_t size_limit,
bool fine_grained = false);

void reorder_tensors_like(std::vector<at::Tensor>& tensors, at::TensorList order);

std::pair<at::Tensor, at::Tensor> flatten_sparse_tensors(at::TensorList tensors);
Expand Down
4 changes: 3 additions & 1 deletion torch/nn/parallel/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ def __init__(self, module, device_ids=None,
# This is a triply-nested list where the "dimensions" are: devices, buckets, bucket_elems
param_buckets = []
# Split the parameters into buckets and by types as well
# TODO: use take_tensor finegrained to provide better overlapping for
# mixed precision training
param_buckets = [list(_take_tensors(m.parameters(), bucket_bytes_cap)) for m in self._module_copies]

self.bucket_sizes = []
Expand Down Expand Up @@ -261,7 +263,7 @@ def train(self, mode=True):
module.train(mode)

def _dist_broadcast_coalesced(self, tensors, buffer_size):
dist._dist_broadcast_coalesced(self.process_group, tensors, buffer_size)
dist._dist_broadcast_coalesced(self.process_group, tensors, buffer_size, False)

def _sync_params(self):
if len(self.device_ids) > 1:
Expand Down

0 comments on commit 7481908

Please sign in to comment.