Skip to content

Commit 9d0a1eb

Browse files
authored
[cherry]Add callback after TensorCopy (#30123) (#30268)
* change to tensor copy sync * change to tensor copy sync * make copy_to safe when use TensorCopy * refine code * add ut * add cudapinned garbagecollector * add testcase: cpu place -> cuda pinned place
1 parent 284bae9 commit 9d0a1eb

File tree

7 files changed

+167
-17
lines changed

7 files changed

+167
-17
lines changed

paddle/fluid/framework/garbage_collector.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,15 @@ void StreamGarbageCollector::ClearCallback(
107107
const std::function<void()> &callback) {
108108
callback_manager_->AddCallback(callback);
109109
}
110+
111+
CUDAPinnedGarbageCollector::CUDAPinnedGarbageCollector(
112+
const platform::CUDAPinnedPlace &place, size_t max_memory_size)
113+
: GarbageCollector(place, max_memory_size) {}
114+
115+
void CUDAPinnedGarbageCollector::ClearCallback(
116+
const std::function<void()> &callback) {
117+
callback();
118+
}
110119
#endif
111120

112121
int64_t GetEagerDeletionThreshold() {

paddle/fluid/framework/garbage_collector.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,15 @@ class StreamGarbageCollector : public GarbageCollector {
119119
cudaStream_t stream_;
120120
std::unique_ptr<platform::StreamCallbackManager> callback_manager_;
121121
};
122+
123+
class CUDAPinnedGarbageCollector : public GarbageCollector {
124+
public:
125+
CUDAPinnedGarbageCollector(const platform::CUDAPinnedPlace &place,
126+
size_t max_memory_size);
127+
128+
protected:
129+
void ClearCallback(const std::function<void()> &callback) override;
130+
};
122131
#endif
123132

124133
template <typename Container>

paddle/fluid/imperative/layer.cc

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "paddle/fluid/imperative/infer_var_type_context.h"
2626
#include "paddle/fluid/imperative/op_base.h"
2727
#include "paddle/fluid/imperative/prepared_operator.h"
28+
#include "paddle/fluid/imperative/tracer.h"
2829
#include "paddle/fluid/operators/math/math_function.h"
2930
#include "paddle/fluid/platform/device_context.h"
3031
#include "paddle/fluid/platform/enforce.h"
@@ -231,9 +232,9 @@ std::shared_ptr<VarBase> VarBase::NewVarBase(const platform::Place& dst_place,
231232
true, platform::errors::InvalidArgument(
232233
"Variable is not initialized or Variable's type is not "
233234
"LoDTensor or SelectedRows when getting numpy tensor"));
235+
234236
if (Var().IsType<framework::LoDTensor>()) {
235237
auto& src_tensor = Var().Get<framework::LoDTensor>();
236-
237238
// TODO(Jiabin): change this after move unique_name generator to CXX
238239
auto new_var = std::make_shared<VarBase>(
239240
true, Name() + std::to_string(copied_counter_++));
@@ -252,10 +253,8 @@ std::shared_ptr<VarBase> VarBase::NewVarBase(const platform::Place& dst_place,
252253
platform::DeviceContextPool::Instance().Get(src_place)->Wait();
253254
}
254255
}
255-
256-
if (platform::is_gpu_place(dst_place)) {
257-
VLOG(3) << "copy tensor " << Name() << " from gpu";
258-
}
256+
VLOG(4) << "copy tensor " << Name() << " from " << Place() << " to "
257+
<< dst_place;
259258
return new_var;
260259
} else {
261260
auto& src_selected_rows = Var().Get<framework::SelectedRows>();
@@ -276,9 +275,8 @@ std::shared_ptr<VarBase> VarBase::NewVarBase(const platform::Place& dst_place,
276275
}
277276
dst_selected_rows->set_height(src_selected_rows.height());
278277
dst_selected_rows->set_rows(src_selected_rows.rows());
279-
if (platform::is_gpu_place(dst_place)) {
280-
VLOG(3) << "copy selected rows " << Name() << " from gpu";
281-
}
278+
VLOG(4) << "copy tensor " << Name() << " from " << Place() << " to "
279+
<< dst_place;
282280
return new_var;
283281
}
284282
}

paddle/fluid/imperative/tracer.cc

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,78 @@ static void PassStopGradient(const NameVarBaseMap& outs, bool generate_grad) {
5656
}
5757
}
5858

59+
void IncreaseVarbaseReferenceCountUntilCopyComplete(
60+
const std::shared_ptr<imperative::VarBase>& var,
61+
const platform::Place& place) {
62+
// Note(zhiqiu): Follow the logic of TensorCopy to determine the place that we
63+
// need to add callback, see tensor_utils.cc:245
64+
auto place_ = platform::is_gpu_place(place) ? place : var->Place();
65+
66+
auto tracer = imperative::GetCurrentTracer();
67+
auto gc = tracer->MutableGarbageCollectorIfNotExists(place_);
68+
69+
// Note(zhiqiu): This is an empty callback, the only way is to "reference"
70+
// var, so it will not be destructed until the kernels launched at current
71+
// stream of given place is finished.
72+
auto callback = [var, place_]() {
73+
VLOG(4) << "Run callback of var:" << var->Name() << " at place " << place_;
74+
};
75+
76+
gc->DirectClearCallback(callback);
77+
}
78+
79+
paddle::framework::GarbageCollector* Tracer::MutableGarbageCollectorIfNotExists(
80+
const platform::Place& place) {
81+
// if not exists, create a new GarbageCollector at given place
82+
if (gcs_.count(place) == 0) {
83+
std::unique_ptr<framework::GarbageCollector> gc;
84+
if (platform::is_gpu_place(place)) {
85+
#ifdef PADDLE_WITH_CUDA
86+
gc.reset(new framework::DefaultStreamGarbageCollector(
87+
BOOST_GET_CONST(platform::CUDAPlace, place), 0));
88+
89+
VLOG(10) << "Created GarbageCollector at " << place;
90+
#else
91+
PADDLE_THROW(platform::errors::PermissionDenied(
92+
"Paddle can't use CUDA device since it's not compiled with CUDA,"
93+
"Please recompile or reinstall Paddle with GPU support."));
94+
#endif
95+
} else if (platform::is_cuda_pinned_place(place)) {
96+
#ifdef PADDLE_WITH_CUDA
97+
gc.reset(new framework::CUDAPinnedGarbageCollector(
98+
BOOST_GET_CONST(platform::CUDAPinnedPlace, place), 0));
99+
100+
VLOG(10) << "Created GarbageCollector at " << place;
101+
#else
102+
PADDLE_THROW(platform::errors::PermissionDenied(
103+
"Paddle can't use CUDAPinned device since it's not compiled with "
104+
"CUDA,"
105+
"Please recompile or reinstall Paddle with GPU support."));
106+
#endif
107+
} else if (platform::is_xpu_place(place)) {
108+
#if defined(PADDLE_WITH_XPU)
109+
gc.reset(new framework::XPUGarbageCollector(
110+
BOOST_GET_CONST(platform::XPUPlace, place), 0));
111+
VLOG(10) << "Created GarbageCollector at " << place;
112+
#else
113+
PADDLE_THROW(platform::errors::PermissionDenied(
114+
"Paddle can't use XPU device since it's not compiled with XPU,"
115+
"Please recompile or reinstall Paddle with XPU support."));
116+
#endif
117+
} else if (platform::is_cpu_place(place)) {
118+
gc.reset(new framework::CPUGarbageCollector(
119+
BOOST_GET_CONST(platform::CPUPlace, place), 0));
120+
VLOG(10) << "Created GarbageCollector at " << place;
121+
} else {
122+
PADDLE_THROW(platform::errors::PreconditionNotMet(
123+
"Unsupported place for garbage collection"));
124+
}
125+
gcs_.emplace(place, std::move(gc));
126+
}
127+
128+
return gcs_.at(place).get();
129+
}
130+
59131
void Tracer::TraceOp(const std::string& type, const NameVarBaseMap& ins,
60132
const NameVarBaseMap& outs, framework::AttributeMap attrs,
61133
const platform::Place& place, bool trace_backward) {

paddle/fluid/imperative/tracer.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616

1717
#include <atomic>
1818
#include <future> // NOLINT
19+
#include <map>
1920
#include <memory>
2021
#include <string>
2122
#include <unordered_map>
2223
#include <vector>
2324

2425
#include "ThreadPool.h"
26+
#include "paddle/fluid/framework/garbage_collector.h"
2527
#include "paddle/fluid/imperative/basic_engine.h"
2628
#include "paddle/fluid/imperative/jit/program_desc_tracer.h"
2729
#include "paddle/fluid/imperative/layer.h"
@@ -30,6 +32,10 @@
3032
namespace paddle {
3133
namespace imperative {
3234

35+
using GarbageCollectorMap =
36+
std::map<platform::Place,
37+
std::unique_ptr<paddle::framework::GarbageCollector>>;
38+
3339
class UniqueNameGenerator {
3440
public:
3541
explicit UniqueNameGenerator(std::string prefix = "") : prefix_(prefix) {}
@@ -102,6 +108,9 @@ class Tracer {
102108

103109
bool IsAutoCastEnabled() const { return enable_autocast_; }
104110

111+
paddle::framework::GarbageCollector* MutableGarbageCollectorIfNotExists(
112+
const platform::Place& place);
113+
105114
private:
106115
std::unique_ptr<BasicEngine> basic_engine_;
107116
std::unique_ptr<jit::ProgramDescTracer> program_desc_tracer_;
@@ -110,11 +119,15 @@ class Tracer {
110119
platform::Place expected_place_;
111120
bool has_grad_{true};
112121
bool enable_autocast_{false};
122+
GarbageCollectorMap gcs_;
113123
};
114124

115125
// To access static variable current_tracer
116126
const std::shared_ptr<Tracer>& GetCurrentTracer();
117127
void SetCurrentTracer(const std::shared_ptr<Tracer>& tracer_);
128+
void IncreaseVarbaseReferenceCountUntilCopyComplete(
129+
const std::shared_ptr<imperative::VarBase>& var,
130+
const platform::Place& place);
118131

119132
} // namespace imperative
120133
} // namespace paddle

paddle/fluid/pybind/imperative.cc

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1060,21 +1060,52 @@ void BindImperative(py::module *m_ptr) {
10601060
)DOC")
10611061
.def("copy_", &imperative::VarBase::CopyFrom)
10621062
.def("_copy_to",
1063-
[](const imperative::VarBase &self, const platform::CPUPlace &place,
1064-
bool blocking) { return self.NewVarBase(place, blocking); },
1063+
[](const std::shared_ptr<imperative::VarBase> &self,
1064+
const platform::CPUPlace &place, bool blocking) {
1065+
auto new_var = self->NewVarBase(place, blocking);
1066+
// Note(zhiqiu): Since NewVarBase may use GpuCopyAsync to
1067+
// copy data from the tensor of self to the tensor of new varbase,
1068+
// we need to ensure that the varbase self is not destructed until
1069+
// the GpuCopyAsync is completed. Otherwise, the memory may be
1070+
// freed
1071+
// when varbase self is destructed.
1072+
// To do that, we increase the reference count of self by 1 and
1073+
// add a cuda event to wait the GpuCopyAsync's completion.
1074+
if (!blocking) {
1075+
IncreaseVarbaseReferenceCountUntilCopyComplete(self, place);
1076+
}
1077+
return new_var;
1078+
},
10651079
py::return_value_policy::copy)
10661080
.def("_copy_to",
1067-
[](const imperative::VarBase &self,
1068-
const platform::CUDAPinnedPlace &place,
1069-
bool blocking) { return self.NewVarBase(place, blocking); },
1081+
[](const std::shared_ptr<imperative::VarBase> &self,
1082+
const platform::CUDAPinnedPlace &place, bool blocking) {
1083+
auto new_var = self->NewVarBase(place, blocking);
1084+
if (!blocking) {
1085+
IncreaseVarbaseReferenceCountUntilCopyComplete(self, place);
1086+
}
1087+
return new_var;
1088+
},
10701089
py::return_value_policy::copy)
10711090
.def("_copy_to",
1072-
[](const imperative::VarBase &self, const platform::XPUPlace &place,
1073-
bool blocking) { return self.NewVarBase(place, blocking); },
1091+
[](const std::shared_ptr<imperative::VarBase> &self,
1092+
const platform::XPUPlace &place, bool blocking) {
1093+
auto new_var = self->NewVarBase(place, blocking);
1094+
if (!blocking) {
1095+
IncreaseVarbaseReferenceCountUntilCopyComplete(self, place);
1096+
}
1097+
return new_var;
1098+
},
10741099
py::return_value_policy::copy)
10751100
.def("_copy_to",
1076-
[](const imperative::VarBase &self, const platform::CUDAPlace &place,
1077-
bool blocking) { return self.NewVarBase(place, blocking); },
1101+
[](const std::shared_ptr<imperative::VarBase> &self,
1102+
const platform::CUDAPlace &place, bool blocking) {
1103+
auto new_var = self->NewVarBase(place, blocking);
1104+
if (!blocking) {
1105+
IncreaseVarbaseReferenceCountUntilCopyComplete(self, place);
1106+
}
1107+
return new_var;
1108+
},
10781109
py::return_value_policy::copy)
10791110
.def("value", [](imperative::VarBase &self) { return self.MutableVar(); },
10801111
py::return_value_policy::reference)

python/paddle/fluid/tests/unittests/test_var_base.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,24 @@ def _test_place(place):
156156
_test_place(core.CUDAPlace(0))
157157
_test_place("gpu:0")
158158

159+
def test_to_tensor_change_place(self):
160+
if core.is_compiled_with_cuda():
161+
a_np = np.random.rand(1024, 1024)
162+
with paddle.fluid.dygraph.guard(core.CPUPlace()):
163+
a = paddle.to_tensor(a_np, place=paddle.CUDAPinnedPlace())
164+
a = paddle.to_tensor(a)
165+
self.assertEqual(a.place.__repr__(), "CPUPlace")
166+
167+
with paddle.fluid.dygraph.guard(core.CUDAPlace(0)):
168+
a = paddle.to_tensor(a_np, place=paddle.CUDAPinnedPlace())
169+
a = paddle.to_tensor(a)
170+
self.assertEqual(a.place.__repr__(), "CUDAPlace(0)")
171+
172+
with paddle.fluid.dygraph.guard(core.CUDAPlace(0)):
173+
a = paddle.to_tensor(a_np, place=paddle.CPUPlace())
174+
a = paddle.to_tensor(a, place=paddle.CUDAPinnedPlace())
175+
self.assertEqual(a.place.__repr__(), "CUDAPinnedPlace")
176+
159177
def test_to_variable(self):
160178
with fluid.dygraph.guard():
161179
var = fluid.dygraph.to_variable(self.array, name="abc")

0 commit comments

Comments
 (0)