-
Notifications
You must be signed in to change notification settings - Fork 825
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cpu mpi broadcast #5726
Cpu mpi broadcast #5726
Conversation
oneflow/core/ccl/ccl.cpp
Outdated
DeviceCtx* ctx) { | ||
CHECK_EQ_OR_RETURN(parallel_desc->device_type(), DeviceType::kCPU); | ||
static thread_local std::vector<int64_t> rank_heap{}; | ||
InitBroadcastRankHeap(&rank_heap, *parallel_desc, root); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
把一组rank看成二叉树堆。root就是根节点,每个rank从父节点拷贝数据,再拷贝到两个子节点。
以后可以重构成更快的方式。
static thread_local const auto& nccl_device = Device::New("nccl"); | ||
return nccl_device; | ||
} else if (input_device->type() == "cpu") { | ||
return input_device; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
虚拟机里的cpu stream原本就是异步的。
oneflow/user/ops/eager_nccl_ops.cpp
Outdated
.Broadcast(user_op::OpArg("out", 0)) | ||
.Build(); | ||
return Maybe<void>::Ok(); | ||
UNIMPLEMENTED_THEN_RETURN() << "consistent tensor are not supported"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个地方eager boxing(p2b)会推导sbp,不能这么写吧
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我复原吧
oneflow/core/framework/rpc_util.cpp
Outdated
const auto& ForEachRank = [&](const std::function<Maybe<void>(int64_t)>& DoEach) -> Maybe<void> { | ||
return rank_group->ForEachRank(DoEach); | ||
}; | ||
return AccessToOtherRanks<SendOrRecv, Prepare>(ForEachRank, token, ctx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里的 ForEachRank,实际想做的事情是返回 DoEach 所应用的 rank 的列表?所以如果直接传一个列表代替 ForEachRank 会更清晰吧
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ForEachRank本就等同于列表。不用列表是因为1)省去对象构造;2)省去容器类型选择;3)省去迭代过程。
|
||
} // namespace | ||
|
||
class EagerCclBroadcastKernel final : public user_op::OpKernel { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里和 EagerNcclBroadcastKernel 的区别就是少了一个 N,感觉容易看混 😂 有没有必要换一个名字
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
它们内容本来就不一样。我后面还想把这些名字上的nccl直接换成ccl。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我后面还想把这些名字上的nccl直接换成ccl。
可以的,这样就不会看混了
oneflow/core/ccl/ccl.cpp
Outdated
*Cb = [] {}; | ||
return Maybe<void>::Ok(); | ||
}); | ||
JUST(RpcUtil::ReceiveDataFromParentInHeap(rank_heap, rpc_token, &rpc_ctx)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
现在使用rpc实现broadcast,以后可以考虑用commnet 支持
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里就是调用commnet模块。
看来是Rpc这个字眼造成误解。我把它重命名为Transport
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
好的
void Init(user_op::KernelInitContext* ctx) { | ||
const std::string& parallel_conf_txt = ctx->Attr<std::string>("parallel_conf"); | ||
ParallelConf parallel_conf; | ||
std::set<std::pair<int64_t, int64_t>> device_set; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个变量没有用
oneflow/user/ops/eager_nccl_ops.cpp
Outdated
.Broadcast(user_op::OpArg("out", 0)) | ||
.Build(); | ||
return Maybe<void>::Ok(); | ||
UNIMPLEMENTED_THEN_RETURN() << "consistent tensor are not supported"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里需要还原,consistent代码也会用到这个op
oneflow/core/ccl/ccl.cpp
Outdated
CHECK_EQ_OR_RETURN(parallel_desc->device_type(), DeviceType::kCPU); | ||
static thread_local std::vector<int64_t> rank_heap{}; | ||
JUST(InitBroadcastRankHeap(&rank_heap, *parallel_desc, root)); | ||
TransportToken rpc_token = TransportToken::NewDataTransportToken(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rpc_token改成transport_token吧,包括下面的rpc_ctx
Optional<int64_t> current_rank_index{}; | ||
for (int i = 0; i < rank_heap.size(); ++i) { | ||
if (rank_heap.at(i) == GlobalProcessCtx::Rank()) { | ||
current_rank_index = i; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里直接 return i,在 for 循环之后 return error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
可以
template<> | ||
Maybe<void> Broadcast<DeviceType::kCPU>(const void* in, void* out, size_t elem_cnt, DataType dtype, | ||
int64_t root, Symbol<ParallelDesc> parallel_desc, | ||
DeviceCtx* ctx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个参数没有用到
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
是,没有用到。我是为了对齐接口,因为以后这里会带一个模板参数,cpu, nccl等功能都走一个函数调用。
@@ -83,7 +91,10 @@ REGISTER_USER_OP("eager_nccl_reduce") | |||
*ctx->OutputShape("out", 0) = ctx->InputShape("in", 0); | |||
return Maybe<void>::Ok(); | |||
}) | |||
.SetGetSbpFn(user_op::GetSbpFnUtil::DefaultBroadcastToBroadcast) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里需要还原吗
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个不用
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里暂不还原了,因为原本的逻辑不对。
return Maybe<void>::Ok(); | ||
}); | ||
JUST(TransportUtil::ReceiveDataFromParentInHeap(rank_heap, transport_token, &transport_ctx)); | ||
JUST(TransportUtil::WaitUntilDoneOrTimeout(transport_ctx, TransportUtil::TimeoutSeconds())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😂 我在基于这个分支实现 flow.load 时从 src rank 广播 shape。发现这里是不是不对,执行到这里的时候,只有 recv 没有 send 吧,应该只在 72 行那里等待?我确实会在不删掉这一行的时候 block 住,删掉这一行就好了
Speed stats:
|
to_consistent支持cpu。主要技术内容: