Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Enhancement] Make scan metrics not duplicated for tablet internal pa…
Browse files Browse the repository at this point in the history
…rallel

Signed-off-by: zihe.liu <ziheliu1024@gmail.com>
ZiheLiu committed Oct 25, 2023
1 parent 73bf022 commit 547326e
Showing 27 changed files with 888 additions and 148 deletions.
4 changes: 4 additions & 0 deletions be/src/connector/lake_connector.cpp
Original file line number Diff line number Diff line change
@@ -132,6 +132,7 @@ class LakeDataSource final : public DataSource {
RuntimeProfile::Counter* _seg_zm_filtered_counter = nullptr;
RuntimeProfile::Counter* _seg_rt_filtered_counter = nullptr;
RuntimeProfile::Counter* _sk_filtered_counter = nullptr;
RuntimeProfile::Counter* _rows_after_sk_filtered_counter = nullptr;
RuntimeProfile::Counter* _block_seek_timer = nullptr;
RuntimeProfile::Counter* _block_seek_counter = nullptr;
RuntimeProfile::Counter* _block_load_timer = nullptr;
@@ -542,6 +543,8 @@ void LakeDataSource::init_counter(RuntimeState* state) {
_zm_filtered_counter =
ADD_CHILD_COUNTER(_runtime_profile, "ZoneMapIndexFilterRows", TUnit::UNIT, segment_init_name);
_sk_filtered_counter = ADD_CHILD_COUNTER(_runtime_profile, "ShortKeyFilterRows", TUnit::UNIT, segment_init_name);
_rows_after_sk_filtered_counter =
ADD_CHILD_COUNTER(_runtime_profile, "RemainingRowsAfterShortKeyFilter", TUnit::UNIT, segment_init_name);
_rows_key_range_counter =
ADD_CHILD_COUNTER(_runtime_profile, "ShortKeyRangeNumber", TUnit::UNIT, segment_init_name);
_column_iterator_init_timer = ADD_CHILD_TIMER(_runtime_profile, "ColumnIteratorInit", segment_init_name);
@@ -657,6 +660,7 @@ void LakeDataSource::update_counter() {
COUNTER_UPDATE(_zm_filtered_counter, _reader->stats().rows_stats_filtered);
COUNTER_UPDATE(_bf_filtered_counter, _reader->stats().rows_bf_filtered);
COUNTER_UPDATE(_sk_filtered_counter, _reader->stats().rows_key_range_filtered);
COUNTER_UPDATE(_rows_after_sk_filtered_counter, _reader->stats().rows_after_key_range);
COUNTER_UPDATE(_rows_key_range_counter, _reader->stats().rows_key_range_num);

COUNTER_UPDATE(_bi_filtered_counter, _reader->stats().rows_bitmap_index_filtered);
50 changes: 47 additions & 3 deletions be/src/exec/pipeline/scan/morsel.cpp
Original file line number Diff line number Diff line change
@@ -35,12 +35,49 @@ namespace starrocks::pipeline {

const std::vector<RowsetSharedPtr> Morsel::kEmptyRowsets;

class PhysicalSplitScanMorsel final : public ScanMorsel {
public:
PhysicalSplitScanMorsel(int32_t plan_node_id, const TScanRange& scan_range, RowidRangeOptionPtr rowid_range_option)
: ScanMorsel(plan_node_id, scan_range), _rowid_range_option(std::move(rowid_range_option)) {}

~PhysicalSplitScanMorsel() override = default;

void init_tablet_reader_params(TabletReaderParams* params) override;

const std::unordered_set<std::string>& skip_min_max_metrics() const override {
static const std::unordered_set<std::string> metrics{"ShortKeyFilterRows", "SegmentZoneMapFilterRows"};
return metrics;
}

private:
RowidRangeOptionPtr _rowid_range_option;
};

class LogicalSplitScanMorsel final : public ScanMorsel {
public:
LogicalSplitScanMorsel(int32_t plan_node_id, const TScanRange& scan_range,
ShortKeyRangesOptionPtr short_key_ranges_option)
: ScanMorsel(plan_node_id, scan_range), _short_key_ranges_option(std::move(short_key_ranges_option)) {}

~LogicalSplitScanMorsel() override = default;

void init_tablet_reader_params(TabletReaderParams* params) override;

const std::unordered_set<std::string>& skip_min_max_metrics() const override {
static const std::unordered_set<std::string> metrics{"ShortKeyFilterRows", "SegmentZoneMapFilterRows"};
return metrics;
}

private:
ShortKeyRangesOptionPtr _short_key_ranges_option;
};

void PhysicalSplitScanMorsel::init_tablet_reader_params(TabletReaderParams* params) {
params->rowid_range_option = _rowid_range_option;
}

void LogicalSplitScanMorsel::init_tablet_reader_params(TabletReaderParams* params) {
params->short_key_ranges = _short_key_ranges;
params->short_key_ranges_option = _short_key_ranges_option;
}

/// MorselQueueFactory.
@@ -277,7 +314,9 @@ StatusOr<RowidRangeOptionPtr> PhysicalSplitMorselQueue::_try_get_split_from_sing
<< "[range=" << taken_range.to_string() << "] ";

num_taken_rows += taken_range.span_size();
rowid_range->add(_cur_rowset(), _cur_segment(), std::make_shared<SparseRange<>>(std::move(taken_range)));
rowid_range->add(_cur_rowset(), _cur_segment(), std::make_shared<SparseRange<>>(std::move(taken_range)),
_is_first_split_of_segment);
_is_first_split_of_segment = false;

if (_is_last_split_of_current_morsel()) {
return rowid_range;
@@ -403,6 +442,8 @@ bool PhysicalSplitMorselQueue::_next_segment() {
}

Status PhysicalSplitMorselQueue::_init_segment() {
_is_first_split_of_segment = true;

// Load the meta of the new rowset and the index of the new segment。
if (0 == _segment_idx) {
// Read a new tablet.
@@ -588,7 +629,9 @@ StatusOr<MorselPtr> LogicalSplitMorselQueue::try_get() {

auto* scan_morsel = down_cast<ScanMorsel*>(_morsels[_tablet_idx].get());
auto morsel = std::make_unique<LogicalSplitScanMorsel>(
scan_morsel->get_plan_node_id(), *(scan_morsel->get_scan_range()), std::move(short_key_ranges));
scan_morsel->get_plan_node_id(), *(scan_morsel->get_scan_range()),
std::make_shared<ShortKeyRangesOption>(std::move(short_key_ranges), _is_first_split_of_tablet));
_is_first_split_of_tablet = false;
morsel->set_rowsets(_tablet_rowsets[_tablet_idx]);
_inc_num_splits(_is_last_split_of_current_morsel());
return morsel;
@@ -730,6 +773,7 @@ Status LogicalSplitMorselQueue::_init_tablet() {
_block_ranges_per_seek_range.clear();
_num_rest_blocks_per_seek_range.clear();
_range_idx = 0;
_is_first_split_of_tablet = true;

if (_tablet_idx == 0) {
// All the tablets have the same schema, so parse seek range with the first table schema.
34 changes: 7 additions & 27 deletions be/src/exec/pipeline/scan/morsel.h
Original file line number Diff line number Diff line change
@@ -91,6 +91,11 @@ class Morsel {
}
}

virtual const std::unordered_set<std::string>& skip_min_max_metrics() const {
static const std::unordered_set<std::string> metrics;
return metrics;
}

private:
int32_t _plan_node_id;
int64_t _from_version = 0;
@@ -139,33 +144,6 @@ class ScanMorsel : public Morsel {
int64_t _version = 0;
};

class PhysicalSplitScanMorsel final : public ScanMorsel {
public:
PhysicalSplitScanMorsel(int32_t plan_node_id, const TScanRange& scan_range, RowidRangeOptionPtr rowid_range_option)
: ScanMorsel(plan_node_id, scan_range), _rowid_range_option(std::move(rowid_range_option)) {}

~PhysicalSplitScanMorsel() override = default;

void init_tablet_reader_params(TabletReaderParams* params) override;

private:
RowidRangeOptionPtr _rowid_range_option;
};

class LogicalSplitScanMorsel final : public ScanMorsel {
public:
LogicalSplitScanMorsel(int32_t plan_node_id, const TScanRange& scan_range,
std::vector<ShortKeyRangeOptionPtr> short_key_ranges)
: ScanMorsel(plan_node_id, scan_range), _short_key_ranges(std::move(short_key_ranges)) {}

~LogicalSplitScanMorsel() override = default;

void init_tablet_reader_params(TabletReaderParams* params) override;

private:
std::vector<ShortKeyRangeOptionPtr> _short_key_ranges;
};

/// MorselQueueFactory.
class MorselQueueFactory {
public:
@@ -418,6 +396,7 @@ class PhysicalSplitMorselQueue final : public SplitMorselQueue {
std::vector<std::vector<RowsetSharedPtr>> _tablet_rowsets;

bool _has_init_any_segment = false;
bool _is_first_split_of_segment = true;

size_t _rowset_idx = 0;
size_t _segment_idx = 0;
@@ -480,6 +459,7 @@ class LogicalSplitMorselQueue final : public SplitMorselQueue {
std::vector<std::vector<RowsetSharedPtr>> _tablet_rowsets;

bool _has_init_any_tablet = false;
bool _is_first_split_of_tablet = true;

// Used to allocate memory for _tablet_seek_ranges.
MemPool _mempool;
19 changes: 17 additions & 2 deletions be/src/exec/pipeline/scan/olap_chunk_source.cpp
Original file line number Diff line number Diff line change
@@ -81,6 +81,15 @@ Status OlapChunkSource::prepare(RuntimeState* state) {
return Status::OK();
}

TCounterMinMaxType::type OlapChunkSource::_get_counter_min_max_type(const std::string& metric_name) {
const auto& skip_min_max_metrics = _morsel->skip_min_max_metrics();
if (skip_min_max_metrics.find(metric_name) != skip_min_max_metrics.end()) {
return TCounterMinMaxType::SKIP_ALL;
}

return TCounterMinMaxType::MIN_MAX_ALL;
}

void OlapChunkSource::_init_counter(RuntimeState* state) {
_bytes_read_counter = ADD_COUNTER(_runtime_profile, "BytesRead", TUnit::BYTES);
_rows_read_counter = ADD_COUNTER(_runtime_profile, "RowsRead", TUnit::UNIT);
@@ -108,12 +117,17 @@ void OlapChunkSource::_init_counter(RuntimeState* state) {
_bi_filtered_counter = ADD_CHILD_COUNTER(_runtime_profile, "BitmapIndexFilterRows", TUnit::UNIT, segment_init_name);
_bf_filtered_counter = ADD_CHILD_COUNTER(_runtime_profile, "BloomFilterFilterRows", TUnit::UNIT, segment_init_name);
_seg_zm_filtered_counter =
ADD_CHILD_COUNTER(_runtime_profile, "SegmentZoneMapFilterRows", TUnit::UNIT, segment_init_name);
ADD_CHILD_COUNTER_SKIP_MIN_MAX(_runtime_profile, "SegmentZoneMapFilterRows", TUnit::UNIT,
_get_counter_min_max_type("SegmentZoneMapFilterRows"), segment_init_name);
_seg_rt_filtered_counter =
ADD_CHILD_COUNTER(_runtime_profile, "SegmentRuntimeZoneMapFilterRows", TUnit::UNIT, segment_init_name);
_zm_filtered_counter =
ADD_CHILD_COUNTER(_runtime_profile, "ZoneMapIndexFilterRows", TUnit::UNIT, segment_init_name);
_sk_filtered_counter = ADD_CHILD_COUNTER(_runtime_profile, "ShortKeyFilterRows", TUnit::UNIT, segment_init_name);
_sk_filtered_counter =
ADD_CHILD_COUNTER_SKIP_MIN_MAX(_runtime_profile, "ShortKeyFilterRows", TUnit::UNIT,
_get_counter_min_max_type("ShortKeyFilterRows"), segment_init_name);
_rows_after_sk_filtered_counter =
ADD_CHILD_COUNTER(_runtime_profile, "RemainingRowsAfterShortKeyFilter", TUnit::UNIT, segment_init_name);
_column_iterator_init_timer = ADD_CHILD_TIMER(_runtime_profile, "ColumnIteratorInit", segment_init_name);
_bitmap_index_iterator_init_timer = ADD_CHILD_TIMER(_runtime_profile, "BitmapIndexIteratorInit", segment_init_name);
_zone_map_filter_timer = ADD_CHILD_TIMER(_runtime_profile, "ZoneMapIndexFiter", segment_init_name);
@@ -481,6 +495,7 @@ void OlapChunkSource::_update_counter() {
COUNTER_UPDATE(_zm_filtered_counter, _reader->stats().rows_stats_filtered);
COUNTER_UPDATE(_bf_filtered_counter, _reader->stats().rows_bf_filtered);
COUNTER_UPDATE(_sk_filtered_counter, _reader->stats().rows_key_range_filtered);
COUNTER_UPDATE(_rows_after_sk_filtered_counter, _reader->stats().rows_after_key_range);
COUNTER_UPDATE(_rows_key_range_counter, _reader->stats().rows_key_range_num);

COUNTER_UPDATE(_read_pages_num_counter, _reader->stats().total_pages_num);
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/scan/olap_chunk_source.h
Original file line number Diff line number Diff line change
@@ -59,6 +59,7 @@ class OlapChunkSource final : public ChunkSource {
Status _init_scanner_columns(std::vector<uint32_t>& scanner_columns);
Status _init_unused_output_columns(const std::vector<std::string>& unused_output_columns);
Status _init_olap_reader(RuntimeState* state);
TCounterMinMaxType::type _get_counter_min_max_type(const std::string& metric_name);
void _init_counter(RuntimeState* state);
Status _init_global_dicts(TabletReaderParams* params);
Status _read_chunk_from_storage([[maybe_unused]] RuntimeState* state, Chunk* chunk);
@@ -134,6 +135,7 @@ class OlapChunkSource final : public ChunkSource {
RuntimeProfile::Counter* _seg_zm_filtered_counter = nullptr;
RuntimeProfile::Counter* _seg_rt_filtered_counter = nullptr;
RuntimeProfile::Counter* _sk_filtered_counter = nullptr;
RuntimeProfile::Counter* _rows_after_sk_filtered_counter = nullptr;
RuntimeProfile::Counter* _block_seek_timer = nullptr;
RuntimeProfile::Counter* _block_seek_counter = nullptr;
RuntimeProfile::Counter* _block_load_timer = nullptr;
1 change: 1 addition & 0 deletions be/src/storage/olap_common.h
Original file line number Diff line number Diff line change
@@ -249,6 +249,7 @@ struct OlapReaderStatistics {

int64_t segment_stats_filtered = 0;
int64_t rows_key_range_filtered = 0;
int64_t rows_after_key_range = 0;
int64_t rows_key_range_num = 0;
int64_t rows_stats_filtered = 0;
int64_t rows_bf_filtered = 0;
13 changes: 7 additions & 6 deletions be/src/storage/rowset/rowid_range_option.cpp
Original file line number Diff line number Diff line change
@@ -21,30 +21,31 @@

namespace starrocks {

void RowidRangeOption::add(const Rowset* rowset, const Segment* segment, SparseRangePtr rowid_range) {
void RowidRangeOption::add(const Rowset* rowset, const Segment* segment, SparseRangePtr rowid_range,
bool is_first_split_of_segment) {
auto rowset_it = rowid_range_per_segment_per_rowset.find(rowset->rowset_id());
if (rowset_it == rowid_range_per_segment_per_rowset.end()) {
rowset_it = rowid_range_per_segment_per_rowset.emplace(rowset->rowset_id(), SetgmentRowidRangeMap()).first;
}

auto& segment_map = rowset_it->second;
segment_map.emplace(segment->id(), std::move(rowid_range));
segment_map.emplace(segment->id(), SegmentSplit{std::move(rowid_range), is_first_split_of_segment});
}

bool RowidRangeOption::match_rowset(const Rowset* rowset) const {
bool RowidRangeOption::contains_rowset(const Rowset* rowset) const {
return rowid_range_per_segment_per_rowset.find(rowset->rowset_id()) != rowid_range_per_segment_per_rowset.end();
}

SparseRangePtr RowidRangeOption::get_segment_rowid_range(const Rowset* rowset, const Segment* segment) {
RowidRangeOption::SegmentSplit RowidRangeOption::get_segment_rowid_range(const Rowset* rowset, const Segment* segment) {
auto rowset_it = rowid_range_per_segment_per_rowset.find(rowset->rowset_id());
if (rowset_it == rowid_range_per_segment_per_rowset.end()) {
return nullptr;
return {nullptr, false};
}

auto& segment_map = rowset_it->second;
auto segment_it = segment_map.find(segment->id());
if (segment_it == segment_map.end()) {
return nullptr;
return {nullptr, false};
}
return segment_it->second;
}
13 changes: 9 additions & 4 deletions be/src/storage/rowset/rowid_range_option.h
Original file line number Diff line number Diff line change
@@ -27,15 +27,20 @@ class Segment;
// It represents a specific rowid range on the segment with `segment_id` of the rowset with `rowset_id`.
struct RowidRangeOption {
public:
struct SegmentSplit {
SparseRangePtr row_id_range;
bool is_first_split_of_segment;
};

RowidRangeOption() = default;

void add(const Rowset* rowset, const Segment* segment, SparseRangePtr rowid_range);
void add(const Rowset* rowset, const Segment* segment, SparseRangePtr rowid_range, bool is_first_split_of_segment);

bool match_rowset(const Rowset* rowset) const;
SparseRangePtr get_segment_rowid_range(const Rowset* rowset, const Segment* segment);
bool contains_rowset(const Rowset* rowset) const;
SegmentSplit get_segment_rowid_range(const Rowset* rowset, const Segment* segment);

public:
using SetgmentRowidRangeMap = std::unordered_map<uint64_t, SparseRangePtr>;
using SetgmentRowidRangeMap = std::unordered_map<uint64_t, SegmentSplit>;
using RowsetRowidRangeMap = std::map<RowsetId, SetgmentRowidRangeMap>;

RowsetRowidRangeMap rowid_range_per_segment_per_rowset;
16 changes: 13 additions & 3 deletions be/src/storage/rowset/rowset.cpp
Original file line number Diff line number Diff line change
@@ -51,6 +51,7 @@
#include "storage/merge_iterator.h"
#include "storage/projection_iterator.h"
#include "storage/rowset/rowid_range_option.h"
#include "storage/rowset/short_key_range_option.h"
#include "storage/storage_engine.h"
#include "storage/tablet_manager.h"
#include "storage/tablet_meta_manager.h"
@@ -598,7 +599,9 @@ Status Rowset::get_segment_iterators(const Schema& schema, const RowsetReadOptio
seg_options.tablet_id = rowset_meta()->tablet_id();
seg_options.rowsetid = rowset_meta()->rowset_id();
seg_options.dcg_loader = std::make_shared<LocalDeltaColumnGroupLoader>(options.meta);
seg_options.short_key_ranges = options.short_key_ranges;
if (options.short_key_ranges_option != nullptr) {
seg_options.short_key_ranges = options.short_key_ranges_option->short_key_ranges;
}
if (options.runtime_state != nullptr) {
seg_options.is_cancelled = &options.runtime_state->cancelled_ref();
}
@@ -626,10 +629,17 @@ Status Rowset::get_segment_iterators(const Schema& schema, const RowsetReadOptio
}

if (options.rowid_range_option != nullptr) {
seg_options.rowid_range_option = options.rowid_range_option->get_segment_rowid_range(this, seg_ptr.get());
if (seg_options.rowid_range_option == nullptr) {
auto [rowid_range, is_first_split_of_segment] =
options.rowid_range_option->get_segment_rowid_range(this, seg_ptr.get());
if (rowid_range == nullptr) {
continue;
}
seg_options.rowid_range_option = std::move(rowid_range);
seg_options.is_first_split_of_segment = is_first_split_of_segment;
} else if (options.short_key_ranges_option != nullptr) {
seg_options.is_first_split_of_segment = options.short_key_ranges_option->is_first_split_of_tablet;
} else {
seg_options.is_first_split_of_segment = true;
}

auto res = seg_ptr->new_iterator(segment_schema, seg_options);
6 changes: 3 additions & 3 deletions be/src/storage/rowset/rowset_options.h
Original file line number Diff line number Diff line change
@@ -37,11 +37,11 @@ class TabletSchema;
class ColumnPredicate;
class DeletePredicates;
struct RowidRangeOption;
struct ShortKeyRangeOption;
struct ShortKeyRangesOption;

class RowsetReadOptions {
using RowidRangeOptionPtr = std::shared_ptr<RowidRangeOption>;
using ShortKeyRangeOptionPtr = std::shared_ptr<ShortKeyRangeOption>;
using ShortKeyRangesOptionPtr = std::shared_ptr<ShortKeyRangesOption>;
using PredicateList = std::vector<const ColumnPredicate*>;

public:
@@ -74,7 +74,7 @@ class RowsetReadOptions {
const std::unordered_set<uint32_t>* unused_output_column_ids = nullptr;

RowidRangeOptionPtr rowid_range_option = nullptr;
std::vector<ShortKeyRangeOptionPtr> short_key_ranges;
ShortKeyRangesOptionPtr short_key_ranges_option = nullptr;

OlapRuntimeScanRangePruner runtime_range_pruner;

4 changes: 3 additions & 1 deletion be/src/storage/rowset/segment.cpp
Original file line number Diff line number Diff line change
@@ -252,7 +252,9 @@ StatusOr<ChunkIteratorPtr> Segment::_new_iterator(const Schema& schema, const Se
if (!_column_readers.at(column_unique_id)->segment_zone_map_filter(pair.second)) {
// skip segment zonemap filter when this segment has column files link to it.
if (tablet_column.is_key() || _use_segment_zone_map_filter(read_options)) {
read_options.stats->segment_stats_filtered += _column_readers.at(column_unique_id)->num_rows();
if (read_options.is_first_split_of_segment) {
read_options.stats->segment_stats_filtered += _column_readers.at(column_unique_id)->num_rows();
}
return Status::EndOfFile(strings::Substitute("End of file $0, empty iterator", _fname));
} else {
break;
Loading

0 comments on commit 547326e

Please sign in to comment.