Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix split config proto and session job set #3637

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion oneflow/core/comm_network/epoll/epoll_comm_network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ EpollCommNet::~EpollCommNet() {
LOG(INFO) << "CommNet Thread " << i << " finish";
pollers_[i]->Stop();
}
OF_BARRIER();
// TODO(chengcheng): change to OF_ENV_BARRIER
OF_SESSION_BARRIER();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

epoll_comm_network 在重构,让它和job, plan 等无关,也应该是和session无关的

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个宏的实现用到了Global<ResourceDesc, ForSession>,它目前就是和Session相关。当然它有问题,重构的时候必须把它改过来

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里加个注释解释一下,再加个TODO()

for (IOEventPoller* poller : pollers_) { delete poller; }
for (auto& pair : sockfd2helper_) { delete pair.second; }
}
Expand Down
12 changes: 8 additions & 4 deletions oneflow/core/comm_network/ibverbs/ibverbs_comm_network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ void IBVerbsCommNet::RegisterMemoryDone() {
.second);
}
}
OF_BARRIER();
// TODO(chengcheng): change to OF_ENV_BARRIER
OF_SESSION_BARRIER();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里也是

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CommNetwork未来要重构成ENV级别的,跟CtrlServer和CtrlClient同一级别,在它们初始化之后就可以初始化。

Global<CtrlClient>::Get()->ClearKV(GenTokensMsgKey(this_machine_id));
}

Expand Down Expand Up @@ -111,14 +112,17 @@ IBVerbsCommNet::IBVerbsCommNet(const Plan& plan)
Global<CtrlClient>::Get()->PullKV(GenConnInfoKey(peer_id, this_machine_id), &conn_info);
qp_vec_.at(peer_id)->Connect(conn_info);
}
OF_BARRIER();
// TODO(chengcheng): change to OF_ENV_BARRIER
OF_SESSION_BARRIER();
for (int64_t peer_id : peer_machine_id()) {
qp_vec_.at(peer_id)->PostAllRecvRequest();
Global<CtrlClient>::Get()->ClearKV(GenConnInfoKey(this_machine_id, peer_id));
}
OF_BARRIER();
// TODO(chengcheng): change to OF_ENV_BARRIER
OF_SESSION_BARRIER();
poll_thread_ = std::thread(&IBVerbsCommNet::PollCQ, this);
OF_BARRIER();
// TODO(chengcheng): change to OF_ENV_BARRIER
OF_SESSION_BARRIER();
}

void IBVerbsCommNet::DoRead(void* read_id, int64_t src_machine_id, void* src_token,
Expand Down
4 changes: 2 additions & 2 deletions oneflow/core/control/ctrl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ class CtrlClient final {

#define FILE_LINE_STR __FILE__ ":" OF_PP_STRINGIZE(__LINE__)

#define OF_BARRIER_ALL() Global<CtrlClient>::Get()->Barrier(FILE_LINE_STR)
#define OF_BARRIER() \
Comment on lines -89 to -90
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

重命名使得语义更佳

#define OF_ENV_BARRIER() Global<CtrlClient>::Get()->Barrier(FILE_LINE_STR)
#define OF_SESSION_BARRIER() \
Global<CtrlClient>::Get()->Barrier(FILE_LINE_STR, \
Global<ResourceDesc, ForSession>::Get()->TotalMachineNum())

Expand Down
2 changes: 1 addition & 1 deletion oneflow/core/control/ctrl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ TEST(CtrlServer, new_delete) {
Global<ResourceDesc, ForSession>::New(GetResource());

// do test
// OF_BARRIER_ALL();
// OF_ENV_BARRIER();

Global<ResourceDesc, ForSession>::Delete();
Global<ResourceDesc, ForEnv>::Delete();
Expand Down
10 changes: 5 additions & 5 deletions oneflow/core/job/cluster_instruction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ void OccasionallyClearCtrlKV(const std::string& key) {
Global<ObsoleteCtrlKeys>::Get()->Add(key);
// 1 instead of 0 is better for avoid clearing no ctrl kv
if ((seq++) % interval == 1) {
OF_BARRIER_ALL();
OF_ENV_BARRIER();
if (Global<MachineCtx>::Get()->IsThisMachineMaster()) {
Global<ObsoleteCtrlKeys>::Get()->ForEach(
[](const std::string& k) { Global<CtrlClient>::Get()->ClearMasterKV(k); });
}
Global<ObsoleteCtrlKeys>::Get()->Clear();
OF_BARRIER_ALL();
OF_ENV_BARRIER();
}
}

Expand All @@ -94,10 +94,10 @@ void PullClusterInstruction(ClusterInstructionProto* cluster_instruction) {
} // namespace

void ClusterInstruction::NewSessionBarrier() {
OF_BARRIER_ALL();
OF_ENV_BARRIER();
Global<CtrlClient>::Get()->Clear();
Global<ObsoleteCtrlKeys>::Get()->Clear();
OF_BARRIER_ALL();
OF_ENV_BARRIER();
}

void ClusterInstruction::MasterSendSessionStart() {
Expand All @@ -124,6 +124,6 @@ void ClusterInstruction::WorkerReceiveInstruction(ClusterInstructionProto* clust
PullClusterInstruction(cluster_instruction);
}

void ClusterInstruction::HaltBarrier() { OF_BARRIER_ALL(); }
void ClusterInstruction::HaltBarrier() { OF_ENV_BARRIER(); }

} // namespace oneflow
2 changes: 2 additions & 0 deletions oneflow/core/job/job.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import "oneflow/core/register/blob_desc.proto";
import "oneflow/core/operator/op_conf.proto";
import "oneflow/core/common/shape.proto";
import "oneflow/core/job/sbp_parallel.proto";
import "oneflow/core/job/lbi_diff_watcher_info.proto";

message JobParallelViewConf {
map<string, SbpSignature> op_name2sbp_signature_conf = 1;
Expand All @@ -30,6 +31,7 @@ message JobHelperConf {
map<string, int64> lbn2logical_object_id = 5;
map<string, OptInt64> lbn2batch_axis = 6;
optional OpBlobArgPairs identical_sbp_oba_pairs = 7;
optional LbiDiffWatcherInfo lbi_diff_watcher_info = 8;
}

message Job {
Expand Down
19 changes: 19 additions & 0 deletions oneflow/core/job/job_build_and_infer_ctx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,25 @@ Maybe<void> EagerJobBuildAndInferCtx::CheckAllInputsWithSameParallelNum(
return Maybe<void>::Ok();
}

Maybe<void> JobBuildAndInferCtx::AddLbiAndDiffWatcherUuidPair(
const LbiAndDiffWatcherUuidPair& lbi_uuid_pair) {
const auto& job_name = job_->job_conf().job_name();
auto* job_helper = job_->mutable_helper();
auto* job_name2pairs =
job_helper->mutable_lbi_diff_watcher_info()->mutable_job_name2lbi_and_watcher_uuids();
LbiAndDiffWatcherUuidPairList* pairs = &(*job_name2pairs)[job_name];
auto PairFoundCond = [&](const LbiAndDiffWatcherUuidPair& x) {
return x.lbi() == lbi_uuid_pair.lbi() && x.watcher_uuid() == lbi_uuid_pair.watcher_uuid();
};
auto found_iter = std::find_if(pairs->lbi_and_uuid_pair().begin(),
pairs->lbi_and_uuid_pair().end(), PairFoundCond);
CHECK_OR_RETURN(found_iter == pairs->lbi_and_uuid_pair().end())
<< "diff blob has been watched. (logical_blob_name: "
<< GenLogicalBlobName(lbi_uuid_pair.lbi()) << ", job_name: " << job_name << ")";
*pairs->mutable_lbi_and_uuid_pair()->Add() = lbi_uuid_pair;
return Maybe<void>::Ok();
}

Maybe<OpAttribute> JobBuildAndInferCtx::AddAndInferMirroredOp(const OperatorConf& op_conf) {
CHECK_OR_RETURN(op_conf.has_scope_symbol_id());
const auto& scope = Global<vm::SymbolStorage<Scope>>::Get()->Get(op_conf.scope_symbol_id());
Expand Down
1 change: 1 addition & 0 deletions oneflow/core/job/job_build_and_infer_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class JobBuildAndInferCtx {
virtual ~JobBuildAndInferCtx() = default;

Maybe<void> SetJobConf(const JobConfigProto& job_conf);
Maybe<void> AddLbiAndDiffWatcherUuidPair(const LbiAndDiffWatcherUuidPair& lbi_uuid_pair);
Maybe<OpAttribute> AddAndInferConsistentOp(const OperatorConf& op_conf);
Maybe<OpAttribute> AddAndInferMirroredOp(const OperatorConf& op_conf);
Maybe<void> AddLossLogicalBlobName(const std::string& lbn);
Expand Down
18 changes: 0 additions & 18 deletions oneflow/core/job/job_build_and_infer_ctx_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,6 @@ Maybe<std::string> JobBuildAndInferCtxMgr::GetCurrentJobName() const {
return cur_job_name_;
}

Maybe<void> JobBuildAndInferCtxMgr::AddLbiAndDiffWatcherUuidPair(
const LbiAndDiffWatcherUuidPair& lbi_uuid_pair) const {
auto* job_name2pairs =
Global<LbiDiffWatcherInfo>::Get()->mutable_job_name2lbi_and_watcher_uuids();
const auto& job_name = JUST(GetCurrentJobName());
LbiAndDiffWatcherUuidPairList* pairs = &(*job_name2pairs)[*job_name];
auto PairFoundCond = [&](const LbiAndDiffWatcherUuidPair& x) {
return x.lbi() == lbi_uuid_pair.lbi() && x.watcher_uuid() == lbi_uuid_pair.watcher_uuid();
};
auto found_iter = std::find_if(pairs->lbi_and_uuid_pair().begin(),
pairs->lbi_and_uuid_pair().end(), PairFoundCond);
CHECK_OR_RETURN(found_iter == pairs->lbi_and_uuid_pair().end())
<< "diff blob has been watched. (logical_blob_name: "
<< GenLogicalBlobName(lbi_uuid_pair.lbi()) << ", job_name: " << *job_name << ")";
*pairs->mutable_lbi_and_uuid_pair()->Add() = lbi_uuid_pair;
return Maybe<void>::Ok();
}

Maybe<void> JobBuildAndInferCtxMgr::CloseCurrentJobBuildAndInferCtx() {
VirtualCloseJob();
if (!has_cur_job_) { return Maybe<void>::Ok(); }
Expand Down
1 change: 0 additions & 1 deletion oneflow/core/job/job_build_and_infer_ctx_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class JobBuildAndInferCtxMgr {
Maybe<JobBuildAndInferCtx*> FindJobBuildAndInferCtx(const std::string& job_name);
Maybe<std::string> GetCurrentJobName() const;
Maybe<void> CloseCurrentJobBuildAndInferCtx();
Maybe<void> AddLbiAndDiffWatcherUuidPair(const LbiAndDiffWatcherUuidPair& lbi_uuid_pair) const;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个函数放在JobBuildAndInferCtxMgr是不合适的。应该属于JobBuildAndInferCtx的接口


const JobSet& job_set() const { return job_set_; }
std::string structure_graph() const;
Expand Down
6 changes: 3 additions & 3 deletions oneflow/core/job/oneflow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ Maybe<void> CompileCurJobOnMaster(Job* job, Plan* improved_plan, bool need_job_c
} else {
PullPlan("complete_plan", &complete_plan);
}
OF_BARRIER();
OF_SESSION_BARRIER();
// Experiment Runtime
{ Runtime experiment_run(complete_plan, job_desc.piece_num_of_experiment_phase(), true); }
// Improve
Expand All @@ -325,7 +325,7 @@ Maybe<void> CompileCurJobOnMaster(Job* job, Plan* improved_plan, bool need_job_c
*improved_plan = *JUST(Improver().Improve(
*Global<AvailableMemDesc>::Get(), naive_plan,
JoinPath(FLAGS_log_dir, ActEventLogger::experiment_act_event_bin_filename())));
OF_BARRIER();
OF_SESSION_BARRIER();
TeePersistentLogStream::Create("improved_plan")->Write(*improved_plan);
}
} else {
Expand Down Expand Up @@ -972,7 +972,7 @@ Maybe<void> CompileAndMergePlanOnMaster(const PbRpf<Job>& conf_jobs, Plan* plan)
TeePersistentLogStream::Create("merged_plan")->Write(*plan);
}
}
OF_BARRIER();
OF_SESSION_BARRIER();
return Maybe<void>::Ok();
}

Expand Down
6 changes: 3 additions & 3 deletions oneflow/core/job/runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,17 @@ Runtime::Runtime(const Plan& plan, size_t total_piece_num, bool is_experiment_ph
HandoutTasks(other_tasks);
runtime_ctx->WaitUntilCntEqualZero("constructing_actor_cnt");
LOG(INFO) << "Actors on this machine constructed";
OF_BARRIER();
OF_SESSION_BARRIER();
LOG(INFO) << "Actors on every machine constructed";
if (Global<CommNet>::Get()) { Global<CommNet>::Get()->RegisterMemoryDone(); }
OF_BARRIER();
OF_SESSION_BARRIER();
runtime_ctx->NewCounter("running_actor_cnt", this_machine_task_num);
SendCmdMsg(source_tasks, ActorCmd::kStart);
}

Runtime::~Runtime() {
Global<RuntimeCtx>::Get()->WaitUntilCntEqualZero("running_actor_cnt");
OF_BARRIER();
OF_SESSION_BARRIER();
DeleteAllGlobal();
}

Expand Down
3 changes: 0 additions & 3 deletions oneflow/core/job/session_global_objects_scope.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ limitations under the License.
#include "oneflow/core/job/job_desc.h"
#include "oneflow/core/job/critical_section_desc.h"
#include "oneflow/core/job/job_build_and_infer_ctx_mgr.h"
#include "oneflow/core/job/lbi_diff_watcher_info.pb.h"
#include "oneflow/core/job/job_set_compile_ctx.h"
#include "oneflow/core/job/runtime_buffer_managers_scope.h"
#include "oneflow/core/framework/load_library.h"
Expand Down Expand Up @@ -89,7 +88,6 @@ Maybe<void> SessionGlobalObjectsScope::Init(const ConfigProto& config_proto) {
Global<CriticalSectionDesc>::New();
Global<InterUserJobInfo>::New();
Global<LazyJobBuildAndInferCtxMgr>::New();
Global<LbiDiffWatcherInfo>::New();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个Global也是非必须的。

Global<JobSetCompileCtx>::New();
Global<RuntimeBufferManagersScope>::New();
}
Expand All @@ -101,7 +99,6 @@ SessionGlobalObjectsScope::~SessionGlobalObjectsScope() {
if (Global<MachineCtx>::Get()->IsThisMachineMaster()) {
Global<RuntimeBufferManagersScope>::Delete();
Global<JobSetCompileCtx>::Delete();
Global<LbiDiffWatcherInfo>::Delete();
Global<LazyJobBuildAndInferCtxMgr>::Delete();
Global<InterUserJobInfo>::Delete();
Global<CriticalSectionDesc>::Delete();
Expand Down
2 changes: 1 addition & 1 deletion oneflow/core/job_rewriter/add_lbi_diff_watcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class AddLbiDiffWatcherOpConfs final : public OpGraphPass {

Maybe<void> AddLbiDiffWatcherOpConfs::Apply(Job* job) const {
JobBuilder job_builder(job);
const auto& map = Global<LbiDiffWatcherInfo>::Get()->job_name2lbi_and_watcher_uuids();
const auto& map = job->helper().lbi_diff_watcher_info().job_name2lbi_and_watcher_uuids();
if (map.find(GlobalJobDesc().job_name()) == map.end()) { return Maybe<void>::Ok(); }
const auto& tag2lbi_relations = job->helper().tag2lbi_relations();
const auto& conf_iter = tag2lbi_relations.find(kProducedLbi2ConsumedDiffLbi);
Expand Down
6 changes: 3 additions & 3 deletions oneflow/core/transport/transport_test_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,9 @@ Maybe<void> TestTransportOn2Machine(const std::string& first_machine_ip,
Global<EpollCommNet>::New();
Global<Transport>::New();

// OF_BARRIER Must call before test,
// OF_ENV_BARRIER Must call before test,
// to ensure that the Global<Transport> on each machine is created
OF_BARRIER_ALL();
OF_ENV_BARRIER();

// Test for correctness
// Each machine will send and receive 100 messages (50 send and 50 recv) alternately.
Expand All @@ -320,7 +320,7 @@ Maybe<void> TestTransportOn2Machine(const std::string& first_machine_ip,

TestThroughput();

OF_BARRIER_ALL();
OF_ENV_BARRIER();
std::cout << "Deleting all global..." << std::endl;
Global<Transport>::Delete();
Global<EpollCommNet>::Delete();
Expand Down
16 changes: 8 additions & 8 deletions oneflow/python/framework/c_api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,31 +114,31 @@ def IsSessionInited():
return oneflow_internal.IsSessionInited()


def InitGlobalSession(config_proto):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

重命名相关api,使之不滥用

def InitLazyGlobalSession(config_proto):
assert type(config_proto) is job_set_pb.ConfigProto
config_proto_str = text_format.MessageToString(config_proto)
error_str = oneflow_internal.InitGlobalSession(config_proto_str)
error_str = oneflow_internal.InitLazyGlobalSession(config_proto_str)
error = text_format.Parse(error_str, error_util.ErrorProto())
if error.HasField("error_type"):
raise JobBuildAndInferError(error)


def DestroyGlobalSession():
error_str = oneflow_internal.DestroyGlobalSession()
def DestroyLazyGlobalSession():
error_str = oneflow_internal.DestroyLazyGlobalSession()
error = text_format.Parse(error_str, error_util.ErrorProto())
if error.HasField("error_type"):
raise JobBuildAndInferError(error)


def StartGlobalSession():
error_str = oneflow_internal.StartGlobalSession()
def StartLazyGlobalSession():
error_str = oneflow_internal.StartLazyGlobalSession()
error = text_format.Parse(error_str, error_util.ErrorProto())
if error.HasField("error_type"):
raise JobBuildAndInferError(error)


def StopGlobalSession():
error_str = oneflow_internal.StopGlobalSession()
def StopLazyGlobalSession():
error_str = oneflow_internal.StopLazyGlobalSession()
error = text_format.Parse(error_str, error_util.ErrorProto())
if error.HasField("error_type"):
raise JobBuildAndInferError(error)
Expand Down
4 changes: 1 addition & 3 deletions oneflow/python/framework/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ def InterpretScope(session, function_desc, config_proto):
job_conf.job_name = function_desc.job_func.__name__
placement_scope = function_desc.function_attribute.default_placement_scope
if placement_scope is None:
tag_and_dev_ids = placement_util.GetDefaultMachineDeviceIds(
oneflow.env.current_resource()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

原来的代码肯定是不对的。没有出错的原因是lazy的情形下env.current_resource就是返回Global<Resource, ForSession>,但在eager下,Global<Resource, ForSession>并没有得到特别的维护。

)
tag_and_dev_ids = placement_util.GetDefaultMachineDeviceIds(session.resource)
placement_scope = placement_util.GetPlacementScope(*tag_and_dev_ids)
distribute_strategy = function_desc.function_attribute.default_distribute_strategy
if distribute_strategy is None:
Expand Down
18 changes: 14 additions & 4 deletions oneflow/python/framework/session_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def __init__(self):
self.inter_user_job_info_ = None
self.uuid2watch_handler_ = {}
self.config_proto_ = None
self.resource_ = None
self.placement_scope_stack_ = []
self.is_mirrored_strategy_enabled_stack_ = []
self.function_flag_name2default_val_ = {}
Expand Down Expand Up @@ -93,6 +94,13 @@ def config_proto(self):
self.config_proto_ = _GetDefaultConfigProto()
return self.config_proto_

@property
def resource(self):
if self.resource_ is None:
return oneflow.env.current_resource()
else:
return self.resource_

@property
def uuid2watch_handler(self):
return self.uuid2watch_handler_
Expand Down Expand Up @@ -210,14 +218,15 @@ def Init(self):
if not c_api_util.IsEnvInited():
oneflow.env.init()
_TryCompleteConfigProto(self.config_proto)
c_api_util.InitGlobalSession(self.config_proto)
self.resource_ = self.config_proto.resource
if not c_api_util.EagerExecutionEnabled():
c_api_util.InitLazyGlobalSession(self.config_proto)
for job_name, func_desc in self.job_name2function_desc_.items():
compiler.Compile(self, func_desc, self.config_proto)
self.existed_module_names_ = set()
self.job_name2var_name2var_blob_ = dict()
assert len(self.job_name2function_desc_.items()) > 0
c_api_util.StartGlobalSession()
c_api_util.StartLazyGlobalSession()
self.inter_user_job_info_ = c_api_util.GetInterUserJobInfo()
return self

Expand All @@ -232,9 +241,10 @@ def Close(self):
del self.var_name2var_blob_
del self.job_name2module_name2module_
self.ForceReleaseEagerBlobs()
c_api_util.StopGlobalSession()
c_api_util.DestroyGlobalSession()
c_api_util.StopLazyGlobalSession()
c_api_util.DestroyLazyGlobalSession()
self.status_ = SessionStatus.CLOSED
self.resource_ = None

def AddJob(self, function_desc):
assert self.status_ is SessionStatus.OPEN
Expand Down
Loading