Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Reduce memory usage in full sorter #16937

Merged
merged 1 commit into from
Jan 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions be/src/bench/chunks_sorter_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "column/chunk.h"
#include "column/column_helper.h"
#include "column/datum_tuple.h"
#include "column/vectorized_fwd.h"
#include "common/config.h"
#include "exec/chunks_sorter.h"
#include "exec/chunks_sorter_full_sort.h"
Expand Down Expand Up @@ -284,7 +285,7 @@ static void do_bench(benchmark::State& state, SortAlgorithm sorter_algo, Logical
}
ASSERT_TRUE(eos);
ASSERT_EQ(expected_rows, actual_rows);
sorter->finish(suite._runtime_state.get());
sorter->done(suite._runtime_state.get());
}
state.counters["rows_sorted"] += item_processed;
state.counters["data_size"] += data_size;
Expand Down Expand Up @@ -413,13 +414,13 @@ static void do_merge_columnwise(benchmark::State& state, int num_runs, bool null
int64_t num_rows = 0;
SortDescs sort_desc(std::vector<int>{1, 1, 1}, std::vector<int>{-1, -1, -1});
for (auto _ : state) {
std::vector<ChunkPtr> inputs;
std::vector<ChunkUniquePtr> inputs;
size_t input_rows = num_runs * chunk1->num_rows();
for (int i = 0; i < num_runs; i++) {
if (i % 2 == 0) {
inputs.push_back(chunk1);
inputs.push_back(chunk1->clone_unique());
} else {
inputs.push_back(chunk2);
inputs.push_back(chunk2->clone_unique());
}
}
SortedRuns merged;
Expand Down
10 changes: 0 additions & 10 deletions be/src/exec/chunks_sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,6 @@ void ChunksSorter::setup_runtime(RuntimeProfile* profile) {
profile->add_info_string("SortType", _is_topn ? "TopN" : "All");
}

Status ChunksSorter::finish(RuntimeState* state) {
TRY_CATCH_BAD_ALLOC(RETURN_IF_ERROR(done(state)));
_is_sink_complete = true;
return Status::OK();
}

bool ChunksSorter::sink_complete() {
return _is_sink_complete;
}

StatusOr<ChunkPtr> ChunksSorter::materialize_chunk_before_sort(Chunk* chunk, TupleDescriptor* materialized_tuple_desc,
const SortExecExprs& sort_exec_exprs,
const std::vector<OrderByType>& order_by_types) {
Expand Down
10 changes: 1 addition & 9 deletions be/src/exec/chunks_sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,21 +111,15 @@ class ChunksSorter {
virtual Status update(RuntimeState* state, const ChunkPtr& chunk) = 0;
// Finish seeding Chunk, and get sorted data with top OFFSET rows have been skipped.
virtual Status done(RuntimeState* state) = 0;

// get_next only works after done().
virtual Status get_next(ChunkPtr* chunk, bool* eos) = 0;

virtual std::vector<JoinRuntimeFilter*>* runtime_filters(ObjectPool* pool) { return nullptr; }

// Return sorted data in multiple runs(Avoid merge them into a big chunk)
virtual SortedRuns get_sorted_runs() = 0;

// Return accurate output rows of this operator
virtual size_t get_output_rows() const = 0;

Status finish(RuntimeState* state);

bool sink_complete();

virtual int64_t mem_usage() const = 0;

protected:
Expand All @@ -145,8 +139,6 @@ class ChunksSorter {
RuntimeProfile::Counter* _sort_timer = nullptr;
RuntimeProfile::Counter* _merge_timer = nullptr;
RuntimeProfile::Counter* _output_timer = nullptr;

std::atomic<bool> _is_sink_complete = false;
};

namespace detail {
Expand Down
8 changes: 2 additions & 6 deletions be/src/exec/chunks_sorter_full_sort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ Status ChunksSorterFullSort::_partial_sort(RuntimeState* state, bool done) {
_sort_permutation.resize(0);
RETURN_IF_ERROR(
sort_and_tie_columns(state->cancelled_ref(), segment.order_by_columns, _sort_desc, &_sort_permutation));
ChunkPtr sorted_chunk = _unsorted_chunk->clone_empty_with_slot(_unsorted_chunk->num_rows());
auto sorted_chunk = _unsorted_chunk->clone_empty_with_slot(_unsorted_chunk->num_rows());
materialize_by_permutation(sorted_chunk.get(), {_unsorted_chunk}, _sort_permutation);
RETURN_IF_ERROR(sorted_chunk->upgrade_if_overflow());

_sorted_chunks.push_back(sorted_chunk);
_sorted_chunks.emplace_back(std::move(sorted_chunk));
_total_rows += _unsorted_chunk->num_rows();
_unsorted_chunk.reset();
}
Expand Down Expand Up @@ -113,10 +113,6 @@ Status ChunksSorterFullSort::get_next(ChunkPtr* chunk, bool* eos) {
return Status::OK();
}

SortedRuns ChunksSorterFullSort::get_sorted_runs() {
return _merged_runs;
}

size_t ChunksSorterFullSort::get_output_rows() const {
return _merged_runs.num_rows();
}
Expand Down
12 changes: 6 additions & 6 deletions be/src/exec/chunks_sorter_full_sort.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include "column/vectorized_fwd.h"
#include "exec/chunks_sorter.h"
#include "exec/sorting/merge.h"
#include "gtest/gtest_prod.h"
Expand All @@ -40,7 +41,6 @@ class ChunksSorterFullSort : public ChunksSorter {
Status done(RuntimeState* state) override;
Status get_next(ChunkPtr* chunk, bool* eos) override;

SortedRuns get_sorted_runs() override;
size_t get_output_rows() const override;

int64_t mem_usage() const override;
Expand All @@ -54,11 +54,11 @@ class ChunksSorterFullSort : public ChunksSorter {
Status _partial_sort(RuntimeState* state, bool done);
Status _merge_sorted(RuntimeState* state);

size_t _total_rows = 0; // Total rows of sorting data
Permutation _sort_permutation; // Temp permutation for sorting
ChunkPtr _unsorted_chunk; // Unsorted chunk, accumulate it to a larger chunk
std::vector<ChunkPtr> _sorted_chunks; // Partial sorted, but not merged
SortedRuns _merged_runs; // After merge
size_t _total_rows = 0; // Total rows of sorting data
Permutation _sort_permutation; // Temp permutation for sorting
ChunkPtr _unsorted_chunk; // Unsorted chunk, accumulate it to a larger chunk
std::vector<ChunkUniquePtr> _sorted_chunks; // Partial sorted, but not merged
SortedRuns _merged_runs; // After merge

// TODO: further tunning the buffer parameter
static constexpr size_t kMaxBufferedChunkSize = 1024000; // Max buffer 1024000 rows
Expand Down
4 changes: 0 additions & 4 deletions be/src/exec/chunks_sorter_heap_sort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,6 @@ Status ChunksSorterHeapSort::update(RuntimeState* state, const ChunkPtr& chunk)
return Status::OK();
}

SortedRuns ChunksSorterHeapSort::get_sorted_runs() {
return {SortedRun(_merged_segment.chunk, _merged_segment.order_by_columns)};
}

size_t ChunksSorterHeapSort::get_output_rows() const {
return _merged_segment.chunk->num_rows();
}
Expand Down
1 change: 0 additions & 1 deletion be/src/exec/chunks_sorter_heap_sort.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ class ChunksSorterHeapSort final : public ChunksSorter {
return _sort_heap->size() * _sort_heap->top().data_segment()->mem_usage() / first_rows;
}

SortedRuns get_sorted_runs() override;
size_t get_output_rows() const override;

void setup_runtime(RuntimeProfile* profile) override;
Expand Down
6 changes: 1 addition & 5 deletions be/src/exec/chunks_sorter_topn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,11 @@ Status ChunksSorterTopn::get_next(ChunkPtr* chunk, bool* eos) {
size_t count = std::min(size_t(_state->chunk_size()), _merged_segment.chunk->num_rows() - _next_output_row);
chunk->reset(_merged_segment.chunk->clone_empty(count).release());
(*chunk)->append_safe(*_merged_segment.chunk, _next_output_row, count);
(*chunk)->downgrade();
RETURN_IF_ERROR((*chunk)->downgrade());
_next_output_row += count;
return Status::OK();
}

SortedRuns ChunksSorterTopn::get_sorted_runs() {
return {SortedRun(_merged_segment.chunk, _merged_segment.order_by_columns)};
}

size_t ChunksSorterTopn::get_output_rows() const {
return _merged_segment.chunk->num_rows();
}
Expand Down
1 change: 0 additions & 1 deletion be/src/exec/chunks_sorter_topn.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ class ChunksSorterTopn : public ChunksSorter {
// get_next only works after done().
Status get_next(ChunkPtr* chunk, bool* eos) override;

SortedRuns get_sorted_runs() override;
size_t get_output_rows() const override;

int64_t mem_usage() const override { return _raw_chunks.mem_usage() + _merged_segment.mem_usage(); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ Status PartitionSortSinkOperator::set_finishing(RuntimeState* state) {
_is_finished = true;
return Status::Cancelled("runtime state is cancelled");
}
RETURN_IF_ERROR(_chunks_sorter->finish(state));
RETURN_IF_ERROR(_chunks_sorter->done(state));

// Current partition sort is ended, and
// the last call will drive LocalMergeSortSourceOperator to work.
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/sorting/merge.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class SimpleChunkSortCursor;
Status merge_sorted_chunks_two_way(const SortDescs& sort_desc, const SortedRun& left, const SortedRun& right,
Permutation* output);
Status merge_sorted_chunks(const SortDescs& descs, const std::vector<ExprContext*>* sort_exprs,
const std::vector<ChunkPtr>& chunks, SortedRuns* output);
std::vector<ChunkUniquePtr>& chunks, SortedRuns* output);
Status merge_sorted_cursor_cascade(const SortDescs& sort_desc,
std::vector<std::unique_ptr<SimpleChunkSortCursor>>&& cursors,
const ChunkConsumer& consumer);
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/sorting/merge_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ SortedRun::SortedRun(const ChunkPtr& ichunk, const std::vector<ExprContext*>* ex
}

void SortedRun::reset() {
chunk->reset();
chunk.reset();
orderby.clear();
range = {};
}
Expand Down Expand Up @@ -468,7 +468,7 @@ Status merge_sorted_chunks_two_way(const SortDescs& sort_desc, const SortedRun&
}

Status merge_sorted_chunks(const SortDescs& descs, const std::vector<ExprContext*>* sort_exprs,
const std::vector<ChunkPtr>& chunks, SortedRuns* output) {
std::vector<ChunkUniquePtr>& chunks, SortedRuns* output) {
std::vector<std::unique_ptr<SimpleChunkSortCursor>> cursors;
std::vector<size_t> chunk_index(chunks.size(), 0);

Expand All @@ -486,7 +486,7 @@ Status merge_sorted_chunks(const SortDescs& descs, const std::vector<ExprContext
return false;
}
chunk_index[i]++;
*output = chunks[i]->clone_unique();
*output = std::move(chunks[i]);
return true;
},
sort_exprs));
Expand Down
18 changes: 11 additions & 7 deletions be/test/exec/sorting_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

#include <gtest/gtest.h>

#include <memory>
#include <random>
#include <utility>

#include "column/chunk.h"
#include "column/column_helper.h"
#include "column/vectorized_fwd.h"
#include "exec/sorting/merge.h"
#include "exec/sorting/sort_helper.h"
#include "exec/sorting/sort_permute.h"
Expand Down Expand Up @@ -98,8 +100,8 @@ TEST_P(MergeTestFixture, merge_sorter_chunks_two_way) {
right_columns.push_back(col);
}
}
ChunkPtr left_chunk = std::make_shared<Chunk>(left_columns, map);
ChunkPtr right_chunk = std::make_shared<Chunk>(right_columns, map);
auto left_chunk = std::make_unique<Chunk>(left_columns, map);
auto right_chunk = std::make_unique<Chunk>(right_columns, map);
Permutation perm;
SortDescs sort_desc(std::vector<int>(num_columns, 1), std::vector<int>(num_columns, -1));

Expand All @@ -118,10 +120,12 @@ TEST_P(MergeTestFixture, merge_sorter_chunks_two_way) {
size_t expected_size = left_rows + right_rows;
ChunkPtr output;
SortedRuns output_run;
ASSERT_OK(merge_sorted_chunks(sort_desc, &sort_exprs, {left_chunk, right_chunk}, &output_run));
std::vector<ChunkUniquePtr> chunks;
chunks.emplace_back(std::move(left_chunk));
chunks.emplace_back(std::move(right_chunk));
ASSERT_OK(merge_sorted_chunks(sort_desc, &sort_exprs, chunks, &output_run));
output = output_run.assemble();
ASSERT_EQ(expected_size, output->num_rows());
ASSERT_EQ(left_chunk->num_columns(), output->num_columns());

std::vector<std::vector<int>> output_data;
for (int i = 0; i < output->num_rows(); i++) {
Expand Down Expand Up @@ -257,7 +261,7 @@ TEST(SortingTest, sorted_runs) {

TEST(SortingTest, merge_sorted_chunks) {
auto runtime_state = create_runtime_state();
std::vector<ChunkPtr> input_chunks;
std::vector<ChunkUniquePtr> input_chunks;
Chunk::SlotHashMap slot_map{{0, 0}};

std::vector<std::vector<int>> input_runs = {{-2074, -1691, -1400, -969, -767, -725},
Expand All @@ -269,8 +273,8 @@ TEST(SortingTest, merge_sorted_chunks) {
for (int x : input_numbers) {
column->append_datum(Datum((int32_t)x));
}
ChunkPtr chunk = std::make_shared<Chunk>(Columns{column}, slot_map);
input_chunks.push_back(chunk);
auto chunk = std::make_unique<Chunk>(Columns{column}, slot_map);
input_chunks.emplace_back(std::move(chunk));
}

std::vector<std::unique_ptr<ColumnRef>> exprs;
Expand Down