Skip to content

Commit

Permalink
[feature](move-memtable)[4/7] add stream sink file writer (apache#23416)
Browse files Browse the repository at this point in the history

Co-authored-by: laihui <1353307710@qq.com>
  • Loading branch information
kaijchen and sollhui authored Aug 24, 2023
1 parent 98d0a2f commit 71071ba
Show file tree
Hide file tree
Showing 8 changed files with 398 additions and 2 deletions.
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,9 @@ DEFINE_Int32(doris_remote_scanner_thread_pool_queue_size, "102400");
// limit the queue of pending batches which will be sent by a single nodechannel
DEFINE_mInt64(nodechannel_pending_queue_max_bytes, "67108864");

// The batch size for sending data by brpc streaming client
DEFINE_mInt64(brpc_streaming_client_batch_bytes, "262144");

// Max waiting time to wait the "plan fragment start" rpc.
// If timeout, the fragment will be cancelled.
// This parameter is usually only used when the FE loses connection,
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,9 @@ DECLARE_Int32(doris_remote_scanner_thread_pool_queue_size);
// limit the queue of pending batches which will be sent by a single nodechannel
DECLARE_mInt64(nodechannel_pending_queue_max_bytes);

// The batch size for sending data by brpc streaming client
DECLARE_mInt64(brpc_streaming_client_batch_bytes);

// Max waiting time to wait the "plan fragment start" rpc.
// If timeout, the fragment will be cancelled.
// This parameter is usually only used when the FE loses connection,
Expand Down
1 change: 1 addition & 0 deletions be/src/io/fs/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace io {
class FileWriter {
public:
FileWriter(Path&& path, FileSystemSPtr fs) : _path(std::move(path)), _fs(fs) {}
FileWriter() = default;
virtual ~FileWriter() = default;

DISALLOW_COPY_AND_ASSIGN(FileWriter);
Expand Down
117 changes: 117 additions & 0 deletions be/src/io/fs/stream_sink_file_writer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "io/fs/stream_sink_file_writer.h"

#include <gen_cpp/internal_service.pb.h>

#include "olap/olap_common.h"
#include "olap/rowset/beta_rowset_writer.h"

namespace doris {
namespace io {

void StreamSinkFileWriter::init(PUniqueId load_id, int64_t partition_id, int64_t index_id,
int64_t tablet_id, int32_t segment_id) {
VLOG_DEBUG << "init stream writer, load id(" << UniqueId(load_id).to_string()
<< "), partition id(" << partition_id << "), index id(" << index_id
<< "), tablet_id(" << tablet_id << "), segment_id(" << segment_id << ")";
_load_id = load_id;
_partition_id = partition_id;
_index_id = index_id;
_tablet_id = tablet_id;
_segment_id = segment_id;

_header.set_src_id(_sender_id);
*_header.mutable_load_id() = _load_id;
_header.set_partition_id(_partition_id);
_header.set_index_id(_index_id);
_header.set_tablet_id(_tablet_id);
_header.set_segment_id(_segment_id);
_header.set_opcode(doris::PStreamHeader::APPEND_DATA);
_append_header();
}

Status StreamSinkFileWriter::appendv(const Slice* data, size_t data_cnt) {
size_t bytes_req = 0;
for (int i = 0; i < data_cnt; i++) {
bytes_req += data[i].get_size();
_buf.append(data[i].get_data(), data[i].get_size());
}
_pending_bytes += bytes_req;
_bytes_appended += bytes_req;

VLOG_DEBUG << "writer appendv, load_id: " << UniqueId(_load_id).to_string()
<< ", index_id: " << _index_id << ", tablet_id: " << _tablet_id
<< ", segment_id: " << _segment_id << ", data_length: " << bytes_req
<< ", current batched bytes: " << _pending_bytes;

if (_pending_bytes >= _max_pending_bytes) {
RETURN_IF_ERROR(_stream_sender(_buf));
_buf.clear();
_append_header();
_pending_bytes = 0;
}

return Status::OK();
}

Status StreamSinkFileWriter::finalize() {
VLOG_DEBUG << "writer finalize, load_id: " << UniqueId(_load_id).to_string()
<< ", index_id: " << _index_id << ", tablet_id: " << _tablet_id
<< ", segment_id: " << _segment_id;
// TODO(zhengyu): update get_inverted_index_file_size into stat
Status status = _stream_sender(_buf);
// send eos
_buf.clear();
_header.set_segment_eos(true);
_append_header();
status = _stream_sender(_buf);
return status;
}

void StreamSinkFileWriter::_append_header() {
size_t header_len = _header.ByteSizeLong();
_buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len));
_buf.append(_header.SerializeAsString());
}

Status StreamSinkFileWriter::send_with_retry(brpc::StreamId stream, butil::IOBuf buf) {
while (true) {
int ret = brpc::StreamWrite(stream, buf);
if (ret == EAGAIN) {
const timespec time = butil::seconds_from_now(60);
int wait_result = brpc::StreamWait(stream, &time);
if (wait_result == 0) {
continue;
} else {
return Status::InternalError("fail to send data when wait stream");
}
} else if (ret == EINVAL) {
return Status::InternalError("fail to send data when stream write");
} else {
return Status::OK();
}
}
}

Status StreamSinkFileWriter::close() {
return Status::OK();
}

} // namespace io
} // namespace doris
94 changes: 94 additions & 0 deletions be/src/io/fs/stream_sink_file_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <brpc/stream.h>
#include <gen_cpp/internal_service.pb.h>

#include <queue>

#include "io/fs/file_writer.h"
#include "util/uid_util.h"

namespace doris {

struct RowsetId;
struct SegmentStatistics;

namespace io {
class StreamSinkFileWriter : public FileWriter {
public:
StreamSinkFileWriter(int sender_id, std::vector<brpc::StreamId> stream_id)
: _sender_id(sender_id), _streams(stream_id) {}

static void deleter(void* data) { ::free(data); }

static Status send_with_retry(brpc::StreamId stream, butil::IOBuf buf);

void init(PUniqueId load_id, int64_t partition_id, int64_t index_id, int64_t tablet_id,
int32_t segment_id);

Status appendv(const Slice* data, size_t data_cnt) override;

Status finalize() override;

Status close() override;

Status abort() override {
return Status::NotSupported("StreamSinkFileWriter::abort() is not supported");
}

Status write_at(size_t offset, const Slice& data) override {
return Status::NotSupported("StreamSinkFileWriter::write_at() is not supported");
}

private:
Status _stream_sender(butil::IOBuf buf) const {
for (auto stream : _streams) {
LOG(INFO) << "writer flushing, load_id: " << UniqueId(_load_id).to_string()
<< ", index_id: " << _index_id << ", tablet_id: " << _tablet_id
<< ", segment_id: " << _segment_id << ", stream id: " << stream
<< ", buf size: " << buf.size();

RETURN_IF_ERROR(send_with_retry(stream, buf));
}
return Status::OK();
}

void _append_header();

private:
PStreamHeader _header;
butil::IOBuf _buf;

std::queue<Slice> _pending_slices;
size_t _max_pending_bytes = config::brpc_streaming_client_batch_bytes;
size_t _pending_bytes;

int _sender_id;
std::vector<brpc::StreamId> _streams;

PUniqueId _load_id;
int64_t _partition_id;
int64_t _index_id;
int64_t _tablet_id;
int32_t _segment_id;
};

} // namespace io
} // namespace doris
Loading

0 comments on commit 71071ba

Please sign in to comment.