Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] support per bucket optimize for colocate aggregate #29252

Merged
merged 3 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ set(EXEC_FILES
pipeline/aggregate/repeat/repeat_operator.cpp
pipeline/analysis/analytic_sink_operator.cpp
pipeline/analysis/analytic_source_operator.cpp
pipeline/bucket_process_operator.cpp
pipeline/table_function_operator.cpp
pipeline/assert_num_rows_operator.cpp
pipeline/set/union_passthrough_operator.cpp
Expand Down
36 changes: 29 additions & 7 deletions be/src/exec/aggregate/aggregate_blocking_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "exec/aggregate/aggregate_blocking_node.h"

#include <memory>
#include <type_traits>
#include <variant>

Expand All @@ -26,14 +27,16 @@
#include "exec/pipeline/aggregate/sorted_aggregate_streaming_source_operator.h"
#include "exec/pipeline/aggregate/spillable_aggregate_blocking_sink_operator.h"
#include "exec/pipeline/aggregate/spillable_aggregate_blocking_source_operator.h"
#include "exec/pipeline/bucket_process_operator.h"
#include "exec/pipeline/chunk_accumulate_operator.h"
#include "exec/pipeline/exchange/exchange_source_operator.h"
#include "exec/pipeline/exchange/local_exchange_source_operator.h"
#include "exec/pipeline/limit_operator.h"
#include "exec/pipeline/noop_sink_operator.h"
#include "exec/pipeline/operator.h"
#include "exec/pipeline/pipeline_builder.h"
#include "exec/pipeline/spill_process_operator.h"
#include "exec/sorted_streaming_aggregator.h"
#include "gutil/casts.h"
#include "runtime/current_thread.h"
#include "simd/simd.h"

Expand Down Expand Up @@ -172,7 +175,8 @@ Status AggregateBlockingNode::get_next(RuntimeState* state, ChunkPtr* chunk, boo

template <class AggFactory, class SourceFactory, class SinkFactory>
pipeline::OpFactories AggregateBlockingNode::_decompose_to_pipeline(pipeline::OpFactories& ops_with_sink,
pipeline::PipelineBuilderContext* context) {
pipeline::PipelineBuilderContext* context,
bool per_bucket_optimize) {
using namespace pipeline;

auto workgroup = context->fragment_context()->workgroup();
Expand Down Expand Up @@ -206,11 +210,24 @@ pipeline::OpFactories AggregateBlockingNode::_decompose_to_pipeline(pipeline::Op
// Initialize OperatorFactory's fields involving runtime filters.
auto&& rc_rf_probe_collector = std::make_shared<RcRfProbeCollector>(2, std::move(this->runtime_filter_collector()));
this->init_runtime_filter_for_operator(agg_sink_op.get(), context, rc_rf_probe_collector);
auto bucket_process_context_factory = std::make_shared<BucketProcessContextFactory>();
if (per_bucket_optimize) {
agg_sink_op = std::make_shared<BucketProcessSinkOperatorFactory>(
context->next_operator_id(), id(), bucket_process_context_factory, std::move(agg_sink_op));
}

ops_with_sink.push_back(std::move(agg_sink_op));

OpFactories ops_with_source;
// Initialize OperatorFactory's fields involving runtime filters.
this->init_runtime_filter_for_operator(agg_source_op.get(), context, rc_rf_probe_collector);

if (per_bucket_optimize) {
auto bucket_source_operator = std::make_shared<BucketProcessSourceOperatorFactory>(
context->next_operator_id(), id(), bucket_process_context_factory, std::move(agg_source_op));
context->inherit_upstream_source_properties(bucket_source_operator.get(), upstream_source_op);
agg_source_op = std::move(bucket_source_operator);
}
ops_with_source.push_back(std::move(agg_source_op));

if (should_cache) {
Expand All @@ -229,6 +246,8 @@ pipeline::OpFactories AggregateBlockingNode::decompose_to_pipeline(pipeline::Pip
auto& agg_node = _tnode.agg_node;

bool sorted_streaming_aggregate = _tnode.agg_node.__isset.use_sort_agg && _tnode.agg_node.use_sort_agg;
bool use_per_bucket_optimize =
_tnode.agg_node.__isset.use_per_bucket_optimize && _tnode.agg_node.use_per_bucket_optimize;
bool has_group_by_keys = agg_node.__isset.grouping_exprs && !_tnode.agg_node.grouping_exprs.empty();
bool could_local_shuffle = context->could_local_shuffle(ops_with_sink);

Expand Down Expand Up @@ -262,19 +281,22 @@ pipeline::OpFactories AggregateBlockingNode::decompose_to_pipeline(pipeline::Pip
}
}

use_per_bucket_optimize &= dynamic_cast<LocalExchangeSourceOperatorFactory*>(ops_with_sink.back().get()) == nullptr;

OpFactories ops_with_source;
if (sorted_streaming_aggregate) {
ops_with_source =
_decompose_to_pipeline<StreamingAggregatorFactory, SortedAggregateStreamingSourceOperatorFactory,
SortedAggregateStreamingSinkOperatorFactory>(ops_with_sink, context);
SortedAggregateStreamingSinkOperatorFactory>(ops_with_sink, context, false);
} else {
if (runtime_state()->enable_spill() && runtime_state()->enable_agg_spill() && has_group_by_keys) {
ops_with_source =
_decompose_to_pipeline<AggregatorFactory, SpillableAggregateBlockingSourceOperatorFactory,
SpillableAggregateBlockingSinkOperatorFactory>(ops_with_sink, context);
ops_with_source = _decompose_to_pipeline<AggregatorFactory, SpillableAggregateBlockingSourceOperatorFactory,
SpillableAggregateBlockingSinkOperatorFactory>(
ops_with_sink, context, use_per_bucket_optimize && has_group_by_keys);
} else {
ops_with_source = _decompose_to_pipeline<AggregatorFactory, AggregateBlockingSourceOperatorFactory,
AggregateBlockingSinkOperatorFactory>(ops_with_sink, context);
AggregateBlockingSinkOperatorFactory>(
ops_with_sink, context, use_per_bucket_optimize && has_group_by_keys);
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/aggregate/aggregate_blocking_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ class AggregateBlockingNode final : public AggregateBaseNode {
private:
template <class AggFactory, class SourceFactory, class SinkFactory>
pipeline::OpFactories _decompose_to_pipeline(pipeline::OpFactories& ops_with_sink,
pipeline::PipelineBuilderContext* context);
pipeline::PipelineBuilderContext* context, bool per_bucket_optimize);
};
} // namespace starrocks
33 changes: 26 additions & 7 deletions be/src/exec/aggregate/distinct_blocking_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
#include "exec/pipeline/aggregate/sorted_aggregate_streaming_sink_operator.h"
#include "exec/pipeline/aggregate/sorted_aggregate_streaming_source_operator.h"
#include "exec/pipeline/aggregate/spillable_aggregate_distinct_blocking_operator.h"
#include "exec/pipeline/bucket_process_operator.h"
#include "exec/pipeline/chunk_accumulate_operator.h"
#include "exec/pipeline/exchange/local_exchange_source_operator.h"
#include "exec/pipeline/limit_operator.h"
#include "exec/pipeline/operator.h"
#include "exec/pipeline/pipeline_builder.h"
Expand Down Expand Up @@ -128,7 +130,8 @@ Status DistinctBlockingNode::get_next(RuntimeState* state, ChunkPtr* chunk, bool

template <class AggFactory, class SourceFactory, class SinkFactory>
pipeline::OpFactories DistinctBlockingNode::_decompose_to_pipeline(pipeline::OpFactories& ops_with_sink,
pipeline::PipelineBuilderContext* context) {
pipeline::PipelineBuilderContext* context,
bool per_bucket_optimize) {
using namespace pipeline;

auto workgroup = context->fragment_context()->workgroup();
Expand Down Expand Up @@ -157,11 +160,24 @@ pipeline::OpFactories DistinctBlockingNode::_decompose_to_pipeline(pipeline::OpF

auto [agg_sink_op, agg_source_op] = operators_generator(false);

auto bucket_process_context_factory = std::make_shared<BucketProcessContextFactory>();
if (per_bucket_optimize) {
agg_sink_op = std::make_shared<BucketProcessSinkOperatorFactory>(
context->next_operator_id(), id(), bucket_process_context_factory, std::move(agg_sink_op));
}

// Create a shared RefCountedRuntimeFilterCollector
auto&& rc_rf_probe_collector = std::make_shared<RcRfProbeCollector>(2, std::move(this->runtime_filter_collector()));
// Initialize OperatorFactory's fields involving runtime filters.
this->init_runtime_filter_for_operator(agg_sink_op.get(), context, rc_rf_probe_collector);

if (per_bucket_optimize) {
auto bucket_source_operator = std::make_shared<BucketProcessSourceOperatorFactory>(
context->next_operator_id(), id(), bucket_process_context_factory, std::move(agg_source_op));
context->inherit_upstream_source_properties(bucket_source_operator.get(), upstream_source_op);
agg_source_op = std::move(bucket_source_operator);
}

OpFactories ops_with_source;
// Initialize OperatorFactory's fields involving runtime filters.
this->init_runtime_filter_for_operator(agg_source_op.get(), context, rc_rf_probe_collector);
Expand All @@ -185,6 +201,8 @@ pipeline::OpFactories DistinctBlockingNode::decompose_to_pipeline(pipeline::Pipe

OpFactories ops_with_sink = _children[0]->decompose_to_pipeline(context);
bool sorted_streaming_aggregate = _tnode.agg_node.__isset.use_sort_agg && _tnode.agg_node.use_sort_agg;
bool use_per_bucket_optimize =
_tnode.agg_node.__isset.use_per_bucket_optimize && _tnode.agg_node.use_per_bucket_optimize;
bool could_local_shuffle = context->could_local_shuffle(ops_with_sink);

auto try_interpolate_local_shuffle = [this, context](auto& ops) {
Expand All @@ -199,23 +217,24 @@ pipeline::OpFactories DistinctBlockingNode::decompose_to_pipeline(pipeline::Pipe
if (!sorted_streaming_aggregate) {
ops_with_sink = try_interpolate_local_shuffle(ops_with_sink);
}
use_per_bucket_optimize &= dynamic_cast<LocalExchangeSourceOperatorFactory*>(ops_with_sink.back().get()) == nullptr;

OpFactories ops_with_source;

if (sorted_streaming_aggregate) {
ops_with_source =
_decompose_to_pipeline<StreamingAggregatorFactory, SortedAggregateStreamingSourceOperatorFactory,
SortedAggregateStreamingSinkOperatorFactory>(ops_with_sink, context);
SortedAggregateStreamingSinkOperatorFactory>(ops_with_sink, context, false);
} else {
if (runtime_state()->enable_spill() && runtime_state()->enable_agg_distinct_spill()) {
ops_with_source =
_decompose_to_pipeline<AggregatorFactory, SpillableAggregateDistinctBlockingSourceOperatorFactory,
SpillableAggregateDistinctBlockingSinkOperatorFactory>(ops_with_sink,
context);
SpillableAggregateDistinctBlockingSinkOperatorFactory>(
ops_with_sink, context, use_per_bucket_optimize);
} else {
ops_with_source =
_decompose_to_pipeline<AggregatorFactory, AggregateDistinctBlockingSourceOperatorFactory,
AggregateDistinctBlockingSinkOperatorFactory>(ops_with_sink, context);
ops_with_source = _decompose_to_pipeline<AggregatorFactory, AggregateDistinctBlockingSourceOperatorFactory,
AggregateDistinctBlockingSinkOperatorFactory>(
ops_with_sink, context, use_per_bucket_optimize);
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/aggregate/distinct_blocking_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ class DistinctBlockingNode final : public AggregateBaseNode {
private:
template <class AggFactory, class SourceFactory, class SinkFactory>
pipeline::OpFactories _decompose_to_pipeline(pipeline::OpFactories& ops_with_sink,
pipeline::PipelineBuilderContext* context);
pipeline::PipelineBuilderContext* context, bool per_bucket_optimize);
};
} // namespace starrocks
12 changes: 12 additions & 0 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "exprs/expr_context.h"
#include "exprs/runtime_filter_bank.h"
#include "glog/logging.h"
#include "gutil/casts.h"
#include "runtime/current_thread.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
Expand Down Expand Up @@ -63,6 +64,10 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
_sorted_by_keys_per_tablet = tnode.olap_scan_node.sorted_by_keys_per_tablet;
}

if (tnode.olap_scan_node.__isset.output_chunk_by_bucket) {
_output_chunk_by_bucket = tnode.olap_scan_node.output_chunk_by_bucket;
}

if (_olap_scan_node.__isset.bucket_exprs) {
const auto& bucket_exprs = _olap_scan_node.bucket_exprs;
_bucket_exprs.resize(bucket_exprs.size());
Expand Down Expand Up @@ -392,6 +397,13 @@ StatusOr<pipeline::MorselQueuePtr> OlapScanNode::convert_scan_range_to_morsel_qu
morsels.emplace_back(std::make_unique<pipeline::ScanMorsel>(node_id, scan_range));
}

if (output_chunk_by_bucket()) {
std::sort(morsels.begin(), morsels.end(), [](auto& l, auto& r) {
return down_cast<pipeline::ScanMorsel*>(l.get())->owner_id() <
down_cast<pipeline::ScanMorsel*>(r.get())->owner_id();
});
}

// None tablet to read shouldn't use tablet internal parallel.
if (morsels.empty()) {
return std::make_unique<pipeline::FixedMorselQueue>(std::move(morsels));
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/olap_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ class OlapScanNode final : public starrocks::ScanNode {
return starrocks::ScanNode::io_tasks_per_scan_operator();
}

bool output_chunk_by_bucket() const override { return _output_chunk_by_bucket; }

const std::vector<ExprContext*>& bucket_exprs() const { return _bucket_exprs; }

private:
Expand Down Expand Up @@ -198,6 +200,7 @@ class OlapScanNode final : public starrocks::ScanNode {
std::vector<std::vector<RowsetSharedPtr>> _tablet_rowsets;

bool _sorted_by_keys_per_tablet = false;
bool _output_chunk_by_bucket = false;

std::vector<ExprContext*> _bucket_exprs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <atomic>
#include <utility>

#include "exec/aggregator.h"
Expand Down Expand Up @@ -54,7 +55,7 @@ class AggregateBlockingSinkOperator : public Operator {

private:
// Whether prev operator has no output
bool _is_finished = false;
std::atomic_bool _is_finished = false;
// whether enable aggregate group by limit optimize
bool _agg_group_by_with_limit = false;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ Status SpillableAggregateBlockingSinkOperator::push_chunk(RuntimeState* state, c
return Status::OK();
}

Status SpillableAggregateBlockingSinkOperator::reset_state(RuntimeState* state,
const std::vector<ChunkPtr>& refill_chunks) {
_is_finished = false;
RETURN_IF_ERROR(_aggregator->spiller()->reset_state(state));
RETURN_IF_ERROR(AggregateBlockingSinkOperator::reset_state(state, refill_chunks));
return Status::OK();
}

Status SpillableAggregateBlockingSinkOperator::_spill_all_inputs(RuntimeState* state, const ChunkPtr& chunk) {
// spill all data
DCHECK(!_aggregator->is_none_group_by_exprs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class SpillableAggregateBlockingSinkOperator : public AggregateBlockingSinkOpera
return 0;
}

Status reset_state(RuntimeState* state, const std::vector<ChunkPtr>& refill_chunks) override;

private:
bool spilled() const { return _aggregator->spiller()->spilled(); }

Expand Down
Loading