Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove IDMgr::GetGpuPhyIdFromThrdId/IDMgr::GetDeviceTypeFromThrdId #6169

Merged
merged 6 commits into from
Sep 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ Maybe<SubTskGphBuilderStatus> SliceBoxingSubTskGphBuilder::Build(
thrd_id = Global<IDMgr>::Get()->PickCpuThrdIdEvenly(src_node->machine_id());
} else if (src_node->device_type() == DeviceType::kGPU) {
#ifdef WITH_CUDA
thrd_id = GetBoxingGpuThrdId(src_node->machine_id(), src_node->GpuPhyId(), "D2H");
thrd_id = GetBoxingGpuThrdId(src_node->machine_id(),
src_node->stream_id().device_id().device_index(), "D2H");
#else
UNIMPLEMENTED();
#endif
Expand Down Expand Up @@ -275,7 +276,8 @@ Maybe<SubTskGphBuilderStatus> SliceBoxingSubTskGphBuilder::Build(
} else if (in_pd.device_type() == DeviceType::kGPU) {
#ifdef WITH_CUDA
TaskNode* node = in_nodes.at(in_parallel_ids.at(out_id % in_parallel_ids.size()));
local_concat_thrd_id = GetBoxingGpuThrdId(node->machine_id(), node->GpuPhyId(), "D2H");
local_concat_thrd_id = GetBoxingGpuThrdId(
node->machine_id(), node->stream_id().device_id().device_index(), "D2H");
#else
UNIMPLEMENTED();
#endif
Expand Down Expand Up @@ -336,7 +338,8 @@ Maybe<SubTskGphBuilderStatus> SliceBoxingSubTskGphBuilder::Build(
} else if (in_pd.device_type() == DeviceType::kGPU) {
#ifdef WITH_CUDA
TaskNode* node = in_nodes.at(in_parallel_ids.at(out_id % in_parallel_ids.size()));
local_add_thrd_id = GetBoxingGpuThrdId(node->machine_id(), node->GpuPhyId(), "D2H");
local_add_thrd_id = GetBoxingGpuThrdId(
node->machine_id(), node->stream_id().device_id().device_index(), "D2H");
#else
UNIMPLEMENTED();
#endif
Expand Down Expand Up @@ -382,7 +385,8 @@ Maybe<SubTskGphBuilderStatus> SliceBoxingSubTskGphBuilder::Build(
} else if (in_pd.device_type() == DeviceType::kGPU) {
#ifdef WITH_CUDA
TaskNode* node = in_nodes.at(in_ids_on_machine.front());
local_add_thrd_id = GetBoxingGpuThrdId(node->machine_id(), node->GpuPhyId(), "H2D");
local_add_thrd_id = GetBoxingGpuThrdId(
node->machine_id(), node->stream_id().device_id().device_index(), "H2D");
#else
UNIMPLEMENTED();
#endif
Expand Down
17 changes: 8 additions & 9 deletions oneflow/core/graph/boxing/sub_task_graph_builder_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ bool SubTskGphBuilderUtil::HasEmptySliceIfSplit(int64_t parallel_num,
}

bool SubTskGphBuilderUtil::IsOnSameGPU(const TaskNode* lhs, const TaskNode* rhs) {
return lhs->machine_id() == rhs->machine_id() && lhs->device_type() == DeviceType::kGPU
&& rhs->device_type() == DeviceType::kGPU && lhs->GpuPhyId() == rhs->GpuPhyId();
return lhs->stream_id().device_id() == rhs->stream_id().device_id()
&& lhs->stream_id().device_id().device_type() == DeviceType::kGPU;
}

bool SubTskGphBuilderUtil::IsBoxingS2S(const cfg::SbpParallel& src, const cfg::SbpParallel& dst) {
Expand Down Expand Up @@ -104,19 +104,18 @@ int64_t SubTskGphBuilderUtil::GetDistance(const ParallelDesc& src_parallel_desc,
}

int64_t SubTskGphBuilderUtil::GetDistance(const TaskNode* src, const TaskNode* dst) {
const auto GetDevPhyId = [](const DeviceType device_type, const int64_t thrd_id) -> int64_t {
if (device_type == DeviceType::kGPU) {
return Global<IDMgr>::Get()->GetGpuPhyIdFromThrdId(thrd_id);
} else if (device_type == DeviceType::kCPU) {
const auto GetDevPhyId = [](const TaskNode* node) -> int64_t {
const DeviceId& device_id = node->stream_id().device_id();
if (device_id.device_type() == DeviceType::kCPU) {
return 0;
} else {
UNIMPLEMENTED();
return device_id.device_index();
}
};
const DeviceType src_device_type = src->device_type();
const int64_t src_dev_phy_id = GetDevPhyId(src_device_type, src->thrd_id());
const int64_t src_dev_phy_id = GetDevPhyId(src);
const DeviceType dst_device_type = dst->device_type();
const int64_t dst_dev_phy_id = GetDevPhyId(dst_device_type, dst->thrd_id());
const int64_t dst_dev_phy_id = GetDevPhyId(dst);
return GetDistance(src->machine_id(), src_dev_phy_id, src_device_type, dst->machine_id(),
dst_dev_phy_id, dst_device_type);
}
Expand Down
3 changes: 2 additions & 1 deletion oneflow/core/graph/copy_task_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ void CopyHdTaskNode::InitProducedRegstMemCase(MemoryCase* mem_case) {
if (copy_type_ == CopyHdOpConf::H2D) {
TaskNode::InitProducedRegstMemCase(mem_case);
} else if (copy_type_ == CopyHdOpConf::D2H) {
mem_case->mutable_host_mem()->mutable_cuda_pinned_mem()->set_device_id(GpuPhyId());
mem_case->mutable_host_mem()->mutable_cuda_pinned_mem()->set_device_id(
stream_id().device_id().device_index());
} else {
UNIMPLEMENTED();
}
Expand Down
2 changes: 1 addition & 1 deletion oneflow/core/graph/task_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ void TaskGraph::ForEachGpuDeviceNodes(
HashMap<std::pair<int64_t, int64_t>, HashSet<TaskNode*>> global_dev_phy_id2nodes;
ForEachNode([&](TaskNode* task_node) {
if (task_node->device_type() != DeviceType::kGPU) { return; }
int64_t dev_phy_id = Global<IDMgr>::Get()->GetGpuPhyIdFromThrdId(task_node->thrd_id());
int64_t dev_phy_id = task_node->stream_id().device_id().device_index();
global_dev_phy_id2nodes[{task_node->machine_id(), dev_phy_id}].emplace(task_node);
});
for (const auto& pair : global_dev_phy_id2nodes) { Handler(pair.second); }
Expand Down
17 changes: 10 additions & 7 deletions oneflow/core/graph/task_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,13 @@ std::shared_ptr<RegstDesc> TaskNode::GetSoleConsumedRegst(const std::string& nam
return vec.front();
}

DeviceType TaskNode::device_type() const {
return Global<IDMgr>::Get()->GetDeviceTypeFromThrdId(thrd_id_);
const StreamId& TaskNode::stream_id() const {
CHECK(new_task_id_);
return new_task_id_->stream_id();
}

DeviceType TaskNode::device_type() const { return stream_id().device_id().device_type(); }

void TaskNode::set_machine_id(int64_t val) {
CHECK_EQ(machine_id_, -1);
machine_id_ = val;
Expand Down Expand Up @@ -310,16 +313,16 @@ void TaskNode::InitProducedRegstMemCase(MemoryCase* mem_case) {
if (device_type() == DeviceType::kCPU) {
mem_case->mutable_host_mem();
} else if (device_type() == DeviceType::kGPU) {
mem_case->mutable_device_cuda_mem()->set_device_id(
Global<IDMgr>::Get()->GetGpuPhyIdFromThrdId(thrd_id_));
mem_case->mutable_device_cuda_mem()->set_device_id(stream_id().device_id().device_index());
} else {
UNIMPLEMENTED();
}
}

void TaskNode::PinConsumedRegstMemCase(MemoryCase* mem_case) {
if (mem_case->has_host_mem() && device_type() == DeviceType::kGPU) {
mem_case->mutable_host_mem()->mutable_cuda_pinned_mem()->set_device_id(GpuPhyId());
mem_case->mutable_host_mem()->mutable_cuda_pinned_mem()->set_device_id(
stream_id().device_id().device_index());
}
}

Expand All @@ -336,8 +339,8 @@ void TaskNode::UpdateTaskId() {
CHECK_NE(machine_id_, -1);
CHECK_NE(thrd_id_, -1);
StreamId stream_id = DeserializeStreamIdFromInt64(thrd_id_);
TaskId task_id = Global<IDMgr>::Get()->GetTaskIdGenerator()->Generate(stream_id);
task_id_ = SerializeTaskIdToInt64(task_id);
new_task_id_.reset(new TaskId(Global<IDMgr>::Get()->GetTaskIdGenerator()->Generate(stream_id)));
task_id_ = SerializeTaskIdToInt64(*new_task_id_);
}

void TaskNode::EraseConsumedRegstsByName(const std::string& name) {
Expand Down
3 changes: 2 additions & 1 deletion oneflow/core/graph/task_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class TaskNode : public Node<TaskNode, TaskEdge> {
int64_t machine_id() const { return machine_id_; }
int64_t thrd_id() const { return thrd_id_; }
int64_t task_id() const { return task_id_; }
const StreamId& stream_id() const;
int64_t chain_id() const { return chain_id_; }
int64_t order_in_graph() const { return order_in_graph_; }
const ExecGraph& exec_gph() const { return exec_gph_; }
Expand All @@ -67,7 +68,6 @@ class TaskNode : public Node<TaskNode, TaskEdge> {
}
DeviceType device_type() const;
virtual const ParallelContext* parallel_ctx() const { return nullptr; }
int64_t GpuPhyId() const { return Global<IDMgr>::Get()->GetGpuPhyIdFromThrdId(thrd_id_); }

// Setters
void set_machine_id(int64_t val);
Expand Down Expand Up @@ -150,6 +150,7 @@ class TaskNode : public Node<TaskNode, TaskEdge> {
int64_t task_id_;
int64_t chain_id_;
int64_t order_in_graph_;
std::unique_ptr<TaskId> new_task_id_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为什么不是直接成员?因为 TaskId 不能随着 TaskNode 的初始化而初始化,且 TaskId 无默认构造函数吗?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为什么不是直接成员?因为 TaskId 不能随着 TaskNode 的初始化而初始化,且 TaskId 无默认构造函数吗?


ExecGraph exec_gph_;
HashMap<std::string, std::shared_ptr<RegstDesc>> produced_regsts_;
Expand Down
12 changes: 0 additions & 12 deletions oneflow/core/job/id_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,11 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/job/id_manager.h"
#include "oneflow/core/device/cuda_util.h"
#include "oneflow/core/common/id_util.h"
#include "oneflow/core/graph/id_serialization.h"

namespace oneflow {

DeviceType IDMgr::GetDeviceTypeFromThrdId(int64_t thrd_id) const {
return DeserializeStreamIdFromInt64(thrd_id).device_id().device_type();
}

int64_t IDMgr::GetGpuPhyIdFromThrdId(int64_t thrd_id) const {
StreamId stream_id = DeserializeStreamIdFromInt64(thrd_id);
DeviceId device_id = stream_id.device_id();
CHECK_EQ(device_id.device_type(), DeviceType::kGPU);
return device_id.device_index();
}

DeviceType IDMgr::GetDeviceTypeFromActorId(int64_t actor_id) const {
return DeserializeTaskIdFromInt64(actor_id).stream_id().device_id().device_type();
}
Expand Down
4 changes: 0 additions & 4 deletions oneflow/core/job/id_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ class IDMgr final {
int64_t NewMemBlockId() { return mem_block_id_count_++; }
int64_t NewChunkId() { return chunk_id_count_++; }

// GetFromThrdId
DeviceType GetDeviceTypeFromThrdId(int64_t thrd_id) const;
int64_t GetGpuPhyIdFromThrdId(int64_t thrd_id) const;

// Runtime
DeviceType GetDeviceTypeFromActorId(int64_t actor_id) const;
int64_t MachineId4ActorId(int64_t actor_id) const;
Expand Down
5 changes: 3 additions & 2 deletions oneflow/core/job/intra_job_mem_sharing_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,11 @@ void InitMemoryChains(Plan* plan,
HashMap<int64_t, HashMap<int64_t, MemoryChain>>* device2chain2mem_chain) {
for (int64_t i = 0; i < plan->task_size(); ++i) {
TaskProto* task = plan->mutable_task(i);
const StreamId stream_id = PlanUtil::GetStreamId(*task);
int64_t machine_id = task->machine_id();
DeviceType device_type = Global<IDMgr>::Get()->GetDeviceTypeFromThrdId(task->thrd_id());
DeviceType device_type = stream_id.device_id().device_type();
if (device_type != DeviceType::kGPU) { continue; }
int64_t device_id = Global<IDMgr>::Get()->GetGpuPhyIdFromThrdId(task->thrd_id());
int64_t device_id = stream_id.device_id().device_index();
int64_t device_unique_id = GenDeviceUniqueId(machine_id, device_id);
MemoryChain* mem_chain =
&((*device2chain2mem_chain)[device_unique_id][task->task_set_info().chain_id()]);
Expand Down
27 changes: 16 additions & 11 deletions oneflow/core/job/plan_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ limitations under the License.
#include "oneflow/core/job/env_desc.h"
#include "oneflow/core/job/global_for.h"
#include "oneflow/core/common/str_util.h"
#include "oneflow/core/graph/task_node.h"
#include "oneflow/core/graph/plan_task_graph.h"
#include "oneflow/core/graph/boxing/collective_boxing_util.h"
#include "oneflow/core/memory/chunk_manager.h"
#include "oneflow/core/memory/memory_case_util.h"
#include "oneflow/core/register/runtime_register_desc.h"
#include "oneflow/core/persistence/tee_persistent_log_stream.h"
#include "oneflow/core/graph/id_serialization.h"

namespace oneflow {

Expand Down Expand Up @@ -460,10 +460,10 @@ void PlanUtil::ToDotFile(const Plan& plan, const std::string& filepath) {
return;
}
if (pass_tag == kNoPassTag) {
if (Global<IDMgr>::Get()->GetDeviceTypeFromThrdId(task_proto.thrd_id()) == DeviceType::kGPU) {
int64_t device_id = Global<IDMgr>::Get()->GetGpuPhyIdFromThrdId(task_proto.thrd_id());
const StreamId stream_id = PlanUtil::GetStreamId(task_proto);
if (stream_id.device_id().device_type() == DeviceType::kGPU) {
machine_id2job_id_device_id2node_list[task_proto.machine_id()][task_proto.job_id()]
[device_id]
[stream_id.device_id().device_index()]
.push_back(node_def);
machine_id2device_id2node_list_job_ids[task_proto.machine_id()].insert(task_proto.job_id());
} else {
Expand Down Expand Up @@ -757,13 +757,10 @@ struct CollectiveBoxingRequestInfo {

void GetDeviceDesc(const TaskProto* task_proto, boxing::collective::DeviceDesc* device_desc) {
device_desc->set_machine_id(task_proto->machine_id());
const int64_t thrd_id = Global<IDMgr>::Get()->ThrdId4ActorId(task_proto->task_id());
device_desc->set_device_type(Global<IDMgr>::Get()->GetDeviceTypeFromThrdId(thrd_id));
if (device_desc->device_type() == DeviceType::kGPU) {
device_desc->set_device_id(Global<IDMgr>::Get()->GetGpuPhyIdFromThrdId(thrd_id));
} else {
UNIMPLEMENTED();
}
const StreamId stream_id = PlanUtil::GetStreamId(*task_proto);
const DeviceId& device_id = stream_id.device_id();
device_desc->set_device_type(device_id.device_type());
device_desc->set_device_id(device_id.device_index());
}

} // namespace
Expand Down Expand Up @@ -981,4 +978,12 @@ void PlanUtil::PopulateOpAttibute(
}
}

/*static*/ StreamId PlanUtil::GetStreamId(const TaskProto& task) {
return DeserializeStreamIdFromInt64(task.thrd_id());
}

/*static*/ int64_t PlanUtil::GetDeviceIndex(const TaskProto& task) {
return GetStreamId(task).device_id().device_index();
}

} // namespace oneflow
3 changes: 3 additions & 0 deletions oneflow/core/job/plan_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ limitations under the License.
#include "oneflow/core/common/util.h"
#include "oneflow/core/job/plan.pb.h"
#include "oneflow/core/job/job.pb.h"
#include "oneflow/core/common/id_util.h"

namespace oneflow {

Expand All @@ -45,6 +46,8 @@ struct PlanUtil {
static void PopulateOpAttibute(
Plan* plan,
const PbMap<int64_t, ::oneflow::OpAttributeRefTable>& job_id2op_attribute_ref_table);
static StreamId GetStreamId(const TaskProto& task);
static int64_t GetDeviceIndex(const TaskProto& task);
};

} // namespace oneflow
Expand Down
3 changes: 2 additions & 1 deletion oneflow/user/summary/plan_to_physical_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
#include "oneflow/core/persistence/tee_persistent_log_stream.h"
#include "oneflow/core/job/id_manager.h"
#include "oneflow/core/framework/to_string.h"
#include "oneflow/core/job/plan_util.h"

namespace oneflow {

Expand Down Expand Up @@ -54,7 +55,7 @@ void PlanToPhysicalGraphFile(const Plan& plan) {
node->set_name(task_id2op_name.at(task.task_id()));
const OperatorConf& op_conf =
task.exec_sequence().exec_node(0).kernel_conf().op_attribute().op_conf();
DeviceType device_type = Global<IDMgr>::Get()->GetDeviceTypeFromThrdId(task.thrd_id());
DeviceType device_type = PlanUtil::GetStreamId(task).device_id().device_type();
node->set_device(*CHECK_JUST(DeviceTag4DeviceType(device_type)));
if (op_conf.has_user_conf()) {
const UserOpConf& user_op = op_conf.user_conf();
Expand Down