Skip to content

Commit

Permalink
[fleet_executor] remove SetCreatingFlag (PaddlePaddle#38539)
Browse files Browse the repository at this point in the history
  • Loading branch information
wangxicoding authored Dec 29, 2021
1 parent 48f061f commit 9171aaa
Show file tree
Hide file tree
Showing 10 changed files with 11 additions and 12 deletions.
1 change: 0 additions & 1 deletion paddle/fluid/distributed/fleet_executor/carrier.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ class Carrier final {
Interceptor* SetInterceptor(int64_t interceptor_id,
std::unique_ptr<Interceptor>);

void SetCreatingFlag(bool flag) {}
void SetMsgBus(const std::shared_ptr<MessageBus>& msg_bus) {
msg_bus_ = msg_bus;
}
Expand Down
3 changes: 3 additions & 0 deletions paddle/fluid/distributed/fleet_executor/task_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <vector>

#include "paddle/fluid/framework/blocking_queue.h"
#include "paddle/fluid/platform/macros.h"

namespace paddle {
namespace distributed {
Expand Down Expand Up @@ -66,6 +67,8 @@ class TaskLoop {
}

private:
DISABLE_COPY_AND_ASSIGN(TaskLoop);

void AbortNotInLoopThread();

static thread_local TaskLoop* thread_local_loop_;
Expand Down
4 changes: 4 additions & 0 deletions paddle/fluid/distributed/fleet_executor/task_loop_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <mutex>
#include <thread>

#include "paddle/fluid/platform/macros.h"

namespace paddle {
namespace distributed {

Expand All @@ -31,6 +33,8 @@ class TaskLoopThread {
TaskLoop* StartLoop();

private:
DISABLE_COPY_AND_ASSIGN(TaskLoopThread);

void Loop();

bool start_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include <memory>
#include <vector>

#include "paddle/fluid/platform/macros.h"

namespace paddle {
namespace distributed {

Expand All @@ -37,6 +39,8 @@ class TaskLoopThreadPool {
std::vector<TaskLoop*> GetAllLoops();

private:
DISABLE_COPY_AND_ASSIGN(TaskLoopThreadPool);

bool start_;
int thread_num_;
std::vector<std::unique_ptr<TaskLoopThread>> threads_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ TEST(ComputeInterceptor, Compute) {
a->SetPlace(place);
a->SetMicroBatchScope(scopes);

carrier.SetCreatingFlag(false);

// start
InterceptorMessage msg;
msg.set_message_type(DATA_IS_READY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ TEST(ComputeInterceptor, Compute) {
carrier.SetInterceptor(1, InterceptorFactory::Create("Compute", 1, node_b));
carrier.SetInterceptor(2, InterceptorFactory::Create("Compute", 2, node_c));

carrier.SetCreatingFlag(false);

InterceptorMessage msg;
msg.set_message_type(DATA_IS_READY);
// test run three times
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ TEST(InterceptorTest, PingPong) {
0, InterceptorFactory::Create("PingPong", 0, nullptr));

carrier.SetInterceptor(1, std::make_unique<PingPongInterceptor>(1, nullptr));
carrier.SetCreatingFlag(false);

InterceptorMessage msg;
a->Send(1, msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ TEST(InterceptorTest, PingPong) {
if (pid == 0) {
Carrier* carrier =
FleetExecutor::CreateCarrier(0, interceptor_id_to_rank);
carrier->SetCreatingFlag(false);
auto msg_bus = std::make_shared<MessageBus>();
carrier->SetMsgBus(msg_bus);
// NOTE: need Init msg_bus after carrier SetMsgBus
Expand All @@ -128,7 +127,6 @@ TEST(InterceptorTest, PingPong) {
} else {
Carrier* carrier =
FleetExecutor::CreateCarrier(1, interceptor_id_to_rank);
carrier->SetCreatingFlag(false);
auto msg_bus = std::make_shared<MessageBus>();
carrier->SetMsgBus(msg_bus);
msg_bus->Init(1, {{0, ip0}, {1, ip1}}, ip1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ TEST(AmplifierInterceptor, Amplifier) {
carrier.SetInterceptor(4, InterceptorFactory::Create("Amplifier", 4, node_e));
carrier.SetInterceptor(5, InterceptorFactory::Create("Compute", 5, node_f));

carrier.SetCreatingFlag(false);

// start
InterceptorMessage msg;
msg.set_message_type(DATA_IS_READY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ TEST(AmplifierInterceptor, Amplifier) {
carrier.SetInterceptor(2, InterceptorFactory::Create("Compute", 2, node_c));
carrier.SetInterceptor(3, InterceptorFactory::Create("Amplifier", 3, node_d));

carrier.SetCreatingFlag(false);

// start
InterceptorMessage msg;
msg.set_message_type(DATA_IS_READY);
Expand Down

0 comments on commit 9171aaa

Please sign in to comment.