Skip to content

Commit

Permalink
[Bug] Fix doris pipeline shared scan and top n opt (#19599)
Browse files Browse the repository at this point in the history
  • Loading branch information
zclllyybb authored May 15, 2023
1 parent 554b891 commit 92bf485
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 5 deletions.
11 changes: 11 additions & 0 deletions be/src/exec/olap_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

#pragma once

#include <fmt/core.h>
#include <gen_cpp/Opcodes_types.h>
#include <glog/logging.h>
#include <math.h>

#include "common/logging.h"
Expand Down Expand Up @@ -49,6 +51,15 @@ struct OlapScanRange {
bool end_include;
OlapTuple begin_scan_range;
OlapTuple end_scan_range;

std::string debug_string() const {
fmt::memory_buffer buf;
DCHECK_EQ(begin_scan_range.size(), end_scan_range.size());
for (int i = 0; i < begin_scan_range.size(); i++) {
fmt::format_to(buf, "({}, {})\n", begin_scan_range[i], end_scan_range[i]);
}
return fmt::to_string(buf);
}
};

static char encoding_table[] = {'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/olap_tuple.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class OlapTuple {
_nulls.clear();
}

std::string operator[](size_t index) const { return _values[index]; }

private:
friend std::ostream& operator<<(std::ostream& os, const OlapTuple& tuple);

Expand Down
13 changes: 13 additions & 0 deletions be/src/vec/exec/scan/new_olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,13 @@ Status NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
std::vector<std::pair<int, int>> rs_reader_seg_offsets;

while (rs_seg_count_index < rs_seg_count.size()) {
// do not generator range of segment (0, 0)
if (rs_seg_count[rs_seg_count_index] == 0) {
rs_seg_start_scan = 0;
rs_seg_count_index++;
continue;
}

auto max_add_seg_nums = rs_seg_count[rs_seg_count_index] - rs_seg_start_scan;
rs_readers.emplace_back(rowset_readers_vector[i][rs_seg_count_index]->clone());

Expand Down Expand Up @@ -572,6 +579,12 @@ Status NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
}
}

#ifndef NDEBUG
for (const auto& offset : rs_reader_seg_offsets) {
DCHECK_NE(offset.first, offset.second);
}
#endif

// dispose some segment tail
if (!rs_readers.empty()) {
build_new_scanner(*scan_range, scanner_ranges, rs_readers, rs_reader_seg_offsets);
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/new_olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int
_version(-1),
_scan_range(scan_range),
_key_ranges(key_ranges) {
DCHECK(rs_readers.size() == rs_reader_seg_offsets.size());
_tablet_reader_params.rs_readers = rs_readers;
_tablet_reader_params.rs_readers_segment_offsets = rs_reader_seg_offsets;
_tablet_schema = std::make_shared<TabletSchema>();
Expand Down
5 changes: 4 additions & 1 deletion be/src/vec/olap/block_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params) {
// check if rowsets are noneoverlapping
_is_rowsets_overlapping = _rowsets_overlapping(read_params.rs_readers);
_vcollect_iter.init(this, _is_rowsets_overlapping, read_params.read_orderby_key,
read_params.read_orderby_key_reverse);
read_params.read_orderby_key_reverse,
read_params.rs_readers_segment_offsets);

_reader_context.push_down_agg_type_opt = read_params.push_down_agg_type_opt;
std::vector<RowsetReaderSharedPtr> valid_rs_readers;
Expand All @@ -108,12 +109,14 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params) {
bool is_empty = read_params.rs_readers_segment_offsets.empty();
for (int i = 0; i < read_params.rs_readers.size(); ++i) {
auto& rs_reader = read_params.rs_readers[i];

// _vcollect_iter.topn_next() will init rs_reader by itself
if (!_vcollect_iter.use_topn_next()) {
RETURN_IF_ERROR(rs_reader->init(
&_reader_context,
is_empty ? std::pair {0, 0} : read_params.rs_readers_segment_offsets[i]));
}

Status res = _vcollect_iter.add_child(rs_reader);
if (!res.ok() && !res.is<END_OF_FILE>()) {
LOG(WARNING) << "failed to add child to iterator, err=" << res;
Expand Down
17 changes: 14 additions & 3 deletions be/src/vec/olap/vcollect_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ VCollectIterator::~VCollectIterator() {
}

void VCollectIterator::init(TabletReader* reader, bool ori_data_overlapping, bool force_merge,
bool is_reverse) {
bool is_reverse,
std::vector<std::pair<int, int>> rs_readers_segment_offsets) {
_reader = reader;

// when aggregate is enabled or key_type is DUP_KEYS, we don't merge
// multiple data to aggregate for better performance
if (_reader->_reader_type == READER_QUERY &&
Expand All @@ -75,19 +77,24 @@ void VCollectIterator::init(TabletReader* reader, bool ori_data_overlapping, boo
_reader->_tablet->enable_unique_key_merge_on_write()))) {
_merge = false;
}

// When data is none overlapping, no need to build heap to traverse data
if (!ori_data_overlapping) {
_merge = false;
} else if (force_merge) {
_merge = true;
}
_is_reverse = is_reverse;

// use topn_next opt only for DUP_KEYS and UNIQUE_KEYS with MOW
if (_reader->_reader_context.read_orderby_key_limit > 0 &&
(_reader->_tablet->keys_type() == KeysType::DUP_KEYS ||
(_reader->_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_reader->_tablet->enable_unique_key_merge_on_write()))) {
_topn_limit = _reader->_reader_context.read_orderby_key_limit;
// When we use scanner pooling + query with topn_with_limit, we need it because we initialize our rs_reader
// in out method but not upstream user. At time we init readers, we will need to use it.
_rs_readers_segment_offsets = rs_readers_segment_offsets;
} else {
_topn_limit = 0;
}
Expand Down Expand Up @@ -277,9 +284,13 @@ Status VCollectIterator::_topn_next(Block* block) {
std::reverse(_rs_readers.begin(), _rs_readers.end());
}

for (auto rs_reader : _rs_readers) {
bool segment_empty = _rs_readers_segment_offsets.empty();
for (size_t i = 0; i < _rs_readers.size(); i++) {
const auto& rs_reader = _rs_readers[i];
// init will prune segment by _reader_context.conditions and _reader_context.runtime_conditions
RETURN_IF_ERROR(rs_reader->init(&_reader->_reader_context));
RETURN_IF_ERROR(
rs_reader->init(&_reader->_reader_context,
segment_empty ? std::pair {0, 0} : _rs_readers_segment_offsets[i]));

// read _topn_limit rows from this rs
size_t read_rows = 0;
Expand Down
5 changes: 4 additions & 1 deletion be/src/vec/olap/vcollect_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class VCollectIterator {
// Hold reader point to get reader params
~VCollectIterator();

void init(TabletReader* reader, bool ori_data_overlapping, bool force_merge, bool is_reverse);
void init(TabletReader* reader, bool ori_data_overlapping, bool force_merge, bool is_reverse,
std::vector<std::pair<int, int>> rs_readers_segment_offsets);

Status add_child(RowsetReaderSharedPtr rs_reader);

Expand Down Expand Up @@ -329,7 +330,9 @@ class VCollectIterator {
// for topn next
size_t _topn_limit = 0;
bool _topn_eof = false;
// when we use scanner pooling + query with topn_with_limit, we use it.
std::vector<RowsetReaderSharedPtr> _rs_readers;
std::vector<std::pair<int, int>> _rs_readers_segment_offsets;

// Hold reader point to access read params, such as fetch conditions.
TabletReader* _reader = nullptr;
Expand Down
13 changes: 13 additions & 0 deletions regression-test/data/correctness_p0/test_scan_topn_limit.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select --
29 71
28 72
27 73
26 74
25 75
24 76
23 77
22 78
21 79
20 80

40 changes: 40 additions & 0 deletions regression-test/suites/correctness_p0/test_scan_topn_limit.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_scan_topn_limit") {
def tableName = "test_scan_topn_limit"

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
k1 int,
k2 int
)
DUPLICATE KEY(k1, k2)
DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
)
"""

for (i in range(1, 30)) {
sql """ INSERT INTO ${tableName} values (${i}, 100-${i}) """
}
qt_select """ SELECT /*+ SET_VAR(experimental_enable_pipeline_engine = true,
parallel_fragment_exec_instance_num = 4) */
k1, k2 FROM ${tableName} ORDER BY k1 DESC, k2 LIMIT 10 """
}

0 comments on commit 92bf485

Please sign in to comment.