Skip to content

Commit

Permalink
[feature](move-memtable) support auto partition in sink v2 (apache#26914
Browse files Browse the repository at this point in the history
)
  • Loading branch information
kaijchen authored Nov 14, 2023
1 parent 0a9d71e commit f6a9914
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 36 deletions.
32 changes: 30 additions & 2 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,32 @@ void LoadStream::_report_result(StreamId stream, const Status& st,
}
}

void LoadStream::_report_schema(StreamId stream, const PStreamHeader& hdr) {
butil::IOBuf buf;
PWriteStreamSinkResponse response;
Status st = Status::OK();
for (const auto& req : hdr.tablets()) {
TabletManager* tablet_mgr = StorageEngine::instance()->tablet_manager();
TabletSharedPtr tablet = tablet_mgr->get_tablet(req.tablet_id());
if (tablet == nullptr) {
st = Status::NotFound("Tablet {} not found", req.tablet_id());
break;
}
auto resp = response.add_tablet_schemas();
resp->set_index_id(req.index_id());
resp->set_enable_unique_key_merge_on_write(tablet->enable_unique_key_merge_on_write());
tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema());
}
st.to_protobuf(response.mutable_status());

buf.append(response.SerializeAsString());
int ret = brpc::StreamWrite(stream, buf);
// TODO: handle EAGAIN
if (ret != 0) {
LOG(INFO) << "stream write report schema " << ret << ": " << std::strerror(ret);
}
}

void LoadStream::_parse_header(butil::IOBuf* const message, PStreamHeader& hdr) {
butil::IOBufAsZeroCopyInputStream wrapper(*message);
hdr.ParseFromZeroCopyStream(&wrapper);
Expand Down Expand Up @@ -458,12 +484,14 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf*
case PStreamHeader::CLOSE_LOAD: {
std::vector<int64_t> success_tablet_ids;
std::vector<int64_t> failed_tablet_ids;
std::vector<PTabletID> tablets_to_commit(hdr.tablets_to_commit().begin(),
hdr.tablets_to_commit().end());
std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), hdr.tablets().end());
auto st = close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, &failed_tablet_ids);
_report_result(id, st, success_tablet_ids, failed_tablet_ids);
brpc::StreamClose(id);
} break;
case PStreamHeader::GET_SCHEMA: {
_report_schema(id, hdr);
} break;
default:
LOG(WARNING) << "unexpected stream message " << hdr.opcode();
DCHECK(false);
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class LoadStream : public brpc::StreamInputHandler {
void _report_result(StreamId stream, const Status& st,
const std::vector<int64_t>& success_tablet_ids,
const std::vector<int64_t>& failed_tablet_ids);
void _report_schema(StreamId stream, const PStreamHeader& hdr);

// report failure for one message
void _report_failure(StreamId stream, const Status& status, const PStreamHeader& header) {
Expand Down
72 changes: 67 additions & 5 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "vec/sink/load_stream_stub.h"

#include <sstream>

#include "olap/rowset/rowset_writer.h"
#include "util/brpc_client_cache.h"
#include "util/network_util.h"
Expand Down Expand Up @@ -73,6 +75,12 @@ int LoadStreamStub::LoadStreamReplyHandler::on_received_messages(brpc::StreamId
<< status;
}
}

if (response.tablet_schemas_size() > 0) {
std::vector<PTabletSchemaWithIndex> schemas(response.tablet_schemas().begin(),
response.tablet_schemas().end());
_stub->add_schema(schemas);
}
}
return 0;
}
Expand Down Expand Up @@ -206,12 +214,65 @@ Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commi
{
std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
for (const auto& tablet : _tablets_to_commit) {
*header.add_tablets_to_commit() = tablet;
*header.add_tablets() = tablet;
}
}
return _encode_and_send(header);
}

// GET_SCHEMA
Status LoadStreamStub::get_schema(const std::vector<PTabletID>& tablets) {
PStreamHeader header;
*header.mutable_load_id() = _load_id;
header.set_src_id(_src_id);
header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
std::ostringstream oss;
oss << "fetching tablet schema from stream " << _stream_id << ", load id: " << _load_id
<< ", tablet id:";
for (const auto& tablet : tablets) {
*header.add_tablets() = tablet;
oss << " " << tablet.tablet_id();
}
LOG(INFO) << oss.str();
return _encode_and_send(header);
}

void LoadStreamStub::add_schema(const std::vector<PTabletSchemaWithIndex>& schemas) {
std::lock_guard<bthread::Mutex> lock(_mutex);
for (const auto& schema : schemas) {
auto tablet_schema = std::make_unique<TabletSchema>();
tablet_schema->init_from_pb(schema.tablet_schema());
_tablet_schema_for_index->emplace(schema.index_id(), std::move(tablet_schema));
_enable_unique_mow_for_index->emplace(schema.index_id(),
schema.enable_unique_key_merge_on_write());
}
_schema_cv.notify_all();
}

Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id,
int64_t timeout_ms) {
if (_tablet_schema_for_index->contains(index_id)) {
return Status::OK();
}
PTabletID tablet;
tablet.set_partition_id(partition_id);
tablet.set_index_id(index_id);
tablet.set_tablet_id(tablet_id);
RETURN_IF_ERROR(get_schema({tablet}));

MonotonicStopWatch watch;
watch.start();
while (!_tablet_schema_for_index->contains(index_id) &&
watch.elapsed_time() / 1000 / 1000 < timeout_ms) {
static_cast<void>(wait_for_new_schema(100));
}

if (!_tablet_schema_for_index->contains(index_id)) {
return Status::TimedOut("timeout to get tablet schema for index {}", index_id);
}
return Status::OK();
}

Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const Slice> data) {
butil::IOBuf buf;
size_t header_len = header.ByteSizeLong();
Expand All @@ -224,21 +285,22 @@ Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const S
buf.append(slice.get_data(), slice.get_size());
}
bool eos = header.opcode() == doris::PStreamHeader::CLOSE_LOAD;
return _send_with_buffer(buf, eos);
bool get_schema = header.opcode() == doris::PStreamHeader::GET_SCHEMA;
return _send_with_buffer(buf, eos || get_schema);
}

Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool eos) {
Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool sync) {
butil::IOBuf output;
std::unique_lock<decltype(_buffer_mutex)> buffer_lock(_buffer_mutex);
_buffer.append(buf);
if (!eos && _buffer.size() < config::brpc_streaming_client_batch_bytes) {
if (!sync && _buffer.size() < config::brpc_streaming_client_batch_bytes) {
return Status::OK();
}
output.swap(_buffer);
// acquire send lock while holding buffer lock, to ensure the message order
std::lock_guard<decltype(_send_mutex)> send_lock(_send_mutex);
buffer_lock.unlock();
VLOG_DEBUG << "send buf size : " << output.size() << ", eos: " << eos;
VLOG_DEBUG << "send buf size : " << output.size() << ", sync: " << sync;
return _send_with_retry(output);
}

Expand Down
27 changes: 25 additions & 2 deletions be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ class LoadStreamStub {
private:
class LoadStreamReplyHandler : public brpc::StreamInputHandler {
public:
LoadStreamReplyHandler(LoadStreamStub* stub) : _stub(stub) {}

int on_received_messages(brpc::StreamId id, butil::IOBuf* const messages[],
size_t size) override;

Expand Down Expand Up @@ -130,6 +132,8 @@ class LoadStreamStub {
bthread::Mutex _failed_tablets_mutex;
std::vector<int64_t> _success_tablets;
std::vector<int64_t> _failed_tablets;

LoadStreamStub* _stub;
};

public:
Expand Down Expand Up @@ -167,6 +171,9 @@ class LoadStreamStub {
// CLOSE_LOAD
Status close_load(const std::vector<PTabletID>& tablets_to_commit);

// GET_SCHEMA
Status get_schema(const std::vector<PTabletID>& tablets);

// wait remote to close stream,
// remote will close stream when it receives CLOSE_LOAD
Status close_wait(int64_t timeout_ms = 0) {
Expand All @@ -176,6 +183,21 @@ class LoadStreamStub {
return _handler.close_wait(timeout_ms);
}

Status wait_for_schema(int64_t partition_id, int64_t index_id, int64_t tablet_id,
int64_t timeout_ms = 60000);

Status wait_for_new_schema(int64_t timeout_ms) {
std::unique_lock<bthread::Mutex> lock(_mutex);
if (timeout_ms > 0) {
int ret = _schema_cv.wait_for(lock, timeout_ms * 1000);
return ret == 0 ? Status::OK() : Status::Error<true>(ret, "wait schema update timeout");
}
_schema_cv.wait(lock);
return Status::OK();
};

void add_schema(const std::vector<PTabletSchemaWithIndex>& schemas);

std::shared_ptr<TabletSchema> tablet_schema(int64_t index_id) const {
return (*_tablet_schema_for_index)[index_id];
}
Expand All @@ -196,7 +218,7 @@ class LoadStreamStub {

private:
Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data = {});
Status _send_with_buffer(butil::IOBuf& buf, bool eos = false);
Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
Status _send_with_retry(butil::IOBuf& buf);

protected:
Expand All @@ -216,8 +238,9 @@ class LoadStreamStub {
brpc::StreamId _stream_id;
int64_t _src_id = -1; // source backend_id
int64_t _dst_id = -1; // destination backend_id
LoadStreamReplyHandler _handler;
LoadStreamReplyHandler _handler {this};

bthread::ConditionVariable _schema_cv;
std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
};
Expand Down
Loading

0 comments on commit f6a9914

Please sign in to comment.