diff --git a/torch/csrc/distributed/c10d/Ops.cpp b/torch/csrc/distributed/c10d/Ops.cpp index 425c3c25f8544..dc978e958acdb 100644 --- a/torch/csrc/distributed/c10d/Ops.cpp +++ b/torch/csrc/distributed/c10d/Ops.cpp @@ -62,785 +62,359 @@ namespace ops { // Below are ProcessGroup's corresponding ops for each backend. Ops are but // routed through the dispatcher to be dispatched to the appropriate backend. // Currently a no-op as the process group does not have a list of backends. -c10::intrusive_ptr send_cpu( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - int64_t dstRank, - int64_t tag) { - auto tensor_vec = tensors.vec(); - return process_group->getBackend(c10::DeviceType::CPU) - ->send(tensor_vec, static_cast(dstRank), static_cast(tag)); -} - -c10::intrusive_ptr send_cuda( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - int64_t dstRank, - int64_t tag) { - auto tensor_vec = tensors.vec(); - return process_group->getBackend(c10::DeviceType::CUDA) - ->send(tensor_vec, static_cast(dstRank), static_cast(tag)); -} - -c10::intrusive_ptr send_privateuse1( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - int64_t dstRank, - int64_t tag) { - auto tensor_vec = tensors.vec(); - return process_group->getBackend(c10::DeviceType::PrivateUse1) - ->send(tensor_vec, static_cast(dstRank), static_cast(tag)); -} - -c10::intrusive_ptr recv_cpu_( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - int64_t srcRank, - int64_t tag) { - auto tensor_vec = tensors.vec(); - return process_group->getBackend(c10::DeviceType::CPU) - ->recv(tensor_vec, static_cast(srcRank), static_cast(tag)); -} - -c10::intrusive_ptr recv_cuda_( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - int64_t srcRank, - int64_t tag) { - auto tensor_vec = tensors.vec(); - return process_group->getBackend(c10::DeviceType::CUDA) - ->recv(tensor_vec, static_cast(srcRank), static_cast(tag)); -} - -c10::intrusive_ptr recv_privateuse1_( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - int64_t srcRank, - int64_t tag) { - auto tensor_vec = tensors.vec(); - return process_group->getBackend(c10::DeviceType::PrivateUse1) - ->recv(tensor_vec, static_cast(srcRank), static_cast(tag)); -} - -c10::intrusive_ptr recv_any_source_cpu_( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - int64_t tag) { - auto tensor_vec = tensors.vec(); - return process_group->getBackend(c10::DeviceType::CPU) - ->recvAnysource(tensor_vec, static_cast(tag)); -} - -c10::intrusive_ptr recv_any_source_cuda_( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - int64_t tag) { - auto tensor_vec = tensors.vec(); - return process_group->getBackend(c10::DeviceType::CUDA) - ->recvAnysource(tensor_vec, static_cast(tag)); -} - -c10::intrusive_ptr recv_any_source_privateuse1_( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - int64_t tag) { - auto tensor_vec = tensors.vec(); - return process_group->getBackend(c10::DeviceType::PrivateUse1) - ->recvAnysource(tensor_vec, static_cast(tag)); -} - -c10::intrusive_ptr reduce_cpu_( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - const c10::intrusive_ptr& reduce_op, - int64_t root_rank, - int64_t root_tensor, - int64_t timeout) { - auto tensor_vec = tensors.vec(); - return process_group->getBackend(c10::DeviceType::CPU) - ->reduce( - tensor_vec, - ReduceOptions{ - *reduce_op.get(), - root_rank, - root_tensor, - std::chrono::milliseconds(timeout)}); -} - -c10::intrusive_ptr reduce_cuda_( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - const c10::intrusive_ptr& reduce_op, - int64_t root_rank, - int64_t root_tensor, - int64_t timeout) { - auto tensor_vec = tensors.vec(); - return process_group->getBackend(c10::DeviceType::CUDA) - ->reduce( - tensor_vec, - ReduceOptions{ - *reduce_op.get(), - root_rank, - root_tensor, - std::chrono::milliseconds(timeout)}); -} - -c10::intrusive_ptr reduce_privateuse1_( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - const c10::intrusive_ptr& reduce_op, - int64_t root_rank, - int64_t root_tensor, - int64_t timeout) { - auto tensor_vec = tensors.vec(); - return process_group->getBackend(c10::DeviceType::PrivateUse1) - ->reduce( - tensor_vec, - c10d::ReduceOptions{ - *reduce_op.get(), - root_rank, - root_tensor, - std::chrono::milliseconds(timeout)}); -} - -std::tuple, c10::intrusive_ptr> broadcast_cpu_( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - int64_t root_rank, - int64_t root_tensor, - int64_t timeout) { - auto tensor_vec = tensors.vec(); - auto work = - process_group->getBackend(c10::DeviceType::CPU) - ->broadcast( - tensor_vec, - BroadcastOptions{ - root_rank, root_tensor, std::chrono::milliseconds(timeout)}); - - return std::tuple, c10::intrusive_ptr>( - std::move(tensor_vec), work); -} - -std::tuple, c10::intrusive_ptr> broadcast_cuda_( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - int64_t root_rank, - int64_t root_tensor, - int64_t timeout) { - auto tensor_vec = tensors.vec(); - auto work = - process_group->getBackend(c10::DeviceType::CUDA) - ->broadcast( - tensor_vec, - BroadcastOptions{ - root_rank, root_tensor, std::chrono::milliseconds(timeout)}); - - return std::tuple, c10::intrusive_ptr>( - std::move(tensor_vec), work); -} - -std::tuple, c10::intrusive_ptr> -broadcast_privateuse1_( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - int64_t root_rank, - int64_t root_tensor, - int64_t timeout) { - auto tensor_vec = tensors.vec(); - auto work = - process_group->getBackend(c10::DeviceType::PrivateUse1) - ->broadcast( - tensor_vec, - c10d::BroadcastOptions{ - root_rank, root_tensor, std::chrono::milliseconds(timeout)}); - - return std::tuple, c10::intrusive_ptr>( - std::move(tensor_vec), work); -} - -std::tuple, c10::intrusive_ptr> allreduce_cpu_( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - const c10::intrusive_ptr& reduce_op, - int64_t timeout) { - auto tensor_vec = tensors.vec(); - auto work = - process_group->getBackend(c10::DeviceType::CPU) - ->allreduce( - tensor_vec, - AllreduceOptions{ - *reduce_op.get(), std::chrono::milliseconds(timeout)}); - - // Return input tensors as output tensors to make inplace allreduce look like - // a functional API, so that make_fx can correctly build the dependencies in - // the graph later. - return std::tuple, c10::intrusive_ptr>( - std::move(tensor_vec), work); -} - -std::tuple, c10::intrusive_ptr> allreduce_cuda_( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - const c10::intrusive_ptr& reduce_op, - int64_t timeout) { - auto tensor_vec = tensors.vec(); - auto work = - process_group->getBackend(c10::DeviceType::CUDA) - ->allreduce( - tensor_vec, - AllreduceOptions{ - *reduce_op.get(), std::chrono::milliseconds(timeout)}); - - // Return input tensors as output tensors to make inplace allreduce look like - // a functional API, so that make_fx can correctly build the dependencies in - // the graph later. - return std::tuple, c10::intrusive_ptr>( - std::move(tensor_vec), work); -} - -std::tuple, c10::intrusive_ptr> -allreduce_privateuse1_( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - const c10::intrusive_ptr& reduce_op, - int64_t timeout) { - auto tensor_vec = tensors.vec(); - auto work = - process_group->getBackend(c10::DeviceType::PrivateUse1) - ->allreduce( - tensor_vec, - c10d::AllreduceOptions{ - *reduce_op.get(), std::chrono::milliseconds(timeout)}); - - // Return input tensors as output tensors to make inplace allreduce look like - // a functional API, so that make_fx can correctly build the dependencies in - // the graph later. - return std::tuple, c10::intrusive_ptr>( - std::move(tensor_vec), work); -} - -c10::intrusive_ptr allreduce_coalesced_cpu_( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - const c10::intrusive_ptr& reduce_op, - int64_t timeout) { - auto tensor_vec = tensors.vec(); - AllreduceCoalescedOptions opts = AllreduceCoalescedOptions{}; - opts.reduceOp = *reduce_op.get(); - opts.timeout = std::chrono::milliseconds(timeout); - - return process_group->getBackend(c10::DeviceType::CPU) - ->allreduce_coalesced(tensor_vec, opts); -} - -c10::intrusive_ptr allreduce_coalesced_cuda_( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - const c10::intrusive_ptr& reduce_op, - int64_t timeout) { - auto tensor_vec = tensors.vec(); - AllreduceCoalescedOptions opts = AllreduceCoalescedOptions{}; - opts.reduceOp = *reduce_op.get(); - opts.timeout = std::chrono::milliseconds(timeout); - - return process_group->getBackend(c10::DeviceType::CUDA) - ->allreduce_coalesced(tensor_vec, opts); -} - -c10::intrusive_ptr allreduce_coalesced_privateuse1_( - at::TensorList tensors, - const c10::intrusive_ptr& process_group, - const c10::intrusive_ptr& reduce_op, - int64_t timeout) { - auto tensor_vec = tensors.vec(); - c10d::AllreduceCoalescedOptions opts = c10d::AllreduceCoalescedOptions{}; - opts.reduceOp = *reduce_op.get(); - opts.timeout = std::chrono::milliseconds(timeout); - - return process_group->getBackend(c10::DeviceType::PrivateUse1) - ->allreduce_coalesced(tensor_vec, opts); -} - -std::tuple>, c10::intrusive_ptr> -allgather_cpu_( - const std::vector>& output_tensors, - at::TensorList input_tensors, - const c10::intrusive_ptr& process_group, - int64_t timeout) { - auto input_tensors_vec = input_tensors.vec(); - auto work = - process_group->getBackend(c10::DeviceType::CPU) - ->allgather( - const_cast>&>(output_tensors), - input_tensors_vec, - AllgatherOptions{std::chrono::milliseconds(timeout)}); - - // Copy output tensors (not storage) so that this can be used in a functional - // manner - return std:: - tuple>, c10::intrusive_ptr>( - output_tensors, work); -} - -std::tuple>, c10::intrusive_ptr> -allgather_cuda_( - const std::vector>& output_tensors, - at::TensorList input_tensors, - const c10::intrusive_ptr& process_group, - int64_t timeout) { - auto input_tensors_vec = input_tensors.vec(); - auto work = - process_group->getBackend(c10::DeviceType::CUDA) - ->allgather( - const_cast>&>(output_tensors), - input_tensors_vec, - AllgatherOptions{std::chrono::milliseconds(timeout)}); - - // Copy output tensors (not storage) so that this can be used in a functional - // manner - return std:: - tuple>, c10::intrusive_ptr>( - output_tensors, work); -} - -std::tuple>, c10::intrusive_ptr> -allgather_privateuse1_( - const std::vector>& output_tensors, - at::TensorList input_tensors, - const c10::intrusive_ptr& process_group, - int64_t timeout) { - auto input_tensors_vec = input_tensors.vec(); - auto work = - process_group->getBackend(c10::DeviceType::PrivateUse1) - ->allgather( - const_cast>&>(output_tensors), - input_tensors_vec, - c10d::AllgatherOptions{std::chrono::milliseconds(timeout)}); - - // Copy output tensors (not storage) so that this can be used in a functional - // manner - return std::tuple< - std::vector>, - c10::intrusive_ptr>(output_tensors, work); -} +#define IMPL_SEND(DEV) \ + c10::intrusive_ptr send##DEV( \ + at::TensorList tensors, \ + const c10::intrusive_ptr& process_group, \ + int64_t dstRank, \ + int64_t tag) { \ + auto tensor_vec = tensors.vec(); \ + return process_group->getBackend(c10::DeviceType::DEV) \ + ->send(tensor_vec, static_cast(dstRank), static_cast(tag)); \ + } -std::tuple> _allgather_base_cpu_( - at::Tensor& output_tensor, - at::Tensor& input_tensor, - const c10::intrusive_ptr& process_group) { - auto work = process_group->getBackend(c10::DeviceType::CPU) - ->_allgather_base(output_tensor, input_tensor); +IMPL_SEND(CPU) +IMPL_SEND(CUDA) +IMPL_SEND(PrivateUse1) + +#define IMPL_RECV(DEV) \ + c10::intrusive_ptr recv_##DEV( \ + at::TensorList tensors, \ + const c10::intrusive_ptr& process_group, \ + int64_t srcRank, \ + int64_t tag) { \ + auto tensor_vec = tensors.vec(); \ + return process_group->getBackend(c10::DeviceType::DEV) \ + ->recv(tensor_vec, static_cast(srcRank), static_cast(tag)); \ + } - return std::tuple>(output_tensor, work); -} +IMPL_RECV(CPU) +IMPL_RECV(CUDA) +IMPL_RECV(PrivateUse1) + +#define IMPL_RECV_ANY_SOURCE(DEV) \ + c10::intrusive_ptr recv_any_source_##DEV( \ + at::TensorList tensors, \ + const c10::intrusive_ptr& process_group, \ + int64_t tag) { \ + auto tensor_vec = tensors.vec(); \ + return process_group->getBackend(c10::DeviceType::DEV) \ + ->recvAnysource(tensor_vec, static_cast(tag)); \ + } -std::tuple> _allgather_base_cuda_( - at::Tensor& output_tensor, - at::Tensor& input_tensor, - const c10::intrusive_ptr& process_group) { - auto work = process_group->getBackend(c10::DeviceType::CUDA) - ->_allgather_base(output_tensor, input_tensor); +IMPL_RECV_ANY_SOURCE(CPU) +IMPL_RECV_ANY_SOURCE(CUDA) +IMPL_RECV_ANY_SOURCE(PrivateUse1) + +#define IMPL_REDUCE(DEV) \ + c10::intrusive_ptr reduce_##DEV( \ + at::TensorList tensors, \ + const c10::intrusive_ptr& process_group, \ + const c10::intrusive_ptr& reduce_op, \ + int64_t root_rank, \ + int64_t root_tensor, \ + int64_t timeout) { \ + auto tensor_vec = tensors.vec(); \ + return process_group->getBackend(c10::DeviceType::DEV) \ + ->reduce( \ + tensor_vec, \ + ReduceOptions{ \ + *reduce_op.get(), \ + root_rank, \ + root_tensor, \ + std::chrono::milliseconds(timeout)}); \ + } - return std::tuple>(output_tensor, work); -} +IMPL_REDUCE(CPU) +IMPL_REDUCE(CUDA) +IMPL_REDUCE(PrivateUse1) + +#define IMPL_BROADCAST(DEV) \ + std::tuple, c10::intrusive_ptr> \ + broadcast_##DEV( \ + at::TensorList tensors, \ + const c10::intrusive_ptr& process_group, \ + int64_t root_rank, \ + int64_t root_tensor, \ + int64_t timeout) { \ + auto tensor_vec = tensors.vec(); \ + auto work = process_group->getBackend(c10::DeviceType::DEV) \ + ->broadcast( \ + tensor_vec, \ + BroadcastOptions{ \ + root_rank, \ + root_tensor, \ + std::chrono::milliseconds(timeout)}); \ + return std::tuple, c10::intrusive_ptr>( \ + std::move(tensor_vec), work); \ + } -std::tuple> -_allgather_base_privateuse1_( - at::Tensor& output_tensor, - at::Tensor& input_tensor, - const c10::intrusive_ptr& process_group) { - auto work = process_group->getBackend(c10::DeviceType::PrivateUse1) - ->_allgather_base(output_tensor, input_tensor); +IMPL_BROADCAST(CPU) +IMPL_BROADCAST(CUDA) +IMPL_BROADCAST(PrivateUse1) + +// Return input tensors as output tensors to make inplace allreduce look like +// a functional API, so that make_fx can correctly build the dependencies in +// the graph later. +#define IMPL_ALLREDUCE(DEV) \ + std::tuple, c10::intrusive_ptr> \ + allreduce_##DEV( \ + at::TensorList tensors, \ + const c10::intrusive_ptr& process_group, \ + const c10::intrusive_ptr& reduce_op, \ + int64_t timeout) { \ + auto tensor_vec = tensors.vec(); \ + auto work = \ + process_group->getBackend(c10::DeviceType::DEV) \ + ->allreduce( \ + tensor_vec, \ + AllreduceOptions{ \ + *reduce_op.get(), std::chrono::milliseconds(timeout)}); \ + return std::tuple, c10::intrusive_ptr>( \ + std::move(tensor_vec), work); \ + } - return std::tuple>( - output_tensor, work); -} +IMPL_ALLREDUCE(CPU) +IMPL_ALLREDUCE(CUDA) +IMPL_ALLREDUCE(PrivateUse1) + +#define IMPL_ALLREDUCE_COALESCED(DEV) \ + c10::intrusive_ptr allreduce_coalesced_##DEV( \ + at::TensorList tensors, \ + const c10::intrusive_ptr& process_group, \ + const c10::intrusive_ptr& reduce_op, \ + int64_t timeout) { \ + auto tensor_vec = tensors.vec(); \ + AllreduceCoalescedOptions opts = AllreduceCoalescedOptions{}; \ + opts.reduceOp = *reduce_op.get(); \ + opts.timeout = std::chrono::milliseconds(timeout); \ + return process_group->getBackend(c10::DeviceType::DEV) \ + ->allreduce_coalesced(tensor_vec, opts); \ + } -c10::intrusive_ptr allgather_coalesced_cpu_( - const std::vector>& output_lists, - const at::TensorList& input_list, - const c10::intrusive_ptr& process_group) { - auto input_list_vec = input_list.vec(); - return process_group->getBackend(c10::DeviceType::CPU) - ->allgather_coalesced( - const_cast>&>(output_lists), - input_list_vec); -} +IMPL_ALLREDUCE_COALESCED(CPU) +IMPL_ALLREDUCE_COALESCED(CUDA) +IMPL_ALLREDUCE_COALESCED(PrivateUse1) + +// Copy output tensors (not storage) so that this can be used in a functional +// manner +#define IMPL_ALLGATHER(DEV) \ + std::tuple>, c10::intrusive_ptr> \ + allgather_##DEV( \ + const std::vector>& output_tensors, \ + at::TensorList input_tensors, \ + const c10::intrusive_ptr& process_group, \ + int64_t timeout) { \ + auto input_tensors_vec = input_tensors.vec(); \ + auto work = process_group->getBackend(c10::DeviceType::DEV) \ + ->allgather( \ + const_cast>&>( \ + output_tensors), \ + input_tensors_vec, \ + AllgatherOptions{std::chrono::milliseconds(timeout)}); \ + return std:: \ + tuple>, c10::intrusive_ptr>( \ + output_tensors, work); \ + } -c10::intrusive_ptr allgather_coalesced_cuda_( - const std::vector>& output_lists, - const at::TensorList& input_list, - const c10::intrusive_ptr& process_group) { - auto input_list_vec = input_list.vec(); - return process_group->getBackend(c10::DeviceType::CUDA) - ->allgather_coalesced( - const_cast>&>(output_lists), - input_list_vec); -} +IMPL_ALLGATHER(CPU) +IMPL_ALLGATHER(CUDA) +IMPL_ALLGATHER(PrivateUse1) + +#define IMPL__ALLGATHER_BASE(DEV) \ + std::tuple> _allgather_base_##DEV( \ + at::Tensor& output_tensor, \ + at::Tensor& input_tensor, \ + const c10::intrusive_ptr& process_group) { \ + auto work = process_group->getBackend(c10::DeviceType::DEV) \ + ->_allgather_base(output_tensor, input_tensor); \ + return std::tuple>( \ + output_tensor, work); \ + } -c10::intrusive_ptr allgather_coalesced_privateuse1_( - const std::vector>& output_lists, - const at::TensorList& input_list, - const c10::intrusive_ptr& process_group) { - auto input_list_vec = input_list.vec(); - return process_group->getBackend(c10::DeviceType::PrivateUse1) - ->allgather_coalesced( - const_cast>&>(output_lists), - input_list_vec); -} +IMPL__ALLGATHER_BASE(CPU) +IMPL__ALLGATHER_BASE(CUDA) +IMPL__ALLGATHER_BASE(PrivateUse1) + +#define IMPL_ALLGATHER_COALESCED(DEV) \ + c10::intrusive_ptr allgather_coalesced_##DEV( \ + const std::vector>& output_lists, \ + const at::TensorList& input_list, \ + const c10::intrusive_ptr& process_group) { \ + auto input_list_vec = input_list.vec(); \ + return process_group->getBackend(c10::DeviceType::DEV) \ + ->allgather_coalesced( \ + const_cast>&>(output_lists), \ + input_list_vec); \ + } -#define IMPL_ALLGATHER_INTO_TENSOR_COALESCED(DEV) \ - c10::intrusive_ptr allgather_into_tensor_coalesced_##DEV##_( \ - at::TensorList outputs, \ - at::TensorList inputs, \ - const c10::intrusive_ptr& process_group) { \ - auto output_vec = outputs.vec(); \ - auto input_vec = inputs.vec(); \ - return process_group->getBackend(c10::DeviceType::DEV) \ - ->allgather_into_tensor_coalesced(output_vec, input_vec); \ +IMPL_ALLGATHER_COALESCED(CPU) +IMPL_ALLGATHER_COALESCED(CUDA) +IMPL_ALLGATHER_COALESCED(PrivateUse1) + +#define IMPL_ALLGATHER_INTO_TENSOR_COALESCED(DEV) \ + c10::intrusive_ptr allgather_into_tensor_coalesced_##DEV( \ + at::TensorList outputs, \ + at::TensorList inputs, \ + const c10::intrusive_ptr& process_group) { \ + auto output_vec = outputs.vec(); \ + auto input_vec = inputs.vec(); \ + return process_group->getBackend(c10::DeviceType::DEV) \ + ->allgather_into_tensor_coalesced(output_vec, input_vec); \ } IMPL_ALLGATHER_INTO_TENSOR_COALESCED(CPU) IMPL_ALLGATHER_INTO_TENSOR_COALESCED(CUDA) IMPL_ALLGATHER_INTO_TENSOR_COALESCED(PrivateUse1) -std::tuple, c10::intrusive_ptr> -reduce_scatter_cpu_( - const at::TensorList& output_tensors, - const std::vector>& input_tensors, - const c10::intrusive_ptr& process_group, - const c10::intrusive_ptr& reduce_op, - int64_t timeout) { - auto output_tensors_vec = output_tensors.vec(); - auto work = - process_group->getBackend(c10::DeviceType::CPU) - ->reduce_scatter( - output_tensors_vec, - const_cast>&>(input_tensors), - ReduceScatterOptions{ - *reduce_op.get(), std::chrono::milliseconds(timeout)}); - - return std::tuple, c10::intrusive_ptr>( - output_tensors_vec, work); -} - -std::tuple, c10::intrusive_ptr> -reduce_scatter_cuda_( - const at::TensorList& output_tensors, - const std::vector>& input_tensors, - const c10::intrusive_ptr& process_group, - const c10::intrusive_ptr& reduce_op, - int64_t timeout) { - auto output_tensors_vec = output_tensors.vec(); - auto work = - process_group->getBackend(c10::DeviceType::CUDA) - ->reduce_scatter( - output_tensors_vec, - const_cast>&>(input_tensors), - ReduceScatterOptions{ - *reduce_op.get(), std::chrono::milliseconds(timeout)}); - - return std::tuple, c10::intrusive_ptr>( - output_tensors_vec, work); -} - -std::tuple, c10::intrusive_ptr> -reduce_scatter_privateuse1_( - const at::TensorList& output_tensors, - const std::vector>& input_tensors, - const c10::intrusive_ptr& process_group, - const c10::intrusive_ptr& reduce_op, - int64_t timeout) { - auto output_tensors_vec = output_tensors.vec(); - auto work = - process_group->getBackend(c10::DeviceType::PrivateUse1) - ->reduce_scatter( - output_tensors_vec, - const_cast>&>(input_tensors), - c10d::ReduceScatterOptions{ - *reduce_op.get(), std::chrono::milliseconds(timeout)}); - - return std::tuple, c10::intrusive_ptr>( - output_tensors_vec, work); -} - -std::tuple> _reduce_scatter_base_cpu_( - at::Tensor& output_tensor, - at::Tensor& input_tensor, - const c10::intrusive_ptr& process_group, - const c10::intrusive_ptr& reduce_op, - int64_t timeout) { - auto work = - process_group->getBackend(c10::DeviceType::CPU) - ->_reduce_scatter_base( - output_tensor, - input_tensor, - ReduceScatterOptions{ - *reduce_op.get(), std::chrono::milliseconds(timeout)}); - - return std::tuple>(output_tensor, work); -} - -std::tuple> _reduce_scatter_base_cuda_( - at::Tensor& output_tensor, - at::Tensor& input_tensor, - const c10::intrusive_ptr& process_group, - const c10::intrusive_ptr& reduce_op, - int64_t timeout) { - auto work = - process_group->getBackend(c10::DeviceType::CUDA) - ->_reduce_scatter_base( - output_tensor, - input_tensor, - ReduceScatterOptions{ - *reduce_op.get(), std::chrono::milliseconds(timeout)}); - - return std::tuple>(output_tensor, work); -} - -std::tuple> -_reduce_scatter_base_privateuse1_( - at::Tensor& output_tensor, - at::Tensor& input_tensor, - const c10::intrusive_ptr& process_group, - const c10::intrusive_ptr& reduce_op, - int64_t timeout) { - auto work = - process_group->getBackend(c10::DeviceType::PrivateUse1) - ->_reduce_scatter_base( - output_tensor, - input_tensor, - c10d::ReduceScatterOptions{ - *reduce_op.get(), std::chrono::milliseconds(timeout)}); - - return std::tuple>( - output_tensor, work); -} - -c10::intrusive_ptr gather_cpu_( - const std::vector>& output_tensors, - const at::TensorList& input_tensors, - const c10::intrusive_ptr& process_group, - int64_t root_rank, - int64_t timeout) { - auto input_tensors_vec = input_tensors.vec(); - return process_group->getBackend(c10::DeviceType::CPU) - ->gather( - const_cast>&>(output_tensors), - input_tensors_vec, - GatherOptions{root_rank, std::chrono::milliseconds(timeout)}); -} -c10::intrusive_ptr gather_cuda_( - const std::vector>& output_tensors, - const at::TensorList& input_tensors, - const c10::intrusive_ptr& process_group, - int64_t root_rank, - int64_t timeout) { - auto input_tensors_vec = input_tensors.vec(); - return process_group->getBackend(c10::DeviceType::CUDA) - ->gather( - const_cast>&>(output_tensors), - input_tensors_vec, - GatherOptions{root_rank, std::chrono::milliseconds(timeout)}); -} - -c10::intrusive_ptr gather_privateuse1_( - const std::vector>& output_tensors, - const at::TensorList& input_tensors, - const c10::intrusive_ptr& process_group, - int64_t root_rank, - int64_t timeout) { - auto input_tensors_vec = input_tensors.vec(); - return process_group->getBackend(c10::DeviceType::PrivateUse1) - ->gather( - const_cast>&>(output_tensors), - input_tensors_vec, - c10d::GatherOptions{root_rank, std::chrono::milliseconds(timeout)}); -} - -std::tuple, c10::intrusive_ptr> scatter_cpu_( - const at::TensorList& output_tensors, - const std::vector>& input_tensors, - const c10::intrusive_ptr& process_group, - int64_t root_rank, - int64_t timeout) { - auto output_tensors_vec = output_tensors.vec(); - auto work = - process_group->getBackend(c10::DeviceType::CPU) - ->scatter( - output_tensors_vec, - const_cast>&>(input_tensors), - ScatterOptions{root_rank, std::chrono::milliseconds(timeout)}); - - return std::tuple, c10::intrusive_ptr>( - std::move(output_tensors_vec), work); -} - -std::tuple, c10::intrusive_ptr> scatter_cuda_( - const at::TensorList& output_tensors, - const std::vector>& input_tensors, - const c10::intrusive_ptr& process_group, - int64_t root_rank, - int64_t timeout) { - auto output_tensors_vec = output_tensors.vec(); - auto work = - process_group->getBackend(c10::DeviceType::CUDA) - ->scatter( - output_tensors_vec, - const_cast>&>(input_tensors), - ScatterOptions{root_rank, std::chrono::milliseconds(timeout)}); - - return std::tuple, c10::intrusive_ptr>( - std::move(output_tensors_vec), work); -} - -std::tuple, c10::intrusive_ptr> -scatter_privateuse1_( - const at::TensorList& output_tensors, - const std::vector>& input_tensors, - const c10::intrusive_ptr& process_group, - int64_t root_rank, - int64_t timeout) { - auto output_tensors_vec = output_tensors.vec(); - auto work = - process_group->getBackend(c10::DeviceType::PrivateUse1) - ->scatter( - output_tensors_vec, - const_cast>&>(input_tensors), - c10d::ScatterOptions{ - root_rank, std::chrono::milliseconds(timeout)}); - - return std::tuple, c10::intrusive_ptr>( - std::move(output_tensors_vec), work); -} - -std::tuple, c10::intrusive_ptr> alltoall_cpu_( - const at::TensorList& output_tensors, - const at::TensorList& input_tensors, - const c10::intrusive_ptr& process_group, - int64_t timeout) { - auto output_tensors_vec = output_tensors.vec(); - auto input_tensors_vec = input_tensors.vec(); - auto work = process_group->getBackend(c10::DeviceType::CPU) - ->alltoall( - output_tensors_vec, - input_tensors_vec, - AllToAllOptions{std::chrono::milliseconds(timeout)}); - return std::tuple, c10::intrusive_ptr>( - std::move(output_tensors_vec), work); -} - -std::tuple, c10::intrusive_ptr> alltoall_cuda_( - const at::TensorList& output_tensors, - const at::TensorList& input_tensors, - const c10::intrusive_ptr& process_group, - int64_t timeout) { - auto output_tensors_vec = output_tensors.vec(); - auto input_tensors_vec = input_tensors.vec(); - auto work = process_group->getBackend(c10::DeviceType::CUDA) - ->alltoall( - output_tensors_vec, - input_tensors_vec, - AllToAllOptions{std::chrono::milliseconds(timeout)}); - return std::tuple, c10::intrusive_ptr>( - std::move(output_tensors_vec), work); -} +#define IMPL_REDUCE_SCATTER(DEV) \ + std::tuple, c10::intrusive_ptr> \ + reduce_scatter_##DEV( \ + const at::TensorList& output_tensors, \ + const std::vector>& input_tensors, \ + const c10::intrusive_ptr& process_group, \ + const c10::intrusive_ptr& reduce_op, \ + int64_t timeout) { \ + auto output_tensors_vec = output_tensors.vec(); \ + auto work = \ + process_group->getBackend(c10::DeviceType::DEV) \ + ->reduce_scatter( \ + output_tensors_vec, \ + const_cast>&>( \ + input_tensors), \ + ReduceScatterOptions{ \ + *reduce_op.get(), std::chrono::milliseconds(timeout)}); \ + return std::tuple, c10::intrusive_ptr>( \ + output_tensors_vec, work); \ + } -std::tuple, c10::intrusive_ptr> -alltoall_privateuse1_( - const at::TensorList& output_tensors, - const at::TensorList& input_tensors, - const c10::intrusive_ptr& process_group, - int64_t timeout) { - auto output_tensors_vec = output_tensors.vec(); - auto input_tensors_vec = input_tensors.vec(); - auto work = - process_group->getBackend(c10::DeviceType::PrivateUse1) - ->alltoall( - output_tensors_vec, - input_tensors_vec, - c10d::AllToAllOptions{std::chrono::milliseconds(timeout)}); - return std::tuple, c10::intrusive_ptr>( - std::move(output_tensors_vec), work); -} +IMPL_REDUCE_SCATTER(CPU) +IMPL_REDUCE_SCATTER(CUDA) +IMPL_REDUCE_SCATTER(PrivateUse1) + +#define IMPL__REDUCE_SCATTER_BASE(DEV) \ + std::tuple> _reduce_scatter_base_##DEV( \ + at::Tensor& output_tensor, \ + at::Tensor& input_tensor, \ + const c10::intrusive_ptr& process_group, \ + const c10::intrusive_ptr& reduce_op, \ + int64_t timeout) { \ + auto work = \ + process_group->getBackend(c10::DeviceType::DEV) \ + ->_reduce_scatter_base( \ + output_tensor, \ + input_tensor, \ + ReduceScatterOptions{ \ + *reduce_op.get(), std::chrono::milliseconds(timeout)}); \ + return std::tuple>( \ + output_tensor, work); \ + } -c10::intrusive_ptr alltoall_base_cpu_( - at::Tensor& output, - at::Tensor& input, - const c10::intrusive_ptr& process_group, - std::vector output_split_sizes, - std::vector input_split_sizes, - int64_t timeout) { - return process_group->getBackend(c10::DeviceType::CPU) - ->alltoall_base( - output, - input, - output_split_sizes, - input_split_sizes, - AllToAllOptions{std::chrono::milliseconds(timeout)}); -} +IMPL__REDUCE_SCATTER_BASE(CPU) +IMPL__REDUCE_SCATTER_BASE(CUDA) +IMPL__REDUCE_SCATTER_BASE(PrivateUse1) + +#define IMPL_GATHER(DEV) \ + c10::intrusive_ptr gather_##DEV( \ + const std::vector>& output_tensors, \ + const at::TensorList& input_tensors, \ + const c10::intrusive_ptr& process_group, \ + int64_t root_rank, \ + int64_t timeout) { \ + auto input_tensors_vec = input_tensors.vec(); \ + return process_group->getBackend(c10::DeviceType::DEV) \ + ->gather( \ + const_cast>&>(output_tensors), \ + input_tensors_vec, \ + GatherOptions{root_rank, std::chrono::milliseconds(timeout)}); \ + } -c10::intrusive_ptr alltoall_base_cuda_( - at::Tensor& output, - at::Tensor& input, - const c10::intrusive_ptr& process_group, - std::vector output_split_sizes, - std::vector input_split_sizes, - int64_t timeout) { - return process_group->getBackend(c10::DeviceType::CUDA) - ->alltoall_base( - output, - input, - output_split_sizes, - input_split_sizes, - AllToAllOptions{std::chrono::milliseconds(timeout)}); -} +IMPL_GATHER(CPU) +IMPL_GATHER(CUDA) +IMPL_GATHER(PrivateUse1) + +#define IMPL_SCATTER(DEV) \ + std::tuple, c10::intrusive_ptr> scatter_##DEV( \ + const at::TensorList& output_tensors, \ + const std::vector>& input_tensors, \ + const c10::intrusive_ptr& process_group, \ + int64_t root_rank, \ + int64_t timeout) { \ + auto output_tensors_vec = output_tensors.vec(); \ + auto work = process_group->getBackend(c10::DeviceType::DEV) \ + ->scatter( \ + output_tensors_vec, \ + const_cast>&>( \ + input_tensors), \ + ScatterOptions{ \ + root_rank, std::chrono::milliseconds(timeout)}); \ + return std::tuple, c10::intrusive_ptr>( \ + std::move(output_tensors_vec), work); \ + } -c10::intrusive_ptr alltoall_base_privateuse1_( - at::Tensor& output, - at::Tensor& input, - const c10::intrusive_ptr& process_group, - std::vector output_split_sizes, - std::vector input_split_sizes, - int64_t timeout) { - return process_group->getBackend(c10::DeviceType::PrivateUse1) - ->alltoall_base( - output, - input, - output_split_sizes, - input_split_sizes, - c10d::AllToAllOptions{std::chrono::milliseconds(timeout)}); -} +IMPL_SCATTER(CPU) +IMPL_SCATTER(CUDA) +IMPL_SCATTER(PrivateUse1) + +#define IMPL_ALLTOALL(DEV) \ + std::tuple, c10::intrusive_ptr> \ + alltoall_##DEV( \ + const at::TensorList& output_tensors, \ + const at::TensorList& input_tensors, \ + const c10::intrusive_ptr& process_group, \ + int64_t timeout) { \ + auto output_tensors_vec = output_tensors.vec(); \ + auto input_tensors_vec = input_tensors.vec(); \ + auto work = process_group->getBackend(c10::DeviceType::DEV) \ + ->alltoall( \ + output_tensors_vec, \ + input_tensors_vec, \ + AllToAllOptions{std::chrono::milliseconds(timeout)}); \ + return std::tuple, c10::intrusive_ptr>( \ + std::move(output_tensors_vec), work); \ + } -c10::intrusive_ptr barrier_cpu( - at::Tensor /* unused */, - const c10::intrusive_ptr& process_group, - const std::vector& device_ids, - int64_t timeout) { - return process_group->getBackend(c10::DeviceType::CPU) - ->barrier(BarrierOptions{device_ids, std::chrono::milliseconds(timeout)}); -} +IMPL_ALLTOALL(CPU) +IMPL_ALLTOALL(CUDA) +IMPL_ALLTOALL(PrivateUse1) + +#define IMPL_ALLTOALL_BASE(DEV) \ + c10::intrusive_ptr alltoall_base_##DEV( \ + at::Tensor& output, \ + at::Tensor& input, \ + const c10::intrusive_ptr& process_group, \ + std::vector output_split_sizes, \ + std::vector input_split_sizes, \ + int64_t timeout) { \ + return process_group->getBackend(c10::DeviceType::DEV) \ + ->alltoall_base( \ + output, \ + input, \ + output_split_sizes, \ + input_split_sizes, \ + AllToAllOptions{std::chrono::milliseconds(timeout)}); \ + } -c10::intrusive_ptr barrier_cuda( - at::Tensor /* unused */, - const c10::intrusive_ptr& process_group, - const std::vector& device_ids, - int64_t timeout) { - return process_group->getBackend(c10::DeviceType::CUDA) - ->barrier(BarrierOptions{device_ids, std::chrono::milliseconds(timeout)}); -} +IMPL_ALLTOALL_BASE(CPU) +IMPL_ALLTOALL_BASE(CUDA) +IMPL_ALLTOALL_BASE(PrivateUse1) + +#define IMPL_BARRIER(DEV) \ + c10::intrusive_ptr barrier##DEV( \ + at::Tensor /* unused */, \ + const c10::intrusive_ptr& process_group, \ + const std::vector& device_ids, \ + int64_t timeout) { \ + return process_group->getBackend(c10::DeviceType::DEV) \ + ->barrier( \ + BarrierOptions{device_ids, std::chrono::milliseconds(timeout)}); \ + } -c10::intrusive_ptr barrier_privateuse1( - at::Tensor /* unused */, - const c10::intrusive_ptr& process_group, - const std::vector& device_ids, - int64_t timeout) { - return process_group->getBackend(c10::DeviceType::PrivateUse1) - ->barrier( - c10d::BarrierOptions{device_ids, std::chrono::milliseconds(timeout)}); -} +IMPL_BARRIER(CPU) +IMPL_BARRIER(CUDA) +IMPL_BARRIER(PrivateUse1) -void monitored_barrier_cpu_( +void monitored_barrier_CPU( at::Tensor /* unused */, const c10::intrusive_ptr<::c10d::ProcessGroup>& process_group, const std::vector& device_ids, @@ -855,231 +429,55 @@ void monitored_barrier_cpu_( // register functions to dispatcher namespace { -#define REGISTER_C10D_OP(FUNC, DEV) \ +// 2nd level expansion +// FUNC: op name +// DEV: device +#define REGISTER_C10D_OP1(FUNC, DEV) \ TORCH_LIBRARY_IMPL(c10d, DEV, m) { \ - m.impl(#FUNC, FUNC##DEV##_); \ + m.impl(#FUNC, FUNC##DEV); \ } -TORCH_LIBRARY_IMPL(c10d, CPU, m) { - m.impl("send", send_cpu); -} - -TORCH_LIBRARY_IMPL(c10d, CUDA, m) { - m.impl("send", send_cuda); -} - -TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) { - m.impl("send", send_privateuse1); -} - -TORCH_LIBRARY_IMPL(c10d, CPU, m) { - m.impl("recv_", recv_cpu_); -} - -TORCH_LIBRARY_IMPL(c10d, CUDA, m) { - m.impl("recv_", recv_cuda_); -} - -TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) { - m.impl("recv_", recv_privateuse1_); -} - -TORCH_LIBRARY_IMPL(c10d, CPU, m) { - m.impl("recv_any_source_", recv_any_source_cpu_); -} - -TORCH_LIBRARY_IMPL(c10d, CUDA, m) { - m.impl("recv_any_source_", recv_any_source_cuda_); -} - -TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) { - m.impl("recv_any_source_", recv_any_source_privateuse1_); -} - -TORCH_LIBRARY_IMPL(c10d, CPU, m) { - m.impl("reduce_", reduce_cpu_); -} - -TORCH_LIBRARY_IMPL(c10d, CUDA, m) { - m.impl("reduce_", reduce_cuda_); -} - -TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) { - m.impl("reduce_", reduce_privateuse1_); -} - -TORCH_LIBRARY_IMPL(c10d, CPU, m) { - m.impl("broadcast_", broadcast_cpu_); -} - -TORCH_LIBRARY_IMPL(c10d, CUDA, m) { - m.impl("broadcast_", broadcast_cuda_); -} - -TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) { - m.impl("broadcast_", broadcast_privateuse1_); -} +// 1st level expansion +#define REGISTER_C10D_OP(FUNC) \ + REGISTER_C10D_OP1(FUNC, CPU) \ + REGISTER_C10D_OP1(FUNC, CUDA) \ + REGISTER_C10D_OP1(FUNC, PrivateUse1) + +// Now we start to register ops with the three device keys + +REGISTER_C10D_OP(send) +REGISTER_C10D_OP(recv_) +REGISTER_C10D_OP(recv_any_source_) +REGISTER_C10D_OP(reduce_) +REGISTER_C10D_OP(broadcast_) +REGISTER_C10D_OP(allreduce_) +REGISTER_C10D_OP(allreduce_coalesced_) +REGISTER_C10D_OP(allgather_) +REGISTER_C10D_OP(_allgather_base_) +REGISTER_C10D_OP(allgather_coalesced_) +REGISTER_C10D_OP(allgather_into_tensor_coalesced_) +REGISTER_C10D_OP(reduce_scatter_) +REGISTER_C10D_OP(_reduce_scatter_base_) +REGISTER_C10D_OP(gather_) +REGISTER_C10D_OP(scatter_) +REGISTER_C10D_OP(alltoall_) +REGISTER_C10D_OP(alltoall_base_) +REGISTER_C10D_OP(barrier) + +// The following ops are specialized, register them separately TORCH_LIBRARY_IMPL(c10d, CPU, m) { - m.impl("allreduce_", allreduce_cpu_); + m.impl("monitored_barrier_", monitored_barrier_CPU); } // TODO: The SparseCPU/SparseCUDA dispatched methods are only used to support // sparse all_reduce in the Gloo backend TORCH_LIBRARY_IMPL(c10d, SparseCPU, m) { - m.impl("allreduce_", allreduce_cpu_); + m.impl("allreduce_", allreduce_CPU); } TORCH_LIBRARY_IMPL(c10d, SparseCUDA, m) { - m.impl("allreduce_", allreduce_cuda_); -} - -TORCH_LIBRARY_IMPL(c10d, CUDA, m) { - m.impl("allreduce_", allreduce_cuda_); -} - -TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) { - m.impl("allreduce_", allreduce_privateuse1_); -} - -TORCH_LIBRARY_IMPL(c10d, CPU, m) { - m.impl("allreduce_coalesced_", allreduce_coalesced_cpu_); -} - -TORCH_LIBRARY_IMPL(c10d, CUDA, m) { - m.impl("allreduce_coalesced_", allreduce_coalesced_cuda_); -} - -TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) { - m.impl("allreduce_coalesced_", allreduce_coalesced_privateuse1_); -} - -TORCH_LIBRARY_IMPL(c10d, CPU, m) { - m.impl("allgather_", allgather_cpu_); -} - -TORCH_LIBRARY_IMPL(c10d, CUDA, m) { - m.impl("allgather_", allgather_cuda_); -} - -TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) { - m.impl("allgather_", allgather_privateuse1_); -} - -TORCH_LIBRARY_IMPL(c10d, CPU, m) { - m.impl("_allgather_base_", _allgather_base_cpu_); -} - -TORCH_LIBRARY_IMPL(c10d, CUDA, m) { - m.impl("_allgather_base_", _allgather_base_cuda_); -} - -TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) { - m.impl("_allgather_base_", _allgather_base_privateuse1_); -} - -TORCH_LIBRARY_IMPL(c10d, CPU, m) { - m.impl("allgather_coalesced_", allgather_coalesced_cpu_); -} - -TORCH_LIBRARY_IMPL(c10d, CUDA, m) { - m.impl("allgather_coalesced_", allgather_coalesced_cuda_); -} - -TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) { - m.impl("allgather_coalesced_", allgather_coalesced_privateuse1_); -} - -REGISTER_C10D_OP(allgather_into_tensor_coalesced_, CPU) -REGISTER_C10D_OP(allgather_into_tensor_coalesced_, CUDA) -REGISTER_C10D_OP(allgather_into_tensor_coalesced_, PrivateUse1) - -TORCH_LIBRARY_IMPL(c10d, CPU, m) { - m.impl("reduce_scatter_", reduce_scatter_cpu_); -} - -TORCH_LIBRARY_IMPL(c10d, CUDA, m) { - m.impl("reduce_scatter_", reduce_scatter_cuda_); -} - -TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) { - m.impl("reduce_scatter_", reduce_scatter_privateuse1_); -} - -TORCH_LIBRARY_IMPL(c10d, CPU, m) { - m.impl("_reduce_scatter_base_", _reduce_scatter_base_cpu_); -} - -TORCH_LIBRARY_IMPL(c10d, CUDA, m) { - m.impl("_reduce_scatter_base_", _reduce_scatter_base_cuda_); -} - -TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) { - m.impl("_reduce_scatter_base_", _reduce_scatter_base_privateuse1_); -} - -TORCH_LIBRARY_IMPL(c10d, CPU, m) { - m.impl("gather_", gather_cpu_); -} - -TORCH_LIBRARY_IMPL(c10d, CUDA, m) { - m.impl("gather_", gather_cuda_); -} - -TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) { - m.impl("gather_", gather_privateuse1_); -} - -TORCH_LIBRARY_IMPL(c10d, CPU, m) { - m.impl("scatter_", scatter_cpu_); -} - -TORCH_LIBRARY_IMPL(c10d, CUDA, m) { - m.impl("scatter_", scatter_cuda_); -} - -TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) { - m.impl("scatter_", scatter_privateuse1_); -} - -TORCH_LIBRARY_IMPL(c10d, CPU, m) { - m.impl("alltoall_", alltoall_cpu_); -} - -TORCH_LIBRARY_IMPL(c10d, CUDA, m) { - m.impl("alltoall_", alltoall_cuda_); -} - -TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) { - m.impl("alltoall_", alltoall_privateuse1_); -} - -TORCH_LIBRARY_IMPL(c10d, CPU, m) { - m.impl("alltoall_base_", alltoall_base_cpu_); -} - -TORCH_LIBRARY_IMPL(c10d, CUDA, m) { - m.impl("alltoall_base_", alltoall_base_cuda_); -} - -TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) { - m.impl("alltoall_base_", alltoall_base_privateuse1_); -} - -TORCH_LIBRARY_IMPL(c10d, CPU, m) { - m.impl("barrier", barrier_cpu); -} - -TORCH_LIBRARY_IMPL(c10d, CUDA, m) { - m.impl("barrier", barrier_cuda); -} - -TORCH_LIBRARY_IMPL(c10d, PrivateUse1, m) { - m.impl("barrier", barrier_privateuse1); -} - -TORCH_LIBRARY_IMPL(c10d, CPU, m) { - m.impl("monitored_barrier_", monitored_barrier_cpu_); + m.impl("allreduce_", allreduce_CUDA); } } // namespace