Skip to content

Commit

Permalink
add GeneralRoleMaker (PaddlePaddle#22295)
Browse files Browse the repository at this point in the history
* add GeneralRoleMaker which is for general usage
* test=develop
  • Loading branch information
xjqbest authored Feb 2, 2020
1 parent 269db0d commit 371f377
Show file tree
Hide file tree
Showing 17 changed files with 993 additions and 63 deletions.
7 changes: 7 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,13 @@ RUN wget -q https://launchpad.net/ubuntu/+archive/primary/+sourcefiles/binutils/
cd binutils-2.27 && \
./configure && make -j && make install && cd .. && rm -rf binutils-2.27 binutils_2.27.orig.tar.gz

RUN wget --no-check-certificate https://pslib.bj.bcebos.com/openmpi-1.4.5.tar.gz && tar -xzf openmpi-1.4.5.tar.gz && \
cd openmpi-1.4.5 && ./configure --prefix=/usr/local && make all -j8 && make install -j8 && \
export LD_LIBRARY_PATH=/usr/local/lib/:$LD_LIBRARY_PATH && export PATH=/usr/local/bin:$PATH && cd .. && \
rm -rf openmpi-1.4.5.tar.gz && pip --no-cache-dir install mpi4py && ln -fs /bin/bash /bin/sh && \
apt-get install libprotobuf-dev -y
RUN pip --no-cache-dir install -U netifaces==0.10.9

# Older versions of patchelf limited the size of the files being processed and were fixed in this pr.
# https://github.com/NixOS/patchelf/commit/ba2695a8110abbc8cc6baf0eea819922ee5007fa
# So install a newer version here.
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ cc_library(parallel_executor SRCS parallel_executor.cc DEPS
graph build_strategy
fast_threaded_ssa_graph_executor variable_helper)

cc_test(dist_multi_trainer_test SRCS dist_multi_trainer_test.cc DEPS executor)
cc_library(prune SRCS prune.cc DEPS framework_proto boost)
cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context)
cc_test(var_type_inference_test SRCS var_type_inference_test.cc DEPS op_registry
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/framework/data_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ void DatasetImpl<T>::LocalShuffle() {

template <typename T>
void DatasetImpl<T>::GlobalShuffle(int thread_num) {
#ifdef PADDLE_WITH_PSLIB
VLOG(3) << "DatasetImpl<T>::GlobalShuffle() begin";
platform::Timer timeline;
timeline.Start();
Expand Down Expand Up @@ -379,6 +380,7 @@ void DatasetImpl<T>::GlobalShuffle(int thread_num) {
timeline.Pause();
VLOG(3) << "DatasetImpl<T>::GlobalShuffle() end, cost time="
<< timeline.ElapsedSec() << " seconds";
#endif
}

template <typename T>
Expand Down
4 changes: 2 additions & 2 deletions paddle/fluid/framework/dist_multi_trainer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ void DistMultiTrainer::Initialize(const TrainerDesc &trainer_desc,
need_dump_field_ = false;
}
}
mpi_rank_ = trainer_desc.mpi_rank() / 2;
mpi_size_ = trainer_desc.mpi_size() / 2;
mpi_rank_ = trainer_desc.mpi_rank();
mpi_size_ = trainer_desc.mpi_size();
dump_file_num_ = trainer_desc.dump_file_num();
const std::vector<paddle::framework::DataFeed *> readers =
dataset->GetReaders();
Expand Down
56 changes: 56 additions & 0 deletions paddle/fluid/framework/dist_multi_trainer_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <fstream>
#include <iostream>
#include <sstream>
#include "google/protobuf/io/zero_copy_stream_impl.h"
#include "google/protobuf/message.h"
#include "google/protobuf/text_format.h"
#include "gtest/gtest.h"
#include "paddle/fluid/framework/trainer.h"

#if defined _WIN32 || defined __APPLE__
#else
#define _LINUX
#endif

namespace paddle {
namespace framework {
TEST(DisMultiTrainerTest, test1) {
#ifdef _LINUX
std::shared_ptr<DistMultiTrainer> tmp1 = std::make_shared<DistMultiTrainer>();
TrainerDesc t;
t.set_class_name("DistMultiTrainer");
t.set_device_worker_name("DownpourWorker");
t.set_thread_num(1);
auto* m = t.mutable_downpour_param()->add_program_config();
m->set_program_id("123");
std::string str;
str += "name: \"MultiSlotDataFeed\"\nbatch_size: 2\nmulti_slot_desc {\n";
str += "slots {\nname: \"words\"\ntype: \"uint64\"\nis_dense: false\n";
str += "is_used: true\n}\nslots {\nname: \"label\"\ntype: \"uint64\"\n";
str += "is_dense: false\nis_used: true\n}\n}\n";
std::shared_ptr<MultiSlotDataset> dataset =
std::make_shared<MultiSlotDataset>();
dataset->SetFileList(std::vector<std::string>());
dataset->SetThreadNum(1);
dataset->SetTrainerNum(1);
dataset->SetDataFeedDesc(str);
dataset->CreateReaders();
tmp1->Initialize(t, dataset.get());
#endif
}
} // namespace framework
} // namespace paddle
39 changes: 30 additions & 9 deletions paddle/fluid/framework/fleet/gloo_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ HdfsStore::HdfsStore(const std::string& path) {
path_ = path;
wait_sleep_ms_ = 3000;
wait_timeout_ = std::chrono::seconds(999999999);
retry_times_ = 100;
}

void HdfsStore::set(const std::string& key, const std::vector<char>& data) {
Expand All @@ -33,10 +34,27 @@ void HdfsStore::set(const std::string& key, const std::vector<char>& data) {
paddle::framework::fs_remove(path);
}
int err_no = 0;
std::shared_ptr<FILE> fp = paddle::framework::fs_open_write(tmp, &err_no, "");
size_t write_count = fwrite_unlocked(data.data(), 1, data.size(), fp.get());
VLOG(3) << "HdfsStore::set write_count=" << write_count << " key " << key;
fp.reset();
for (int i = 1; i <= retry_times_; ++i) {
std::shared_ptr<FILE> fp =
paddle::framework::fs_open_write(tmp, &err_no, "");
if (err_no != 0) {
VLOG(0) << "fs_open_write failed, retry times " << i << " err no "
<< err_no;
fp.reset();
sleep(wait_sleep_ms_ / 1000);
continue;
}
size_t write_count = fwrite_unlocked(data.data(), 1, data.size(), fp.get());
if (write_count != data.size()) {
VLOG(0) << "fwrite_unlocked failed, retry times " << i << " write_count "
<< write_count << " data.size() " << data.size();
fp.reset();
sleep(2);
continue;
}
fp.reset();
break;
}
paddle::framework::fs_mv(tmp, path);
#endif
}
Expand Down Expand Up @@ -131,7 +149,7 @@ void GlooWrapper::Init(int rank, int size, const std::string& path,
}
rank_ = rank;
size_ = size;
std::string cmd = std::string("hadoop fs");
std::string cmd = std::string("${HADOOP_HOME}/bin/hadoop fs");
cmd += " -D fs.default.name=" + fs_name;
cmd += " -D hadoop.job.ugi=" + fs_ugi;
paddle::framework::hdfs_set_command(cmd);
Expand All @@ -149,16 +167,19 @@ void GlooWrapper::Init(int rank, int size, const std::string& path,
is_initialized_ = true;
}

template void GlooWrapper::AllReduce<int64_t>(
template std::vector<int64_t> GlooWrapper::AllReduce<int64_t>(
std::vector<int64_t>& sendbuf, // NOLINT
std::vector<int64_t>& recvbuf, // NOLINT
const std::string& mode);
template void GlooWrapper::AllReduce<double>(
template std::vector<double> GlooWrapper::AllReduce<double>(
std::vector<double>& sendbuf, // NOLINT
std::vector<double>& recvbuf, // NOLINT
const std::string& mode);
template std::vector<uint64_t> GlooWrapper::AllReduce<uint64_t>(
std::vector<uint64_t>& sendbuf, // NOLINT
const std::string& mode);
template std::vector<int64_t> GlooWrapper::AllGather<int64_t>(
int64_t& input); // NOLINT
template std::vector<uint64_t> GlooWrapper::AllGather<uint64_t>(
uint64_t& input); // NOLINT
template std::vector<double> GlooWrapper::AllGather<double>(
double& input); // NOLINT

Expand Down
7 changes: 5 additions & 2 deletions paddle/fluid/framework/fleet/gloo_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class HdfsStore {
std::string path_;
int wait_sleep_ms_;
std::chrono::seconds wait_timeout_;
int retry_times_;
};

} // namespace rendezvous
Expand Down Expand Up @@ -107,9 +108,10 @@ class GlooWrapper {
}

template <typename T>
void AllReduce(std::vector<T>& sendbuf, std::vector<T>& recvbuf, // NOLINT
const std::string& mode = "sum") {
std::vector<T> AllReduce(std::vector<T>& sendbuf, // NOLINT
const std::string& mode = "sum") { // NOLINT
CHECK_EQ(is_initialized_, true);
std::vector<T> recvbuf(sendbuf.size(), T());
CHECK_EQ(sendbuf.size() == recvbuf.size(), true);
#ifdef PADDLE_WITH_GLOO
gloo::AllreduceOptions opts(context_);
Expand All @@ -133,6 +135,7 @@ class GlooWrapper {
}
gloo::allreduce(opts);
#endif
return recvbuf;
}

template <typename T>
Expand Down
3 changes: 1 addition & 2 deletions paddle/fluid/framework/fleet/test_fleet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ TEST(TEST_GLOO, store_1) {
gw.Size();
gw.Barrier();
std::vector<double> input;
std::vector<double> output;
gw.AllReduce(input, output);
gw.AllReduce(input);
int64_t t;
gw.AllGather(t);
#endif
Expand Down
6 changes: 3 additions & 3 deletions paddle/fluid/pybind/gloo_wrapper_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ void BindGlooWrapper(py::module* m) {
.def("rank", &framework::GlooWrapper::Rank)
.def("size", &framework::GlooWrapper::Size)
.def("barrier", &framework::GlooWrapper::Barrier)
.def("all_reduce", &framework::GlooWrapper::AllReduce<uint64_t>)
.def("all_reduce", &framework::GlooWrapper::AllReduce<int64_t>)
.def("all_reduce", &framework::GlooWrapper::AllReduce<double>)
.def("all_gather", &framework::GlooWrapper::AllGather<uint64_t>)
.def("all_gather", &framework::GlooWrapper::AllGather<int64_t>)
.def("all_gather", &framework::GlooWrapper::AllGather<double>)
.def("Allreduce", &framework::GlooWrapper::AllReduce<int64_t>)
.def("Allreduce", &framework::GlooWrapper::AllReduce<double>);
.def("all_gather", &framework::GlooWrapper::AllGather<double>);
} // end BindGlooWrapper
} // end namespace pybind
} // end namespace paddle
16 changes: 8 additions & 8 deletions python/paddle/fluid/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ def global_shuffle(self, fleet=None, thread_num=12):
"""
trainer_num = 1
if fleet is not None:
fleet._role_maker._barrier_worker()
fleet._role_maker.barrier_worker()
trainer_num = fleet.worker_num()
if self.fleet_send_batch_size is None:
self.fleet_send_batch_size = 1024
Expand All @@ -537,14 +537,14 @@ def global_shuffle(self, fleet=None, thread_num=12):
self.dataset.set_fleet_send_batch_size(self.fleet_send_batch_size)
self.dataset.set_fleet_send_sleep_seconds(self.fleet_send_sleep_seconds)
if fleet is not None:
fleet._role_maker._barrier_worker()
fleet._role_maker.barrier_worker()
self.dataset.global_shuffle(thread_num)
if fleet is not None:
fleet._role_maker._barrier_worker()
fleet._role_maker.barrier_worker()
if self.merge_by_lineid:
self.dataset.merge_by_lineid()
if fleet is not None:
fleet._role_maker._barrier_worker()
fleet._role_maker.barrier_worker()

def release_memory(self):
"""
Expand Down Expand Up @@ -599,8 +599,8 @@ def get_memory_data_size(self, fleet=None):
local_data_size = np.array([local_data_size])
if fleet is not None:
global_data_size = local_data_size * 0
fleet._role_maker._node_type_comm.Allreduce(local_data_size,
global_data_size)
fleet._role_maker.all_reduce_worker(local_data_size,
global_data_size)
return global_data_size[0]
return local_data_size[0]

Expand Down Expand Up @@ -637,8 +637,8 @@ def get_shuffle_data_size(self, fleet=None):
local_data_size = np.array([local_data_size])
if fleet is not None:
global_data_size = local_data_size * 0
fleet._role_maker._node_type_comm.Allreduce(local_data_size,
global_data_size)
fleet._role_maker.all_reduce_worker(local_data_size,
global_data_size)
return global_data_size[0]
return local_data_size[0]

Expand Down
16 changes: 16 additions & 0 deletions python/paddle/fluid/incubate/fleet/base/fleet_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,22 @@ def init(self, role_maker=None):
self._role_maker.generate_role()
self._is_initialized = True

def all_reduce_worker(self, input, output):
"""
all reduce between workers, only support array of one dim.
Args:
input(list|numpy.array): array of one dim
output(list|numpy.array): array of one dim
"""
self._role_maker.all_reduce_worker(input, output)

def barrier_worker(self):
"""
barrier between workers
"""
self._role_maker.barrier_worker()

@abc.abstractmethod
def init_worker(self):
pass
Expand Down
Loading

0 comments on commit 371f377

Please sign in to comment.