Skip to content

Reuduce memory copy when communication between trainer and pserver. #9271

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Mar 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 25 additions & 27 deletions benchmark/cluster/vgg16/vgg16_fluid.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
import time
import numpy as np
import paddle.v2 as paddle
import paddle.v2.fluid as fluid
import paddle.v2.fluid.core as core
import paddle.v2.fluid.profiler as profiler
import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.profiler as profiler
import argparse
import functools
import os
from paddle.fluid import debuger


def str2bool(v):
Expand Down Expand Up @@ -182,28 +183,27 @@ def train_loop(exe, trainer_prog):
start_time = time.time()
num_samples = 0
train_pass_acc.reset()
with profiler.profiler("CPU", 'total') as prof:
for batch_id, data in enumerate(train_reader()):
ts = time.time()
img_data = np.array(
map(lambda x: x[0].reshape(data_shape), data)).astype(
"float32")
y_data = np.array(map(lambda x: x[1], data)).astype("int64")
y_data = y_data.reshape([-1, 1])

loss, acc, b_size = exe.run(
trainer_prog,
feed={"pixel": img_data,
"label": y_data},
fetch_list=[avg_cost, batch_acc, batch_size])
iters += 1
num_samples += len(data)
train_pass_acc.add(value=acc, weight=b_size)
print(
"Pass = %d, Iters = %d, Loss = %f, Accuracy = %f, Speed = %.2f img/s"
% (pass_id, iters, loss, acc,
len(data) / (time.time() - ts))
) # The accuracy is the accumulation of batches, but not the current batch.
for batch_id, data in enumerate(train_reader()):
ts = time.time()
img_data = np.array(
map(lambda x: x[0].reshape(data_shape), data)).astype(
"float32")
y_data = np.array(map(lambda x: x[1], data)).astype("int64")
y_data = y_data.reshape([-1, 1])

loss, acc, b_size = exe.run(
trainer_prog,
feed={"pixel": img_data,
"label": y_data},
fetch_list=[avg_cost, batch_acc, batch_size])
iters += 1
num_samples += len(data)
train_pass_acc.add(value=acc, weight=b_size)
print(
"Pass = %d, Iters = %d, Loss = %f, Accuracy = %f, Speed = %.2f img/s"
% (pass_id, iters, loss, acc,
len(data) / (time.time() - ts))
) # The accuracy is the accumulation of batches, but not the current batch.

pass_elapsed = time.time() - start_time
pass_train_acc = train_pass_acc.eval()
Expand Down Expand Up @@ -254,9 +254,7 @@ def train_loop(exe, trainer_prog):
pserver_prog = t.get_pserver_program(current_endpoint)
pserver_startup = t.get_startup_program(current_endpoint,
pserver_prog)
print("starting server side startup")
exe.run(pserver_startup)
print("starting parameter server...")
exe.run(pserver_prog)
elif training_role == "TRAINER":
# Parameter initialization
Expand Down
10 changes: 7 additions & 3 deletions benchmark/cluster/vgg16/vgg16_tf.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,18 @@ def test():
return np.mean(test_accs)

config = tf.ConfigProto(
intra_op_parallelism_threads=1, inter_op_parallelism_threads=1)
intra_op_parallelism_threads=1,
inter_op_parallelism_threads=1,
log_device_placement=True)
config.gpu_options.allow_growth = True

hooks = [tf.train.StopAtStepHook(last_step=1000000)]

with tf.train.MonitoredTrainingSession(
master=server.target, is_chief=(args.task_index == 0),
hooks=hooks) as sess:
master=server.target,
is_chief=(args.task_index == 0),
hooks=hooks,
config=config) as sess:
iters, num_samples, start_time = 0, 0, 0.0
for pass_id in range(args.num_passes):
# train
Expand Down
6 changes: 4 additions & 2 deletions paddle/fluid/operators/detail/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
if(WITH_DISTRIBUTE)
grpc_library(sendrecvop_grpc SRCS bytebuffer_stream.cc sendrecvop_utils.cc grpc_client.cc grpc_server.cc PROTO send_recv.proto DEPS lod_tensor selected_rows)
grpc_library(sendrecvop_grpc SRCS bytebuffer_stream.cc sendrecvop_utils.cc grpc_client.cc
grpc_server.cc variable_response.cc PROTO send_recv.proto DEPS lod_tensor selected_rows)
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
set_source_files_properties(test_serde.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_test(serde_test SRCS test_serde.cc DEPS grpc++_unsecure grpc_unsecure gpr cares zlib protobuf sendrecvop_grpc)
cc_test(serde_test SRCS test_serde.cc variable_response.cc DEPS grpc++_unsecure grpc_unsecure gpr
cares zlib protobuf sendrecvop_grpc)
endif()
134 changes: 134 additions & 0 deletions paddle/fluid/operators/detail/bytebuffer_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,107 @@ limitations under the License. */
#include "google/protobuf/io/coded_stream.h"
#include "google/protobuf/io/zero_copy_stream.h"

namespace grpc {
// A ZeroCopyInputStream that reads from grpc_byte_buffer
class GrpcBufferReader final
: public ::google::protobuf::io::ZeroCopyInputStream {
typedef void (CoreCodegenInterface::*OldReaderInitAPI)(
grpc_byte_buffer_reader* reader, grpc_byte_buffer* buffer);
typedef int (CoreCodegenInterface::*NewReaderInitAPI)(
grpc_byte_buffer_reader* reader, grpc_byte_buffer* buffer);
void ReaderInit(OldReaderInitAPI ptr, grpc_byte_buffer_reader* reader,
grpc_byte_buffer* buffer) {
(g_core_codegen_interface->*ptr)(reader, buffer);
}
void ReaderInit(NewReaderInitAPI ptr, grpc_byte_buffer_reader* reader,
grpc_byte_buffer* buffer) {
int result = (g_core_codegen_interface->*ptr)(reader, buffer);
(void)result;
}

public:
explicit GrpcBufferReader(grpc_byte_buffer* buffer)
: byte_count_(0), backup_count_(0) {
ReaderInit(&CoreCodegenInterface::grpc_byte_buffer_reader_init, &reader_,
buffer);
}
~GrpcBufferReader() override {
g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader_);
}

bool Next(const void** data, int* size) override {
if (backup_count_ > 0) {
*data = GRPC_SLICE_START_PTR(slice_) + GRPC_SLICE_LENGTH(slice_) -
backup_count_;
GPR_CODEGEN_ASSERT(backup_count_ <= INT_MAX);
*size = (int)backup_count_;
backup_count_ = 0;
return true;
}
if (!g_core_codegen_interface->grpc_byte_buffer_reader_next(&reader_,
&slice_)) {
return false;
}
g_core_codegen_interface->grpc_slice_unref(slice_);
*data = GRPC_SLICE_START_PTR(slice_);
// On win x64, int is only 32bit
GPR_CODEGEN_ASSERT(GRPC_SLICE_LENGTH(slice_) <= INT_MAX);
byte_count_ += * size = (int)GRPC_SLICE_LENGTH(slice_);
return true;
}

void BackUp(int count) override { backup_count_ = count; }

bool Skip(int count) override {
const void* data;
int size;
while (Next(&data, &size)) {
if (size >= count) {
BackUp(size - count);
return true;
}
// size < count;
count -= size;
}
// error or we have too large count;
return false;
}

::google::protobuf::int64 ByteCount() const override {
return byte_count_ - backup_count_;
}

private:
int64_t byte_count_;
int64_t backup_count_;
grpc_byte_buffer_reader reader_;
grpc_slice slice_;
};

}; // namespace grpc

namespace paddle {
namespace operators {
namespace detail {
// Source provides a way for a particular RPC implementation to provide
// received data to ParseFrom.
class Source {
public:
virtual ~Source() {}

// Return the stream that contains the data to be parsed.
// Note that this method might be invoked more than once if
// ParseFrom needs to fall back to a more expensive parsing method.
// Every call must return a stream pointing at the beginning of
// the serialized RecvTensorResponse.
//
// Note that a subsequent call to contents() invalidates previous
// results of contents().
//
// Ownership of the returned stream is retained by the Source and
// should not be deleted by the caller.
virtual ::google::protobuf::io::ZeroCopyInputStream* contents() = 0;
};

// A ZeroCopyInputStream that reads from a grpc::ByteBuffer.
class GrpcByteBufferSource
Expand All @@ -46,6 +144,42 @@ class GrpcByteBufferSource
::google::protobuf::int64 byte_count_;
};

class GrpcByteBufferSourceWrapper : public Source {
public:
GrpcByteBufferSourceWrapper(GrpcByteBufferSource* source) : source_(source) {}
virtual ::google::protobuf::io::ZeroCopyInputStream* contents() override {
return source_;
}

private:
GrpcByteBufferSource* source_;
};

class GrpcByteSource : public Source {
public:
explicit GrpcByteSource(grpc_byte_buffer* buffer) : buffer_(buffer) {}
~GrpcByteSource() override { DeleteStream(); }

typedef ::grpc::GrpcBufferReader Reader;

::google::protobuf::io::ZeroCopyInputStream* contents() override {
DeleteStream();
stream_ = new (&space_) Reader(buffer_);
return stream_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need so many wrappers, just create the ::grpc::GrpcBufferReader as ZeroCopyInputStream could be simpler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So make it don't have any abstract type. The interface is simple enough to understand without this abstract.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没看懂。。。。
Parse函数需要支持ByteBuffergrpc_byte_buffer两种类型的参数,他们都可以转成ZeroCopyInputStream, 而ZeroCopyInputStream是不能当做参数类型的。

}

private:
void DeleteStream() {
if (stream_) {
stream_->~Reader();
}
}

grpc_byte_buffer* buffer_; // Not owned
Reader* stream_ = nullptr; // Points into space_ if non-nullptr
char space_[sizeof(Reader)];
};

} // namespace detail
} // namespace operators
} // namespace paddle
39 changes: 30 additions & 9 deletions paddle/fluid/operators/detail/grpc_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ See the License for the specific language governing permissions and
limitations under the License. */

#include "grpc_client.h"
#include <sys/time.h>
#include "paddle/fluid/framework/threadpool.h"

namespace paddle {
namespace operators {
namespace detail {
Expand All @@ -31,8 +33,9 @@ bool RPCClient::AsyncSendVariable(const std::string& ep,

framework::Async([var_name_val, p_ctx, ep_val, p_scope, time_out, ch, this] {
auto* var = p_scope->FindVar(var_name_val);
sendrecv::VariableMessage req;
SerializeToMessage(var_name_val, var, *p_ctx, &req);

::grpc::ByteBuffer req;
SerializeToByteBuffer(var_name_val, var, *p_ctx, &req);

// varhandle
VarHandle var_h;
Expand All @@ -46,8 +49,11 @@ bool RPCClient::AsyncSendVariable(const std::string& ep,
s->Prepare(var_h, time_out);
s->response_call_back_ = NULL;

auto rpc = s->stub_->AsyncSendVariable(s->context_.get(), req, &cq_);
rpc->Finish(&s->reply_, &s->status_, (void*)s);
auto call = std::move(s->stub_g_.PrepareUnaryCall(
s->context_.get(), "/sendrecv.SendRecvService/SendVariable", req,
&cq_));
call->StartCall();
call->Finish(&s->reply_, &s->status_, (void*)s);
});

req_count_++;
Expand All @@ -56,9 +62,19 @@ bool RPCClient::AsyncSendVariable(const std::string& ep,
}

void ProcGetResponse(const VarHandle& var_h,
const sendrecv::VariableMessage& ret_msg) {
auto* outvar = var_h.scope->FindVar(var_h.name);
DeserializeFromMessage(ret_msg, *var_h.ctx, outvar);
// const sendrecv::VariableMessage& ret_msg) {
const ::grpc::ByteBuffer& ret_msg) {
framework::Variable* outvar = NULL;
DeserializeFromByteBuffer(ret_msg, *var_h.ctx, var_h.scope, outvar);
}

template <typename T>
void RequestToByteBuffer(const T& proto, ::grpc::ByteBuffer* result) {
::grpc::Slice slice(proto.ByteSizeLong());
proto.SerializeWithCachedSizesToArray(
const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(slice.begin())));
::grpc::ByteBuffer tmp(&slice, 1);
result->Swap(&tmp);
}

bool RPCClient::AsyncGetVariable(const std::string& ep,
Expand Down Expand Up @@ -88,8 +104,13 @@ bool RPCClient::AsyncGetVariable(const std::string& ep,
s->Prepare(var_h, time_out);
s->response_call_back_ = ProcGetResponse;

auto rpc = s->stub_->AsyncGetVariable(s->context_.get(), req, &cq_);
rpc->Finish(&s->reply_, &s->status_, (void*)s);
::grpc::ByteBuffer buf;
RequestToByteBuffer<sendrecv::VariableMessage>(req, &buf);

auto call = std::move(s->stub_g_.PrepareUnaryCall(
s->context_.get(), "/sendrecv.SendRecvService/GetVariable", buf, &cq_));
call->StartCall();
call->Finish(&s->reply_, &s->status_, (void*)s);
});

req_count_++;
Expand Down
Loading