Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions paddle/fluid/distributed/collective/ProcessGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,15 @@ class ProcessGroup {
"ProcessGroup%s does not support AllGather", GetBackendName()));
}

virtual std::shared_ptr<ProcessGroup::Task> AllGather_Partial(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
int offset,
int length) { // NOLINT
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support AllGather_Partial", GetBackendName()));
}

virtual std::shared_ptr<ProcessGroup::Task> AllToAll(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&) { // NOLINT
Expand Down
48 changes: 40 additions & 8 deletions paddle/fluid/distributed/collective/ProcessGroupNCCL.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,26 +85,27 @@ bool ProcessGroupNCCL::NCCLTask::IsCompleted() {
return true;
}

void ProcessGroupNCCL::CheckSplitSizes(std::vector<int64_t>& split_sizes,
void ProcessGroupNCCL::CheckSplitSizes(std::vector<int64_t>* split_sizes,
std::vector<int64_t> tensor_shape) {
int64_t len_size = split_sizes.size();
int64_t len_size = (*split_sizes).size();
if (len_size == 0) {
PADDLE_ENFORCE_EQ(tensor_shape[0] % size_ == 0,
true,
platform::errors::InvalidArgument(
"Tensor's dim[0] must be divisible by group size "
"when split_sizes not given."));
split_sizes.insert(split_sizes.end(),
size_,
static_cast<int64_t>(tensor_shape[0] / size_));
(*split_sizes)
.insert((*split_sizes).end(),
size_,
static_cast<int64_t>(tensor_shape[0] / size_));
} else {
PADDLE_ENFORCE_EQ(
len_size == size_,
true,
platform::errors::InvalidArgument(
"The length of split_sizes must be equal to group size."));
auto sum_size = std::accumulate(
split_sizes.begin(), split_sizes.end(), static_cast<int64_t>(0));
(*split_sizes).begin(), (*split_sizes).end(), static_cast<int64_t>(0));
PADDLE_ENFORCE_EQ(
sum_size == tensor_shape[0],
true,
Expand Down Expand Up @@ -626,6 +627,37 @@ void* GetPointerByOffset(void* raw_pointer,
return nullptr;
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllGather_Partial(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
int offset,
int length) {
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(in_tensors),
true,
platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(out_tensors),
true,
platform::errors::InvalidArgument("All outputs should be in CudaPlace."));
return Collective(
in_tensors,
out_tensors,
[&](phi::DenseTensor& input,
phi::DenseTensor& output,
ncclComm_t comm,
const gpuStream_t& stream) {
return platform::dynload::ncclAllGather(
GetPointerByOffset(input.data(), offset, input.dtype()),
output.data(),
length,
platform::ToNCCLDataType(input.dtype()),
comm,
stream);
},
CommType::ALLGATHER);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllToAll(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors) {
Expand Down Expand Up @@ -695,8 +727,8 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllToAll_Single(

std::vector<int64_t> in_dims = phi::vectorize(input.dims());
std::vector<int64_t> out_dims = phi::vectorize(output.dims());
CheckSplitSizes(in_sizes, in_dims);
CheckSplitSizes(out_sizes, out_dims);
CheckSplitSizes(&in_sizes, in_dims);
CheckSplitSizes(&out_sizes, out_dims);

size_t in_offset = 0, out_offset = 0;
size_t in_length = 0, out_length = 0;
Expand Down
8 changes: 7 additions & 1 deletion paddle/fluid/distributed/collective/ProcessGroupNCCL.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ class ProcessGroupNCCL : public ProcessGroup {
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors) override;

std::shared_ptr<ProcessGroup::Task> AllGather_Partial(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
int offset,
int length) override;

std::shared_ptr<ProcessGroup::Task> AllToAll(
std::vector<phi::DenseTensor>& in,
std::vector<phi::DenseTensor>& out) override;
Expand Down Expand Up @@ -206,7 +212,7 @@ class ProcessGroupNCCL : public ProcessGroup {
void CreateNCCLManagerCache(const std::string& places_key,
const std::vector<Place>& places);

void CheckSplitSizes(std::vector<int64_t>& split_sizes,
void CheckSplitSizes(std::vector<int64_t>* split_sizes,
std::vector<int64_t> tensor_shape);
};

Expand Down
45 changes: 30 additions & 15 deletions paddle/fluid/operators/collective/partial_allgather_op.cu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License. */
#include "paddle/fluid/operators/collective/partial_allgather_op.h"

#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#endif
Expand Down Expand Up @@ -61,24 +62,38 @@ class PartialAllGatherOpCUDAKernel : public framework::OpKernel<T> {

int64_t send_numel = numel / nranks;
int offset = send_numel * rank;
const T* send_buff = in->data<T>() + offset;
T* recv_buff = out->data<T>();

gpuStream_t stream = nullptr;
if (ctx.Attr<bool>("use_calc_stream")) {
auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
stream = static_cast<platform::CUDADeviceContext*>(dev_ctx)->stream();
auto map = distributed::ProcessGroupMapFromGid::getInstance();
if (map->has(rid)) {
// Use ProcessGroup
distributed::ProcessGroup* pg = map->get(rid);
std::vector<phi::DenseTensor> in_tensors;
std::vector<phi::DenseTensor> out_tensors;
in_tensors.push_back(*in);
out_tensors.push_back(*out);
auto task =
pg->AllGather_Partial(in_tensors, out_tensors, offset, send_numel);
task->Wait();
} else {
stream = comm->stream();
const T* send_buff = in->data<T>() + offset;
T* recv_buff = out->data<T>();

gpuStream_t stream = nullptr;
if (ctx.Attr<bool>("use_calc_stream")) {
auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
stream = static_cast<platform::CUDADeviceContext*>(dev_ctx)->stream();
} else {
stream = comm->stream();
}

PADDLE_ENFORCE_GPU_SUCCESS(
platform::dynload::ncclAllGather(send_buff,
recv_buff,
send_numel,
static_cast<ncclDataType_t>(dtype),
comm->comm(),
stream));
}

PADDLE_ENFORCE_GPU_SUCCESS(
platform::dynload::ncclAllGather(send_buff,
recv_buff,
send_numel,
static_cast<ncclDataType_t>(dtype),
comm->comm(),
stream));
#else
PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should compile with GPU."));
Expand Down
60 changes: 34 additions & 26 deletions paddle/fluid/operators/collective/partial_recv_op.cu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License. */
#include "paddle/fluid/operators/collective/partial_recv_op.h"

#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#endif
Expand Down Expand Up @@ -65,37 +66,44 @@ class PartialRecvOpCUDAKernel : public framework::OpKernel<T> {
platform::errors::InvalidArgument(
"The input numel (%d) must be divisible by num(%d)", numel, num));

gpuStream_t stream = nullptr;
auto place = ctx.GetPlace();
auto comm = platform::NCCLCommContext::Instance().Get(rid, place);
if (ctx.Attr<bool>("use_calc_stream")) {
auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
stream = static_cast<platform::CUDADeviceContext *>(dev_ctx)->stream();
} else {
stream = comm->stream();
}
PADDLE_ENFORCE_LT(
peer,
comm->nranks(),
platform::errors::InvalidArgument("The value of peer (%d) you set must "
"be less than comm->nranks (%d).",
peer,
comm->nranks()));

out->mutable_data<T>(out_dims, place);
ncclDataType_t dtype = platform::ToNCCLDataType(type);
int recv_numel = numel / num;
int offset = recv_numel * id;

PADDLE_ENFORCE_GPU_SUCCESS(
platform::dynload::ncclRecv(out->data<T>() + offset,
recv_numel,
dtype,
peer,
comm->comm(),
stream));
VLOG(3) << "rank " << comm->rank() << " recv " << recv_numel
<< " from offset[" << offset << "] from " << peer;
auto map = distributed::ProcessGroupMapFromGid::getInstance();
if (map->has(rid)) {
// Use ProcessGroup
distributed::ProcessGroup *pg = map->get(rid);
auto task = pg->Recv_Partial(*out, peer, offset, recv_numel);
task->Wait();
} else {
gpuStream_t stream = nullptr;
auto comm = platform::NCCLCommContext::Instance().Get(rid, place);
if (ctx.Attr<bool>("use_calc_stream")) {
auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
stream = static_cast<platform::CUDADeviceContext *>(dev_ctx)->stream();
} else {
stream = comm->stream();
}
PADDLE_ENFORCE_LT(peer,
comm->nranks(),
platform::errors::InvalidArgument(
"The value of peer (%d) you set must "
"be less than comm->nranks (%d).",
peer,
comm->nranks()));
ncclDataType_t dtype = platform::ToNCCLDataType(type);
PADDLE_ENFORCE_GPU_SUCCESS(
platform::dynload::ncclRecv(out->data<T>() + offset,
recv_numel,
dtype,
peer,
comm->comm(),
stream));
VLOG(3) << "rank " << comm->rank() << " recv " << recv_numel
<< " from offset[" << offset << "] from " << peer;
}
#else
PADDLE_THROW(platform::errors::Unavailable(
"PaddlePaddle should be compiled with NCCL and "
Expand Down
62 changes: 39 additions & 23 deletions paddle/fluid/operators/collective/partial_send_op.cu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License. */
#include "paddle/fluid/operators/collective/partial_send_op.h"

#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
#include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#endif
Expand Down Expand Up @@ -61,32 +62,47 @@ class PartialSendCUDAKernel : public framework::OpKernel<T> {
platform::errors::InvalidArgument(
"The input numel (%d) must be divisible by num(%d)", numel, num));

gpuStream_t stream = nullptr;
auto place = ctx.GetPlace();
auto comm = platform::NCCLCommContext::Instance().Get(rid, place);
if (ctx.Attr<bool>("use_calc_stream")) {
auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
stream = static_cast<platform::CUDADeviceContext*>(dev_ctx)->stream();
} else {
stream = comm->stream();
}
PADDLE_ENFORCE_LT(
peer,
comm->nranks(),
platform::errors::InvalidArgument("The value of peer (%d) you set must "
"be less than comm->nranks (%d).",
peer,
comm->nranks()));

ncclDataType_t dtype =
platform::ToNCCLDataType(framework::TransToProtoVarType(x->dtype()));
int send_numel = numel / num;
int offset = send_numel * id;

PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclSend(
x->data<T>() + offset, send_numel, dtype, peer, comm->comm(), stream));
VLOG(3) << "rank " << comm->rank() << " send " << send_numel
<< " from offset[" << offset << "] to " << peer;
auto map = distributed::ProcessGroupMapFromGid::getInstance();
if (map->has(rid)) {
// Use ProcessGroup
distributed::ProcessGroup* pg = map->get(rid);
phi::DenseTensor tmp = *x;
auto task = pg->Send_Partial(tmp, peer, offset, send_numel);
task->Wait();
} else {
gpuStream_t stream = nullptr;
auto place = ctx.GetPlace();
auto comm = platform::NCCLCommContext::Instance().Get(rid, place);
if (ctx.Attr<bool>("use_calc_stream")) {
auto dev_ctx = platform::DeviceContextPool::Instance().Get(place);
stream = static_cast<platform::CUDADeviceContext*>(dev_ctx)->stream();
} else {
stream = comm->stream();
}
PADDLE_ENFORCE_LT(peer,
comm->nranks(),
platform::errors::InvalidArgument(
"The value of peer (%d) you set must "
"be less than comm->nranks (%d).",
peer,
comm->nranks()));

ncclDataType_t dtype =
platform::ToNCCLDataType(framework::TransToProtoVarType(x->dtype()));

PADDLE_ENFORCE_GPU_SUCCESS(
platform::dynload::ncclSend(x->data<T>() + offset,
send_numel,
dtype,
peer,
comm->comm(),
stream));
VLOG(3) << "rank " << comm->rank() << " send " << send_numel
<< " from offset[" << offset << "] to " << peer;
}
#else
PADDLE_THROW(platform::errors::Unavailable(
"PaddlePaddle should be compiled with NCCL "
Expand Down
Loading