-
Notifications
You must be signed in to change notification settings - Fork 2.5k
use all_gather to gather results from all gpus #383
Conversation
whats the advantage using one over the other? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return data_list | ||
|
||
|
||
def reduce_dict(input_dict, average=True): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I believe this is not used anywhere (it probably is due to a refactor in the engine/trainer?)
for _ in size_list: | ||
tensor_list.append(torch.ByteTensor(size=(max_size,)).to("cuda")) | ||
if local_size != max_size: | ||
padding = torch.ByteTensor(size=(max_size - local_size,)).to("cuda") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this could generate NaN because the data is unnitialized. This per se doesn't affect the overall results because we remove the padded value, but I'm not sure if it could cause problems with dist.all_gather
.
# gathering tensors of different shapes | ||
tensor_list = [] | ||
for _ in size_list: | ||
tensor_list.append(torch.ByteTensor(size=(max_size,)).to("cuda")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it would be good to use the new API for this:
device=torch.device("cuda")
...
torch.empty((max_size,), dtype=torch.uint8, device=device
tensor = torch.ByteTensor(storage).to("cuda") | ||
|
||
# obtain Tensor size of each rank | ||
local_size = torch.IntTensor([tensor.numel()]).to("cuda") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It would be good to replace the usages of IntTensor
/ByteTensor
with their new constructs. In this case (because it's initialized):
device = torch.device("cuda")
...
local_size = torch.tensor([tensor.numel()], dtype=torch.int32, device=device)
# or, if 0d tensors work with dist.all_gather
local_size = torch.tensor(tensor.numel(), dtype=torch.int32, device=device)
@qianyizhang the advantage is that it makes it possible to do multi-machine testing, which was not possible before. |
@fmassa I got the rendezvous complaining "RuntimeError: Address already in use", how to make it work? |
@qianyizhang yes, it's possible, but you need to change the |
# serialized to a Tensor | ||
buffer = pickle.dumps(data) | ||
storage = torch.ByteStorage.from_buffer(buffer) | ||
tensor = torch.ByteTensor(storage).to("cuda") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, @ppwwyyxx this will probably be problematic for large datasets, as we will run out of memory on the GPU when trying to perform this communication.
The idea that I had was to use shared memory on the CPU and communicate the address of the shared memory, but this doesn't work on the multiple-machine case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. But is there any way to do all-gather on CPUs (given that the dist backend was initialized with "nccl")?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With c10d, it is now possible to have more than one dist backend at a time. So one could potentially have one nccl backend and one mpi backend?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I am using these communication codes in other task. Because of the large size of each data
, I get OOM error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yelantingfeng I'd recommend either:
- reverting this change locally for now
- try creating a new process group with c10d which is on the CPU, and communicate this
data
on the CPU instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I am using these communication codes in other task. Because of the large size of each
data
, I get OOM error.
I got the OOM error. Have you implement the second method recommended by @fmassa ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yelantingfeng I'd recommend either:
- reverting this change locally for now
- try creating a new process group with c10d which is on the CPU, and communicate this
data
on the CPU instead
I think we could just set a memory limit for this all_gather function. I implement this by splitting those ByteTensors into chunks. After some tests, I found my implementation can limit the total usage of memory to MiB level. Though not 100% precise limit, but I think it should be useful enough. I would be glad to send a PR if you think this is a good improvement for this repository.
size_list = [torch.IntTensor([0]).to("cuda") for _ in range(world_size)] | ||
dist.all_gather(size_list, local_size) | ||
size_list = [int(size.item()) for size in size_list] | ||
max_size = max(size_list) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wat3rBro You can use dist.all_reduce(local_size, op=dist.ReduceOp.MAX)
here for a little less code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sizes of all ranks are needed later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see, because of pickle requiring the exact size and doesn't tolerate additional NULs. Thanks for clarifying.
HI, Can I use 'all_gather' to gather the weights of all gpus? I want to achieve in each batch, different gpu outputs different weights, and the loss will be calculated using all the weights. When I use 'all_gather', I found the output accumulated weights loss the grad_fn. |
from @ppwwyyxx