-
Notifications
You must be signed in to change notification settings - Fork 825
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Transport #3549
Transport #3549
Conversation
This comment has been minimized.
This comment has been minimized.
} else { | ||
recv_before_send = true; | ||
stat = &(it->second); | ||
CHECK_GE(stat->size, msg.size); // NOTE(chengcheng): Recv size may larger than Send size. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may
may <be>
<>
里面的内容会被解释为 html tag,就显示不出来了,可以在<
和>
前面加上\
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我的英文注释是先想的中文,再用百度翻译翻译的。。。。没想到语法还不通顺。。。
} | ||
|
||
if (recv_before_send) { | ||
// it means the local machine has call Transport::Receive() before this handler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
has
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
has
这里的<>
也没显示,has <called>
uint64_t token = first_token + i; | ||
if (i % 2 == 0) { | ||
// Recv | ||
Global<Transport>::Get()->Receive(token, 0, ptr, size + 66, [ptr, i, size, &bc]() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
magic number: 66
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里是为了测试新奇的需求,即: Receive的size >= Send的size,66是随意设置的值,只要>= 1即可
std::function<void()> callback); | ||
|
||
// TODO(chengcheng) | ||
// Global<Transport> has a dependency on Global<CommNet> which should be initialized first. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
如果注释不能一步到位写好,可暂时不写的。
Transport::Transport() { | ||
comm_net_ = Global<EpollCommNet>::Get(); | ||
this_machine_id_ = Global<MachineCtx>::Get()->this_machine_id(); | ||
CHECK(comm_net_ != nullptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
检查comm_net_ 是否为空应该直接放在获得comm_net_值之后,而且直接CHECK(comm_net_) 即可吧。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
可以,这里写代码的风格可以改进一下。
} | ||
switch (msg.type) { | ||
case TransportMsgType::kSend: { | ||
HandlerAchievedTransportSendMsgFromSrcMachine(msg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
函数名里的achieved很怪异。TransportSendMsgHandler() 是不是就够了
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SendMsg不太明确。这里希望在函数名上解释处理SendMsg的是接收端收到了来自发送端的SendMsg消息。
TransportStatus* stat = nullptr; | ||
{ | ||
std::unique_lock<std::mutex> lock(status_mutex_); | ||
auto it = token2status_.find(token); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
如果传进来stat参数,至少这里不需要再做加锁的事情
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
嗯嗯 这里可以优化一下。
std::unique_lock<std::mutex> lock(status_mutex_); | ||
auto it = token2status_.find(stat->token); | ||
CHECK(it != token2status_.end()); | ||
token2status_.erase(it); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
从token2status_ 中删除status有可能放在233行,即在这里把status从字典中取出来,并在后续执行DoRead时传进来即可。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个好像不行吧。如果在DoRead之前就把status从token2status_ 里取出来,那么status里的callback要等真正的Read操作结束在ReadCallback里调用CallBack。您的意思是把这个status保存在ReadCallback的传参列表里吗?好像这样也可以。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
是的,如果发现token2status里面已经有了,就可以在锁保护的范围取出并删除了,然后拷贝传给DoRead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
好的~学到了
stat->callback = callback; | ||
stat->is_send_ready = true; | ||
stat->is_recv_ready = false; | ||
stat->src_mem_token = comm_net_->RegisterMemory(mut_ptr, size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RegisterMemory和 UnRegisterMemory 的逻辑割裂了。(当然也许以后不需要在外部调用RegisterMemory了。) 一种办法是让stat->callback = {一个新的包含了callback 和UnregisterMemory的回调}
bool send_before_recv = false; | ||
{ | ||
std::unique_lock<std::mutex> lock(status_mutex_); | ||
auto it = token2status_.find(token); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
如新奇建议,token2status_ 的value部分弄一个unique_ptr,把这个对象在外部构造好,然后在锁保护的区间move一下,也就不需要把status内部成员的赋值分散到各处了,分散到各处正确性保证也比较难证明。
if (need_do_callback) { | ||
callback(); | ||
receive_callback(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need_do_callback和need_do_copy 逻辑混乱。逻辑上331行应该放到334 之前吧
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
您的意思是把
if (need_do_copy) { memcpy(dst_ptr, ptr, size); }
这行放在
if (need_do_callback) {
}
这个if条件语句block里面吗?
类似这样:
if (need_do_callback) {
if (need_do_copy) { memcpy(dst_ptr, ptr, size); }
callback();
receive_callback();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
是的
if (need_do_callback) { | ||
callback(); | ||
send_callback(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
368 应该放在371之前
CopyStatusOnLocalMachine(token, ptr, max_size, callback)); | ||
} else { | ||
need_do_callback = true; | ||
send_callback = std::move(it->second.callback); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里可以用std::unique_ptr优化
回调类型不是直接的std::function<void()>
,而是std::unique_ptr<std::function<void()>>
* Networker interface * global epoll comm net * half implement of Networker * implement of Networke::Send * Implement of Networker::Recieve * Implement of Networker::HandlerRecieveSendMsgFromSrcMachine * Implement of Networker::HandlerRecieveAckMsgFromDstMachine * refine iterator in Networker * add networker test exe * add log and blocking count * OF_BARRIER for networker test * add log for debug * add more log * fix bug and add check * fix bug of wrong delete global in callback * fix bug of double free * exchange Netwoker deconstructor * moving BlockingCount in networker_test_main * send_before_recv & recv_before_send ; fix lock bug of status access * Networker -> Transport * add TODO(chengcheng) and rename interface of Transport handler * add more test * fix bug and refine test code * fix compile err for new change * Test for correctness * Test throughput like ibverbs read bandwith * Fix BUG: All stat change need be set in the block protected by lock * test 23 data up to 8388608 * note for debug * OF_BARRIER_ALL * ctrl_client clear * not commnet * test global * CtrlServer: Clear cq before shutdown * Fix cq shutdown on loop thread * add log * try shutdown grpc server before cq shutdown * move grpc server to loop thread * fix rpc loop return condition * revert change in ctrl server * fix bug of delete global in runtime * transport support 1. Receive size > Send size; 2. Local Send/Recv * add test for Send size < Recv size and local transport * fix bug when Send size < Recv size * refine code for review * Fix bug of Transport UnRegisterMemory * add Transport user doc * refine code for review. move memcopy from block protected by mutex Former-commit-id: 3a54beb
Transport: 支持动态网络传输,提供Send/Recv接口。已通过验证。
进展
TODO 设计文档
输出