Skip to content

Commit 07a66be

Browse files
xcpherzmxdream
authored andcommitted
Fix async memory alloc bug & reuse scope variable memory (PaddlePaddle#59)
* fix async alloc bug * use stream safe alloc * alloc fix & reuse scope mem
1 parent 58ddc23 commit 07a66be

File tree

7 files changed

+114
-14
lines changed

7 files changed

+114
-14
lines changed

paddle/fluid/framework/data_feed.cc

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2745,15 +2745,17 @@ void SlotRecordInMemoryDataFeed::BuildSlotBatchGPU(const int ins_num, MiniBatchG
27452745
h_tensor_ptrs[j] = float_tensor.data<float>() + float_offset;
27462746
float_offset += total_instance;
27472747
} else {
2748-
h_tensor_ptrs[j] = pack->float_tensor_vec()[float_zero_slot_index].mutable_data<float>({total_instance, 1}, this->place_);
2748+
h_tensor_ptrs[j] = pack->float_tensor_vec()[float_zero_slot_index].mutable_data<float>({total_instance, 1},
2749+
this->place_);
27492750
float_zero_slot_index++;
27502751
}
27512752
} else if (info.type[0] == 'u') { // uint64
27522753
if (total_instance > 0) {
27532754
h_tensor_ptrs[j] = uint64_tensor.data<int64_t>() + uint64_offset;
27542755
uint64_offset += total_instance;
27552756
} else {
2756-
h_tensor_ptrs[j] = pack->uint64_tensor_vec()[uint64_zero_slot_index].mutable_data<int64_t>({total_instance, 1}, this->place_);
2757+
h_tensor_ptrs[j] = pack->uint64_tensor_vec()[uint64_zero_slot_index].mutable_data<int64_t>({total_instance, 1},
2758+
this->place_);
27572759
uint64_zero_slot_index++;
27582760
}
27592761
}
@@ -2869,10 +2871,12 @@ MiniBatchGpuPack* SlotRecordInMemoryDataFeed::get_pack(MiniBatchGpuPack* last_pa
28692871

28702872

28712873
MiniBatchGpuPack::MiniBatchGpuPack(const paddle::platform::Place& place,
2872-
const std::vector<UsedSlotInfo>& infos) {
2874+
const std::vector<UsedSlotInfo>& infos,
2875+
phi::StreamId stream_id) {
28732876
place_ = place;
28742877
stream_holder_.reset(new platform::stream::CUDAStream(place));
28752878
stream_ = stream_holder_->raw_stream();
2879+
alloc_stream_id_ = stream_id;
28762880

28772881
ins_num_ = 0;
28782882
pv_num_ = 0;
@@ -2892,7 +2896,7 @@ MiniBatchGpuPack::MiniBatchGpuPack(const paddle::platform::Place& place,
28922896
}
28932897
copy_host2device(&gpu_slots_, gpu_used_slots_.data(), gpu_used_slots_.size());
28942898

2895-
slot_buf_ptr_ = memory::AllocShared(place_, used_slot_size_ * sizeof(void*));
2899+
slot_buf_ptr_ = memory::AllocShared(place_, used_slot_size_ * sizeof(void*), phi_stream());
28962900

28972901
int device_id = place_.GetDeviceId();
28982902
VLOG(3) << "begin get batch pack device id: " << device_id;

paddle/fluid/framework/data_feed.h

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,8 @@ struct BatchGPUValue {
549549
class MiniBatchGpuPack {
550550
public:
551551
MiniBatchGpuPack(const paddle::platform::Place& place,
552-
const std::vector<UsedSlotInfo>& infos);
552+
const std::vector<UsedSlotInfo>& infos,
553+
phi::StreamId stream_id);
553554
~MiniBatchGpuPack();
554555

555556
bool is_use();
@@ -570,14 +571,14 @@ class MiniBatchGpuPack {
570571
if (used_float_num_ > 0) {
571572
int float_total_len = buf_.h_float_lens.back();
572573
if (float_total_len > 0) {
573-
float_tensor_.mutable_data<float>({float_total_len, 1}, this->place_);
574+
float_tensor_.mutable_data<float>({float_total_len, 1}, this->place_, phi_stream());
574575
}
575576
}
576577
if (used_uint64_num_ > 0) {
577578
int uint64_total_len = buf_.h_uint64_lens.back();
578579
if (uint64_total_len > 0) {
579580
uint64_tensor_.mutable_data<int64_t>({uint64_total_len, 1},
580-
this->place_);
581+
this->place_, phi_stream());
581582
}
582583
}
583584
}
@@ -595,9 +596,9 @@ class MiniBatchGpuPack {
595596

596597
void resize_gpu_slot_offsets(const size_t slot_total_bytes) {
597598
if (gpu_slot_offsets_ == nullptr) {
598-
gpu_slot_offsets_ = memory::AllocShared(place_, slot_total_bytes);
599+
gpu_slot_offsets_ = memory::AllocShared(place_, slot_total_bytes, phi_stream());
599600
} else if (gpu_slot_offsets_->size() < slot_total_bytes) {
600-
auto buf = memory::AllocShared(place_, slot_total_bytes);
601+
auto buf = memory::AllocShared(place_, slot_total_bytes, phi_stream());
601602
gpu_slot_offsets_.swap(buf);
602603
buf = nullptr;
603604
}
@@ -613,6 +614,11 @@ class MiniBatchGpuPack {
613614
return stream_;
614615
}
615616

617+
// only for interface compatibility
618+
phi::Stream phi_stream() {
619+
return phi::Stream(alloc_stream_id_);
620+
}
621+
616622
private:
617623
void transfer_to_gpu(void);
618624
void pack_all_data(const SlotRecord* ins_vec, int num);
@@ -666,6 +672,8 @@ class MiniBatchGpuPack {
666672

667673
std::shared_ptr<phi::Allocation> gpu_slot_offsets_ = nullptr;
668674
std::shared_ptr<phi::Allocation> slot_buf_ptr_ = nullptr;
675+
676+
phi::StreamId alloc_stream_id_ {0};
669677
};
670678
class MiniBatchGpuPackMgr {
671679
static const int MAX_DEIVCE_NUM = 16;
@@ -700,14 +708,24 @@ class MiniBatchGpuPackMgr {
700708
return pack_list_[device_id][i];
701709
}
702710
}
703-
auto* pack = new MiniBatchGpuPack(place, infos);
711+
{
712+
std::lock_guard<std::mutex> lock(mutex_);
713+
if (!alloc_stream_map_.count(device_id)) {
714+
alloc_stream_map_.emplace(device_id, new platform::stream::CUDAStream(place));
715+
}
716+
}
717+
phi::StreamId alloc_stream_id =
718+
reinterpret_cast<phi::StreamId>(alloc_stream_map_[device_id]->raw_stream());
719+
auto* pack = new MiniBatchGpuPack(place, infos, alloc_stream_id);
704720
pack->set_use_flag(true);
705721
pack_list_[device_id].push_back(pack);
706722
return pack;
707723
}
708724

709725
private:
710726
std::vector<std::vector<MiniBatchGpuPack*>> pack_list_;
727+
std::unordered_map<int, std::unique_ptr<platform::stream::CUDAStream>> alloc_stream_map_;
728+
std::mutex mutex_;
711729
};
712730
// global mgr
713731
inline MiniBatchGpuPackMgr& BatchGpuPackMgr() {

paddle/fluid/framework/device_worker.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,7 @@ class PSGPUWorker : public HogwildWorker {
643643
// async infershape
644644
int task_threads_num_ {1};
645645
int scope_num_ {task_threads_num_ + 1};
646+
// int scope_num_ {1};
646647
std::atomic<int> thread_count_ {0};
647648
std::atomic<bool> stop_token_ {false};
648649
std::atomic<bool> pack_is_end_ {false};

paddle/fluid/framework/ps_gpu_worker.cc

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,41 @@ void PSGPUWorker::CreateDeviceResource(const ProgramDesc& main_prog) {
5858
for (auto& op : ops_) {
5959
op->SetIsRuntimeInferShape(true);
6060
}
61+
62+
// reusing memory
63+
auto input_names = device_reader_->GetInputVarNames();
64+
std::set<std::string> input_names_set(input_names.begin(), input_names.end());
65+
for (auto& scope : thread_scope_vec_) {
66+
std::vector<Variable*> need_reuse;
67+
for (auto& var : block.AllVars()) {
68+
std::string name = var->Name();
69+
if (!var->Persistable()) {
70+
if (input_names_set.find(var->Name()) != input_names_set.end()) {
71+
continue;
72+
}
73+
auto* ptr = scope->FindLocalVar(var->Name());
74+
PADDLE_ENFORCE_NE(ptr, nullptr,
75+
phi::errors::NotFound("The var %s is not found.", var->Name()));
76+
need_reuse.push_back(ptr);
77+
}
78+
}
79+
need_reuse_var_vec_[scope] = std::move(need_reuse);
80+
}
81+
{
82+
need_reuse_var_.clear();
83+
for (auto& var : block.AllVars()) {
84+
std::string name = var->Name();
85+
if (!var->Persistable()) {
86+
if (input_names_set.find(var->Name()) != input_names_set.end()) {
87+
continue;
88+
}
89+
auto* ptr = thread_scope_->FindLocalVar(var->Name());
90+
PADDLE_ENFORCE_NE(ptr, nullptr,
91+
phi::errors::NotFound("The var %s is not found.", var->Name()));
92+
need_reuse_var_.push_back(ptr);
93+
}
94+
}
95+
}
6196
}
6297
}
6398

@@ -400,6 +435,18 @@ void PSGPUWorker::TrainFiles() {
400435
std::chrono::microseconds(200));
401436
}
402437
thread_scope = cur_task.scope;
438+
// tensor share buffer
439+
std::vector<Variable*>& cur_scope_vars = need_reuse_var_vec_[thread_scope];
440+
PADDLE_ENFORCE_EQ(cur_scope_vars.size(), need_reuse_var_.size(),
441+
platform::errors::Fatal(
442+
"reuse vars size must be same."));
443+
for (size_t i = 0; i < need_reuse_var_.size(); i++) {
444+
Variable* child = cur_scope_vars[i];
445+
Variable* parent = need_reuse_var_[i];
446+
if (child->IsType<LoDTensor>()) {
447+
child->GetMutable<LoDTensor>()->ShareBufferWith(*(parent->GetMutable<LoDTensor>()));
448+
}
449+
}
403450
}
404451

405452
if (cur_batch <= 0) {
@@ -409,9 +456,11 @@ void PSGPUWorker::TrainFiles() {
409456
total_ins_num += cur_batch;
410457

411458
if (shape_check_flag_.load()) {
412-
VLOG(0) << "Begin OpRunAndShapeCheck... "
459+
VLOG(0) << "Begin OpRunAndShapeCheck, "
460+
<< shape_check_count_.load();
461+
if (scope_num_ == 1 || shape_check_count_.fetch_sub(1) <= 0) {
462+
VLOG(0) << "End OpRunAndShapeCheck."
413463
<< shape_check_count_.load();
414-
if (shape_check_count_.fetch_sub(1) <= 0) {
415464
shape_check_flag_ = false;
416465
}
417466
}
@@ -514,6 +563,17 @@ void PSGPUWorker::TrainFiles() {
514563
++batch_cnt;
515564

516565
if (scope_num_ != 1) {
566+
std::vector<Variable*>& cur_scope_vars = need_reuse_var_vec_[thread_scope];
567+
PADDLE_ENFORCE_EQ(cur_scope_vars.size(), need_reuse_var_.size(),
568+
platform::errors::Fatal(
569+
"reuse vars size must be same."));
570+
for (size_t i = 0; i < need_reuse_var_.size(); i++) {
571+
Variable* child = cur_scope_vars[i];
572+
Variable* parent = need_reuse_var_[i];
573+
if (child->IsType<LoDTensor>()) {
574+
parent->GetMutable<LoDTensor>()->ShareBufferWith(*(child->GetMutable<LoDTensor>()));
575+
}
576+
}
517577
device_reader_->get_pack(cur_task.pack);
518578
free_task_queue_.Push(cur_task);
519579
}

paddle/fluid/memory/allocation/allocator_facade.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ class AllocatorFacadePrivate {
369369
bool create_if_not_found = false) {
370370
if (LIKELY(!IsCUDAGraphCapturing())) {
371371
if (stream == GetDefaultStream(place)) {
372-
VLOG(7) << "Get Allocator by passing in a default stream";
372+
VLOG(0) << "Get Allocator by passing in a default stream";
373373
return GetAllocator(place, /* A non-zero num to choose allocator_ */ 1);
374374
}
375375
}
@@ -391,6 +391,7 @@ class AllocatorFacadePrivate {
391391
/* unique_lock_guard */ {
392392
std::unique_lock<std::shared_timed_mutex> lock_guard(
393393
cuda_allocator_mutex_);
394+
VLOG(0) << "InitStreamSafeCUDAAllocator of " << reinterpret_cast<uint64_t>(stream);
394395
InitStreamSafeCUDAAllocator(place, stream);
395396
return cuda_allocators_[place][stream];
396397
}

paddle/phi/core/dense_tensor.inl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ T* mutable_data(const DDim& dims,
4848
const phi::Place& place,
4949
size_t requested_size = 0);
5050

51+
template <typename T>
52+
T* mutable_data(const DDim& dims,
53+
const phi::Place& place,
54+
const phi::Stream& stream);
55+
5156
void* mutable_data(const phi::Place& place,
5257
paddle::experimental::DataType type,
5358
size_t requested_size = 0);

paddle/phi/core/dense_tensor_impl.cc

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,15 @@ inline T* DenseTensor::mutable_data(const DDim& dims,
167167
return mutable_data<T>(place, requested_size);
168168
}
169169

170+
template <typename T>
171+
inline T* DenseTensor::mutable_data(const DDim& dims,
172+
const Place& place,
173+
const phi::Stream& stream) {
174+
static_assert(std::is_pod<T>::value, "T must be POD");
175+
meta_.dims = dims;
176+
return reinterpret_cast<T*>(mutable_data(place, paddle::experimental::CppTypeToDataType<T>::Type(), stream));
177+
}
178+
170179
template <typename T>
171180
inline T* DenseTensor::mutable_data(const Place& place, size_t requested_size) {
172181
static_assert(std::is_pod<T>::value, "T must be POD");
@@ -186,7 +195,9 @@ void DenseTensor::ShareBufferWith(const DenseTensor& tensor) {
186195
template dtype* DenseTensor::mutable_data( \
187196
const DDim& dims, const Place& place, size_t requested_size); \
188197
template dtype* DenseTensor::mutable_data(const Place& place, \
189-
size_t requested_size);
198+
size_t requested_size); \
199+
template dtype* DenseTensor::mutable_data( \
200+
const DDim& dims, const Place& place, const phi::Stream& stream);
190201

191202
LEGACY_DATA_MEMBER_FUNC_INSTANTIATION(bool)
192203
LEGACY_DATA_MEMBER_FUNC_INSTANTIATION(int8_t)

0 commit comments

Comments
 (0)