diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 7998e153741bbd..d8a4afe9aa4cf0 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -894,6 +894,9 @@ DEFINE_Int32(doris_remote_scanner_thread_pool_queue_size, "102400"); // limit the queue of pending batches which will be sent by a single nodechannel DEFINE_mInt64(nodechannel_pending_queue_max_bytes, "67108864"); +// The batch size for sending data by brpc streaming client +DEFINE_mInt64(brpc_streaming_client_batch_bytes, "262144"); + // Max waiting time to wait the "plan fragment start" rpc. // If timeout, the fragment will be cancelled. // This parameter is usually only used when the FE loses connection, diff --git a/be/src/common/config.h b/be/src/common/config.h index a9f69e2a947ad9..fbc680f2297e9b 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -938,6 +938,9 @@ DECLARE_Int32(doris_remote_scanner_thread_pool_queue_size); // limit the queue of pending batches which will be sent by a single nodechannel DECLARE_mInt64(nodechannel_pending_queue_max_bytes); +// The batch size for sending data by brpc streaming client +DECLARE_mInt64(brpc_streaming_client_batch_bytes); + // Max waiting time to wait the "plan fragment start" rpc. // If timeout, the fragment will be cancelled. // This parameter is usually only used when the FE loses connection, diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h index 03d4d8baced391..012f0a743030dd 100644 --- a/be/src/io/fs/file_writer.h +++ b/be/src/io/fs/file_writer.h @@ -32,6 +32,7 @@ namespace io { class FileWriter { public: FileWriter(Path&& path, FileSystemSPtr fs) : _path(std::move(path)), _fs(fs) {} + FileWriter() = default; virtual ~FileWriter() = default; DISALLOW_COPY_AND_ASSIGN(FileWriter); diff --git a/be/src/io/fs/stream_sink_file_writer.cpp b/be/src/io/fs/stream_sink_file_writer.cpp new file mode 100644 index 00000000000000..ff2667b9fa3af1 --- /dev/null +++ b/be/src/io/fs/stream_sink_file_writer.cpp @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "io/fs/stream_sink_file_writer.h" + +#include + +#include "olap/olap_common.h" +#include "olap/rowset/beta_rowset_writer.h" + +namespace doris { +namespace io { + +void StreamSinkFileWriter::init(PUniqueId load_id, int64_t partition_id, int64_t index_id, + int64_t tablet_id, int32_t segment_id) { + VLOG_DEBUG << "init stream writer, load id(" << UniqueId(load_id).to_string() + << "), partition id(" << partition_id << "), index id(" << index_id + << "), tablet_id(" << tablet_id << "), segment_id(" << segment_id << ")"; + _load_id = load_id; + _partition_id = partition_id; + _index_id = index_id; + _tablet_id = tablet_id; + _segment_id = segment_id; + + _header.set_src_id(_sender_id); + *_header.mutable_load_id() = _load_id; + _header.set_partition_id(_partition_id); + _header.set_index_id(_index_id); + _header.set_tablet_id(_tablet_id); + _header.set_segment_id(_segment_id); + _header.set_opcode(doris::PStreamHeader::APPEND_DATA); + _append_header(); +} + +Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) { + size_t bytes_req = 0; + for (int i = 0; i < data_cnt; i++) { + bytes_req += data[i].get_size(); + _buf.append(data[i].get_data(), data[i].get_size()); + } + _pending_bytes += bytes_req; + _bytes_appended += bytes_req; + + VLOG_DEBUG << "writer appendv, load_id: " << UniqueId(_load_id).to_string() + << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id + << ", segment_id: " << _segment_id << ", data_length: " << bytes_req + << ", current batched bytes: " << _pending_bytes; + + if (_pending_bytes >= _max_pending_bytes) { + RETURN_IF_ERROR(_stream_sender(_buf)); + _buf.clear(); + _append_header(); + _pending_bytes = 0; + } + + return Status::OK(); +} + +Status StreamSinkFileWriter::finalize() { + VLOG_DEBUG << "writer finalize, load_id: " << UniqueId(_load_id).to_string() + << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id + << ", segment_id: " << _segment_id; + // TODO(zhengyu): update get_inverted_index_file_size into stat + Status status = _stream_sender(_buf); + // send eos + _buf.clear(); + _header.set_segment_eos(true); + _append_header(); + status = _stream_sender(_buf); + return status; +} + +void StreamSinkFileWriter::_append_header() { + size_t header_len = _header.ByteSizeLong(); + _buf.append(reinterpret_cast(&header_len), sizeof(header_len)); + _buf.append(_header.SerializeAsString()); +} + +Status StreamSinkFileWriter::send_with_retry(brpc::StreamId stream, butil::IOBuf buf) { + while (true) { + int ret = brpc::StreamWrite(stream, buf); + if (ret == EAGAIN) { + const timespec time = butil::seconds_from_now(60); + int wait_result = brpc::StreamWait(stream, &time); + if (wait_result == 0) { + continue; + } else { + return Status::InternalError("fail to send data when wait stream"); + } + } else if (ret == EINVAL) { + return Status::InternalError("fail to send data when stream write"); + } else { + return Status::OK(); + } + } +} + +Status StreamSinkFileWriter::close() { + return Status::OK(); +} + +} // namespace io +} // namespace doris diff --git a/be/src/io/fs/stream_sink_file_writer.h b/be/src/io/fs/stream_sink_file_writer.h new file mode 100644 index 00000000000000..074a2f0f5add3e --- /dev/null +++ b/be/src/io/fs/stream_sink_file_writer.h @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include + +#include "io/fs/file_writer.h" +#include "util/uid_util.h" + +namespace doris { + +struct RowsetId; +struct SegmentStatistics; + +namespace io { +class StreamSinkFileWriter : public FileWriter { +public: + StreamSinkFileWriter(int sender_id, std::vector stream_id) + : _sender_id(sender_id), _streams(stream_id) {} + + static void deleter(void* data) { ::free(data); } + + static Status send_with_retry(brpc::StreamId stream, butil::IOBuf buf); + + void init(PUniqueId load_id, int64_t partition_id, int64_t index_id, int64_t tablet_id, + int32_t segment_id); + + Status appendv(const Slice* data, size_t data_cnt) override; + + Status finalize() override; + + Status close() override; + + Status abort() override { + return Status::NotSupported("StreamSinkFileWriter::abort() is not supported"); + } + + Status write_at(size_t offset, const Slice& data) override { + return Status::NotSupported("StreamSinkFileWriter::write_at() is not supported"); + } + +private: + Status _stream_sender(butil::IOBuf buf) const { + for (auto stream : _streams) { + LOG(INFO) << "writer flushing, load_id: " << UniqueId(_load_id).to_string() + << ", index_id: " << _index_id << ", tablet_id: " << _tablet_id + << ", segment_id: " << _segment_id << ", stream id: " << stream + << ", buf size: " << buf.size(); + + RETURN_IF_ERROR(send_with_retry(stream, buf)); + } + return Status::OK(); + } + + void _append_header(); + +private: + PStreamHeader _header; + butil::IOBuf _buf; + + std::queue _pending_slices; + size_t _max_pending_bytes = config::brpc_streaming_client_batch_bytes; + size_t _pending_bytes; + + int _sender_id; + std::vector _streams; + + PUniqueId _load_id; + int64_t _partition_id; + int64_t _index_id; + int64_t _tablet_id; + int32_t _segment_id; +}; + +} // namespace io +} // namespace doris diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp b/be/test/io/fs/stream_sink_file_writer_test.cpp new file mode 100644 index 00000000000000..e894cbaf912c33 --- /dev/null +++ b/be/test/io/fs/stream_sink_file_writer_test.cpp @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "io/fs/stream_sink_file_writer.h" + +#include +#include + +#include "gtest/gtest_pred_impl.h" +#include "olap/olap_common.h" +#include "util/debug/leakcheck_disabler.h" +#include "util/faststring.h" + +namespace doris { + +#ifndef CHECK_STATUS_OK +#define CHECK_STATUS_OK(stmt) \ + do { \ + Status _status_ = (stmt); \ + ASSERT_TRUE(_status_.ok()) << _status_; \ + } while (false) +#endif + +DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short"); +DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); +DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); +DEFINE_int32(idle_timeout_s, -1, + "Connection will be closed if there is no " + "read/write operations during the last `idle_timeout_s'"); + +class StreamSinkFileWriterTest : public testing::Test { + class MockStreamSinkFileRecevier : public brpc::StreamInputHandler { + public: + virtual int on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[], + size_t size) { + std::stringstream str; + for (size_t i = 0; i < size; ++i) { + str << "msg[" << i << "]=" << *messages[i]; + } + LOG(INFO) << "Received from Stream=" << id << ": " << str.str(); + return 0; + } + virtual void on_idle_timeout(brpc::StreamId id) { + LOG(INFO) << "Stream=" << id << " has no data transmission for a while"; + } + virtual void on_closed(brpc::StreamId id) { LOG(INFO) << "Stream=" << id << " is closed"; } + }; + + class StreamingSinkFileService : public PBackendService { + public: + StreamingSinkFileService() : _sd(brpc::INVALID_STREAM_ID) {} + virtual ~StreamingSinkFileService() { brpc::StreamClose(_sd); }; + virtual void open_stream_sink(google::protobuf::RpcController* controller, + const POpenStreamSinkRequest*, + POpenStreamSinkResponse* response, + google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + + brpc::Controller* cntl = static_cast(controller); + brpc::StreamOptions stream_options; + stream_options.handler = &_receiver; + CHECK_EQ(0, brpc::StreamAccept(&_sd, *cntl, &stream_options)); + Status::OK().to_protobuf(response->mutable_status()); + } + + private: + MockStreamSinkFileRecevier _receiver; + brpc::StreamId _sd; + }; + +public: + StreamSinkFileWriterTest() { srand(time(nullptr)); } + ~StreamSinkFileWriterTest() {} + +protected: + virtual void SetUp() { + // init channel + brpc::Channel channel; + brpc::ChannelOptions options; + options.protocol = brpc::PROTOCOL_BAIDU_STD; + options.connection_type = FLAGS_connection_type; + options.timeout_ms = FLAGS_timeout_ms; + options.max_retry = FLAGS_max_retry; + std::stringstream port; + CHECK_EQ(0, channel.Init("127.0.0.1:18946", nullptr)); + + // init server + _stream_service = new StreamingSinkFileService(); + CHECK_EQ(0, _server.AddService(_stream_service, brpc::SERVER_DOESNT_OWN_SERVICE)); + brpc::ServerOptions server_options; + server_options.idle_timeout_sec = FLAGS_idle_timeout_s; + { + debug::ScopedLeakCheckDisabler disable_lsan; + CHECK_EQ(0, _server.Start("127.0.0.1:18946", &server_options)); + } + + // init stream connect + PBackendService_Stub stub(&channel); + brpc::Controller cntl; + brpc::StreamId stream; + CHECK_EQ(0, brpc::StreamCreate(&stream, cntl, NULL)); + + POpenStreamSinkRequest request; + POpenStreamSinkResponse response; + request.mutable_load_id()->set_hi(1); + request.mutable_load_id()->set_lo(1); + stub.open_stream_sink(&cntl, &request, &response, NULL); + + brpc::Join(cntl.call_id()); + _stream = stream; + } + + virtual void TearDown() { + CHECK_EQ(0, brpc::StreamClose(_stream)); + CHECK_EQ(0, _server.Stop(1000)); + CHECK_EQ(0, _server.Join()); + delete _stream_service; + } + + StreamingSinkFileService* _stream_service; + brpc::StreamId _stream; + brpc::Server _server; +}; + +TEST_F(StreamSinkFileWriterTest, TestInit) { + std::vector stream_ids {_stream}; + io::StreamSinkFileWriter writer(0, stream_ids); + PUniqueId load_id; + load_id.set_hi(1); + load_id.set_lo(2); + writer.init(load_id, 3, 4, 5, 6); +} + +TEST_F(StreamSinkFileWriterTest, TestAppend) { + std::vector stream_ids {_stream}; + io::StreamSinkFileWriter writer(0, stream_ids); + PUniqueId load_id; + load_id.set_hi(1); + load_id.set_lo(2); + writer.init(load_id, 3, 4, 5, 6); + std::vector slices {"hello"}; + CHECK_STATUS_OK(writer.appendv(&slices[0], slices.size())); +} + +TEST_F(StreamSinkFileWriterTest, TestFinalize) { + std::vector stream_ids {_stream}; + io::StreamSinkFileWriter writer(0, stream_ids); + PUniqueId load_id; + load_id.set_hi(1); + load_id.set_lo(2); + writer.init(load_id, 3, 4, 5, 6); + CHECK_STATUS_OK(writer.finalize()); +} +} // namespace doris diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index 6dc86646dbb138..e350d188cca342 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -1485,4 +1485,9 @@ Indicates how many tablets failed to load in the data directory. At the same tim #### `user_files_secure_path` * Description: The storage directory for files queried by `local` table valued functions. -* Default value: `${DORIS_HOME}` \ No newline at end of file +* Default value: `${DORIS_HOME}` + +#### `brpc_streaming_client_batch_bytes` + +* Description: The batch size for sending data by brpc streaming client +* Default value: 262144 \ No newline at end of file diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index 2b9b97af6ed8d3..3c7574d2976eab 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -1514,4 +1514,9 @@ load tablets from header failed, failed tablets size: xxx, path=xxx #### `user_files_secure_path` * 描述: `local` 表函数查询的文件的存储目录。 -* 默认值: `${DORIS_HOME}` \ No newline at end of file +* 默认值: `${DORIS_HOME}` + +#### `brpc_streaming_client_batch_bytes` + +* 描述: brpc streaming 客户端发送数据时的攒批大小(字节) +* 默认值: 262144 \ No newline at end of file