diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index 73d642d677e44f..384e7606f9f58c 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -164,7 +164,7 @@ class DataStreamSender::Channel { int64_t _packet_seq; // we're accumulating rows into this batch - boost::scoped_ptr _batch; + std::unique_ptr _batch; bool _need_close; int _be_number; @@ -228,6 +228,9 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) { _brpc_request.set_eos(eos); if (batch != nullptr) { + butil::IOBuf& io_buf = _closure->cntl.request_attachment(); + io_buf.append(batch->tuple_data()); + batch->set_tuple_data(""); // to padding the required tuple_data field in PB _brpc_request.set_allocated_row_batch(batch); } _brpc_request.set_packet_seq(_packet_seq++); @@ -270,12 +273,7 @@ Status DataStreamSender::Channel::add_row(TupleRow* row) { } Status DataStreamSender::Channel::send_current_batch(bool eos) { - { - SCOPED_TIMER(_parent->_serialize_batch_timer); - int uncompressed_bytes = _batch->serialize(&_pb_batch); - COUNTER_UPDATE(_parent->_bytes_sent_counter, RowBatch::get_batch_size(_pb_batch)); - COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes); - } + _parent->serialize_batch(_batch.get(), &_pb_batch, 1); _batch->reset(); RETURN_IF_ERROR(send_batch(&_pb_batch, eos)); return Status::OK(); diff --git a/be/src/service/brpc.h b/be/src/service/brpc.h index d3fa30f4811aac..c9325d31e861bf 100644 --- a/be/src/service/brpc.h +++ b/be/src/service/brpc.h @@ -56,3 +56,4 @@ #include #include #include +#include diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 68dfdfd63813ad..07372ba01e1cb7 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -49,6 +49,13 @@ void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cnt google::protobuf::Closure* done) { VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id()) << " node=" << request->node_id(); + brpc::Controller* cntl = static_cast(cntl_base); + if (cntl->request_attachment().size() > 0) { + PRowBatch* batch = (const_cast(request))->mutable_row_batch(); + butil::IOBuf& io_buf = cntl->request_attachment(); + std::string* tuple_data = batch->mutable_tuple_data(); + io_buf.copy_to(tuple_data); + } _exec_env->stream_mgr()->transmit_data(request, &done); if (done != nullptr) { done->Run();