Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SR-4078] Add exchange merge sort operator #205

Merged
merged 11 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ set(EXEC_FILES
parquet/metadata.cpp
parquet/group_reader.cpp
parquet/file_reader.cpp
pipeline/exchange/exchange_merge_sort_source_operator.cpp
pipeline/exchange/exchange_sink_operator.cpp
pipeline/exchange/exchange_source_operator.cpp
pipeline/exchange/local_exchange.cpp
Expand Down
22 changes: 15 additions & 7 deletions be/src/exec/exchange_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "exec/exchange_node.h"

#include "column/chunk.h"
#include "exec/pipeline/exchange/exchange_merge_sort_source_operator.h"
#include "exec/pipeline/exchange/exchange_source_operator.h"
#include "exec/pipeline/limit_operator.h"
#include "exec/pipeline/pipeline_builder.h"
Expand Down Expand Up @@ -377,13 +378,20 @@ void ExchangeNode::debug_string(int indentation_level, std::stringstream* out) c
pipeline::OpFactories ExchangeNode::decompose_to_pipeline(pipeline::PipelineBuilderContext* context) {
using namespace pipeline;
OpFactories operators;
auto exchange_operator = std::make_shared<ExchangeSourceOperatorFactory>(context->next_operator_id(), id(),
_num_senders, _input_row_desc);
// A merging ExchangeSourceOperator should not be parallelized.
exchange_operator->set_num_driver_instances(_is_merging ? 1 : context->driver_instance_count());
operators.emplace_back(std::move(exchange_operator));
if (limit() != -1) {
operators.emplace_back(std::make_shared<LimitOperatorFactory>(context->next_operator_id(), id(), limit()));
if (!_is_merging) {
operators.emplace_back(std::make_shared<ExchangeSourceOperatorFactory>(context->next_operator_id(), id(),
_num_senders, _input_row_desc));
if (limit() != -1) {
operators.emplace_back(std::make_shared<LimitOperatorFactory>(context->next_operator_id(), id(), limit()));
}
} else {
operators.emplace_back(std::make_shared<ExchangeMergeSortSourceOperatorFactory>(
context->next_operator_id(), id(), _num_senders, _input_row_desc, &_sort_exec_exprs, _is_asc_order,
_nulls_first, _offset, _limit, _is_merging));
Pslydhh marked this conversation as resolved.
Show resolved Hide resolved

//if (limit() != -1) {
// operators.emplace_back(std::make_shared<LimitOperatorFactory>(context->next_operator_id(), id(), limit()));
//}
Pslydhh marked this conversation as resolved.
Show resolved Hide resolved
}
return operators;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021 StarRocks Limited.

#include "exec/pipeline/exchange/exchange_merge_sort_source_operator.h"

#include "runtime/data_stream_mgr.h"
#include "runtime/data_stream_recvr.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/vectorized/sorted_chunks_merger.h"

namespace starrocks::pipeline {
Status ExchangeMergeSortSourceOperator::prepare(RuntimeState* state) {
Operator::prepare(state);
_stream_recvr = state->exec_env()->stream_mgr()->create_recvr(
state, _row_desc, state->fragment_instance_id(), _plan_node_id, _num_sender,
config::exchg_node_buffer_size_bytes, _runtime_profile, _is_merging, nullptr, true);
Pslydhh marked this conversation as resolved.
Show resolved Hide resolved
_stream_recvr->create_merger_for_pipeline(_sort_exec_exprs, &_is_asc_order, &_nulls_first);
return Status::OK();
}

Status ExchangeMergeSortSourceOperator::close(RuntimeState* state) {
Operator::close(state);
return Status::OK();
}

bool ExchangeMergeSortSourceOperator::has_output() const {
return _stream_recvr->is_data_ready();
}

bool ExchangeMergeSortSourceOperator::is_finished() const {
return _num_rows_returned >= _limit || _is_finishing;
}

void ExchangeMergeSortSourceOperator::finish(RuntimeState* state) {
if (_is_finishing) {
Pslydhh marked this conversation as resolved.
Show resolved Hide resolved
return;
}
_is_finishing = true;
return _stream_recvr->close();
}

StatusOr<vectorized::ChunkPtr> ExchangeMergeSortSourceOperator::pull_chunk(RuntimeState* state) {
vectorized::ChunkPtr chunk = std::make_shared<vectorized::Chunk>();
Pslydhh marked this conversation as resolved.
Show resolved Hide resolved
get_chunk(state, &chunk);
return std::move(chunk);
}

Status ExchangeMergeSortSourceOperator::get_chunk(RuntimeState* state, ChunkPtr* chunk) {
if (is_finished()) {
*chunk = nullptr;
Pslydhh marked this conversation as resolved.
Show resolved Hide resolved
return Status::OK();
}

return get_next_merging(state, chunk);
}

Status ExchangeMergeSortSourceOperator::get_next_merging(RuntimeState* state, ChunkPtr* chunk) {
RETURN_IF_ERROR(state->check_query_state("Exchange, while merging next."));
*chunk = nullptr;
if (is_finished()) {
return Status::OK();
}

bool should_exit = false;
if (_num_rows_skipped < _offset) {
ChunkPtr tmp_chunk;
do {
if (!should_exit) {
RETURN_IF_ERROR(_stream_recvr->get_next_for_pipeline(&tmp_chunk, &_is_finishing, &should_exit));
}

if (tmp_chunk) {
_num_rows_skipped += tmp_chunk->num_rows();
} else {
break;
}
} while (!should_exit && _num_rows_skipped < _offset);

if (_num_rows_skipped > _offset) {
int64_t size = _num_rows_skipped - _offset;
Pslydhh marked this conversation as resolved.
Show resolved Hide resolved
int64_t offset_in_chunk = tmp_chunk->num_rows() - size;
if (_limit > 0 && size > _limit) {
Pslydhh marked this conversation as resolved.
Show resolved Hide resolved
size = _limit;
}
*chunk = tmp_chunk->clone_empty_with_slot(size);
for (size_t c = 0; c < tmp_chunk->num_columns(); ++c) {
const ColumnPtr& src = tmp_chunk->get_column_by_index(c);
ColumnPtr& dest = (*chunk)->get_column_by_index(c);
dest->append(*src, offset_in_chunk, size);
// resize constant column as same as other non-constant columns, so Chunk::num_rows()
// can return a right number if this ConstColumn is the first column of the chunk.
if (dest->is_constant()) {
dest->resize(size);
}
}
_num_rows_skipped = _offset;
_num_rows_returned += size;

// the first Chunk will have a size less than config::vector_chunk_size.
return Status::OK();
}

if (!tmp_chunk) {
// check EOS after (_num_rows_skipped < _offset), so the only one chunk can be returned.
return Status::OK();
}
}

if (!should_exit) {
RETURN_IF_ERROR(_stream_recvr->get_next_for_pipeline(chunk, &_is_finishing, &should_exit));
}

if ((*chunk) != nullptr) {
size_t size_in_chunk = (*chunk)->num_rows();
if (_limit > 0 && size_in_chunk + _num_rows_returned > _limit) {
size_in_chunk -= (size_in_chunk + _num_rows_returned - _limit);
(*chunk)->set_num_rows(size_in_chunk);
}
_num_rows_returned += size_in_chunk;
}

return Status::OK();
}

} // namespace starrocks::pipeline
100 changes: 100 additions & 0 deletions be/src/exec/pipeline/exchange/exchange_merge_sort_source_operator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021 StarRocks Limited.

#pragma once

#include <atomic>

#include "exec/pipeline/source_operator.h"

namespace starrocks {
class DataStreamRecvr;
class RowDescriptor;
class SortExecExprs;
namespace pipeline {
class ExchangeMergeSortSourceOperator : public SourceOperator {
public:
ExchangeMergeSortSourceOperator(int32_t id, int32_t plan_node_id, int32_t num_sender, const RowDescriptor& row_desc,
Pslydhh marked this conversation as resolved.
Show resolved Hide resolved
SortExecExprs* sort_exec_exprs, std::vector<bool> is_asc_order,
std::vector<bool> nulls_first, int64_t offset, int64_t limit, bool is_merging)
: SourceOperator(id, "exchange_merge_sort_source", plan_node_id),
_num_sender(num_sender),
_row_desc(row_desc),
_sort_exec_exprs(sort_exec_exprs),
_is_asc_order(is_asc_order),
_nulls_first(nulls_first),
_offset(offset),
_limit(limit),
_is_merging(is_merging) {}

~ExchangeMergeSortSourceOperator() override = default;

Status prepare(RuntimeState* state) override;

Status close(RuntimeState* state) override;

bool has_output() const override;

bool is_finished() const override;

void finish(RuntimeState* state) override;

StatusOr<vectorized::ChunkPtr> pull_chunk(RuntimeState* state) override;

private:
Status get_chunk(RuntimeState* state, vectorized::ChunkPtr* chunk);
Status get_next_merging(RuntimeState* state, vectorized::ChunkPtr* chunk);

int32_t _num_sender;
const RowDescriptor& _row_desc;

SortExecExprs* _sort_exec_exprs;
std::vector<bool> _is_asc_order;
Pslydhh marked this conversation as resolved.
Show resolved Hide resolved
std::vector<bool> _nulls_first;

std::shared_ptr<DataStreamRecvr> _stream_recvr;
std::atomic<bool> _is_finishing{false};

int64_t _num_rows_returned = 0;
int64_t _num_rows_skipped = 0;
Pslydhh marked this conversation as resolved.
Show resolved Hide resolved
int64_t _offset;
int64_t _limit;
bool _is_merging;
};

class ExchangeMergeSortSourceOperatorFactory final : public OperatorFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rebase the latest main branch, make ExchangeMergeSortSourceOperatorFactory extend SourceOperatorFactory

public:
ExchangeMergeSortSourceOperatorFactory(int32_t id, int32_t plan_node_id, int32_t num_sender,
const RowDescriptor& row_desc, SortExecExprs* sort_exec_exprs,
std::vector<bool> is_asc_order, std::vector<bool> nulls_first,
int64_t offset, int64_t limit, bool is_merging)
: OperatorFactory(id, plan_node_id),
_num_sender(num_sender),
_row_desc(row_desc),
_sort_exec_exprs(sort_exec_exprs),
_is_asc_order(is_asc_order),
_nulls_first(nulls_first),
_offset(offset),
_limit(limit),
_is_merging(is_merging) {}

~ExchangeMergeSortSourceOperatorFactory() override = default;

OperatorPtr create(int32_t driver_instance_count, int32_t driver_sequence) override {
return std::make_shared<ExchangeMergeSortSourceOperator>(_id, _plan_node_id, _num_sender, _row_desc,
_sort_exec_exprs, _is_asc_order, _nulls_first, _offset,
_limit, _is_merging);
}

private:
int32_t _num_sender;
const RowDescriptor& _row_desc;
SortExecExprs* _sort_exec_exprs;
std::vector<bool> _is_asc_order;
std::vector<bool> _nulls_first;
int64_t _offset;
int64_t _limit;
bool _is_merging;
Pslydhh marked this conversation as resolved.
Show resolved Hide resolved
};

} // namespace pipeline
} // namespace starrocks
8 changes: 4 additions & 4 deletions be/src/runtime/data_stream_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ inline uint32_t DataStreamMgr::get_hash_value(const TUniqueId& fragment_instance
std::shared_ptr<DataStreamRecvr> DataStreamMgr::create_recvr(
RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id, int num_senders, int buffer_size, const std::shared_ptr<RuntimeProfile>& profile,
bool is_merging, std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr) {
bool is_merging, std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr, bool is_pipeline) {
DCHECK(profile != NULL);
VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id << ", node=" << dest_node_id;
std::shared_ptr<DataStreamRecvr> recvr(
new DataStreamRecvr(this, state->instance_mem_tracker(), row_desc, fragment_instance_id, dest_node_id,
num_senders, is_merging, buffer_size, profile, sub_plan_query_statistics_recvr));
std::shared_ptr<DataStreamRecvr> recvr(new DataStreamRecvr(
this, state->instance_mem_tracker(), row_desc, fragment_instance_id, dest_node_id, num_senders, is_merging,
buffer_size, profile, sub_plan_query_statistics_recvr, is_pipeline));
uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
std::lock_guard<std::mutex> l(_lock);
_fragment_stream_set.emplace(fragment_instance_id, dest_node_id);
Expand Down
10 changes: 6 additions & 4 deletions be/src/runtime/data_stream_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,12 @@ class DataStreamMgr {
// single stream.
// Ownership of the receiver is shared between this DataStream mgr instance and the
// caller.
std::shared_ptr<DataStreamRecvr> create_recvr(
RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id, int num_senders, int buffer_size, const std::shared_ptr<RuntimeProfile>& profile,
bool is_merging, std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr);
std::shared_ptr<DataStreamRecvr> create_recvr(RuntimeState* state, const RowDescriptor& row_desc,
const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
int num_senders, int buffer_size,
const std::shared_ptr<RuntimeProfile>& profile, bool is_merging,
std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr,
bool is_pipeline = false);

Status transmit_data(const PTransmitDataParams* request, ::google::protobuf::Closure** done);

Expand Down
Loading