Skip to content

Commit 2f18834

Browse files
authored
[fleet_executor] Remove runtime graph, all scheduler on python side (#38261)
1 parent 8c9c81c commit 2f18834

File tree

10 files changed

+165
-366
lines changed

10 files changed

+165
-366
lines changed

paddle/fluid/distributed/fleet_executor/carrier.cc

+6-7
Original file line numberDiff line numberDiff line change
@@ -240,13 +240,12 @@ void Carrier::CreateInterceptors() {
240240
task_node->run_at_offset(), task_node->run_per_steps()));
241241

242242
std::unique_ptr<Interceptor> interceptor;
243-
if (task_node->type().empty()) {
244-
// TODO(wangxi): delete this in future
245-
interceptor.reset(new Interceptor(interceptor_id, task_node));
246-
} else {
247-
interceptor = InterceptorFactory::Create(task_node->type(),
248-
interceptor_id, task_node);
249-
}
243+
PADDLE_ENFORCE_NE(task_node->type().empty(), true,
244+
platform::errors::NotFound(
245+
"Cannot found type for task node with id %lld",
246+
task_node->task_id()));
247+
interceptor = InterceptorFactory::Create(task_node->type(), interceptor_id,
248+
task_node);
250249
interceptor->SetPlace(place_);
251250
interceptor->SetMiniBatchScope(minibatch_scope_);
252251
interceptor->SetMicroBatchScope(microbatch_scopes_);

paddle/fluid/distributed/fleet_executor/fleet_executor.cc

+21-24
Original file line numberDiff line numberDiff line change
@@ -48,32 +48,29 @@ void FleetExecutor::Init(
4848
const framework::ProgramDesc& program_desc, framework::Scope* scope,
4949
const platform::Place& place, const std::vector<TaskNode*>& task_nodes,
5050
const std::unordered_map<int64_t, int64_t>& task_id_to_rank) {
51-
if (task_nodes.size() == 0) {
52-
LOG(INFO) << "fleet executor will use c++ side scheduler construction.";
53-
runtime_graph_ = std::make_shared<RuntimeGraph>(program_desc, exe_desc_);
54-
} else {
55-
LOG(INFO) << "fleet executor has been set dependency on python side.";
56-
// TODO(fleet_exe devs): the unused_vars should be got from run time graph
57-
std::vector<std::unique_ptr<framework::OperatorBase>> ops;
58-
for (auto task_node : task_nodes) {
59-
for (auto op : task_node->ops()) {
60-
ops.emplace_back(std::unique_ptr<framework::OperatorBase>(op));
61-
}
62-
}
63-
auto unused_vars = framework::GetUnusedVars(program_desc.Block(0), ops, {});
64-
runtime_graph_ = std::make_shared<RuntimeGraph>();
65-
std::unordered_map<int64_t, TaskNode*> interceptor_id_to_task;
66-
for (auto task_node : task_nodes) {
67-
task_node->SetUnusedVars(unused_vars);
68-
int64_t interceptor_id = task_node->task_id();
69-
interceptor_id_to_task.emplace(interceptor_id, task_node);
70-
}
71-
runtime_graph_->SetInterceptorIdToRank(task_id_to_rank);
72-
runtime_graph_->SetInterceptorIdToNode(interceptor_id_to_task);
73-
for (auto& unique_op : ops) {
74-
unique_op.release();
51+
PADDLE_ENFORCE_GT(task_nodes.size(), 0,
52+
platform::errors::InvalidArgument(
53+
"Fleet executor is inited with empty task node"));
54+
// TODO(fleet_exe devs): the unused_vars should be got from run time graph
55+
std::vector<std::unique_ptr<framework::OperatorBase>> ops;
56+
for (auto task_node : task_nodes) {
57+
for (auto op : task_node->ops()) {
58+
ops.emplace_back(std::unique_ptr<framework::OperatorBase>(op));
7559
}
7660
}
61+
auto unused_vars = framework::GetUnusedVars(program_desc.Block(0), ops, {});
62+
runtime_graph_ = std::make_shared<RuntimeGraph>();
63+
std::unordered_map<int64_t, TaskNode*> interceptor_id_to_task;
64+
for (auto task_node : task_nodes) {
65+
task_node->SetUnusedVars(unused_vars);
66+
int64_t interceptor_id = task_node->task_id();
67+
interceptor_id_to_task.emplace(interceptor_id, task_node);
68+
}
69+
runtime_graph_->SetInterceptorIdToRank(task_id_to_rank);
70+
runtime_graph_->SetInterceptorIdToNode(interceptor_id_to_task);
71+
for (auto& unique_op : ops) {
72+
unique_op.release();
73+
}
7774
root_scope_ = scope;
7875
place_ = place;
7976
PADDLE_ENFORCE_NOT_NULL(root_scope_, platform::errors::InvalidArgument(

paddle/fluid/distributed/fleet_executor/fleet_executor_desc.proto

+1-5
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,5 @@ message RankInfo {
2323
message FleetExecutorDesc {
2424
optional int64 cur_rank = 1 [ default = 0 ]; // Rank id of current processor
2525
repeated RankInfo cluster_info = 2;
26-
optional int32 dp_degree = 3 [ default = 1 ];
27-
optional int32 mp_degree = 4 [ default = 1 ];
28-
optional int32 pp_degree = 5 [ default = 1 ];
29-
optional int64 num_micro_batches = 6 [ default = 1 ];
30-
optional int64 num_slots = 7 [ default = 1 ];
26+
optional int64 num_micro_batches = 3 [ default = 1 ];
3127
}

paddle/fluid/distributed/fleet_executor/runtime_graph.cc

+2-287
Original file line numberDiff line numberDiff line change
@@ -14,300 +14,15 @@
1414

1515
#include "paddle/fluid/distributed/fleet_executor/runtime_graph.h"
1616
#include "paddle/fluid/distributed/fleet_executor/task_node.h"
17-
#include "paddle/fluid/framework/executor_gc_helper.h"
18-
#include "paddle/fluid/framework/op_registry.h"
19-
#include "paddle/fluid/framework/operator.h"
20-
#include "paddle/fluid/framework/program_desc.h"
2117

2218
namespace paddle {
2319
namespace distributed {
24-
namespace {
25-
26-
using OperatorBase = RuntimeGraph::OperatorBase;
27-
using OpRole = paddle::framework::OpRole;
28-
using OpRegistry = paddle::framework::OpRegistry;
29-
using ProgramDesc = paddle::framework::ProgramDesc;
30-
31-
bool IsForward(int32_t op_role) {
32-
return (op_role == static_cast<int32_t>(OpRole::kForward)) ||
33-
(op_role == (static_cast<int32_t>(OpRole::kForward) |
34-
static_cast<int32_t>(OpRole::kLoss)));
35-
}
36-
37-
bool IsLRSched(int32_t op_role) {
38-
return op_role == static_cast<int32_t>(OpRole::kLRSched);
39-
}
40-
41-
bool IsBackward(int32_t op_role) {
42-
return (op_role == static_cast<int32_t>(OpRole::kBackward)) ||
43-
(op_role == (static_cast<int32_t>(OpRole::kBackward) |
44-
static_cast<int32_t>(OpRole::kLoss)));
45-
}
46-
47-
bool IsOptimize(int32_t op_role) {
48-
return op_role == static_cast<int32_t>(OpRole::kOptimize);
49-
}
50-
51-
struct DistCoord {
52-
int32_t dp_idx;
53-
int32_t pp_idx;
54-
int32_t mp_idx;
55-
};
56-
57-
class DistCoordSys final {
58-
public:
59-
DistCoordSys(int32_t dp_degree, int32_t pp_degree, int32_t mp_degree)
60-
: dp_degree_(dp_degree), pp_degree_(pp_degree), mp_degree_(mp_degree) {}
61-
DistCoord RankToCoord(int64_t rank) const;
62-
int64_t CoordToRank(const DistCoord& coord) const;
63-
64-
private:
65-
DISABLE_COPY_AND_ASSIGN(DistCoordSys);
66-
bool InvalidCoord(const DistCoord& coord) const;
67-
int32_t dp_degree_;
68-
int32_t pp_degree_;
69-
int32_t mp_degree_;
70-
};
71-
72-
DistCoord DistCoordSys::RankToCoord(int64_t rank) const {
73-
DistCoord coord;
74-
coord.mp_idx = rank % mp_degree_;
75-
rank /= mp_degree_;
76-
coord.pp_idx = rank % pp_degree_;
77-
rank /= pp_degree_;
78-
coord.dp_idx = rank % dp_degree_;
79-
return coord;
80-
}
81-
82-
int64_t DistCoordSys::CoordToRank(const DistCoord& coord) const {
83-
if (InvalidCoord(coord)) {
84-
return -1;
85-
}
86-
return coord.dp_idx * pp_degree_ * mp_degree_ + coord.pp_idx * mp_degree_ +
87-
coord.mp_idx;
88-
}
89-
90-
bool DistCoordSys::InvalidCoord(const DistCoord& coord) const {
91-
return coord.mp_idx < 0 || coord.mp_idx >= mp_degree_ || coord.pp_idx < 0 ||
92-
coord.pp_idx >= pp_degree_ || coord.dp_idx < 0 ||
93-
coord.dp_idx >= dp_degree_;
94-
}
95-
96-
} // namespace
97-
98-
std::vector<OpRole> RuntimeGraph::functionality_order = {
99-
OpRole::kLRSched, OpRole::kForward, OpRole::kBackward, OpRole::kOptimize};
100-
101-
RuntimeGraph::RuntimeGraph(const ProgramDesc& program,
102-
const FleetExecutorDesc& exe_desc)
103-
: exe_desc_(exe_desc) {
104-
if (exe_desc.pp_degree() == 1) {
105-
OriginProgramCompile(program);
106-
} else {
107-
SplitProgramBasedFunctionality(program);
108-
AssignTaskToIntercepter();
109-
FakeDependence();
110-
FakeRuntimeInfo();
111-
}
112-
}
113-
114-
void RuntimeGraph::OriginProgramCompile(const ProgramDesc& program) {
115-
int64_t cur_rank = exe_desc_.cur_rank();
116-
int64_t max_run_times = exe_desc_.num_micro_batches();
117-
int64_t max_slot_nums = exe_desc_.num_slots();
118-
119-
auto task_node = std::make_unique<TaskNode>(program, cur_rank, max_run_times,
120-
max_slot_nums);
121-
// TODO(wangxi): add skip vars
122-
auto unused_vars =
123-
framework::GetUnusedVars(program.Block(0), task_node->unique_ops(), {});
124-
task_node->SetType("Compute");
125-
task_node->SetUnusedVars(unused_vars);
126-
127-
task_nodes_.emplace_back(std::move(task_node));
128-
int64_t task_id = task_nodes_[0]->task_id();
129-
intercepter_id_to_rank_.insert({task_id, cur_rank});
130-
intercepter_id_to_node_.insert({task_id, task_nodes_[0].get()});
131-
}
132-
133-
void RuntimeGraph::SplitProgramBasedFunctionality(const ProgramDesc& program) {
134-
for (const auto& op_desc : program.Block(0).AllOps()) {
135-
ops_.emplace_back(OpRegistry::CreateOp(*op_desc));
136-
}
137-
// TODO(wangxi): how to gc pipeline backward send
138-
auto unused_vars = framework::GetUnusedVars(program.Block(0), ops_, {});
139-
140-
std::unordered_map<int32_t, std::vector<OperatorBase*>> role_to_ops;
141-
for (const auto& op : ops_) {
142-
int32_t op_role = op->Attr<int32_t>("op_role");
143-
OpRole new_op_role;
144-
if (IsLRSched(op_role)) {
145-
new_op_role = OpRole::kLRSched;
146-
} else if (IsForward(op_role)) {
147-
new_op_role = OpRole::kForward;
148-
} else if (IsBackward(op_role)) {
149-
new_op_role = OpRole::kBackward;
150-
} else if (IsOptimize(op_role)) {
151-
new_op_role = OpRole::kOptimize;
152-
} else {
153-
PADDLE_THROW(platform::errors::PreconditionNotMet(
154-
"The op %s is None of LRSched, Forward, Backward or Optimize.",
155-
op->Type()));
156-
}
157-
int32_t new_op_role_id = static_cast<int32_t>(new_op_role);
158-
if (role_to_ops.find(new_op_role_id) == role_to_ops.end()) {
159-
role_to_ops.insert({new_op_role_id, {}});
160-
}
161-
role_to_ops.at(new_op_role_id).emplace_back(op.get());
162-
}
163-
164-
int64_t cur_rank = exe_desc_.cur_rank();
165-
DistCoordSys coord_sys(exe_desc_.dp_degree(), exe_desc_.pp_degree(),
166-
exe_desc_.mp_degree());
167-
const auto& coord = coord_sys.RankToCoord(cur_rank);
168-
int pipeline_stage = coord.pp_idx;
169-
int64_t num_pipeline_stages = exe_desc_.pp_degree();
170-
171-
// TODO(fleet_executor dev): start up steps should be a config `num_slots`
172-
int64_t start_up_steps = num_pipeline_stages - pipeline_stage;
173-
int64_t num_micro_batches = exe_desc_.num_micro_batches();
174-
int64_t task_id = cur_rank * functionality_order.size();
175-
for (std::size_t i = 0; i < functionality_order.size(); ++i) {
176-
VLOG(3) << "Runtime graph is creating task node for: " << task_id << ".";
177-
OpRole role = functionality_order[i];
178-
int32_t role_id = static_cast<int64_t>(role);
179-
int64_t max_run_times = num_micro_batches;
180-
int64_t max_slot_nums = start_up_steps;
181-
// NOTE: use short path, each interceptor should run for max_run_times
182-
std::vector<OperatorBase*> task_ops{};
183-
if (role_to_ops.find(role_id) != role_to_ops.end()) {
184-
task_ops = role_to_ops.at(role_id);
185-
}
186-
std::unique_ptr<TaskNode> task_node = std::make_unique<TaskNode>(
187-
role_id, task_ops, cur_rank, task_id, max_run_times, max_slot_nums);
188-
if (IsLRSched(role_id) || IsOptimize(role_id)) {
189-
task_node->SetType("Amplifier");
190-
if (IsLRSched(role_id)) {
191-
task_node->SetRunPerSteps(max_run_times);
192-
} else {
193-
task_node->SetRunAtOffset(max_run_times - 1);
194-
task_node->SetRunPerSteps(max_run_times);
195-
}
196-
} else {
197-
task_node->SetType("Compute");
198-
}
199-
task_node->SetUnusedVars(unused_vars);
200-
task_nodes_.emplace_back(std::move(task_node));
201-
++task_id;
202-
}
203-
}
204-
205-
void RuntimeGraph::FakeDependence() {
206-
int64_t cur_rank = exe_desc_.cur_rank();
207-
DistCoordSys coord_sys(exe_desc_.dp_degree(), exe_desc_.pp_degree(),
208-
exe_desc_.mp_degree());
209-
const auto& coord = coord_sys.RankToCoord(cur_rank);
210-
DistCoord upstream_coord = coord, downstream_coord = coord;
211-
upstream_coord.pp_idx -= 1;
212-
downstream_coord.pp_idx += 1;
213-
int64_t pp_upstream = coord_sys.CoordToRank(upstream_coord);
214-
int64_t pp_downstream = coord_sys.CoordToRank(downstream_coord);
215-
bool is_first_stage = (pp_upstream == -1);
216-
bool is_last_stage = (pp_downstream == -1);
217-
218-
int32_t num_of_functionality = functionality_order.size();
219-
// lr(1:m) -> forward -> backward -> (m:1)optimize
220-
// ↑ ↓
221-
// lr(1:m) -> forward -> backward -> (m:1)optimize
222-
// ↑ ↓
223-
// lr(1:m) -> forward -> backward -> (m:1)optimize
224-
for (std::size_t i = 0; i < task_nodes_.size(); ++i) {
225-
auto& node = task_nodes_[i];
226-
bool is_forward = IsForward(node->role());
227-
bool is_backward = IsBackward(node->role());
228-
229-
int64_t cur_id = cur_rank * num_of_functionality + i;
230-
int64_t prev_id = cur_id - 1;
231-
int64_t next_id = cur_id + 1;
232-
233-
int64_t upstream_id = pp_upstream * num_of_functionality + i;
234-
int64_t downstream_id = pp_downstream * num_of_functionality + i;
235-
236-
// 1F1B, last stage pp_buff_size should be 1, while first stage
237-
// pp_buff_size should be pp_degree
238-
int64_t pp_buff_size = exe_desc_.pp_degree() - coord.pp_idx;
239-
240-
std::vector<std::pair<int64_t, int64_t>> ups;
241-
std::vector<std::pair<int64_t, int64_t>> downs;
242-
243-
if (i != 0) { // not lr
244-
int64_t buff_size = is_backward ? pp_buff_size : 2;
245-
ups.emplace_back(prev_id, buff_size);
246-
}
247-
if (i != task_nodes_.size() - 1) { // not optimize
248-
int64_t buff_size = is_forward ? pp_buff_size : 2;
249-
downs.emplace_back(next_id, buff_size);
250-
}
251-
252-
if (is_forward) {
253-
if (!is_first_stage) {
254-
ups.emplace_back(upstream_id, 2);
255-
}
256-
if (!is_last_stage) {
257-
downs.emplace_back(downstream_id, 2);
258-
}
259-
} else if (is_backward) {
260-
if (!is_last_stage) {
261-
ups.emplace_back(downstream_id, 2);
262-
}
263-
if (!is_first_stage) {
264-
downs.emplace_back(upstream_id, 2);
265-
}
266-
}
267-
268-
for (auto up : ups) {
269-
VLOG(3) << "Task(" << cur_id << ") AddUpstream Task(" << up.first
270-
<< ") with buff_size=" << up.second;
271-
node->AddUpstreamTask(up.first, up.second);
272-
}
273-
for (auto down : downs) {
274-
VLOG(3) << "Task(" << cur_id << ") AddDownstream Task(" << down.first
275-
<< ") with buff_size=" << down.second;
276-
node->AddDownstreamTask(down.first, down.second);
277-
}
278-
}
279-
}
280-
281-
void RuntimeGraph::AssignTaskToIntercepter() {
282-
for (const auto& task : task_nodes_) {
283-
int64_t intercepter_id = task->task_id();
284-
VLOG(3) << "Runtime graph is assigning task to interceptor: "
285-
<< intercepter_id << " with type: " << task->type() << ".";
286-
if (intercepter_id_to_node_.find(intercepter_id) !=
287-
intercepter_id_to_node_.end()) {
288-
PADDLE_THROW(platform::errors::PreconditionNotMet(
289-
"Repeated intercepter id: %d", intercepter_id));
290-
}
291-
intercepter_id_to_node_.insert({intercepter_id, task.get()});
292-
}
293-
}
294-
295-
void RuntimeGraph::FakeRuntimeInfo() {
296-
int64_t nrank = exe_desc_.cluster_info().size();
297-
int32_t num_of_functionality = functionality_order.size();
298-
for (int64_t i = 0; i < nrank; ++i) {
299-
for (int32_t j = 0; j < num_of_functionality; ++j) {
300-
int64_t intercepter_id = i * num_of_functionality + j;
301-
intercepter_id_to_rank_.insert({intercepter_id, i});
302-
}
303-
}
304-
}
30520

30621
std::string RuntimeGraph::DebugString() const {
30722
std::ostringstream os;
30823
os << "\nRuntime Graph Debug: \n";
309-
for (const auto& task : task_nodes_) {
310-
os << task->DebugString();
24+
for (const auto& pair : intercepter_id_to_node_) {
25+
os << pair.second->DebugString();
31126
os << "\n";
31227
}
31328
return os.str();

0 commit comments

Comments
 (0)