Skip to content

Commit

Permalink
[Feature] Add local tablet reader and rpc interface (StarRocks#16537)
Browse files Browse the repository at this point in the history
Signed-off-by: Binglin Chang <decstery@gmail.com>
  • Loading branch information
decster authored Feb 1, 2023
1 parent ae11f69 commit 03baf49
Show file tree
Hide file tree
Showing 12 changed files with 473 additions and 146 deletions.
45 changes: 45 additions & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,51 @@ Status PInternalServiceImplBase<T>::_mv_commit_epoch(const pipeline::QueryContex
return Status::OK();
}

template <typename T>
void PInternalServiceImplBase<T>::local_tablet_reader_open(google::protobuf::RpcController* controller,
const PTabletReaderOpenRequest* request,
PTabletReaderOpenResult* response,
google::protobuf::Closure* done) {
ClosureGuard closure_guard(done);
response->mutable_status()->set_status_code(TStatusCode::NOT_IMPLEMENTED_ERROR);
}

template <typename T>
void PInternalServiceImplBase<T>::local_tablet_reader_close(google::protobuf::RpcController* controller,
const PTabletReaderCloseRequest* request,
PTabletReaderCloseResult* response,
google::protobuf::Closure* done) {
ClosureGuard closure_guard(done);
response->mutable_status()->set_status_code(TStatusCode::NOT_IMPLEMENTED_ERROR);
}

template <typename T>
void PInternalServiceImplBase<T>::local_tablet_reader_multi_get(google::protobuf::RpcController* controller,
const PTabletReaderMultiGetRequest* request,
PTabletReaderMultiGetResult* response,
google::protobuf::Closure* done) {
ClosureGuard closure_guard(done);
response->mutable_status()->set_status_code(TStatusCode::NOT_IMPLEMENTED_ERROR);
}

template <typename T>
void PInternalServiceImplBase<T>::local_tablet_reader_scan_open(google::protobuf::RpcController* controller,
const PTabletReaderScanOpenRequest* request,
PTabletReaderScanOpenResult* response,
google::protobuf::Closure* done) {
ClosureGuard closure_guard(done);
response->mutable_status()->set_status_code(TStatusCode::NOT_IMPLEMENTED_ERROR);
}

template <typename T>
void PInternalServiceImplBase<T>::local_tablet_reader_scan_get_next(google::protobuf::RpcController* controller,
const PTabletReaderScanGetNextRequest* request,
PTabletReaderScanGetNextResult* response,
google::protobuf::Closure* done) {
ClosureGuard closure_guard(done);
response->mutable_status()->set_status_code(TStatusCode::NOT_IMPLEMENTED_ERROR);
}

template class PInternalServiceImplBase<PInternalService>;
template class PInternalServiceImplBase<doris::PBackendService>;

Expand Down
20 changes: 20 additions & 0 deletions be/src/service/internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,26 @@ class PInternalServiceImplBase : public T {
const PMVMaintenanceTaskRequest* request, PMVMaintenanceTaskResult* response,
google::protobuf::Closure* done) override;

void local_tablet_reader_open(google::protobuf::RpcController* controller, const PTabletReaderOpenRequest* request,
PTabletReaderOpenResult* response, google::protobuf::Closure* done) override;

void local_tablet_reader_close(google::protobuf::RpcController* controller,
const PTabletReaderCloseRequest* request, PTabletReaderCloseResult* response,
google::protobuf::Closure* done) override;

void local_tablet_reader_multi_get(google::protobuf::RpcController* controller,
const PTabletReaderMultiGetRequest* request,
PTabletReaderMultiGetResult* response, google::protobuf::Closure* done) override;

void local_tablet_reader_scan_open(google::protobuf::RpcController* controller,
const PTabletReaderScanOpenRequest* request,
PTabletReaderScanOpenResult* response, google::protobuf::Closure* done) override;

void local_tablet_reader_scan_get_next(google::protobuf::RpcController* controller,
const PTabletReaderScanGetNextRequest* request,
PTabletReaderScanGetNextResult* response,
google::protobuf::Closure* done) override;

private:
void _transmit_chunk(::google::protobuf::RpcController* controller,
const ::starrocks::PTransmitChunkParams* request, ::starrocks::PTransmitChunkResult* response,
Expand Down
30 changes: 30 additions & 0 deletions be/src/service/service_be/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,36 @@ void BackendInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcCo
*request, response, done);
}

template <typename T>
void BackendInternalServiceImpl<T>::local_tablet_reader_open(google::protobuf::RpcController* controller,
const PTabletReaderOpenRequest* request,
PTabletReaderOpenResult* response,
google::protobuf::Closure* done) {}

template <typename T>
void BackendInternalServiceImpl<T>::local_tablet_reader_close(google::protobuf::RpcController* controller,
const PTabletReaderCloseRequest* request,
PTabletReaderCloseResult* response,
google::protobuf::Closure* done) {}

template <typename T>
void BackendInternalServiceImpl<T>::local_tablet_reader_multi_get(google::protobuf::RpcController* controller,
const PTabletReaderMultiGetRequest* request,
PTabletReaderMultiGetResult* response,
google::protobuf::Closure* done) {}

template <typename T>
void BackendInternalServiceImpl<T>::local_tablet_reader_scan_open(google::protobuf::RpcController* controller,
const PTabletReaderScanOpenRequest* request,
PTabletReaderScanOpenResult* response,
google::protobuf::Closure* done) {}

template <typename T>
void BackendInternalServiceImpl<T>::local_tablet_reader_scan_get_next(google::protobuf::RpcController* controller,
const PTabletReaderScanGetNextRequest* request,
PTabletReaderScanGetNextResult* response,
google::protobuf::Closure* done) {}

template class BackendInternalServiceImpl<PInternalService>;
template class BackendInternalServiceImpl<doris::PBackendService>;
} // namespace starrocks
16 changes: 16 additions & 0 deletions be/src/service/service_be/internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,22 @@ class BackendInternalServiceImpl : public PInternalServiceImplBase<T> {

void tablet_writer_cancel(google::protobuf::RpcController* controller, const PTabletWriterCancelRequest* request,
PTabletWriterCancelResult* response, google::protobuf::Closure* done) override;

void local_tablet_reader_open(google::protobuf::RpcController* controller, const PTabletReaderOpenRequest* request,
PTabletReaderOpenResult* response, google::protobuf::Closure* done) override;
void local_tablet_reader_close(google::protobuf::RpcController* controller,
const PTabletReaderCloseRequest* request, PTabletReaderCloseResult* response,
google::protobuf::Closure* done) override;
void local_tablet_reader_multi_get(google::protobuf::RpcController* controller,
const PTabletReaderMultiGetRequest* request,
PTabletReaderMultiGetResult* response, google::protobuf::Closure* done) override;
void local_tablet_reader_scan_open(google::protobuf::RpcController* controller,
const PTabletReaderScanOpenRequest* request,
PTabletReaderScanOpenResult* response, google::protobuf::Closure* done) override;
void local_tablet_reader_scan_get_next(google::protobuf::RpcController* controller,
const PTabletReaderScanGetNextRequest* request,
PTabletReaderScanGetNextResult* response,
google::protobuf::Closure* done) override;
};

} // namespace starrocks
1 change: 1 addition & 0 deletions be/src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ add_library(Storage STATIC
segment_replicate_executor.cpp
metadata_util.cpp
kv_store.cpp
local_tablet_reader.cpp
olap_common.cpp
olap_server.cpp
options.cpp
Expand Down
173 changes: 173 additions & 0 deletions be/src/storage/local_tablet_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "storage/local_tablet_reader.h"

#include "storage/chunk_helper.h"
#include "storage/primary_index.h"
#include "storage/primary_key_encoder.h"
#include "storage/projection_iterator.h"
#include "storage/tablet_reader.h"
#include "storage/tablet_updates.h"

namespace starrocks {

LocalTabletReader::LocalTabletReader() {}

Status LocalTabletReader::init(TabletSharedPtr& tablet, int64_t version) {
if (tablet->updates() == nullptr) {
return Status::NotSupported("LocalTabletReader only support PK tablet");
}
_tablet = tablet;
_version = version;
return Status::OK();
}

Status LocalTabletReader::close() {
return Status::OK();
}

static void plan_read_by_rssid(const vector<uint64_t>& rowids, vector<bool>& found,
std::map<uint32_t, std::vector<uint32_t>>& rowids_by_rssid, vector<uint32_t>& idxes) {
struct RowidSortEntry {
uint32_t rowid;
uint32_t idx;
RowidSortEntry(uint32_t rowid, uint32_t idx) : rowid(rowid), idx(idx) {}
bool operator<(const RowidSortEntry& rhs) const { return rowid < rhs.rowid; }
};

size_t n = rowids.size();
found.resize(n);
phmap::node_hash_map<uint32_t, std::vector<RowidSortEntry>> sort_entry_by_rssid;
uint32_t idx = 0;
for (uint32_t i = 0; i < n; i++) {
uint64_t v = rowids[i];
uint32_t rssid = v >> 32;
if (rssid == (uint32_t)-1) {
found[i] = false;
} else {
found[i] = true;
uint32_t rowid = v & ROWID_MASK;
sort_entry_by_rssid[rssid].emplace_back(rowid, idx++);
}
}
// construct rowids_by_rssid
for (auto& e : sort_entry_by_rssid) {
std::sort(e.second.begin(), e.second.end());
rowids_by_rssid.emplace(e.first, std::vector<uint32_t>(e.second.size()));
}
idxes.resize(idx);
// iterate rowids_by_rssid by rssid order
uint32_t ridx = 0;
for (auto& e : rowids_by_rssid) {
auto& sort_entries = sort_entry_by_rssid[e.first];
for (uint32_t i = 0; i < sort_entries.size(); i++) {
e.second[i] = sort_entries[i].rowid;
idxes[sort_entries[i].idx] = ridx;
ridx++;
}
}
}

Status LocalTabletReader::multi_get(const Chunk& keys, const std::vector<std::string>& value_columns,
std::vector<bool>& found, Chunk& values) {
int64_t t_start = MonotonicMillis();
size_t n = keys.num_rows();
if (n > UINT32_MAX) {
return Status::InvalidArgument(
strings::Substitute("multi_get number of keys exceed limit $0 > $1", n, UINT32_MAX));
}
// get read column ids by values_columns
const auto& tablet_schema = _tablet->tablet_schema();
std::vector<uint32_t> read_column_ids;
for (const auto& name : value_columns) {
auto cid = tablet_schema.field_index(name);
if (cid == -1) {
return Status::InvalidArgument(strings::Substitute("multi_get value_column $0 not found", name));
}
read_column_ids.push_back(cid);
}

// convert keys to pk single column format
vector<uint32_t> pk_columns;
for (size_t i = 0; i < tablet_schema.num_key_columns(); i++) {
pk_columns.push_back((uint32_t)i);
}
std::unique_ptr<Column> pk_column;
if (!PrimaryKeyEncoder::create_column(*tablet_schema.schema(), &pk_column).ok()) {
CHECK(false) << "create column for primary key encoder failed";
}
PrimaryKeyEncoder::encode(*tablet_schema.schema(), keys, 0, keys.num_rows(), pk_column.get());

// search pks in pk index to get rowids
EditVersion edit_version;
std::vector<uint64_t> rowids(n);
RETURN_IF_ERROR(
_tablet->updates()->prepare_partial_update_states(_tablet.get(), pk_column, &edit_version, &rowids));
if (edit_version.major() != _version) {
return Status::InternalError(
strings::Substitute("multi_get version not match tablet:$0 current_version:$1 read_version:$2",
_tablet->tablet_id(), edit_version.to_string(), _version));
}
if (rowids.size() != n) {
return Status::InternalError(strings::Substitute("multi_get rowid size not match tablet:$0 $1 != $2",
_tablet->tablet_id(), rowids.size(), n));
}

// sort rowids by rssid, so we can plan&perform read operations by rowset/segment
std::map<uint32_t, std::vector<uint32_t>> rowids_by_rssid;
vector<uint32_t> idxes;
plan_read_by_rssid(rowids, found, rowids_by_rssid, idxes);

auto read_column_schema = ChunkHelper::convert_schema(tablet_schema, read_column_ids);
std::vector<std::unique_ptr<Column>> read_columns(read_column_ids.size());
for (uint32_t i = 0; i < read_columns.size(); ++i) {
read_columns[i] = ChunkHelper::column_from_field(*read_column_schema.field(i).get())->clone_empty();
}
RETURN_IF_ERROR(_tablet->updates()->get_column_values(read_column_ids, false, rowids_by_rssid, &read_columns));

// reorder read values to input keys' order and put into values output parameter
values.reset();
for (size_t col_idx = 0; col_idx < read_column_ids.size(); col_idx++) {
values.get_column_by_index(col_idx)->append_selective(*read_columns[col_idx], idxes.data(), 0, idxes.size());
}
int64_t t_end = MonotonicMillis();
LOG(INFO) << strings::Substitute("multi_get tablet:$0 version:$1 #columns:$2 #rows:$3 found:$4 time:$5ms",
_tablet->tablet_id(), _version, value_columns.size(), n, idxes.size(),
t_end - t_start);
return Status::OK();
}

StatusOr<ChunkIteratorPtr> LocalTabletReader::scan(const std::vector<std::string>& value_columns,
const std::vector<const ColumnPredicate*>& predicates) {
TabletReaderParams tablet_reader_params;
tablet_reader_params.predicates = predicates;
auto& full_schema = *_tablet->tablet_schema().schema();
vector<ColumnId> column_ids;
for (auto& cname : value_columns) {
auto idx = full_schema.get_field_index_by_name(cname);
if (idx == -1) {
return Status::InvalidArgument(strings::Substitute("column $0 not found", cname));
}
column_ids.push_back(idx);
}
auto values_schema = Schema(&full_schema, column_ids);
std::shared_ptr<TabletReader> reader = std::make_shared<TabletReader>(_tablet, Version(0, _version), full_schema);
RETURN_IF_ERROR(reader->prepare());
RETURN_IF_ERROR(reader->open(tablet_reader_params));
// TODO: remove projection
return new_projection_iterator(values_schema, reader);
}

} // namespace starrocks
53 changes: 53 additions & 0 deletions be/src/storage/local_tablet_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "storage/chunk_iterator.h"
#include "storage/tablet.h"

namespace starrocks {

class ColumnPredicate;

// A tablet reader supports multiple get and scan snapshot reads
// this class is used for remote accessible TableReader, do not confuse with TabletReader
class LocalTabletReader {
public:
LocalTabletReader();

Status init(TabletSharedPtr& tablet, int64_t version);

Status close();

// for detail document, see TableReader::scan
Status multi_get(const Chunk& keys, const std::vector<std::string>& value_columns, std::vector<bool>& found,
Chunk& values);

// for detail document, see TableReader::scan
StatusOr<ChunkIteratorPtr> scan(const std::vector<std::string>& value_columns,
const std::vector<const ColumnPredicate*>& predicates);

private:
TabletSharedPtr _tablet;
int64_t _version{0};
};

// Manage remotely opened LocalTabletReaders
class LocalTabletReaderManager {
public:
LocalTabletReaderManager();
};

} // namespace starrocks
Loading

0 comments on commit 03baf49

Please sign in to comment.