-
Notifications
You must be signed in to change notification settings - Fork 826
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bugfix split config proto and session job set #3637
Changes from all commits
3a418de
e087e21
cf499fb
3bb26a3
5a928df
ee0fc17
9555207
d3f02af
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,7 +66,8 @@ void IBVerbsCommNet::RegisterMemoryDone() { | |
.second); | ||
} | ||
} | ||
OF_BARRIER(); | ||
// TODO(chengcheng): change to OF_ENV_BARRIER | ||
OF_SESSION_BARRIER(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这里也是 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CommNetwork未来要重构成ENV级别的,跟CtrlServer和CtrlClient同一级别,在它们初始化之后就可以初始化。 |
||
Global<CtrlClient>::Get()->ClearKV(GenTokensMsgKey(this_machine_id)); | ||
} | ||
|
||
|
@@ -111,14 +112,17 @@ IBVerbsCommNet::IBVerbsCommNet(const Plan& plan) | |
Global<CtrlClient>::Get()->PullKV(GenConnInfoKey(peer_id, this_machine_id), &conn_info); | ||
qp_vec_.at(peer_id)->Connect(conn_info); | ||
} | ||
OF_BARRIER(); | ||
// TODO(chengcheng): change to OF_ENV_BARRIER | ||
OF_SESSION_BARRIER(); | ||
for (int64_t peer_id : peer_machine_id()) { | ||
qp_vec_.at(peer_id)->PostAllRecvRequest(); | ||
Global<CtrlClient>::Get()->ClearKV(GenConnInfoKey(this_machine_id, peer_id)); | ||
} | ||
OF_BARRIER(); | ||
// TODO(chengcheng): change to OF_ENV_BARRIER | ||
OF_SESSION_BARRIER(); | ||
poll_thread_ = std::thread(&IBVerbsCommNet::PollCQ, this); | ||
OF_BARRIER(); | ||
// TODO(chengcheng): change to OF_ENV_BARRIER | ||
OF_SESSION_BARRIER(); | ||
} | ||
|
||
void IBVerbsCommNet::DoRead(void* read_id, int64_t src_machine_id, void* src_token, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -86,8 +86,8 @@ class CtrlClient final { | |
|
||
#define FILE_LINE_STR __FILE__ ":" OF_PP_STRINGIZE(__LINE__) | ||
|
||
#define OF_BARRIER_ALL() Global<CtrlClient>::Get()->Barrier(FILE_LINE_STR) | ||
#define OF_BARRIER() \ | ||
Comment on lines
-89
to
-90
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 重命名使得语义更佳 |
||
#define OF_ENV_BARRIER() Global<CtrlClient>::Get()->Barrier(FILE_LINE_STR) | ||
#define OF_SESSION_BARRIER() \ | ||
Global<CtrlClient>::Get()->Barrier(FILE_LINE_STR, \ | ||
Global<ResourceDesc, ForSession>::Get()->TotalMachineNum()) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,7 +34,6 @@ class JobBuildAndInferCtxMgr { | |
Maybe<JobBuildAndInferCtx*> FindJobBuildAndInferCtx(const std::string& job_name); | ||
Maybe<std::string> GetCurrentJobName() const; | ||
Maybe<void> CloseCurrentJobBuildAndInferCtx(); | ||
Maybe<void> AddLbiAndDiffWatcherUuidPair(const LbiAndDiffWatcherUuidPair& lbi_uuid_pair) const; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个函数放在JobBuildAndInferCtxMgr是不合适的。应该属于JobBuildAndInferCtx的接口 |
||
|
||
const JobSet& job_set() const { return job_set_; } | ||
std::string structure_graph() const; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,7 +30,6 @@ limitations under the License. | |
#include "oneflow/core/job/job_desc.h" | ||
#include "oneflow/core/job/critical_section_desc.h" | ||
#include "oneflow/core/job/job_build_and_infer_ctx_mgr.h" | ||
#include "oneflow/core/job/lbi_diff_watcher_info.pb.h" | ||
#include "oneflow/core/job/job_set_compile_ctx.h" | ||
#include "oneflow/core/job/runtime_buffer_managers_scope.h" | ||
#include "oneflow/core/framework/load_library.h" | ||
|
@@ -89,7 +88,6 @@ Maybe<void> SessionGlobalObjectsScope::Init(const ConfigProto& config_proto) { | |
Global<CriticalSectionDesc>::New(); | ||
Global<InterUserJobInfo>::New(); | ||
Global<LazyJobBuildAndInferCtxMgr>::New(); | ||
Global<LbiDiffWatcherInfo>::New(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个Global也是非必须的。 |
||
Global<JobSetCompileCtx>::New(); | ||
Global<RuntimeBufferManagersScope>::New(); | ||
} | ||
|
@@ -101,7 +99,6 @@ SessionGlobalObjectsScope::~SessionGlobalObjectsScope() { | |
if (Global<MachineCtx>::Get()->IsThisMachineMaster()) { | ||
Global<RuntimeBufferManagersScope>::Delete(); | ||
Global<JobSetCompileCtx>::Delete(); | ||
Global<LbiDiffWatcherInfo>::Delete(); | ||
Global<LazyJobBuildAndInferCtxMgr>::Delete(); | ||
Global<InterUserJobInfo>::Delete(); | ||
Global<CriticalSectionDesc>::Delete(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -114,31 +114,31 @@ def IsSessionInited(): | |
return oneflow_internal.IsSessionInited() | ||
|
||
|
||
def InitGlobalSession(config_proto): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 重命名相关api,使之不滥用 |
||
def InitLazyGlobalSession(config_proto): | ||
assert type(config_proto) is job_set_pb.ConfigProto | ||
config_proto_str = text_format.MessageToString(config_proto) | ||
error_str = oneflow_internal.InitGlobalSession(config_proto_str) | ||
error_str = oneflow_internal.InitLazyGlobalSession(config_proto_str) | ||
error = text_format.Parse(error_str, error_util.ErrorProto()) | ||
if error.HasField("error_type"): | ||
raise JobBuildAndInferError(error) | ||
|
||
|
||
def DestroyGlobalSession(): | ||
error_str = oneflow_internal.DestroyGlobalSession() | ||
def DestroyLazyGlobalSession(): | ||
error_str = oneflow_internal.DestroyLazyGlobalSession() | ||
error = text_format.Parse(error_str, error_util.ErrorProto()) | ||
if error.HasField("error_type"): | ||
raise JobBuildAndInferError(error) | ||
|
||
|
||
def StartGlobalSession(): | ||
error_str = oneflow_internal.StartGlobalSession() | ||
def StartLazyGlobalSession(): | ||
error_str = oneflow_internal.StartLazyGlobalSession() | ||
error = text_format.Parse(error_str, error_util.ErrorProto()) | ||
if error.HasField("error_type"): | ||
raise JobBuildAndInferError(error) | ||
|
||
|
||
def StopGlobalSession(): | ||
error_str = oneflow_internal.StopGlobalSession() | ||
def StopLazyGlobalSession(): | ||
error_str = oneflow_internal.StopLazyGlobalSession() | ||
error = text_format.Parse(error_str, error_util.ErrorProto()) | ||
if error.HasField("error_type"): | ||
raise JobBuildAndInferError(error) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,9 +58,7 @@ def InterpretScope(session, function_desc, config_proto): | |
job_conf.job_name = function_desc.job_func.__name__ | ||
placement_scope = function_desc.function_attribute.default_placement_scope | ||
if placement_scope is None: | ||
tag_and_dev_ids = placement_util.GetDefaultMachineDeviceIds( | ||
oneflow.env.current_resource() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 原来的代码肯定是不对的。没有出错的原因是lazy的情形下env.current_resource就是返回Global<Resource, ForSession>,但在eager下,Global<Resource, ForSession>并没有得到特别的维护。 |
||
) | ||
tag_and_dev_ids = placement_util.GetDefaultMachineDeviceIds(session.resource) | ||
placement_scope = placement_util.GetPlacementScope(*tag_and_dev_ids) | ||
distribute_strategy = function_desc.function_attribute.default_distribute_strategy | ||
if distribute_strategy is None: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
epoll_comm_network 在重构,让它和job, plan 等无关,也应该是和session无关的
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个宏的实现用到了
Global<ResourceDesc, ForSession>
,它目前就是和Session相关。当然它有问题,重构的时候必须把它改过来There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里加个注释解释一下,再加个TODO()