Skip to content

Commit

Permalink
Paddlebox add paddlebox dataset gflags (PaddlePaddle#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
qingshui authored Jun 22, 2020
1 parent 73adc86 commit fbaa076
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 17 deletions.
34 changes: 27 additions & 7 deletions paddle/fluid/framework/data_feed.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ limitations under the License. */
#include "paddle/fluid/framework/fleet/fleet_wrapper.h"
#include "paddle/fluid/platform/monitor.h"
#include "paddle/fluid/platform/timer.h"

#include "paddle/fluid/string/string_helper.h"
#ifdef PADDLE_WITH_BOX_PS
#include <dlfcn.h>
extern "C" {
Expand Down Expand Up @@ -1801,7 +1801,22 @@ void SlotPaddleBoxDataFeed::Init(const DataFeedDesc& data_feed_desc) {

rank_offset_name_ = data_feed_desc.rank_offset();
pv_batch_size_ = data_feed_desc.pv_batch_size();

// fprintf(stdout, "rank_offset_name: [%s]\n", rank_offset_name_.c_str());
size_t pos = pipe_command_.find(".so");
if (pos != std::string::npos) {
pos = pipe_command_.rfind('|');
if (pos == std::string::npos) {
parser_so_path_ = pipe_command_;
pipe_command_.clear();
} else {
parser_so_path_ = pipe_command_.substr(pos + 1);
pipe_command_ = pipe_command_.substr(0, pos);
}
parser_so_path_ = paddle::string::erase_spaces(parser_so_path_);
} else {
parser_so_path_.clear();
}
}
void SlotPaddleBoxDataFeed::GetUsedSlotIndex(
std::vector<int>* used_slot_index) {
Expand Down Expand Up @@ -2325,7 +2340,7 @@ SlotInsParserMgr& global_parser_pool() {

void SlotPaddleBoxDataFeed::LoadIntoMemory() {
VLOG(3) << "LoadIntoMemory() begin, thread_id=" << thread_id_;
if (this->pipe_command_.find(".so") != std::string::npos) {
if (!parser_so_path_.empty()) {
LoadIntoMemoryByLib();
} else {
LoadIntoMemoryByCommand();
Expand All @@ -2334,11 +2349,11 @@ void SlotPaddleBoxDataFeed::LoadIntoMemory() {

void SlotPaddleBoxDataFeed::LoadIntoMemoryByLib(void) {
paddle::framework::ISlotParser* parser =
global_parser_pool().Get(this->pipe_command_, all_slots_info_);
global_parser_pool().Get(parser_so_path_, all_slots_info_);
CHECK(parser != nullptr);

boxps::PaddleDataReader* reader = nullptr;
if (BoxWrapper::GetInstance()->UseAfsApi()) {
if (BoxWrapper::GetInstance()->UseAfsApi() && pipe_command_.empty()) {
reader =
boxps::PaddleDataReader::New(BoxWrapper::GetInstance()->GetFileMgr());
}
Expand Down Expand Up @@ -2394,15 +2409,20 @@ void SlotPaddleBoxDataFeed::LoadIntoMemoryByLib(void) {
}
};
int lines = 0;
if (BoxWrapper::GetInstance()->UseAfsApi()) {
if (BoxWrapper::GetInstance()->UseAfsApi() && pipe_command_.empty()) {
while (reader->open(filename) < 0) {
sleep(1);
}
lines = line_reader.read_api(reader, func);
reader->close();
} else {
int err_no = 0;
this->fp_ = fs_open_read(filename, &err_no, "");
if (BoxWrapper::GetInstance()->UseAfsApi()) {
this->fp_ = BoxWrapper::GetInstance()->OpenReadFile(
filename, this->pipe_command_);
} else {
int err_no = 0;
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_);
}
CHECK(this->fp_ != nullptr);
__fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER);
lines = line_reader.read_file(this->fp_.get(), func);
Expand Down
4 changes: 3 additions & 1 deletion paddle/fluid/framework/data_feed.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ limitations under the License. */

USE_INT_STAT(STAT_total_feasign_num_in_mem);
USE_INT_STAT(STAT_slot_pool_size);
DECLARE_int32(padbox_record_pool_max_size);
namespace paddle {
namespace framework {

Expand Down Expand Up @@ -871,7 +872,7 @@ class SlotObjAllocator {

class SlotObjPool {
public:
SlotObjPool() : max_capacity_(10000000) {
SlotObjPool() : max_capacity_(FLAGS_padbox_record_pool_max_size) {
ins_chan_ = MakeChannel<SlotRecord>();
thread_ = std::thread([this]() { run(); });
}
Expand Down Expand Up @@ -1229,6 +1230,7 @@ class SlotPaddleBoxDataFeed : public DataFeed {
std::vector<AllSlotInfo> all_slots_info_;
std::vector<UsedSlotInfo> used_slots_info_;
std::vector<size_t> slot_value_offsets_;
std::string parser_so_path_;
};

template <class AR, class T>
Expand Down
4 changes: 2 additions & 2 deletions paddle/fluid/framework/data_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1529,8 +1529,8 @@ void PadBoxSlotDataset::MergeInsKeys(const Channel<SlotRecord>& in) {
boxps::PSAgentBase* agent = boxps_ptr->GetAgent();

int thread_num = boxps_ptr->GetFeedpassThreadNum();
if (thread_num > 10) {
thread_num = 10;
if (thread_num > FLAGS_padbox_dataset_merge_thread_num) {
thread_num = FLAGS_padbox_dataset_merge_thread_num;
}
std::mutex mutex;
for (int tid = 0; tid < thread_num; ++tid) {
Expand Down
4 changes: 3 additions & 1 deletion paddle/fluid/framework/data_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include <vector>

#include "paddle/fluid/framework/data_feed.h"
DECLARE_int32(padbox_dataset_shuffle_thread_num);
DECLARE_int32(padbox_dataset_merge_thread_num);

namespace paddle {
namespace framework {
Expand Down Expand Up @@ -371,7 +373,7 @@ class PadBoxSlotDataset : public DatasetImpl<SlotRecord> {
int mpi_size_ = 1;
int mpi_rank_ = 0;
std::vector<SlotPvInstance> input_pv_ins_;
int shuffle_thread_num_ = 10;
int shuffle_thread_num_ = FLAGS_padbox_dataset_shuffle_thread_num;
std::atomic<int> shuffle_counter_{0};
void* data_consumer_ = nullptr;
};
Expand Down
6 changes: 6 additions & 0 deletions paddle/fluid/platform/flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -475,3 +475,9 @@ DEFINE_double(local_exe_sub_scope_limit, 256.0, // MBytes
"The default value is 256 MBytes.");

DEFINE_int32(fix_dayid, 0, "Whether fix dayid in PaddleBox");
DEFINE_int32(padbox_record_pool_max_size, 2000000,
"PadBoxSlotDataset slot record pool max size");
DEFINE_int32(padbox_dataset_shuffle_thread_num, 10,
"PadBoxSlotDataset shuffle thread num");
DEFINE_int32(padbox_dataset_merge_thread_num, 10,
"PadBoxSlotDataset shuffle thread num");
23 changes: 17 additions & 6 deletions python/paddle/fluid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,23 @@ def __bootstrap__():

if core.is_compiled_with_cuda():
read_env_flags += [
'fraction_of_gpu_memory_to_use', 'initial_gpu_memory_in_mb',
'reallocate_gpu_memory_in_mb', 'cudnn_deterministic',
'enable_cublas_tensor_op_math', 'conv_workspace_size_limit',
'cudnn_exhaustive_search', 'selected_gpus', 'sync_nccl_allreduce',
'cudnn_batchnorm_spatial_persistent', 'gpu_allocator_retry_time',
'local_exe_sub_scope_limit', 'gpu_memory_limit_mb', 'fix_dayid'
'fraction_of_gpu_memory_to_use',
'initial_gpu_memory_in_mb',
'reallocate_gpu_memory_in_mb',
'cudnn_deterministic',
'enable_cublas_tensor_op_math',
'conv_workspace_size_limit',
'cudnn_exhaustive_search',
'selected_gpus',
'sync_nccl_allreduce',
'cudnn_batchnorm_spatial_persistent',
'gpu_allocator_retry_time',
'local_exe_sub_scope_limit',
'gpu_memory_limit_mb',
'fix_dayid',
'padbox_record_pool_max_size',
'padbox_dataset_shuffle_thread_num',
'padbox_dataset_merge_thread_num',
]
core.init_gflags(["--tryfromenv=" + ",".join(read_env_flags)])
core.init_glog(sys.argv[0])
Expand Down

0 comments on commit fbaa076

Please sign in to comment.