Skip to content

Commit

Permalink
[GPUPS]Fix pull_thread_pool_size && Code optimize (PaddlePaddle#44)
Browse files Browse the repository at this point in the history
* [GPUPS]Fix psgpuwrapper initialization (PaddlePaddle#44468)

* Update ps_gpu_wrapper.h

* Update ps_gpu_wrapper.h

* Update ps_gpu_wrapper.cc

* remote Optimizer base Class

* remove feature value

* remove featurevalue base class

* fix hbm_thread_pool&pull_thread_pool
  • Loading branch information
zmxdream committed Sep 14, 2022
1 parent de8499c commit 3bd3bb3
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 35 deletions.
21 changes: 16 additions & 5 deletions paddle/fluid/framework/fleet/heter_ps/feature_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ struct FeaturePushValue {
};
*/


/*
class FeatureValueAccessor {
public:
__host__ __device__ FeatureValueAccessor() {}
Expand All @@ -132,11 +132,12 @@ class FeatureValueAccessor {
protected:
std::unordered_map<std::string, float> _config;
};
*/

// adagrad: embed_sgd_dim=1, embedx_sgd_dim=1,embedx_dim=n
// adam std: embed_sgd_dim=4, embedx_sgd_dim=n*2+2,embedx_dim=n
// adam shared: embed_sgd_dim=4, embedx_sgd_dim=4,embedx_dim=n
class CommonFeatureValueAccessor : public FeatureValueAccessor {
class CommonFeatureValueAccessor {
public:
struct CommonFeatureValue {

Expand Down Expand Up @@ -306,8 +307,7 @@ class CommonFeatureValueAccessor : public FeatureValueAccessor {
__host__ __device__ CommonFeatureValueAccessor() {}
__host__ __device__ ~CommonFeatureValueAccessor() {}


#define DEFINE_GET_INDEX4( instance, field) \
#define DEFINE_GET_INDEX4( instance, field) \
__host__ __device__ int get_##field##_index() { \
return instance.field##Index(); \
}
Expand All @@ -323,7 +323,7 @@ class CommonFeatureValueAccessor : public FeatureValueAccessor {
DEFINE_GET_INDEX4(common_feature_value, MfDim)
#endif

__host__ __device__ virtual int Initialize() {
__host__ int Initialize() {

// TODO(zhangminxu): support adam/shared_adam
int optimizer_type = (_config.find("optimizer_type") == _config.end())
Expand All @@ -346,6 +346,10 @@ class CommonFeatureValueAccessor : public FeatureValueAccessor {
common_feature_value.optimizer_type_ = optimizer_type;
common_feature_value.embedx_dim = sparse_embedx_dim;

VLOG(0) << "Initialize optimizer type: " << common_feature_value.optimizer_type_
<< " embed_sgd_dim: " << common_feature_value.embed_sgd_dim
<< " embedx_sgd_dim: " << common_feature_value.embedx_sgd_dim;

return 0;
}

Expand Down Expand Up @@ -671,7 +675,14 @@ __host__ __device__ std::string ParseToString(const float* v, int param_size) {
for(int j = left; j < right; j++) output[j] = input[j];
}
}

__host__ int Configure(std::unordered_map<std::string, float>& config) {
_config = config;
Initialize();
return 0;
}
public:
std::unordered_map<std::string, float> _config;
CommonFeatureValue common_feature_value;
CommonPushValue common_push_value;
CommonPullValue common_pull_value;
Expand Down
16 changes: 2 additions & 14 deletions paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,11 @@ namespace paddle {
namespace framework {

template <typename GPUAccessor>
class Optimizer {
public:
Optimizer() {}
~Optimizer() {}

virtual __device__ void dy_mf_update_value(const OptimizerConfig& optimizer_config,
float* ptr,
const float* grad,
curandState& state) = 0;
};

template <typename GPUAccessor>
class SparseAdagradOptimizer : public Optimizer<GPUAccessor> {
class SparseAdagradOptimizer {

public:
SparseAdagradOptimizer() {}
SparseAdagradOptimizer(GPUAccessor& gpu_accessor): Optimizer<GPUAccessor>() {
SparseAdagradOptimizer(GPUAccessor& gpu_accessor) {
gpu_accessor_ = gpu_accessor;
}

Expand Down
13 changes: 7 additions & 6 deletions paddle/fluid/framework/fleet/ps_gpu_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -693,12 +693,13 @@ void PSGPUWrapper::BuildPull(std::shared_ptr<HeterContext> gpu_task) {
task_futures.clear();
VLOG(0) << "GpuPs build hbmps done";
}
std::vector<std::vector<int>> prefix_sum;
prefix_sum.resize(device_num);
for (int i = 0; i < device_num; i++) {
prefix_sum[i].resize(thread_keys_shard_num_ + 1);
prefix_sum[i][0] = 0;
}

// std::vector<std::vector<int>> prefix_sum;
// prefix_sum.resize(device_num);
// for (int i = 0; i < device_num; i++) {
// prefix_sum[i].resize(thread_keys_shard_num_ + 1);
// prefix_sum[i][0] = 0;
// }

/*
auto calc_prefix_func = [this, &prefix_sum, &device_keys, &device_vals,
Expand Down
21 changes: 11 additions & 10 deletions paddle/fluid/framework/fleet/ps_gpu_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,6 @@ class PSGPUWrapper {
PSGPUWrapper() {
HeterPs_ = NULL;
sleep_seconds_before_fail_exit_ = 300;
hbm_thread_pool_.resize(thread_keys_shard_num_);
for (size_t i = 0; i < hbm_thread_pool_.size(); i++) {
hbm_thread_pool_[i].reset(new ::ThreadPool(1));
}
pull_thread_pool_.resize(thread_keys_shard_num_);
for (size_t i = 0; i < pull_thread_pool_.size(); i++) {
pull_thread_pool_[i].reset(new ::ThreadPool(1));
}
mg_time_0 = std::vector<double>(8, 0.0);
mg_time_1 = std::vector<double>(8, 0.0);
}

void PullSparse(const paddle::platform::Place& place, const int table_id,
Expand Down Expand Up @@ -270,6 +260,17 @@ class PSGPUWrapper {
thread_keys_shard_num_ = sparse_shard_num;
VLOG(0) << "GPUPS set sparse shard num: " << thread_keys_shard_num_;

hbm_thread_pool_.resize(thread_keys_shard_num_);
for (size_t i = 0; i < hbm_thread_pool_.size(); i++) {
hbm_thread_pool_[i].reset(new ::ThreadPool(1));
}
pull_thread_pool_.resize(thread_keys_shard_num_);
for (size_t i = 0; i < pull_thread_pool_.size(); i++) {
pull_thread_pool_[i].reset(new ::ThreadPool(1));
}
VLOG(0) << "set hbm_thread_pool size: " << hbm_thread_pool_.size()
<< " set pull_thread_pool size: " << pull_thread_pool_.size();

float nonclk_coeff = (config.find("nonclk_coeff") == config.end())
? 1.0
: config["nonclk_coeff"];
Expand Down

0 comments on commit 3bd3bb3

Please sign in to comment.