Skip to content

Commit

Permalink
[BUG] Using attachement strategy of brpc to send packet with big size. (
Browse files Browse the repository at this point in the history
apache#4237)

Using attachement strategy of brpc to send packet with big size.
BRPC send packet should serialize it first and then send it.
If we send one batch with big size, it will encounter a connection failed.
So we can use attachment strategy to bypass the problem and eliminate
the serialization cost.
  • Loading branch information
chaoyli authored Aug 5, 2020
1 parent bfb8c65 commit 120f30b
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 7 deletions.
12 changes: 5 additions & 7 deletions be/src/runtime/data_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class DataStreamSender::Channel {
int64_t _packet_seq;

// we're accumulating rows into this batch
boost::scoped_ptr<RowBatch> _batch;
std::unique_ptr<RowBatch> _batch;

bool _need_close;
int _be_number;
Expand Down Expand Up @@ -228,6 +228,9 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) {

_brpc_request.set_eos(eos);
if (batch != nullptr) {
butil::IOBuf& io_buf = _closure->cntl.request_attachment();
io_buf.append(batch->tuple_data());
batch->set_tuple_data(""); // to padding the required tuple_data field in PB
_brpc_request.set_allocated_row_batch(batch);
}
_brpc_request.set_packet_seq(_packet_seq++);
Expand Down Expand Up @@ -270,12 +273,7 @@ Status DataStreamSender::Channel::add_row(TupleRow* row) {
}

Status DataStreamSender::Channel::send_current_batch(bool eos) {
{
SCOPED_TIMER(_parent->_serialize_batch_timer);
int uncompressed_bytes = _batch->serialize(&_pb_batch);
COUNTER_UPDATE(_parent->_bytes_sent_counter, RowBatch::get_batch_size(_pb_batch));
COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes);
}
_parent->serialize_batch(_batch.get(), &_pb_batch, 1);
_batch->reset();
RETURN_IF_ERROR(send_batch(&_pb_batch, eos));
return Status::OK();
Expand Down
1 change: 1 addition & 0 deletions be/src/service/brpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,4 @@
#include <brpc/closure_guard.h>
#include <brpc/reloadable_flags.h>
#include <brpc/protocol.h>
#include <butil/strings/string_piece.h>
7 changes: 7 additions & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ void PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* cnt
google::protobuf::Closure* done) {
VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id())
<< " node=" << request->node_id();
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
if (cntl->request_attachment().size() > 0) {
PRowBatch* batch = (const_cast<PTransmitDataParams*>(request))->mutable_row_batch();
butil::IOBuf& io_buf = cntl->request_attachment();
std::string* tuple_data = batch->mutable_tuple_data();
io_buf.copy_to(tuple_data);
}
_exec_env->stream_mgr()->transmit_data(request, &done);
if (done != nullptr) {
done->Run();
Expand Down

0 comments on commit 120f30b

Please sign in to comment.