- 
                Notifications
    You must be signed in to change notification settings 
- Fork 5.9k
[fleet_executor] Parse pipeline config #37319
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -27,4 +27,6 @@ message FleetExecutorDesc { | |
| optional int32 dp_degree = 4 [ default = 1 ]; | ||
| optional int32 mp_degree = 5 [ default = 1 ]; | ||
| optional int32 pp_degree = 6 [ default = 1 ]; | ||
| optional int64 num_micro_batches = 7 [ default = 1 ]; | ||
| optional int64 num_slots = 8 [ default = 1 ]; | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个是啥 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 就是一次最多能跑多少步 | ||
| } | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -96,6 +96,9 @@ class Interceptor { | |
| // local mailbox, written by FetchRemoteMailbox() | ||
| // read by PoolTheMailbox() | ||
| std::queue<InterceptorMessage> local_mailbox_; | ||
|  | ||
| int64_t already_run_times_{0}; | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个建议加到后面的compute_interceptor中 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个是为了fake run准备的,留着吧,后面的子类可以不用? | ||
| int64_t used_slot_nums_{0}; | ||
| }; | ||
|  | ||
| } // namespace distributed | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -136,16 +136,31 @@ void RuntimeGraph::SplitProgramBasedFunctionality(const ProgramDesc& program) { | |
| role_to_ops.at(new_op_role_id).emplace_back(op.get()); | ||
| } | ||
| int64_t cur_rank = exe_desc_.cur_rank(); | ||
| DistCoordSys coord_sys(exe_desc_.dp_degree(), exe_desc_.pp_degree(), | ||
| exe_desc_.mp_degree()); | ||
| const auto& coord = coord_sys.RankToCoord(cur_rank); | ||
| int pipeline_stage = coord.pp_idx; | ||
| int64_t num_pipeline_stages = exe_desc_.pp_degree(); | ||
| // TODO(fleet_executor dev): start up steps should be a config `num_slots` | ||
| int64_t start_up_steps = num_pipeline_stages - pipeline_stage - 1; | ||
| int64_t num_micro_batches = exe_desc_.num_micro_batches(); | ||
| int64_t task_id = cur_rank * functionality_order.size(); | ||
| for (std::size_t i = 0; i < functionality_order.size(); ++i) { | ||
| OpRole role = functionality_order[i]; | ||
| int64_t role_id = static_cast<int64_t>(role); | ||
| int64_t max_run_times = num_micro_batches; | ||
| int64_t max_slot_nums = start_up_steps; | ||
| if (IsLRSched(role_id) || IsOptimize(role_id)) { | ||
| max_run_times = 1; | ||
| max_slot_nums = 1; | ||
| } | ||
| if (role_to_ops.find(role_id) == role_to_ops.end()) { | ||
| task_nodes_.emplace_back( | ||
| TaskNode::CreateEmptyTaskNode(role_id, cur_rank, task_id)); | ||
| task_nodes_.emplace_back(TaskNode::CreateEmptyTaskNode( | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 后续可能需要有ComputeTaskNode There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 没理解,为啥要单搞一个新的tasknode出来? | ||
| role_id, cur_rank, task_id, max_run_times, max_slot_nums)); | ||
| } else { | ||
| task_nodes_.emplace_back(TaskNode::CreateTaskNode( | ||
| role_id, role_to_ops.at(role_id), cur_rank, task_id)); | ||
| task_nodes_.emplace_back( | ||
| TaskNode::CreateTaskNode(role_id, role_to_ops.at(role_id), cur_rank, | ||
| task_id, max_run_times, max_slot_nums)); | ||
| } | ||
| ++task_id; | ||
| } | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
大概不是num_micro_steps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个在python端就是global batch size / micro batch size,所以叫num_mircro_batches?一共有多少个mirco batch?其实就是num_micro_steps