Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cpu mpi broadcast #5726

Merged
merged 20 commits into from
Aug 6, 2021
Merged

Cpu mpi broadcast #5726

merged 20 commits into from
Aug 6, 2021

Conversation

lixinqi
Copy link
Contributor

@lixinqi lixinqi commented Aug 4, 2021

to_consistent支持cpu。主要技术内容:

  1. 实现底层的eager_nccl_broadcast op。以后会命名为ccl_broadcast。
  2. eager_nccl_broadcast kernel里调用底层新实现的ccl::Broadcast操作,后者的接口模仿nccl。
  3. 重构 eager_nccl_broadcast op 的DeviceInferFn。

DeviceCtx* ctx) {
CHECK_EQ_OR_RETURN(parallel_desc->device_type(), DeviceType::kCPU);
static thread_local std::vector<int64_t> rank_heap{};
InitBroadcastRankHeap(&rank_heap, *parallel_desc, root);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

把一组rank看成二叉树堆。root就是根节点,每个rank从父节点拷贝数据,再拷贝到两个子节点。
以后可以重构成更快的方式。

static thread_local const auto& nccl_device = Device::New("nccl");
return nccl_device;
} else if (input_device->type() == "cpu") {
return input_device;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

虚拟机里的cpu stream原本就是异步的。

.Broadcast(user_op::OpArg("out", 0))
.Build();
return Maybe<void>::Ok();
UNIMPLEMENTED_THEN_RETURN() << "consistent tensor are not supported";
Copy link
Contributor

@clackhan clackhan Aug 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个地方eager boxing(p2b)会推导sbp,不能这么写吧

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我复原吧

const auto& ForEachRank = [&](const std::function<Maybe<void>(int64_t)>& DoEach) -> Maybe<void> {
return rank_group->ForEachRank(DoEach);
};
return AccessToOtherRanks<SendOrRecv, Prepare>(ForEachRank, token, ctx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的 ForEachRank,实际想做的事情是返回 DoEach 所应用的 rank 的列表?所以如果直接传一个列表代替 ForEachRank 会更清晰吧

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ForEachRank本就等同于列表。不用列表是因为1)省去对象构造;2)省去容器类型选择;3)省去迭代过程。


} // namespace

class EagerCclBroadcastKernel final : public user_op::OpKernel {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里和 EagerNcclBroadcastKernel 的区别就是少了一个 N,感觉容易看混 😂 有没有必要换一个名字

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

它们内容本来就不一样。我后面还想把这些名字上的nccl直接换成ccl。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我后面还想把这些名字上的nccl直接换成ccl。
可以的,这样就不会看混了

*Cb = [] {};
return Maybe<void>::Ok();
});
JUST(RpcUtil::ReceiveDataFromParentInHeap(rank_heap, rpc_token, &rpc_ctx));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

现在使用rpc实现broadcast,以后可以考虑用commnet 支持

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里就是调用commnet模块。
看来是Rpc这个字眼造成误解。我把它重命名为Transport

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的

void Init(user_op::KernelInitContext* ctx) {
const std::string& parallel_conf_txt = ctx->Attr<std::string>("parallel_conf");
ParallelConf parallel_conf;
std::set<std::pair<int64_t, int64_t>> device_set;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个变量没有用

.Broadcast(user_op::OpArg("out", 0))
.Build();
return Maybe<void>::Ok();
UNIMPLEMENTED_THEN_RETURN() << "consistent tensor are not supported";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里需要还原,consistent代码也会用到这个op

CHECK_EQ_OR_RETURN(parallel_desc->device_type(), DeviceType::kCPU);
static thread_local std::vector<int64_t> rank_heap{};
JUST(InitBroadcastRankHeap(&rank_heap, *parallel_desc, root));
TransportToken rpc_token = TransportToken::NewDataTransportToken();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rpc_token改成transport_token吧,包括下面的rpc_ctx

Optional<int64_t> current_rank_index{};
for (int i = 0; i < rank_heap.size(); ++i) {
if (rank_heap.at(i) == GlobalProcessCtx::Rank()) {
current_rank_index = i;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里直接 return i,在 for 循环之后 return error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以

template<>
Maybe<void> Broadcast<DeviceType::kCPU>(const void* in, void* out, size_t elem_cnt, DataType dtype,
int64_t root, Symbol<ParallelDesc> parallel_desc,
DeviceCtx* ctx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个参数没有用到

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是,没有用到。我是为了对齐接口,因为以后这里会带一个模板参数,cpu, nccl等功能都走一个函数调用。

@@ -83,7 +91,10 @@ REGISTER_USER_OP("eager_nccl_reduce")
*ctx->OutputShape("out", 0) = ctx->InputShape("in", 0);
return Maybe<void>::Ok();
})
.SetGetSbpFn(user_op::GetSbpFnUtil::DefaultBroadcastToBroadcast)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里需要还原吗

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个不用

Copy link
Contributor Author

@lixinqi lixinqi Aug 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里暂不还原了,因为原本的逻辑不对。

return Maybe<void>::Ok();
});
JUST(TransportUtil::ReceiveDataFromParentInHeap(rank_heap, transport_token, &transport_ctx));
JUST(TransportUtil::WaitUntilDoneOrTimeout(transport_ctx, TransportUtil::TimeoutSeconds()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😂 我在基于这个分支实现 flow.load 时从 src rank 广播 shape。发现这里是不是不对,执行到这里的时候,只有 recv 没有 send 吧,应该只在 72 行那里等待?我确实会在不删掉这一行的时候 block 住,删掉这一行就好了

@oneflow-ci-bot oneflow-ci-bot requested review from oneflow-ci-bot and removed request for oneflow-ci-bot August 5, 2021 16:28
@oneflow-ci-bot oneflow-ci-bot requested review from oneflow-ci-bot and removed request for oneflow-ci-bot August 5, 2021 20:32
@oneflow-ci-bot oneflow-ci-bot requested review from oneflow-ci-bot and removed request for oneflow-ci-bot August 5, 2021 22:52
@oneflow-ci-bot oneflow-ci-bot self-requested a review August 6, 2021 03:32
@github-actions
Copy link
Contributor

github-actions bot commented Aug 6, 2021

Speed stats:
GPU Name: GeForce GTX 1080 

PyTorch resnet50 time: 139.7ms (= 6985.7ms / 50, input_shape=[16, 3, 224, 224], backward is enabled)
OneFlow resnet50 time: 127.9ms (= 6394.1ms / 50, input_shape=[16, 3, 224, 224], backward is enabled)
Relative speed: 1.09 (= 139.7ms / 127.9ms)

PyTorch resnet50 time: 85.6ms (= 4280.7ms / 50, input_shape=[8, 3, 224, 224], backward is enabled)
OneFlow resnet50 time: 74.1ms (= 3705.3ms / 50, input_shape=[8, 3, 224, 224], backward is enabled)
Relative speed: 1.16 (= 85.6ms / 74.1ms)

PyTorch resnet50 time: 57.3ms (= 2866.4ms / 50, input_shape=[4, 3, 224, 224], backward is enabled)
OneFlow resnet50 time: 47.2ms (= 2359.0ms / 50, input_shape=[4, 3, 224, 224], backward is enabled)
Relative speed: 1.22 (= 57.3ms / 47.2ms)

PyTorch resnet50 time: 47.6ms (= 2382.2ms / 50, input_shape=[2, 3, 224, 224], backward is enabled)
OneFlow resnet50 time: 40.0ms (= 1999.2ms / 50, input_shape=[2, 3, 224, 224], backward is enabled)
Relative speed: 1.19 (= 47.6ms / 40.0ms)

PyTorch resnet50 time: 44.2ms (= 2210.2ms / 50, input_shape=[1, 3, 224, 224], backward is enabled)
OneFlow resnet50 time: 44.4ms (= 2221.6ms / 50, input_shape=[1, 3, 224, 224], backward is enabled)
Relative speed: 0.99 (= 44.2ms / 44.4ms)

@oneflow-ci-bot oneflow-ci-bot merged commit 9e07e2c into master Aug 6, 2021
@oneflow-ci-bot oneflow-ci-bot deleted the cpu_mpi_broadcast branch August 6, 2021 05:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants