From 9812cc178f903b779c035d5bd24d51a6c43bed78 Mon Sep 17 00:00:00 2001 From: qingshui Date: Sat, 20 Jun 2020 11:01:34 +0800 Subject: [PATCH] Paddlebox Fix DualBox mpi coredump bug (#23) --- paddle/fluid/framework/data_feed.h | 8 +++ paddle/fluid/framework/data_set.cc | 68 ++++++++++----------- paddle/fluid/framework/data_set.h | 5 +- paddle/fluid/framework/fleet/box_wrapper.cc | 21 ++++++- 4 files changed, 64 insertions(+), 38 deletions(-) diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index 08debb3ce2d64..add8641977272 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -733,6 +733,10 @@ struct SlotValues { std::vector slot_values; std::vector slot_offsets; + ~SlotValues() { + slot_values.shrink_to_fit(); + slot_offsets.shrink_to_fit(); + } void add_values(const T* values, uint32_t num) { if (slot_offsets.empty()) { slot_offsets.push_back(0); @@ -797,6 +801,10 @@ inline SlotRecord make_slotrecord() { return new SlotRecordObject(); } struct SlotPvInstanceObject { std::vector ads; + ~SlotPvInstanceObject() { + ads.clear(); + ads.shrink_to_fit(); + } void merge_instance(SlotRecord ins) { ads.push_back(ins); } }; diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index b108c8f5be8f6..559c0bf4d6f36 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -1389,6 +1389,20 @@ void MultiSlotDataset::SlotsShuffle( } #ifdef PADDLE_WITH_BOX_PS +class PadBoxSlotDataConsumer : public boxps::DataConsumer { + public: + explicit PadBoxSlotDataConsumer(PadBoxSlotDataset* dataset) + : _dataset(dataset) { + BoxWrapper::data_shuffle_->register_handler(this); + } + virtual ~PadBoxSlotDataConsumer() {} + virtual void on_receive(const int client_id, const char* buff, int len) { + _dataset->ReceiveSuffleData(client_id, buff, len); + } + + private: + PadBoxSlotDataset* _dataset; +}; // paddlebox PadBoxSlotDataset::PadBoxSlotDataset() { mpi_size_ = boxps::MPICluster::Ins().size(); @@ -1398,15 +1412,17 @@ PadBoxSlotDataset::PadBoxSlotDataset() { finished_counter_ = mpi_size_; mpi_flags_.assign(mpi_size_, 1); VLOG(3) << "RegisterClientToClientMsgHandler"; - BoxWrapper::data_shuffle_->register_handler( - [this](int client_id, const char* buf, int len) { - return this->ReceiveSuffleData(client_id, buf, len); - }); + data_consumer_ = reinterpret_cast(new PadBoxSlotDataConsumer(this)); VLOG(3) << "RegisterClientToClientMsgHandler done"; } SlotRecordPool(); } -PadBoxSlotDataset::~PadBoxSlotDataset() {} +PadBoxSlotDataset::~PadBoxSlotDataset() { + if (data_consumer_ != nullptr) { + delete reinterpret_cast(data_consumer_); + data_consumer_ = nullptr; + } +} // create input channel and output channel void PadBoxSlotDataset::CreateChannel() { if (input_channel_ == nullptr) { @@ -1529,7 +1545,9 @@ void PadBoxSlotDataset::MergeInsKeys(const Channel& in) { for (auto& rec : datas) { for (auto& idx : used_fea_index) { uint64_t* feas = rec->slot_uint64_feasigns_.get_values(idx, &num); - agent->AddKeys(feas, num, tid); + if (num > 0) { + agent->AddKeys(feas, num, tid); + } } feed_obj->ExpandSlotRecord(&rec); } @@ -1581,6 +1599,7 @@ void PadBoxSlotDataset::ReleaseMemory() { delete pv; } input_pv_ins_.clear(); + input_pv_ins_.shrink_to_fit(); } timeline.Pause(); VLOG(1) << "DatasetImpl::ReleaseMemory() end, cost time=" @@ -1602,7 +1621,6 @@ void PadBoxSlotDataset::ShuffleData(std::vector* shuffle_threads, std::vector loc_datas; std::vector releases; std::vector ars(mpi_size_); - std::vector> rets(mpi_size_); while (input_channel_->Read(data)) { for (auto& t : data) { @@ -1631,41 +1649,26 @@ void PadBoxSlotDataset::ShuffleData(std::vector* shuffle_threads, if (i == mpi_rank_) { continue; } - if (ars[i].Length() == 0) { + auto& ar = ars[i]; + if (ar.Length() == 0) { continue; } - rets[i] = BoxWrapper::data_shuffle_->send_message(i, ars[i].Buffer(), - ars[i].Length()); + BoxWrapper::data_shuffle_->send_message(i, ar.Buffer(), ar.Length()); + ar.Clear(); } - for (int i = 0; i < mpi_size_; ++i) { - if (i == mpi_rank_) { - continue; - } - rets[i].wait(); - ars[i].Clear(); - } data.clear(); loc_datas.clear(); } - + VLOG(3) << "end shuffle thread id = " << tid; // only one thread send finish notify if (--shuffle_counter_ == 0) { // send closed - paddle::framework::BinaryArchive ar; - for (int i = 0; i < mpi_size_; ++i) { - if (i == mpi_rank_) { - continue; - } - rets[i] = BoxWrapper::data_shuffle_->send_message(i, ar.Buffer(), - ar.Length()); - } - for (int i = 0; i < mpi_size_; ++i) { if (i == mpi_rank_) { continue; } - rets[i].wait(); + BoxWrapper::data_shuffle_->send_message(i, NULL, 0); } // local closed channel if (--finished_counter_ == 0) { @@ -1680,12 +1683,6 @@ void PadBoxSlotDataset::ReceiveSuffleData(int client_id, const char* buf, VLOG(3) << "ReceiveFromClient client_id=" << client_id << ", msg length=" << len; if (len == 0) { - return; - } - - paddle::framework::BinaryArchive ar; - ar.SetReadBuffer(const_cast(buf), len, nullptr); - if (ar.Cursor() == ar.Finish()) { if (mpi_flags_[client_id]) { mpi_flags_[client_id] = 0; --finished_counter_; @@ -1696,6 +1693,9 @@ void PadBoxSlotDataset::ReceiveSuffleData(int client_id, const char* buf, return; } + paddle::framework::BinaryArchive ar; + ar.SetReadBuffer(const_cast(buf), len, nullptr); + int offset = 0; const int max_fetch_num = 1000; std::vector data; diff --git a/paddle/fluid/framework/data_set.h b/paddle/fluid/framework/data_set.h index 16c98085b92c2..e049e906e98b8 100644 --- a/paddle/fluid/framework/data_set.h +++ b/paddle/fluid/framework/data_set.h @@ -357,7 +357,9 @@ class PadBoxSlotDataset : public DatasetImpl { // shuffle data virtual void ShuffleData(std::vector* shuffle_threads, int thread_num = -1); - virtual void ReceiveSuffleData(int client_id, const char* msg, int len); + + public: + virtual void ReceiveSuffleData(const int client_id, const char* msg, int len); private: void MergeInsKeys(const Channel& in); @@ -371,6 +373,7 @@ class PadBoxSlotDataset : public DatasetImpl { std::vector input_pv_ins_; int shuffle_thread_num_ = 10; std::atomic shuffle_counter_{0}; + void* data_consumer_ = nullptr; }; #endif diff --git a/paddle/fluid/framework/fleet/box_wrapper.cc b/paddle/fluid/framework/fleet/box_wrapper.cc index 34f773097c112..a1be28a090e39 100644 --- a/paddle/fluid/framework/fleet/box_wrapper.cc +++ b/paddle/fluid/framework/fleet/box_wrapper.cc @@ -32,6 +32,10 @@ int BoxWrapper::expand_embed_dim_ = 0; void BasicAucCalculator::compute() { double* table[2] = {&_table[0][0], &_table[1][0]}; + if (boxps::MPICluster::Ins().size() > 1) { + boxps::MPICluster::Ins().allreduce_sum(table[0], _table_size); + boxps::MPICluster::Ins().allreduce_sum(table[1], _table_size); + } double area = 0; double fp = 0; @@ -51,10 +55,21 @@ void BasicAucCalculator::compute() { _auc = area / (fp * tp); } - _mae = _local_abserr / (fp + tp); - _rmse = sqrt(_local_sqrerr / (fp + tp)); + if (boxps::MPICluster::Ins().size() > 1) { + // allreduce sum + double local_err[3] = {_local_abserr, _local_sqrerr, _local_pred}; + boxps::MPICluster::Ins().allreduce_sum(local_err, 3); + + _mae = local_err[0] / (fp + tp); + _rmse = sqrt(local_err[1] / (fp + tp)); + _predicted_ctr = local_err[2] / (fp + tp); + } else { + _mae = _local_abserr / (fp + tp); + _rmse = sqrt(_local_sqrerr / (fp + tp)); + _predicted_ctr = _local_pred / (fp + tp); + } _actual_ctr = tp / (fp + tp); - _predicted_ctr = _local_pred / (fp + tp); + _size = fp + tp; }