Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Rank topn prune redundant rows #8108

Merged
merged 6 commits into from
Jul 4, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions be/src/exec/vectorized/chunks_sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void DataSegment::init(const std::vector<ExprContext*>* sort_exprs, const ChunkP
Status DataSegment::get_filter_array(std::vector<DataSegment>& data_segments, size_t rows_to_sort,
std::vector<std::vector<uint8_t>>& filter_array,
const std::vector<int>& sort_order_flags, const std::vector<int>& null_first_flags,
uint32_t& least_num, uint32_t& middle_num) {
uint32_t& before_num, uint32_t& in_num) {
size_t dats_segment_size = data_segments.size();
std::vector<CompareVector> compare_results_array(dats_segment_size);

Expand All @@ -64,7 +64,7 @@ Status DataSegment::get_filter_array(std::vector<DataSegment>& data_segments, si
// compare with first row of this DataSegment,
// then we set BEFORE_LAST_RESULT and IN_LAST_RESULT at filter_array.
if (rows_to_sort == 1) {
least_num = 0, middle_num = 0;
before_num = 0, in_num = 0;
filter_array.resize(dats_segment_size);
for (size_t i = 0; i < dats_segment_size; ++i) {
size_t rows = data_segments[i].chunk->num_rows();
Expand All @@ -73,34 +73,34 @@ Status DataSegment::get_filter_array(std::vector<DataSegment>& data_segments, si
for (size_t j = 0; j < rows; ++j) {
if (compare_results_array[i][j] < 0) {
filter_array[i][j] = DataSegment::BEFORE_LAST_RESULT;
++least_num;
++before_num;
} else {
filter_array[i][j] = DataSegment::IN_LAST_RESULT;
++middle_num;
++in_num;
}
}
}
} else {
std::vector<size_t> first_size_array;
first_size_array.resize(dats_segment_size);

middle_num = 0;
in_num = 0;
filter_array.resize(dats_segment_size);
for (size_t i = 0; i < dats_segment_size; ++i) {
DataSegment& segment = data_segments[i];
size_t rows = segment.chunk->num_rows();
filter_array[i].resize(rows);

size_t local_first_size = middle_num;
size_t local_first_size = in_num;
for (size_t j = 0; j < rows; ++j) {
if (compare_results_array[i][j] <= 0) {
filter_array[i][j] = DataSegment::IN_LAST_RESULT;
++middle_num;
++in_num;
}
}

// obtain number of rows for second compare.
first_size_array[i] = middle_num - local_first_size;
first_size_array[i] = in_num - local_first_size;
}

// second compare with first row of this chunk, use rows from first compare.
Expand All @@ -116,19 +116,19 @@ Status DataSegment::get_filter_array(std::vector<DataSegment>& data_segments, si
null_first_flags);
}

least_num = 0;
before_num = 0;
for (size_t i = 0; i < dats_segment_size; ++i) {
DataSegment& segment = data_segments[i];
size_t rows = segment.chunk->num_rows();

for (size_t j = 0; j < rows; ++j) {
if (compare_results_array[i][j] < 0) {
filter_array[i][j] = DataSegment::BEFORE_LAST_RESULT;
++least_num;
++before_num;
}
}
}
middle_num -= least_num;
in_num -= before_num;
}

return Status::OK();
Expand Down
95 changes: 67 additions & 28 deletions be/src/exec/vectorized/chunks_sorter_topn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ Status ChunksSorterTopn::done(RuntimeState* state) {
RETURN_IF_ERROR(_sort_chunks(state));
}

_rank_pruning();

// skip top OFFSET rows
if (_offset > 0) {
if (_offset > _merged_segment.chunk->num_rows()) {
Expand Down Expand Up @@ -213,9 +215,9 @@ void ChunksSorterTopn::_set_permutation_complete(std::pair<Permutation, Permutat
}

// In general, we take the first and last row from _merged_segment:
// step 1: use last row to filter chunk_size rows in segments as two parts(rows < lastRow and rows >= lastRow),
// step 2: use first row to filter all rows < lastRow, result in two parts, the BEFORE is (rows < firstRow), the IN is (rows >= firstRow and rwos < lastRows),
// step 3: set this result in filter_array, BEOFRE(filter_array's value is 2), IN(filter_array's value is 1), others is give up.
// step 1: use last row to filter chunk_size rows in segments as two parts(rows <= lastRow and rows > lastRow),
// step 2: use first row to filter all rows < lastRow, result in two parts, the BEFORE is (rows < firstRow), the IN is (firstRow <= rwos <= lastRows),
// step 3: set this result in filter_array, BEOFRE(filter_array's value is 2), IN(filter_array's value is 1), others(rows > lastRow) is dropped.
// all this is done in get_filter_array.
//
// and maybe _merged_segment'size is not enough as < rows_to_sort,
Expand All @@ -234,7 +236,7 @@ Status ChunksSorterTopn::_filter_and_sort_data(RuntimeState* state, std::pair<Pe

if (_init_merged_segment) {
std::vector<std::vector<uint8_t>> filter_array;
uint32_t least_num, middle_num;
uint32_t before_num, in_num;

// Here are 2 cases:
// case 1: _merged_segment.chunk->num_rows() >= rows_to_sort, which means we already have enough rows,
Expand All @@ -245,28 +247,28 @@ Status ChunksSorterTopn::_filter_and_sort_data(RuntimeState* state, std::pair<Pe

if (_merged_segment.chunk->num_rows() >= rows_to_sort) {
RETURN_IF_ERROR(_merged_segment.get_filter_array(segments, rows_to_sort, filter_array, _sort_order_flag,
_null_first_flag, least_num, middle_num));
_null_first_flag, before_num, in_num));
} else {
RETURN_IF_ERROR(_merged_segment.get_filter_array(segments, 1, filter_array, _sort_order_flag,
_null_first_flag, least_num, middle_num));
_null_first_flag, before_num, in_num));
}

timer.stop();
{
ScopedTimer<MonotonicStopWatch> timer(_build_timer);
permutations.first.resize(least_num);
// BEFORE's size is enough, so we ignore IN.
if (least_num >= rows_to_sort) {
permutations.first.resize(before_num);
// BEFORE_LAST_RESULT's size is enough, so we ignore IN_LAST_RESULT.
if (before_num >= rows_to_sort) {
// use filter_array to set permutations.first.
_set_permutation_before(permutations.first, segments.size(), filter_array);
} else if (rows_to_sort > 1 || _topn_type == TTopNType::RANK) {
// If rows_to_sort == 1, here are two cases:
// case 1: _topn_type is TTopNType::ROW_NUMBER, first row and last row is the same identity. so we do nothing.
// case 2: _topn_type is TTopNType::RANK, we need to contain all the rows equal with the last row of merged segment,
// and these equals row maybe exist in the second part
// and these equals row maybe exist in the IN_LAST_RESULT part

// BEFORE's size < rows_to_sort, we need set permutations.first and permutations.second.
permutations.second.resize(middle_num);
permutations.second.resize(in_num);

// use filter_array to set permutations.first and permutations.second.
_set_permutation_complete(permutations, segments.size(), filter_array);
Expand All @@ -275,12 +277,13 @@ Status ChunksSorterTopn::_filter_and_sort_data(RuntimeState* state, std::pair<Pe
timer.start();
}

return _partial_sort_col_wise(state, permutations, segments, chunk_size, rows_to_sort);
return _partial_sort_col_wise(state, permutations, segments, chunk_size);
}

Status ChunksSorterTopn::_partial_sort_col_wise(RuntimeState* state, std::pair<Permutation, Permutation>& permutations,
DataSegments& segments, const size_t chunk_size,
const size_t rows_to_sort) {
DataSegments& segments, const size_t chunk_size) {
const size_t rows_to_sort = _get_number_of_rows_to_sort();

std::vector<Columns> vertical_chunks;
for (auto& segment : segments) {
vertical_chunks.push_back(segment.order_by_columns);
Expand All @@ -292,20 +295,16 @@ Status ChunksSorterTopn::_partial_sort_col_wise(RuntimeState* state, std::pair<P

size_t first_size = std::min(permutations.first.size(), rows_to_sort);

// Sort the first, then the second
// Sort the first
if (first_size > 0) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(do_sort(permutations.first, first_size));
}

// Sort the second
if (rows_to_sort > first_size) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(do_sort(permutations.second, rows_to_sort - first_size));
} else if (_topn_type == TTopNType::RANK) {
// if _topn_type is TTopNType::RANK, we need to contain all the rows equal with the last row of merged segment,
// and these equals row maybe exist in the second part, we can fetch these part by set rank limit number to 1
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(do_sort(permutations.second, 1));
}

return Status::OK();
Expand Down Expand Up @@ -392,27 +391,32 @@ Status ChunksSorterTopn::_hybrid_sort_common(RuntimeState* state, std::pair<Perm
chunks.push_back(segment.chunk);
}

// There are three parts of data
// part1: _merged_segment, the sorted one
// part2: BEFORE_LAST_RESULT, ∀ e ∈ part2, e < smallest(part1)
// part3: IN_LAST_RESULT, ∀ e ∈ part3, smallest(part1) <= e <= largest(part1)

// First, we find elements from part1
if (first_size > 0) {
big_chunk.reset(segments[new_permutation.first[0].chunk_index].chunk->clone_empty(first_size).release());
append_by_permutation(big_chunk.get(), chunks, new_permutation.first);
rows_to_keep -= first_size;
}

// If rows_to_keep > 0, then second_size > 0 is always positive
// If rows_to_keep == 0 and sort type is rank, then we will process second part only if it's not empty
if (rows_to_keep > 0 || (_topn_type == TTopNType::RANK && second_size > 0)) {
// In case of rows_to_keep == 0, through _merged_segment.get_filter_array,
// all the rows that equal to the last value of merged segment are stored in the IN_LAST_RESULT,
// so we also need to consider this part in type rank
// Seoncd, there are two cases:
// case1: rows_to_keep == 0, which measn part1 itself suffice, we don't need to check part2 and part3
// case2: rows_to_keep > 0, which means part1 itself not suffice, we need to get more elements from part2 and part3,
// so we need to first merge part2 and part3 and then get elements
if (rows_to_keep > 0) {
if (big_chunk == nullptr) {
big_chunk.reset(segments[new_permutation.second[0].chunk_index].chunk->clone_empty(rows_to_keep).release());
}
size_t sorted_size = _merged_segment.chunk->num_rows();
const size_t sorted_size = _merged_segment.chunk->num_rows();
rows_to_keep = std::min(rows_to_keep, sorted_size + second_size);
if (_topn_type == TTopNType::RANK && sorted_size + second_size > rows_to_keep) {
// For rank type, there may exist a wide equal range, so we need to keep all elements of part2 and part3
rows_to_keep = sorted_size + second_size;
}

RETURN_IF_ERROR(_merge_sort_common(big_chunk, segments, rows_to_keep, sorted_size, new_permutation.second));
}
RETURN_IF_ERROR(big_chunk->upgrade_if_overflow());
Expand Down Expand Up @@ -460,4 +464,39 @@ Status ChunksSorterTopn::_hybrid_sort_first_time(RuntimeState* state, Permutatio
return Status::OK();
}

void ChunksSorterTopn::_rank_pruning() {
if (_topn_type != TTopNType::RANK) {
return;
}
if (!_init_merged_segment) {
return;
}
if (_merged_segment.chunk->num_rows() <= _get_number_of_rows_to_sort()) {
return;
}
DCHECK(!_merged_segment.order_by_columns.empty());

const auto size = _merged_segment.chunk->num_rows();
const auto peer_group_start = _get_number_of_rows_to_sort() - 1;
size_t peer_group_end = size;
bool found = false;

for (int i = peer_group_start + 1; !found && i < size; ++i) {
for (auto& column : _merged_segment.order_by_columns) {
if (column->compare_at(i, i - 1, *column, 1) != 0) {
peer_group_end = i;
found = true;
break;
}
}
}

if (found) {
_merged_segment.chunk->set_num_rows(peer_group_end);
for (auto& column : _merged_segment.order_by_columns) {
column->resize(peer_group_end);
}
}
}

} // namespace starrocks::vectorized
8 changes: 7 additions & 1 deletion be/src/exec/vectorized/chunks_sorter_topn.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,13 @@ class ChunksSorterTopn : public ChunksSorter {
DataSegments& segments);

Status _partial_sort_col_wise(RuntimeState* state, std::pair<Permutation, Permutation>& permutations,
DataSegments& segments, const size_t chunk_size, const size_t rows_to_sort);
DataSegments& segments, const size_t chunk_size);

// For rank type topn, it may keep more data than we need during processing,
// therefor, pruning should be performed when processing is finished
// For example, given the sorted set [1, 2, 3, 3, 3, 4, 5] with limit = 3,
// the last two element [4, 5] should be pruned
void _rank_pruning();

// buffer
struct RawChunks {
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/vectorized/sorting/compare_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,4 +294,4 @@ void build_tie_for_column(const ColumnPtr column, Tie* tie) {
DCHECK(st.ok());
}

} // namespace starrocks::vectorized
} // namespace starrocks::vectorized
1 change: 0 additions & 1 deletion be/src/exprs/expr_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ StatusOr<ColumnPtr> ExprContext::evaluate(Expr* e, vectorized::Chunk* chunk) {
#ifndef NDEBUG
if (chunk != nullptr) {
chunk->check_or_die();
CHECK(!chunk->is_empty());
}
#endif
try {
Expand Down
Loading