From 92bf485abd4e171e4795339dba1fb55aaca9c55c Mon Sep 17 00:00:00 2001 From: zclllyybb Date: Mon, 15 May 2023 10:00:44 +0800 Subject: [PATCH] [Bug] Fix doris pipeline shared scan and top n opt (#19599) --- be/src/exec/olap_utils.h | 11 +++++ be/src/olap/olap_tuple.h | 2 + be/src/vec/exec/scan/new_olap_scan_node.cpp | 13 ++++++ be/src/vec/exec/scan/new_olap_scanner.cpp | 1 + be/src/vec/olap/block_reader.cpp | 5 ++- be/src/vec/olap/vcollect_iterator.cpp | 17 ++++++-- be/src/vec/olap/vcollect_iterator.h | 5 ++- .../correctness_p0/test_scan_topn_limit.out | 13 ++++++ .../test_scan_topn_limit.groovy | 40 +++++++++++++++++++ 9 files changed, 102 insertions(+), 5 deletions(-) create mode 100644 regression-test/data/correctness_p0/test_scan_topn_limit.out create mode 100644 regression-test/suites/correctness_p0/test_scan_topn_limit.groovy diff --git a/be/src/exec/olap_utils.h b/be/src/exec/olap_utils.h index 153d4f7963c787..1d6bdf959302b0 100644 --- a/be/src/exec/olap_utils.h +++ b/be/src/exec/olap_utils.h @@ -17,7 +17,9 @@ #pragma once +#include #include +#include #include #include "common/logging.h" @@ -49,6 +51,15 @@ struct OlapScanRange { bool end_include; OlapTuple begin_scan_range; OlapTuple end_scan_range; + + std::string debug_string() const { + fmt::memory_buffer buf; + DCHECK_EQ(begin_scan_range.size(), end_scan_range.size()); + for (int i = 0; i < begin_scan_range.size(); i++) { + fmt::format_to(buf, "({}, {})\n", begin_scan_range[i], end_scan_range[i]); + } + return fmt::to_string(buf); + } }; static char encoding_table[] = {'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', diff --git a/be/src/olap/olap_tuple.h b/be/src/olap/olap_tuple.h index 69978329fb9bd2..9a3ed19a3aef73 100644 --- a/be/src/olap/olap_tuple.h +++ b/be/src/olap/olap_tuple.h @@ -59,6 +59,8 @@ class OlapTuple { _nulls.clear(); } + std::string operator[](size_t index) const { return _values[index]; } + private: friend std::ostream& operator<<(std::ostream& os, const OlapTuple& tuple); diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 79d43984612cc7..667213e625ba93 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -536,6 +536,13 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { std::vector> rs_reader_seg_offsets; while (rs_seg_count_index < rs_seg_count.size()) { + // do not generator range of segment (0, 0) + if (rs_seg_count[rs_seg_count_index] == 0) { + rs_seg_start_scan = 0; + rs_seg_count_index++; + continue; + } + auto max_add_seg_nums = rs_seg_count[rs_seg_count_index] - rs_seg_start_scan; rs_readers.emplace_back(rowset_readers_vector[i][rs_seg_count_index]->clone()); @@ -572,6 +579,12 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { } } +#ifndef NDEBUG + for (const auto& offset : rs_reader_seg_offsets) { + DCHECK_NE(offset.first, offset.second); + } +#endif + // dispose some segment tail if (!rs_readers.empty()) { build_new_scanner(*scan_range, scanner_ranges, rs_readers, rs_reader_seg_offsets); diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index e87f851126fb98..f8e910dabac9e8 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -67,6 +67,7 @@ NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int _version(-1), _scan_range(scan_range), _key_ranges(key_ranges) { + DCHECK(rs_readers.size() == rs_reader_seg_offsets.size()); _tablet_reader_params.rs_readers = rs_readers; _tablet_reader_params.rs_readers_segment_offsets = rs_reader_seg_offsets; _tablet_schema = std::make_shared(); diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index c7f4ebbc25094b..c7a9987121e51c 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -98,7 +98,8 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params) { // check if rowsets are noneoverlapping _is_rowsets_overlapping = _rowsets_overlapping(read_params.rs_readers); _vcollect_iter.init(this, _is_rowsets_overlapping, read_params.read_orderby_key, - read_params.read_orderby_key_reverse); + read_params.read_orderby_key_reverse, + read_params.rs_readers_segment_offsets); _reader_context.push_down_agg_type_opt = read_params.push_down_agg_type_opt; std::vector valid_rs_readers; @@ -108,12 +109,14 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params) { bool is_empty = read_params.rs_readers_segment_offsets.empty(); for (int i = 0; i < read_params.rs_readers.size(); ++i) { auto& rs_reader = read_params.rs_readers[i]; + // _vcollect_iter.topn_next() will init rs_reader by itself if (!_vcollect_iter.use_topn_next()) { RETURN_IF_ERROR(rs_reader->init( &_reader_context, is_empty ? std::pair {0, 0} : read_params.rs_readers_segment_offsets[i])); } + Status res = _vcollect_iter.add_child(rs_reader); if (!res.ok() && !res.is()) { LOG(WARNING) << "failed to add child to iterator, err=" << res; diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp index 2af1a9ac0d1a69..5f59485291ca57 100644 --- a/be/src/vec/olap/vcollect_iterator.cpp +++ b/be/src/vec/olap/vcollect_iterator.cpp @@ -65,8 +65,10 @@ VCollectIterator::~VCollectIterator() { } void VCollectIterator::init(TabletReader* reader, bool ori_data_overlapping, bool force_merge, - bool is_reverse) { + bool is_reverse, + std::vector> rs_readers_segment_offsets) { _reader = reader; + // when aggregate is enabled or key_type is DUP_KEYS, we don't merge // multiple data to aggregate for better performance if (_reader->_reader_type == READER_QUERY && @@ -75,6 +77,7 @@ void VCollectIterator::init(TabletReader* reader, bool ori_data_overlapping, boo _reader->_tablet->enable_unique_key_merge_on_write()))) { _merge = false; } + // When data is none overlapping, no need to build heap to traverse data if (!ori_data_overlapping) { _merge = false; @@ -82,12 +85,16 @@ void VCollectIterator::init(TabletReader* reader, bool ori_data_overlapping, boo _merge = true; } _is_reverse = is_reverse; + // use topn_next opt only for DUP_KEYS and UNIQUE_KEYS with MOW if (_reader->_reader_context.read_orderby_key_limit > 0 && (_reader->_tablet->keys_type() == KeysType::DUP_KEYS || (_reader->_tablet->keys_type() == KeysType::UNIQUE_KEYS && _reader->_tablet->enable_unique_key_merge_on_write()))) { _topn_limit = _reader->_reader_context.read_orderby_key_limit; + // When we use scanner pooling + query with topn_with_limit, we need it because we initialize our rs_reader + // in out method but not upstream user. At time we init readers, we will need to use it. + _rs_readers_segment_offsets = rs_readers_segment_offsets; } else { _topn_limit = 0; } @@ -277,9 +284,13 @@ Status VCollectIterator::_topn_next(Block* block) { std::reverse(_rs_readers.begin(), _rs_readers.end()); } - for (auto rs_reader : _rs_readers) { + bool segment_empty = _rs_readers_segment_offsets.empty(); + for (size_t i = 0; i < _rs_readers.size(); i++) { + const auto& rs_reader = _rs_readers[i]; // init will prune segment by _reader_context.conditions and _reader_context.runtime_conditions - RETURN_IF_ERROR(rs_reader->init(&_reader->_reader_context)); + RETURN_IF_ERROR( + rs_reader->init(&_reader->_reader_context, + segment_empty ? std::pair {0, 0} : _rs_readers_segment_offsets[i])); // read _topn_limit rows from this rs size_t read_rows = 0; diff --git a/be/src/vec/olap/vcollect_iterator.h b/be/src/vec/olap/vcollect_iterator.h index 863173c93260fb..e8162c26ab6b70 100644 --- a/be/src/vec/olap/vcollect_iterator.h +++ b/be/src/vec/olap/vcollect_iterator.h @@ -53,7 +53,8 @@ class VCollectIterator { // Hold reader point to get reader params ~VCollectIterator(); - void init(TabletReader* reader, bool ori_data_overlapping, bool force_merge, bool is_reverse); + void init(TabletReader* reader, bool ori_data_overlapping, bool force_merge, bool is_reverse, + std::vector> rs_readers_segment_offsets); Status add_child(RowsetReaderSharedPtr rs_reader); @@ -329,7 +330,9 @@ class VCollectIterator { // for topn next size_t _topn_limit = 0; bool _topn_eof = false; + // when we use scanner pooling + query with topn_with_limit, we use it. std::vector _rs_readers; + std::vector> _rs_readers_segment_offsets; // Hold reader point to access read params, such as fetch conditions. TabletReader* _reader = nullptr; diff --git a/regression-test/data/correctness_p0/test_scan_topn_limit.out b/regression-test/data/correctness_p0/test_scan_topn_limit.out new file mode 100644 index 00000000000000..908d00b9a5d575 --- /dev/null +++ b/regression-test/data/correctness_p0/test_scan_topn_limit.out @@ -0,0 +1,13 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select -- +29 71 +28 72 +27 73 +26 74 +25 75 +24 76 +23 77 +22 78 +21 79 +20 80 + diff --git a/regression-test/suites/correctness_p0/test_scan_topn_limit.groovy b/regression-test/suites/correctness_p0/test_scan_topn_limit.groovy new file mode 100644 index 00000000000000..cc819d20667b76 --- /dev/null +++ b/regression-test/suites/correctness_p0/test_scan_topn_limit.groovy @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_scan_topn_limit") { + def tableName = "test_scan_topn_limit" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + k1 int, + k2 int + ) + DUPLICATE KEY(k1, k2) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + for (i in range(1, 30)) { + sql """ INSERT INTO ${tableName} values (${i}, 100-${i}) """ + } + qt_select """ SELECT /*+ SET_VAR(experimental_enable_pipeline_engine = true, + parallel_fragment_exec_instance_num = 4) */ + k1, k2 FROM ${tableName} ORDER BY k1 DESC, k2 LIMIT 10 """ +}