Skip to content

Commit

Permalink
Bugfix: fail to capture rowset for version xxx (#234)
Browse files Browse the repository at this point in the history
Issue number #227

The `Tablet::capture_consistent_rowset()` method is used in
`Reader::prepare()` method to get rowsets of specified version.
`Tablet::capture_consistent_rowset()` will check the _stale_rs_meta_map,
so rowsets that have been compacted but not yet been deleted can still be found.

This PR also splits the original `Reader::init` function into two functions:
`Reader::prepare` and `Reader::open`.
`Reader::prepare` is used to fetch specific version of rowsets, while the
`Reader::open` is used to open those rowsets.

The `Reader::prepare` method is usually called before query processing begins, while
the `Reader::open` method is usually called during query processing.

A single query may involve multiple tablets and may take a long time to execute. By
fetching all the rowsets of the tablets sequentially before the query execution, it is
ensured that these rowsets will not be deleted during the execution, while the open
function can be called by multiple threads during the query execution to reduce the
latency of rowset opening.
  • Loading branch information
sduzh authored Sep 15, 2021
1 parent d6eb8ec commit b2a1324
Show file tree
Hide file tree
Showing 16 changed files with 94 additions and 128 deletions.
14 changes: 3 additions & 11 deletions be/src/exec/pipeline/olap_chunk_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,8 @@ Status OlapChunkSource::_get_tablet(const TInternalScanRange* scan_range) {
Status OlapChunkSource::_init_reader_params(const std::vector<OlapScanRange*>& key_ranges,
const std::vector<uint32_t>& scanner_columns,
std::vector<uint32_t>& reader_columns, vectorized::ReaderParams* params) {
params->tablet = _tablet;
params->reader_type = READER_QUERY;
params->skip_aggregation = _skip_aggregation;
params->version = Version(0, _version);
params->profile = _scan_profile;
params->runtime_state = _runtime_state;
params->use_page_cache = !config::disable_storage_page_cache;
Expand Down Expand Up @@ -211,7 +209,7 @@ Status OlapChunkSource::_init_olap_reader(RuntimeState* runtime_state) {
const TabletSchema& tablet_schema = _tablet->tablet_schema();
starrocks::vectorized::Schema child_schema =
ChunkHelper::convert_schema_to_format_v2(tablet_schema, reader_columns);
_reader = std::make_shared<Reader>(std::move(child_schema));
_reader = std::make_shared<Reader>(_tablet, Version(0, _version), std::move(child_schema));
if (reader_columns.size() == scanner_columns.size()) {
_prj_iter = _reader;
} else {
Expand All @@ -223,14 +221,8 @@ Status OlapChunkSource::_init_olap_reader(RuntimeState* runtime_state) {
if (!_un_push_down_conjuncts.empty() || !_un_push_down_predicates.empty()) {
_expr_filter_timer = ADD_TIMER(_scan_profile, "ExprFilterTime");
}
Status res = _reader->init(params);
if (!res.ok()) {
std::stringstream ss;
ss << "failed to initialize storage reader. tablet=" << params.tablet->full_name()
<< ", res=" << res.to_string() << ", backend=" << BackendOptions::get_localhost();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str().c_str());
}
RETURN_IF_ERROR(_reader->prepare());
RETURN_IF_ERROR(_reader->open(params));
return Status::OK();
}

Expand Down
1 change: 1 addition & 0 deletions be/src/exec/pipeline/olap_chunk_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "exprs/expr_context.h"
#include "gen_cpp/InternalService_types.h"
#include "runtime/runtime_state.h"
#include "storage/tablet.h"
#include "storage/vectorized/conjunctive_predicates.h"
#include "storage/vectorized/reader.h"
#include "storage/vectorized/reader_params.h"
Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/vectorized/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,7 @@ void OlapScanNode::_init_counter(RuntimeState* state) {

_scan_profile = _runtime_profile->create_child("SCAN", true, false);

_reader_init_timer = ADD_TIMER(_scan_profile, "ReaderInit");
_capture_rowset_timer = ADD_CHILD_TIMER(_scan_profile, "CaptureRowset", "ReaderInit");
_create_seg_iter_timer = ADD_TIMER(_scan_profile, "CreateSegmentIter");

_read_compressed_counter = ADD_COUNTER(_scan_profile, "CompressedBytesRead", TUnit::BYTES);
_read_uncompressed_counter = ADD_COUNTER(_scan_profile, "UncompressedBytesRead", TUnit::BYTES);
Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/vectorized/olap_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,8 @@ class OlapScanNode final : public starrocks::ScanNode {
RuntimeProfile* _scan_profile = nullptr;

RuntimeProfile::Counter* _scan_timer = nullptr;
RuntimeProfile::Counter* _capture_rowset_timer = nullptr;
RuntimeProfile::Counter* _create_seg_iter_timer = nullptr;
RuntimeProfile::Counter* _tablet_counter = nullptr;
RuntimeProfile::Counter* _reader_init_timer = nullptr;
RuntimeProfile::Counter* _io_timer = nullptr;
RuntimeProfile::Counter* _read_compressed_counter = nullptr;
RuntimeProfile::Counter* _decompress_timer = nullptr;
Expand Down
40 changes: 22 additions & 18 deletions be/src/exec/vectorized/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Status OlapScanner::init(RuntimeState* runtime_state, const OlapScannerParams& p
RETURN_IF_ERROR(_init_reader_params(params.key_ranges));
const TabletSchema& tablet_schema = _tablet->tablet_schema();
Schema child_schema = ChunkHelper::convert_schema_to_format_v2(tablet_schema, _reader_columns);
_reader = std::make_shared<Reader>(std::move(child_schema));
_reader = std::make_shared<Reader>(_tablet, Version(0, _version), std::move(child_schema));
if (_reader_columns.size() == _scanner_columns.size()) {
_prj_iter = _reader;
} else {
Expand All @@ -40,26 +40,32 @@ Status OlapScanner::init(RuntimeState* runtime_state, const OlapScannerParams& p
if (!_conjunct_ctxs.empty() || !_predicates.empty()) {
_expr_filter_timer = ADD_TIMER(_parent->_runtime_profile, "ExprFilterTime");
}
return Status::OK();

Status st = _reader->prepare();
if (!st.ok()) {
std::string msg = strings::Substitute("[$0] fail to prepare tablet reader $1: $2",
BackendOptions::get_localhost(), _tablet->full_name(), st.to_string());
LOG(WARNING) << msg;
return Status::InternalError(msg);
} else {
return Status::OK();
}
}

Status OlapScanner::open([[maybe_unused]] RuntimeState* runtime_state) {
if (_is_open) {
return Status::OK();
} else {
_is_open = true;
Status st = _reader->open(_params);
if (!st.ok()) {
auto msg = strings::Substitute("[$0] fail to open tablet reader $1: $2", BackendOptions::get_localhost(),
_tablet->full_name(), st.to_string());
st = Status::InternalError(msg);
LOG(WARNING) << st;
}
return st;
}

SCOPED_TIMER(_parent->_reader_init_timer);

Status res = _reader->init(_params);
if (!res.ok()) {
std::stringstream ss;
ss << "failed to initialize storage reader. tablet=" << _params.tablet->full_name()
<< ", res=" << res.to_string() << ", backend=" << BackendOptions::get_localhost();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str().c_str());
}
_is_open = true;
return Status::OK();
}

Status OlapScanner::close(RuntimeState* state) {
Expand Down Expand Up @@ -94,10 +100,8 @@ Status OlapScanner::_get_tablet(const TInternalScanRange* scan_range) {
}

Status OlapScanner::_init_reader_params(const std::vector<OlapScanRange*>* key_ranges) {
_params.tablet = _tablet;
_params.reader_type = READER_QUERY;
_params.skip_aggregation = _skip_aggregation;
_params.version = Version(0, _version);
_params.profile = _parent->_scan_profile;
_params.runtime_state = _runtime_state;
// If a agg node is this scan node direct parent
Expand Down Expand Up @@ -238,7 +242,7 @@ void OlapScanner::update_counter() {
if (_has_update_counter) {
return;
}
COUNTER_UPDATE(_parent->_capture_rowset_timer, _reader->stats().capture_rowset_ns);
COUNTER_UPDATE(_parent->_create_seg_iter_timer, _reader->stats().create_segment_iter_ns);
COUNTER_UPDATE(_parent->_rows_read_counter, _num_rows_read);

COUNTER_UPDATE(_parent->_io_timer, _reader->stats().io_ns);
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/vectorized/olap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "exprs/expr_context.h"
#include "gen_cpp/InternalService_types.h"
#include "runtime/runtime_state.h"
#include "storage/tablet.h"
#include "storage/vectorized/conjunctive_predicates.h"
#include "storage/vectorized/reader.h"
#include "storage/vectorized/reader_params.h"
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ using KeyRange = std::pair<WrapperField*, WrapperField*>;

// ReaderStatistics used to collect statistics when scan data from storage
struct OlapReaderStatistics {
int64_t capture_rowset_ns = 0;
int64_t create_segment_iter_ns = 0;
int64_t io_ns = 0;
int64_t compressed_bytes_read = 0;

Expand Down
4 changes: 0 additions & 4 deletions be/src/storage/rowset/vectorized/rowset_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ class RowsetReadOptions {
// whether rowset should return rows in sorted order.
bool sorted = true;

// columns to load bloom filter index
// including columns in "=" or "in" conditions
const std::set<uint32_t>* load_bf_columns = nullptr;

const DeletePredicates* delete_predicates = nullptr;

const TabletSchema* tablet_schema = nullptr;
Expand Down
18 changes: 5 additions & 13 deletions be/src/storage/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1304,24 +1304,16 @@ StatusOr<Tablet::IteratorList> Tablet::capture_segment_iterators(const Version&
return _updates->read(spec_version.second, schema, options);
}
std::shared_lock rdlock(_meta_lock);
vector<Version> version_path;
std::vector<Version> version_path;
std::vector<RowsetSharedPtr> rowsets;
OLAPStatus res = capture_consistent_versions(spec_version, &version_path);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "Fail to capture consistent versions. err=" << res;
return Status::InternalError("capture consistent versions failed");
}

std::vector<RowsetSharedPtr> rowsets;
rowsets.reserve(version_path.size());
for (auto version : version_path) {
auto it = _rs_version_map.find(version);
if (it == _rs_version_map.end()) {
return Status::InternalError("fail to find rowset for version");
}
const RowsetSharedPtr& rowset = it->second;
if (!rowset->empty()) {
rowsets.emplace_back(rowset);
}
res = _capture_consistent_rowsets_unlocked(version_path, &rowsets);
if (res != OLAP_SUCCESS) {
return Status::InternalError("fail to capture rowset for some version");
}
// Release lock before acquiring segment iterators.
rdlock.unlock();
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class Tablet : public BaseTablet {
bool check_version_exist(const Version& version) const;
void list_versions(std::vector<Version>* versions) const;

// REQUIRE: `obtain_header_rdlock()`ed
OLAPStatus capture_consistent_rowsets(const Version& spec_version, vector<RowsetSharedPtr>* rowsets) const;
OLAPStatus capture_rs_readers(const Version& spec_version, vector<RowsetReaderSharedPtr>* rs_readers) const;
OLAPStatus capture_rs_readers(const vector<Version>& version_path, vector<RowsetReaderSharedPtr>* rs_readers) const;
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/tablet_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class TabletMeta {
const std::unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id,
RowsetTypePB rowset_type, TabletMetaSharedPtr* tablet_meta);

TabletMeta(MemTracker* mem_tracker);
explicit TabletMeta(MemTracker* mem_tracker);
TabletMeta(MemTracker* mem_tracker, int64_t table_id, int64_t partition_id, int64_t tablet_id, int32_t schema_hash,
uint64_t shard_id, const TTabletSchema& tablet_schema, uint32_t next_unique_id,
const std::unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id, const TabletUid& tablet_uid,
Expand Down
7 changes: 3 additions & 4 deletions be/src/storage/vectorized/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,9 @@ Status Compaction::construct_output_rowset_writer() {
Status Compaction::merge_rowsets(MemTracker* mem_tracker, Statistics* stats_output) {
TRACE_COUNTER_SCOPE_LATENCY_US("merge_rowsets_latency_us");
Schema schema = ChunkHelper::convert_schema_to_format_v2(_tablet->tablet_schema());
Reader reader(schema);
Reader reader(_tablet, _output_rs_writer->version(), schema);
ReaderParams reader_params;
reader_params.tablet = _tablet;
reader_params.reader_type = compaction_type();
reader_params.version = _output_rs_writer->version();
reader_params.profile = _runtime_profile.create_child("merge_rowsets");

int64_t num_rows = 0;
Expand All @@ -172,7 +170,8 @@ Status Compaction::merge_rowsets(MemTracker* mem_tracker, Statistics* stats_outp
chunk_size = config::vector_chunk_size;
}
reader_params.chunk_size = chunk_size;
RETURN_IF_ERROR(reader.init(reader_params));
RETURN_IF_ERROR(reader.prepare());
RETURN_IF_ERROR(reader.open(reader_params));

int64_t output_rows = 0;

Expand Down
Loading

0 comments on commit b2a1324

Please sign in to comment.