Skip to content

Commit

Permalink
[Improvement](topn) runtime prune for topn query (apache#15558)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaokang authored Jan 5, 2023
1 parent d36b937 commit 9d1f02c
Show file tree
Hide file tree
Showing 23 changed files with 538 additions and 2 deletions.
5 changes: 5 additions & 0 deletions be/src/olap/iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "olap/column_predicate.h"
#include "olap/olap_common.h"
#include "olap/tablet_schema.h"
#include "runtime/runtime_state.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"

Expand Down Expand Up @@ -94,12 +95,16 @@ class StorageReadOptions {

TabletSchemaSPtr tablet_schema = nullptr;
bool record_rowids = false;
// flag for enable topn opt
bool use_topn_opt = false;
// used for special optimization for query : ORDER BY key DESC LIMIT n
bool read_orderby_key_reverse = false;
// columns for orderby keys
std::vector<uint32_t>* read_orderby_key_columns = nullptr;
IOContext io_ctx;
vectorized::VExpr* remaining_vconjunct_root = nullptr;
// runtime state
RuntimeState* runtime_state = nullptr;
};

class RowwiseIterator {
Expand Down
7 changes: 7 additions & 0 deletions be/src/olap/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params,
_reader_context.version = read_params.version;
_reader_context.tablet_schema = _tablet_schema;
_reader_context.need_ordered_result = need_ordered_result;
_reader_context.use_topn_opt = read_params.use_topn_opt;
_reader_context.read_orderby_key_reverse = read_params.read_orderby_key_reverse;
_reader_context.return_columns = &_return_columns;
_reader_context.read_orderby_key_columns =
Expand Down Expand Up @@ -521,6 +522,12 @@ void TabletReader::_init_conditions_param_except_leafnode_of_andnode(
_col_preds_except_leafnode_of_andnode.push_back(predicate);
}
}

if (read_params.use_topn_opt) {
auto& runtime_predicate =
read_params.runtime_state->get_query_fragments_ctx()->get_runtime_predicate();
runtime_predicate.set_tablet_schema(_tablet_schema);
}
}

ColumnPredicate* TabletReader::_parse_to_predicate(
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ class TabletReader {

// used for compaction to record row ids
bool record_rowids = false;
// flag for enable topn opt
bool use_topn_opt = false;
// used for special optimization for query : ORDER BY key LIMIT n
bool read_orderby_key = false;
// used for special optimization for query : ORDER BY key DESC LIMIT n
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,11 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
_read_options.use_page_cache = read_context->use_page_cache;
_read_options.tablet_schema = read_context->tablet_schema;
_read_options.record_rowids = read_context->record_rowids;
_read_options.use_topn_opt = read_context->use_topn_opt;
_read_options.read_orderby_key_reverse = read_context->read_orderby_key_reverse;
_read_options.read_orderby_key_columns = read_context->read_orderby_key_columns;
_read_options.io_ctx.reader_type = read_context->reader_type;
_read_options.runtime_state = read_context->runtime_state;

// load segments
RETURN_NOT_OK(SegmentLoader::instance()->load_segments(
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/rowset_reader_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ struct RowsetReaderContext {
ReaderType reader_type = READER_QUERY;
Version version {-1, -1};
TabletSchemaSPtr tablet_schema = nullptr;
// flag for enable topn opt
bool use_topn_opt = false;
// whether rowset should return ordered rows.
bool need_ordered_result = true;
// used for special optimization for query : ORDER BY key DESC LIMIT n
Expand Down
18 changes: 18 additions & 0 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,24 @@ Status Segment::new_iterator(const Schema& schema, const StorageReadOptions& rea
}
}

if (read_options.use_topn_opt) {
auto query_ctx = read_options.runtime_state->get_query_fragments_ctx();
auto runtime_predicate = query_ctx->get_runtime_predicate().get_predictate();
if (runtime_predicate) {
int32_t uid =
read_options.tablet_schema->column(runtime_predicate->column_id()).unique_id();
AndBlockColumnPredicate and_predicate;
auto single_predicate = new SingleColumnBlockPredicate(runtime_predicate.get());
and_predicate.add_column_predicate(single_predicate);
if (!_column_readers.at(uid)->match_condition(&and_predicate)) {
// any condition not satisfied, return.
iter->reset(new EmptySegmentIterator(schema));
read_options.stats->filtered_segment_number++;
return Status::OK();
}
}
}

RETURN_IF_ERROR(load_index());
if (read_options.col_id_to_del_predicates.empty() &&
read_options.push_down_agg_type_opt != TPushAggOp::NONE) {
Expand Down
38 changes: 37 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,14 @@ Status SegmentIterator::_get_row_ranges_by_column_conditions() {
RETURN_IF_ERROR(_apply_bitmap_index());
RETURN_IF_ERROR(_apply_inverted_index());

std::shared_ptr<doris::ColumnPredicate> runtime_predicate = nullptr;
if (_opts.use_topn_opt) {
auto query_ctx = _opts.runtime_state->get_query_fragments_ctx();
runtime_predicate = query_ctx->get_runtime_predicate().get_predictate();
}

if (!_row_bitmap.isEmpty() &&
(!_opts.col_id_to_predicates.empty() ||
(runtime_predicate || !_opts.col_id_to_predicates.empty() ||
_opts.delete_condition_predicates->num_of_column_predicate() > 0)) {
RowRanges condition_row_ranges = RowRanges::create_single(_segment->num_rows());
RETURN_IF_ERROR(_get_row_ranges_from_conditions(&condition_row_ranges));
Expand Down Expand Up @@ -372,6 +378,26 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row
&zone_map_row_ranges);
}

std::shared_ptr<doris::ColumnPredicate> runtime_predicate = nullptr;
if (_opts.use_topn_opt) {
auto query_ctx = _opts.runtime_state->get_query_fragments_ctx();
runtime_predicate = query_ctx->get_runtime_predicate().get_predictate();
if (runtime_predicate) {
int32_t cid = _opts.tablet_schema->column(runtime_predicate->column_id()).unique_id();
AndBlockColumnPredicate and_predicate;
auto single_predicate = new SingleColumnBlockPredicate(runtime_predicate.get());
and_predicate.add_column_predicate(single_predicate);

RowRanges column_rp_row_ranges = RowRanges::create_single(num_rows());
RETURN_IF_ERROR(_column_iterators[_schema.unique_id(cid)]->get_row_ranges_by_zone_map(
&and_predicate, nullptr, &column_rp_row_ranges));

// intersect different columns's row ranges to get final row ranges by zone map
RowRanges::ranges_intersection(zone_map_row_ranges, column_rp_row_ranges,
&zone_map_row_ranges);
}
}

pre_size = condition_row_ranges->count();
RowRanges::ranges_intersection(*condition_row_ranges, zone_map_row_ranges,
condition_row_ranges);
Expand Down Expand Up @@ -910,6 +936,16 @@ void SegmentIterator::_vec_init_lazy_materialization() {
}
}

// add runtime predicate to _col_predicates
if (_opts.use_topn_opt) {
auto& runtime_predicate =
_opts.runtime_state->get_query_fragments_ctx()->get_runtime_predicate();
_runtime_predicate = runtime_predicate.get_predictate();
if (_runtime_predicate) {
_col_predicates.push_back(_runtime_predicate.get());
}
}

if (!_col_predicates.empty() || !del_cond_id_set.empty()) {
std::set<ColumnId> short_cir_pred_col_id_set; // using set for distinct cid
std::set<ColumnId> vec_pred_col_id_set;
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ class SegmentIterator : public RowwiseIterator {
std::vector<roaring::Roaring> _pred_except_leafnode_of_andnode_evaluate_result;
std::unique_ptr<ColumnPredicateInfo> _column_predicate_info;

std::shared_ptr<ColumnPredicate> _runtime_predicate {nullptr};

// row schema of the key to seek
// only used in `_get_row_ranges_by_keys`
std::unique_ptr<Schema> _seek_schema;
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ set(RUNTIME_FILES
row_batch.cpp
runtime_state.cpp
runtime_filter_mgr.cpp
runtime_predicate.cpp
string_value.cpp
jsonb_value.cpp
thread_context.cpp
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/query_fragments_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "runtime/datetime_value.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/runtime_predicate.h"
#include "util/pretty_printer.h"
#include "util/threadpool.h"
#include "vec/runtime/shared_hash_table_controller.h"
Expand Down Expand Up @@ -118,6 +119,8 @@ class QueryFragmentsCtx {
return _shared_hash_table_controller;
}

vectorized::RuntimePredicate& get_runtime_predicate() { return _runtime_predicate; }

public:
TUniqueId query_id;
DescriptorTbl* desc_tbl;
Expand Down Expand Up @@ -161,6 +164,8 @@ class QueryFragmentsCtx {
std::atomic<bool> _is_cancelled {false};

std::shared_ptr<vectorized::SharedHashTableController> _shared_hash_table_controller;

vectorized::RuntimePredicate _runtime_predicate;
};

} // namespace doris
153 changes: 153 additions & 0 deletions be/src/runtime/runtime_predicate.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "runtime/runtime_predicate.h"

#include "olap/predicate_creator.h"

namespace doris {

namespace vectorized {

Status RuntimePredicate::init(const PrimitiveType type) {
std::unique_lock<std::shared_mutex> wlock(_rwlock);

if (_inited) {
return Status::OK();
}

_predicate_mem_pool.reset(new MemPool());

// set get value function
switch (type) {
case PrimitiveType::TYPE_BOOLEAN: {
_get_value_fn = get_bool_value;
break;
}
case PrimitiveType::TYPE_TINYINT: {
_get_value_fn = get_tinyint_value;
break;
}
case PrimitiveType::TYPE_SMALLINT: {
_get_value_fn = get_smallint_value;
break;
}
case PrimitiveType::TYPE_INT: {
_get_value_fn = get_int_value;
break;
}
case PrimitiveType::TYPE_BIGINT: {
_get_value_fn = get_bigint_value;
break;
}
case PrimitiveType::TYPE_LARGEINT: {
_get_value_fn = get_largeint_value;
break;
}
case PrimitiveType::TYPE_FLOAT: {
_get_value_fn = get_float_value;
break;
}
case PrimitiveType::TYPE_DOUBLE: {
_get_value_fn = get_double_value;
break;
}
case PrimitiveType::TYPE_STRING: {
_get_value_fn = get_string_value;
break;
}
case PrimitiveType::TYPE_DATEV2: {
_get_value_fn = get_datev2_value;
break;
}
case PrimitiveType::TYPE_DATETIMEV2: {
_get_value_fn = get_datetimev2_value;
break;
}
case PrimitiveType::TYPE_DATE: {
_get_value_fn = get_date_value;
break;
}
case PrimitiveType::TYPE_DATETIME: {
_get_value_fn = get_datetime_value;
break;
}
case PrimitiveType::TYPE_DECIMAL32: {
_get_value_fn = get_decimal32_value;
break;
}
case PrimitiveType::TYPE_DECIMAL64: {
_get_value_fn = get_decimal64_value;
break;
}
case PrimitiveType::TYPE_DECIMALV2: {
_get_value_fn = get_decimalv2_value;
break;
}
case PrimitiveType::TYPE_DECIMAL128I: {
_get_value_fn = get_decimal128_value;
break;
}
default:
return Status::InvalidArgument("unsupported runtime predicate type {}", type);
}

_inited = true;
return Status::OK();
}

Status RuntimePredicate::update(const Field& value, const String& col_name, bool is_reverse) {
std::unique_lock<std::shared_mutex> wlock(_rwlock);

bool updated = false;

if (UNLIKELY(_orderby_extrem.is_null())) {
_orderby_extrem = value;
updated = true;
} else if (is_reverse) {
if (value > _orderby_extrem) {
_orderby_extrem = value;
updated = true;
}
} else {
if (value < _orderby_extrem) {
_orderby_extrem = value;
updated = true;
}
}

if (!updated) {
return Status::OK();
}

TCondition condition;
condition.__set_column_name(col_name);
condition.__set_column_unique_id(_tablet_schema->column(col_name).unique_id());
condition.__set_condition_op(is_reverse ? ">=" : "<=");

// get value string from _orderby_extrem and push back to condition_values
condition.condition_values.push_back(_get_value_fn(_orderby_extrem));

// update _predictate
_predictate.reset(
parse_to_predicate(_tablet_schema, condition, _predicate_mem_pool.get(), false));

return Status::OK();
}

} // namespace vectorized
} // namespace doris
Loading

0 comments on commit 9d1f02c

Please sign in to comment.