Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport #3549

Merged
merged 57 commits into from
Sep 28, 2020
Merged

Transport #3549

merged 57 commits into from
Sep 28, 2020

Conversation

chengtbf
Copy link
Contributor

@chengtbf chengtbf commented Sep 10, 2020

Transport: 支持动态网络传输,提供Send/Recv接口。已通过验证。

进展

  • 支持动态Send/Recv (Send size <= Recv size)
  • 支持本机传输
  • 正确性测试通过 (包括: 1) 两机之间乱序动态SendRecv并验证数据合法性 2)本机数据乱序SendRecv验证)
  • 压力测试通过
  • 用户接口文档

TODO 设计文档

输出

chengcheng@oneflow-15:~/oneflow/build/bin $ ./transport_test_main_exe 
WARNING: Logging before InitGoogleLogging() is written to STDERR
I0924 18:55:58.996147 22756 global.h:32] NewGlobal N7oneflow7EnvDescE
I0924 18:55:58.996220 22756 global.h:32] NewGlobal N7oneflow10CtrlServerE
I0924 18:55:58.997524 22756 ctrl_server.cpp:53] CtrlServer listening on 0.0.0.0:12143
I0924 18:55:58.997624 22756 global.h:32] NewGlobal N7oneflow10CtrlClientE
I0924 18:55:58.999680 22756 ctrl_client.cpp:231] LoadServer 192.168.1.15 Successful at 0 times
I0924 18:55:59.001565 22756 ctrl_client.cpp:231] LoadServer 192.168.1.16 Successful at 0 times
I0924 18:55:59.001775 22756 global.h:32] NewGlobal N7oneflow10MachineCtxE
I0924 18:55:59.001796 22756 machine_context.cpp:33] this machine id: 0
I0924 18:55:59.001809 22756 global.h:32] NewGlobal N7oneflow12ResourceDescE
I0924 18:55:59.001818 22756 global.h:32] NewGlobal N7oneflow12ResourceDescE
New All Global
I0924 18:55:59.001881 22756 global.h:32] NewGlobal N7oneflow12EpollCommNetE
I0924 18:55:59.002081 22756 epoll_comm_network.cpp:44] CommNet:Epoll listening on 0.0.0.0:1024
I0924 18:56:05.534235 22756 epoll_comm_network.cpp:194] machine 0 sockfd -1
I0924 18:56:05.534268 22756 epoll_comm_network.cpp:194] machine 1 sockfd 23
I0924 18:56:05.534382 22756 global.h:32] NewGlobal N7oneflow9TransportE
Test for correctness. Start. 
Each machine will send and receive 100 messages (50 send and 50 recv) alternately. The first address and the last address of each transport are written with data for correctness verification.
I'm first machine!
the latency is : 0.735017 s, the throughput is : 114.128 MiB/s 
Test for correctness. Done.

Test for local send/recv transport correctness. Start. 
Machine will send and receive 100 messages alternately. The first address and the last address of each transport are written with data for correctness verification.
the latency is : 0.090513 s, the throughput is : 926.784 MiB/s 
Test for local send/recv transport correctness. Done.

Test for throughput. Start.
-------------------------------------------------------------------------------
#bytes                   #iterations              #throughput peek[MiB/s]  #throughput average[MiB/s]
2                        1000                     0.0110497                0.00655074               
4                        1000                     0.0231214                0.0141732                
8                        1000                     0.0473373                0.026643                 
16                       1000                     0.0935673                0.0558287                
32                       1000                     0.205128                 0.122117                 
64                       1000                     0.426667                 0.252637                 
128                      1000                     0.790123                 0.481984                 
256                      1000                     1.56098                  0.881585                 
512                      1000                     3.06587                  1.8084                   
1024                     1000                     6.20606                  2.84344                  
2048                     1000                     10.9519                  5.69644                  
4096                     1000                     10.24                    8.38262                  
8192                     1000                     23.6081                  16.2622                  
16384                    1000                     41.7959                  28.3646                  
32768                    1000                     66.7373                  43.9132                  
65536                    1000                     79.7275                  64.2819                  
131072                   1000                     96.0938                  80.8765                  
262144                   1000                     103.984                  96.7126                  
524288                   1000                     110.913                  103.72                   
1048576                  1000                     114.212                  110.356                  
2097152                  1000                     115.66                   113.219                  
4194304                  1000                     116.512                  115.681                  
8388608                  1000                     117.204                  116.633                  
-------------------------------------------------------------------------------
Test for throughput. Done.

Deleting all global...
I0924 18:58:38.559795 22756 global.h:37] DeleteGlobal N7oneflow9TransportE
I0924 18:58:38.574929 22756 global.h:37] DeleteGlobal N7oneflow12EpollCommNetE
I0924 18:58:38.574968 22756 epoll_comm_network.cpp:78] CommNet Thread 0 finish
I0924 18:58:38.580687 22756 global.h:37] DeleteGlobal N7oneflow12ResourceDescE
I0924 18:58:38.581069 22756 global.h:37] DeleteGlobal N7oneflow12ResourceDescE
I0924 18:58:38.581094 22756 global.h:37] DeleteGlobal N7oneflow10MachineCtxE
I0924 18:58:38.581183 22756 global.h:37] DeleteGlobal N7oneflow10CtrlClientE
I0924 18:58:40.020783 22756 global.h:37] DeleteGlobal N7oneflow10CtrlServerE
I0924 18:58:40.022223 22756 global.h:37] DeleteGlobal N7oneflow7EnvDescE
All Done!

@chengtbf chengtbf added feature bottleneck blocking another feature/PR WIP work in progress labels Sep 10, 2020
@chengtbf chengtbf self-assigned this Sep 10, 2020
@jackalcooper jackalcooper added this to the 0.1.12 milestone Sep 14, 2020
@chengtbf

This comment has been minimized.

@chengtbf chengtbf requested a review from hsj0429 September 14, 2020 15:20
@chengtbf chengtbf marked this pull request as ready for review September 27, 2020 11:52
@lixinqi lixinqi merged commit 3a54beb into master Sep 28, 2020
@lixinqi lixinqi deleted the dev_cc_networker branch September 28, 2020 08:20
} else {
recv_before_send = true;
stat = &(it->second);
CHECK_GE(stat->size, msg.size); // NOTE(chengcheng): Recv size may larger than Send size.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may

may <be>

<> 里面的内容会被解释为 html tag,就显示不出来了,可以在<>前面加上\

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我的英文注释是先想的中文,再用百度翻译翻译的。。。。没想到语法还不通顺。。。

}

if (recv_before_send) {
// it means the local machine has call Transport::Receive() before this handler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has

这里的<>也没显示,has <called>

uint64_t token = first_token + i;
if (i % 2 == 0) {
// Recv
Global<Transport>::Get()->Receive(token, 0, ptr, size + 66, [ptr, i, size, &bc]() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

magic number: 66

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是为了测试新奇的需求,即: Receive的size >= Send的size,66是随意设置的值,只要>= 1即可

std::function<void()> callback);

// TODO(chengcheng)
// Global<Transport> has a dependency on Global<CommNet> which should be initialized first.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果注释不能一步到位写好,可暂时不写的。

Transport::Transport() {
comm_net_ = Global<EpollCommNet>::Get();
this_machine_id_ = Global<MachineCtx>::Get()->this_machine_id();
CHECK(comm_net_ != nullptr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

检查comm_net_ 是否为空应该直接放在获得comm_net_值之后,而且直接CHECK(comm_net_) 即可吧。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以,这里写代码的风格可以改进一下。

}
switch (msg.type) {
case TransportMsgType::kSend: {
HandlerAchievedTransportSendMsgFromSrcMachine(msg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

函数名里的achieved很怪异。TransportSendMsgHandler() 是不是就够了

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SendMsg不太明确。这里希望在函数名上解释处理SendMsg的是接收端收到了来自发送端的SendMsg消息。

TransportStatus* stat = nullptr;
{
std::unique_lock<std::mutex> lock(status_mutex_);
auto it = token2status_.find(token);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果传进来stat参数,至少这里不需要再做加锁的事情

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯嗯 这里可以优化一下。

std::unique_lock<std::mutex> lock(status_mutex_);
auto it = token2status_.find(stat->token);
CHECK(it != token2status_.end());
token2status_.erase(it);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

从token2status_ 中删除status有可能放在233行,即在这里把status从字典中取出来,并在后续执行DoRead时传进来即可。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个好像不行吧。如果在DoRead之前就把status从token2status_ 里取出来,那么status里的callback要等真正的Read操作结束在ReadCallback里调用CallBack。您的意思是把这个status保存在ReadCallback的传参列表里吗?好像这样也可以。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的,如果发现token2status里面已经有了,就可以在锁保护的范围取出并删除了,然后拷贝传给DoRead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的~学到了

stat->callback = callback;
stat->is_send_ready = true;
stat->is_recv_ready = false;
stat->src_mem_token = comm_net_->RegisterMemory(mut_ptr, size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RegisterMemory和 UnRegisterMemory 的逻辑割裂了。(当然也许以后不需要在外部调用RegisterMemory了。) 一种办法是让stat->callback = {一个新的包含了callback 和UnregisterMemory的回调}

bool send_before_recv = false;
{
std::unique_lock<std::mutex> lock(status_mutex_);
auto it = token2status_.find(token);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如新奇建议,token2status_ 的value部分弄一个unique_ptr,把这个对象在外部构造好,然后在锁保护的区间move一下,也就不需要把status内部成员的赋值分散到各处了,分散到各处正确性保证也比较难证明。

if (need_do_callback) {
callback();
receive_callback();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need_do_callback和need_do_copy 逻辑混乱。逻辑上331行应该放到334 之前吧

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

您的意思是把

if (need_do_copy) { memcpy(dst_ptr, ptr, size); }

这行放在

if (need_do_callback) {
}

这个if条件语句block里面吗?

类似这样:

  if (need_do_callback) {
    if (need_do_copy) { memcpy(dst_ptr, ptr, size); }
    callback();
    receive_callback();
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的

if (need_do_callback) {
callback();
send_callback();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

368 应该放在371之前

CopyStatusOnLocalMachine(token, ptr, max_size, callback));
} else {
need_do_callback = true;
send_callback = std::move(it->second.callback);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里可以用std::unique_ptr优化
回调类型不是直接的std::function<void()>,而是std::unique_ptr<std::function<void()>>

liujuncheng pushed a commit that referenced this pull request Jun 3, 2021
* Networker interface

* global epoll comm net

* half implement of Networker

* implement of Networke::Send

* Implement of Networker::Recieve

* Implement of Networker::HandlerRecieveSendMsgFromSrcMachine

* Implement of Networker::HandlerRecieveAckMsgFromDstMachine

* refine iterator in Networker

* add networker test exe

* add log and blocking count

* OF_BARRIER for networker test

* add log for debug

* add more log

* fix bug and add check

* fix bug of wrong delete global in callback

* fix bug of double free

* exchange Netwoker deconstructor

* moving BlockingCount in networker_test_main

* send_before_recv & recv_before_send ; fix lock bug of status access

* Networker -> Transport

* add TODO(chengcheng) and rename interface of Transport handler

* add more test

* fix bug and refine test code

* fix compile err for new change

* Test for correctness

* Test throughput like ibverbs read bandwith

* Fix BUG: All stat change need be set in the block protected by lock

* test 23 data up to 8388608

* note for debug

* OF_BARRIER_ALL

* ctrl_client clear

* not commnet

* test global

* CtrlServer: Clear cq before shutdown

* Fix cq shutdown on loop thread

* add log

* try shutdown grpc server before cq shutdown

* move grpc server to loop thread

* fix rpc loop return condition

* revert change in ctrl server

* fix bug of delete global in runtime

* transport support 1. Receive size > Send size; 2. Local Send/Recv

* add test for Send size < Recv size and local transport

* fix bug when Send size < Recv size

* refine code for review

* Fix bug of Transport UnRegisterMemory

* add Transport user doc

* refine code for review. move memcopy from block protected by mutex

Former-commit-id: 3a54beb
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
automerge bottleneck blocking another feature/PR feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants