Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion paddle/fluid/distributed/collective/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ endif()
if(WITH_NCCL OR WITH_RCCL)
cc_library(
process_group_nccl
SRCS process_group_nccl.cc nccl_tools.cc common.cc
SRCS process_group_nccl.cc common.cc
DEPS process_group
phi
place
Expand Down
2 changes: 0 additions & 2 deletions paddle/fluid/distributed/collective/bkcl_tools.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

#include "paddle/fluid/distributed/collective/bkcl_tools.h"

#include "paddle/fluid/distributed/collective/types.h"

namespace paddle {
namespace distributed {

Expand Down
3 changes: 2 additions & 1 deletion paddle/fluid/distributed/collective/bkcl_tools.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@

#pragma once

#include "paddle/fluid/distributed/collective/types.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/phi/backends/xpu/enforce_xpu.h"
#include "paddle/phi/backends/xpu/xpu_context.h"
#include "paddle/phi/core/distributed/types.h"

namespace paddle {
namespace distributed {
using XPUContext = phi::XPUContext;
using phi::distributed::ReduceOp;

#define BKCLCHECK(cmd) \
do { \
Expand Down
1 change: 0 additions & 1 deletion paddle/fluid/distributed/collective/custom_ccl_tools.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

#include "paddle/fluid/distributed/collective/custom_ccl_tools.h"
#include "paddle/fluid/distributed/collective/types.h"

namespace paddle {
namespace distributed {
Expand Down
4 changes: 3 additions & 1 deletion paddle/fluid/distributed/collective/custom_ccl_tools.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,20 @@

#include <string>

#include "paddle/fluid/distributed/collective/types.h"
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/collective_helper.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/phi/backends/device_guard.h"
#include "paddle/phi/backends/device_manager.h"
#include "paddle/phi/core/distributed/types.h"

namespace paddle {
namespace distributed {

using phi::distributed::ReduceOp;

phi::ccl::CCLReduceOp ToXCCLRedType(ReduceOp reduction);

} // namespace distributed
Expand Down
49 changes: 0 additions & 49 deletions paddle/fluid/distributed/collective/nccl_tools.h

This file was deleted.

128 changes: 59 additions & 69 deletions paddle/fluid/distributed/collective/process_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
#include <unordered_map>
#include <vector>

#include "paddle/fluid/distributed/collective/types.h"
#include "paddle/fluid/eager/utils.h"
#include "paddle/phi/core/dense_tensor.h"
#include "paddle/phi/core/device_context.h"
#include "paddle/phi/core/distributed/types.h"
#include "paddle/phi/core/distributed/utils.h"
#include "paddle/phi/core/enforce.h"
#include "paddle/phi/core/errors.h"

Expand All @@ -32,24 +32,18 @@ constexpr auto kWaitTimeout = std::chrono::milliseconds(0);
namespace paddle {
namespace distributed {

using phi::distributed::AllreduceOptions;
using phi::distributed::BarrierOptions;
using phi::distributed::BroadcastOptions;
using phi::distributed::CommType;
using phi::distributed::GatherOptions;
using phi::distributed::GetPartialTensor;
using phi::distributed::ReduceOp;
using phi::distributed::ReduceOptions;
using phi::distributed::ReduceScatterOptions;
using phi::distributed::ScatterOptions;
constexpr int kIgnoreId = -1;

enum class CommType : std::uint8_t {
BROADCAST = 0,
ALLREDUCE = 1,
ALLREDUCE_SPARSE = 2, // TODO(shenliang03): to support sparse in allreduce
REDUCE = 3,
ALLGATHER = 4,
GATHER = 5,
SCATTER = 6,
REDUCE_SCATTER = 7,
ALLTOALL = 8,
SEND = 9,
RECV = 10,
BARRIER = 11,
UNKNOWN = 100,
};

class ProcessGroup {
public:
class Task {
Expand Down Expand Up @@ -95,6 +89,15 @@ class ProcessGroup {

int GetSize() const { return size_; }

int GetGid() const { return gid_; }

std::string GetGroupMessage() const {
return std::string("rank_in_group: ") + std::to_string(rank_) +
std::string(", nranks: ") + std::to_string(size_) +
std::string(", gid: ") + std::to_string(gid_) +
std::string(", backend: ") + GetBackendName();
}

virtual std::string GetBackendName() const = 0;

virtual phi::DeviceContext* GetDeviceContext(
Expand Down Expand Up @@ -294,7 +297,7 @@ class ProcessGroup {
const phi::DenseTensor& in_tensor UNUSED,
const BroadcastOptions& opts UNUSED,
bool sync_op UNUSED,
bool use_calc_stream UNUSED) {
bool use_calc_stream) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的UNUSED是不是还得加上?后面报错了"ProcessGroup does not support broadcast with sync_op and use_calc_stream flag."后面的Broadcast看着是支持sync_op参数的,需要把sync_opUNUSED删除吗?

其他的通信函数的sync_op好像也有一样的问题

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的UNUSED是不是还得加上?后面报错了"ProcessGroup does not support broadcast with sync_op and use_calc_stream flag."后面的Broadcast看着是支持sync_op参数的,需要把sync_opUNUSED删除吗?

其他的通信函数的sync_op好像也有一样的问题
UNUSED是在基类上加的,实例找不到实现时才会报错。这几个UNUSED这是develop分支加的,后续确认一下

PADDLE_THROW(
phi::errors::Unimplemented("ProcessGroup%s does not support broadcast "
"with sync_op and use_calc_stream flag.",
Expand Down Expand Up @@ -412,68 +415,57 @@ class ProcessGroup {
// legacy APIs
// TODO(liyurui): This API will be moved later
virtual std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT
std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT
const AllreduceOptions& UNUSED = AllreduceOptions()) {
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support allreduce", GetBackendName()));
std::vector<phi::DenseTensor>& inputs, // NOLINT
std::vector<phi::DenseTensor>& outputs, // NOLINT
const AllreduceOptions& options = AllreduceOptions()) {
return AllReduce(outputs.data(), inputs.front(), options, false);
}

virtual std::shared_ptr<ProcessGroup::Task> AllReduce(
std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT
std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT
const AllreduceOptions& UNUSED,
bool) {
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support allreduce with sync_op flag",
GetBackendName()));
std::vector<phi::DenseTensor>& inputs, // NOLINT
std::vector<phi::DenseTensor>& outputs, // NOLINT
const AllreduceOptions& options,
bool sync_op) {
return AllReduce(outputs.data(), inputs.front(), options, sync_op);
}

// TODO(sunyilun): methods below will be removed later
virtual std::shared_ptr<ProcessGroup::Task> Broadcast(
std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT
std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT
const BroadcastOptions& UNUSED = BroadcastOptions()) {
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support broadcast", GetBackendName()));
std::vector<phi::DenseTensor>& inputs, // NOLINT
std::vector<phi::DenseTensor>& outputs, // NOLINT
const BroadcastOptions& options = BroadcastOptions()) {
return Broadcast(outputs.data(), inputs.front(), options, false);
}

virtual std::shared_ptr<ProcessGroup::Task> Broadcast(
std::vector<phi::DenseTensor>& /* input tensors */, // NOLINT
std::vector<phi::DenseTensor>& /* output tensors */, // NOLINT
const BroadcastOptions& UNUSED,
bool) {
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support broadcast with sync_op flag",
GetBackendName()));
std::vector<phi::DenseTensor>& inputs, // NOLINT
std::vector<phi::DenseTensor>& outputs, // NOLINT
const BroadcastOptions& options,
bool sync_op) {
return Broadcast(outputs.data(), inputs.front(), options, sync_op);
}

virtual std::shared_ptr<ProcessGroup::Task> Send(
std::vector<phi::DenseTensor>&, int) { // NOLINT
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support send", GetBackendName()));
std::vector<phi::DenseTensor>& tensors, int dst_rank) { // NOLINT
return Send(tensors.front(), dst_rank, false);
}

virtual std::shared_ptr<ProcessGroup::Task> Recv(
std::vector<phi::DenseTensor>&, int) { // NOLINT
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support recv", GetBackendName()));
std::vector<phi::DenseTensor>& tensors, int src_rank) { // NOLINT
return Recv(&tensors.front(), src_rank, false);
}

virtual std::shared_ptr<ProcessGroup::Task> AllGather(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&) { // NOLINT
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support all_gather", GetBackendName()));
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors) { // NOLINT
return AllGather(out_tensors.data(), in_tensors.front(), false);
}

virtual std::shared_ptr<ProcessGroup::Task> AllGather(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&, // NOLINT
bool) {
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support all_gather with sync_op flag",
GetBackendName()));
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
bool sync_op) {
return AllGather(out_tensors.data(), in_tensors.front(), sync_op);
}

virtual std::shared_ptr<ProcessGroup::Task> AllToAll(
Expand All @@ -484,19 +476,17 @@ class ProcessGroup {
}

virtual std::shared_ptr<ProcessGroup::Task> Reduce(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&, // NOLINT
const ReduceOptions& opts UNUSED) {
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support reduce", GetBackendName()));
std::vector<phi::DenseTensor>& ins, // NOLINT
std::vector<phi::DenseTensor>& outs, // NOLINT
const ReduceOptions& opts) {
return Reduce(outs.data(), ins.front(), opts, false);
}

virtual std::shared_ptr<ProcessGroup::Task> Scatter(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&, // NOLINT
const ScatterOptions&) {
PADDLE_THROW(phi::errors::InvalidArgument(
"ProcessGroup%s does not support scatter", GetBackendName()));
std::vector<phi::DenseTensor>& ins, // NOLINT
std::vector<phi::DenseTensor>& outs, // NOLINT
const ScatterOptions& opts) {
return Scatter(outs.data(), ins.front(), opts, false);
}

protected:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#include "paddle/fluid/distributed/collective/bkcl_tools.h"
#include "paddle/fluid/distributed/collective/common.h"
#include "paddle/fluid/distributed/collective/utils.h"
#include "paddle/fluid/framework/convert_utils.h"
#include "paddle/fluid/platform/device/xpu/bkcl_helper.h"
#include "paddle/fluid/platform/device/xpu/xpu_info.h"
#include "paddle/phi/api/lib/utils/allocator.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "paddle/fluid/distributed/collective/common.h"
#include "paddle/fluid/distributed/collective/custom_ccl_tools.h"
#include "paddle/fluid/distributed/collective/utils.h"
#include "paddle/phi/api/lib/utils/allocator.h"
#include "paddle/phi/core/distributed/check/static_check.h"
#include "paddle/phi/core/enforce.h"
Expand All @@ -32,6 +31,8 @@ PD_DECLARE_bool(use_stream_safe_cuda_allocator);
namespace paddle {
namespace distributed {

using phi::distributed::CheckSizeOnEachRank;
using phi::distributed::GetPointerByOffset;
static std::mutex g_unfinished_xccl_task_events_mutex;
static std::list<std::unique_ptr<phi::event::Event>>
g_unfinished_xccl_task_events;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "paddle/fluid/distributed/collective/process_group.h"
#include "paddle/fluid/distributed/collective/process_group_with_stream.h"
#include "paddle/phi/backends/custom/custom_context.h"
#include "paddle/phi/backends/device_manager.h"
#include "paddle/phi/common/place.h"
#include "paddle/phi/core/device_context.h"
Expand Down
Loading