Skip to content

Commit

Permalink
[Enhancement] support per bucket optimize for colocate aggregate
Browse files Browse the repository at this point in the history
Signed-off-by: stdpain <drfeng08@gmail.com>
  • Loading branch information
stdpain committed Aug 28, 2023
1 parent 94421bb commit 212a550
Show file tree
Hide file tree
Showing 44 changed files with 821 additions and 158 deletions.
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ set(EXEC_FILES
pipeline/aggregate/repeat/repeat_operator.cpp
pipeline/analysis/analytic_sink_operator.cpp
pipeline/analysis/analytic_source_operator.cpp
pipeline/bucket_process_operator.cpp
pipeline/table_function_operator.cpp
pipeline/assert_num_rows_operator.cpp
pipeline/set/union_passthrough_operator.cpp
Expand Down
36 changes: 29 additions & 7 deletions be/src/exec/aggregate/aggregate_blocking_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "exec/aggregate/aggregate_blocking_node.h"

#include <memory>
#include <type_traits>
#include <variant>

Expand All @@ -26,14 +27,16 @@
#include "exec/pipeline/aggregate/sorted_aggregate_streaming_source_operator.h"
#include "exec/pipeline/aggregate/spillable_aggregate_blocking_sink_operator.h"
#include "exec/pipeline/aggregate/spillable_aggregate_blocking_source_operator.h"
#include "exec/pipeline/bucket_process_operator.h"
#include "exec/pipeline/chunk_accumulate_operator.h"
#include "exec/pipeline/exchange/exchange_source_operator.h"
#include "exec/pipeline/exchange/local_exchange_source_operator.h"
#include "exec/pipeline/limit_operator.h"
#include "exec/pipeline/noop_sink_operator.h"
#include "exec/pipeline/operator.h"
#include "exec/pipeline/pipeline_builder.h"
#include "exec/pipeline/spill_process_operator.h"
#include "exec/sorted_streaming_aggregator.h"
#include "gutil/casts.h"
#include "runtime/current_thread.h"
#include "simd/simd.h"

Expand Down Expand Up @@ -172,7 +175,8 @@ Status AggregateBlockingNode::get_next(RuntimeState* state, ChunkPtr* chunk, boo

template <class AggFactory, class SourceFactory, class SinkFactory>
pipeline::OpFactories AggregateBlockingNode::_decompose_to_pipeline(pipeline::OpFactories& ops_with_sink,
pipeline::PipelineBuilderContext* context) {
pipeline::PipelineBuilderContext* context,
bool per_bucket_optimize) {
using namespace pipeline;

auto workgroup = context->fragment_context()->workgroup();
Expand Down Expand Up @@ -206,11 +210,24 @@ pipeline::OpFactories AggregateBlockingNode::_decompose_to_pipeline(pipeline::Op
// Initialize OperatorFactory's fields involving runtime filters.
auto&& rc_rf_probe_collector = std::make_shared<RcRfProbeCollector>(2, std::move(this->runtime_filter_collector()));
this->init_runtime_filter_for_operator(agg_sink_op.get(), context, rc_rf_probe_collector);
auto bucket_process_context_factory = std::make_shared<BucketProcessContextFactory>();
if (per_bucket_optimize) {
agg_sink_op = std::make_shared<BucketProcessSinkOperatorFactory>(
context->next_operator_id(), id(), bucket_process_context_factory, std::move(agg_sink_op));
}

ops_with_sink.push_back(std::move(agg_sink_op));

OpFactories ops_with_source;
// Initialize OperatorFactory's fields involving runtime filters.
this->init_runtime_filter_for_operator(agg_source_op.get(), context, rc_rf_probe_collector);

if (per_bucket_optimize) {
auto bucket_source_operator = std::make_shared<BucketProcessSourceOperatorFactory>(
context->next_operator_id(), id(), bucket_process_context_factory, std::move(agg_source_op));
context->inherit_upstream_source_properties(bucket_source_operator.get(), upstream_source_op);
agg_source_op = std::move(bucket_source_operator);
}
ops_with_source.push_back(std::move(agg_source_op));

if (should_cache) {
Expand All @@ -229,6 +246,8 @@ pipeline::OpFactories AggregateBlockingNode::decompose_to_pipeline(pipeline::Pip
auto& agg_node = _tnode.agg_node;

bool sorted_streaming_aggregate = _tnode.agg_node.__isset.use_sort_agg && _tnode.agg_node.use_sort_agg;
bool use_per_bucket_optimize =
_tnode.agg_node.__isset.use_per_bucket_optimize && _tnode.agg_node.use_per_bucket_optimize;
bool has_group_by_keys = agg_node.__isset.grouping_exprs && !_tnode.agg_node.grouping_exprs.empty();
bool could_local_shuffle = context->could_local_shuffle(ops_with_sink);

Expand Down Expand Up @@ -262,19 +281,22 @@ pipeline::OpFactories AggregateBlockingNode::decompose_to_pipeline(pipeline::Pip
}
}

use_per_bucket_optimize &= dynamic_cast<LocalExchangeSourceOperatorFactory*>(ops_with_sink.back().get()) == nullptr;

OpFactories ops_with_source;
if (sorted_streaming_aggregate) {
ops_with_source =
_decompose_to_pipeline<StreamingAggregatorFactory, SortedAggregateStreamingSourceOperatorFactory,
SortedAggregateStreamingSinkOperatorFactory>(ops_with_sink, context);
SortedAggregateStreamingSinkOperatorFactory>(ops_with_sink, context, false);
} else {
if (runtime_state()->enable_spill() && runtime_state()->enable_agg_spill() && has_group_by_keys) {
ops_with_source =
_decompose_to_pipeline<AggregatorFactory, SpillableAggregateBlockingSourceOperatorFactory,
SpillableAggregateBlockingSinkOperatorFactory>(ops_with_sink, context);
ops_with_source = _decompose_to_pipeline<AggregatorFactory, SpillableAggregateBlockingSourceOperatorFactory,
SpillableAggregateBlockingSinkOperatorFactory>(
ops_with_sink, context, use_per_bucket_optimize && has_group_by_keys);
} else {
ops_with_source = _decompose_to_pipeline<AggregatorFactory, AggregateBlockingSourceOperatorFactory,
AggregateBlockingSinkOperatorFactory>(ops_with_sink, context);
AggregateBlockingSinkOperatorFactory>(
ops_with_sink, context, use_per_bucket_optimize && has_group_by_keys);
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/aggregate/aggregate_blocking_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ class AggregateBlockingNode final : public AggregateBaseNode {
private:
template <class AggFactory, class SourceFactory, class SinkFactory>
pipeline::OpFactories _decompose_to_pipeline(pipeline::OpFactories& ops_with_sink,
pipeline::PipelineBuilderContext* context);
pipeline::PipelineBuilderContext* context, bool per_bucket_optimize);
};
} // namespace starrocks
33 changes: 26 additions & 7 deletions be/src/exec/aggregate/distinct_blocking_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
#include "exec/pipeline/aggregate/sorted_aggregate_streaming_sink_operator.h"
#include "exec/pipeline/aggregate/sorted_aggregate_streaming_source_operator.h"
#include "exec/pipeline/aggregate/spillable_aggregate_distinct_blocking_operator.h"
#include "exec/pipeline/bucket_process_operator.h"
#include "exec/pipeline/chunk_accumulate_operator.h"
#include "exec/pipeline/exchange/local_exchange_source_operator.h"
#include "exec/pipeline/limit_operator.h"
#include "exec/pipeline/operator.h"
#include "exec/pipeline/pipeline_builder.h"
Expand Down Expand Up @@ -128,7 +130,8 @@ Status DistinctBlockingNode::get_next(RuntimeState* state, ChunkPtr* chunk, bool

template <class AggFactory, class SourceFactory, class SinkFactory>
pipeline::OpFactories DistinctBlockingNode::_decompose_to_pipeline(pipeline::OpFactories& ops_with_sink,
pipeline::PipelineBuilderContext* context) {
pipeline::PipelineBuilderContext* context,
bool per_bucket_optimize) {
using namespace pipeline;

auto workgroup = context->fragment_context()->workgroup();
Expand Down Expand Up @@ -157,11 +160,24 @@ pipeline::OpFactories DistinctBlockingNode::_decompose_to_pipeline(pipeline::OpF

auto [agg_sink_op, agg_source_op] = operators_generator(false);

auto bucket_process_context_factory = std::make_shared<BucketProcessContextFactory>();
if (per_bucket_optimize) {
agg_sink_op = std::make_shared<BucketProcessSinkOperatorFactory>(
context->next_operator_id(), id(), bucket_process_context_factory, std::move(agg_sink_op));
}

// Create a shared RefCountedRuntimeFilterCollector
auto&& rc_rf_probe_collector = std::make_shared<RcRfProbeCollector>(2, std::move(this->runtime_filter_collector()));
// Initialize OperatorFactory's fields involving runtime filters.
this->init_runtime_filter_for_operator(agg_sink_op.get(), context, rc_rf_probe_collector);

if (per_bucket_optimize) {
auto bucket_source_operator = std::make_shared<BucketProcessSourceOperatorFactory>(
context->next_operator_id(), id(), bucket_process_context_factory, std::move(agg_source_op));
context->inherit_upstream_source_properties(bucket_source_operator.get(), upstream_source_op);
agg_source_op = std::move(bucket_source_operator);
}

OpFactories ops_with_source;
// Initialize OperatorFactory's fields involving runtime filters.
this->init_runtime_filter_for_operator(agg_source_op.get(), context, rc_rf_probe_collector);
Expand All @@ -185,6 +201,8 @@ pipeline::OpFactories DistinctBlockingNode::decompose_to_pipeline(pipeline::Pipe

OpFactories ops_with_sink = _children[0]->decompose_to_pipeline(context);
bool sorted_streaming_aggregate = _tnode.agg_node.__isset.use_sort_agg && _tnode.agg_node.use_sort_agg;
bool use_per_bucket_optimize =
_tnode.agg_node.__isset.use_per_bucket_optimize && _tnode.agg_node.use_per_bucket_optimize;
bool could_local_shuffle = context->could_local_shuffle(ops_with_sink);

auto try_interpolate_local_shuffle = [this, context](auto& ops) {
Expand All @@ -199,23 +217,24 @@ pipeline::OpFactories DistinctBlockingNode::decompose_to_pipeline(pipeline::Pipe
if (!sorted_streaming_aggregate) {
ops_with_sink = try_interpolate_local_shuffle(ops_with_sink);
}
use_per_bucket_optimize &= dynamic_cast<LocalExchangeSourceOperatorFactory*>(ops_with_sink.back().get()) == nullptr;

OpFactories ops_with_source;

if (sorted_streaming_aggregate) {
ops_with_source =
_decompose_to_pipeline<StreamingAggregatorFactory, SortedAggregateStreamingSourceOperatorFactory,
SortedAggregateStreamingSinkOperatorFactory>(ops_with_sink, context);
SortedAggregateStreamingSinkOperatorFactory>(ops_with_sink, context, false);
} else {
if (runtime_state()->enable_spill() && runtime_state()->enable_agg_distinct_spill()) {
ops_with_source =
_decompose_to_pipeline<AggregatorFactory, SpillableAggregateDistinctBlockingSourceOperatorFactory,
SpillableAggregateDistinctBlockingSinkOperatorFactory>(ops_with_sink,
context);
SpillableAggregateDistinctBlockingSinkOperatorFactory>(
ops_with_sink, context, use_per_bucket_optimize);
} else {
ops_with_source =
_decompose_to_pipeline<AggregatorFactory, AggregateDistinctBlockingSourceOperatorFactory,
AggregateDistinctBlockingSinkOperatorFactory>(ops_with_sink, context);
ops_with_source = _decompose_to_pipeline<AggregatorFactory, AggregateDistinctBlockingSourceOperatorFactory,
AggregateDistinctBlockingSinkOperatorFactory>(
ops_with_sink, context, use_per_bucket_optimize);
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/aggregate/distinct_blocking_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ class DistinctBlockingNode final : public AggregateBaseNode {
private:
template <class AggFactory, class SourceFactory, class SinkFactory>
pipeline::OpFactories _decompose_to_pipeline(pipeline::OpFactories& ops_with_sink,
pipeline::PipelineBuilderContext* context);
pipeline::PipelineBuilderContext* context, bool per_bucket_optimize);
};
} // namespace starrocks
12 changes: 12 additions & 0 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "exprs/expr_context.h"
#include "exprs/runtime_filter_bank.h"
#include "glog/logging.h"
#include "gutil/casts.h"
#include "runtime/current_thread.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
Expand Down Expand Up @@ -63,6 +64,10 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
_sorted_by_keys_per_tablet = tnode.olap_scan_node.sorted_by_keys_per_tablet;
}

if (tnode.olap_scan_node.__isset.output_chunk_by_bucket) {
_output_chunk_by_bucket = tnode.olap_scan_node.output_chunk_by_bucket;
}

if (_olap_scan_node.__isset.bucket_exprs) {
const auto& bucket_exprs = _olap_scan_node.bucket_exprs;
_bucket_exprs.resize(bucket_exprs.size());
Expand Down Expand Up @@ -392,6 +397,13 @@ StatusOr<pipeline::MorselQueuePtr> OlapScanNode::convert_scan_range_to_morsel_qu
morsels.emplace_back(std::make_unique<pipeline::ScanMorsel>(node_id, scan_range));
}

if (output_chunk_by_bucket()) {
std::sort(morsels.begin(), morsels.end(), [](auto& l, auto& r) {
return down_cast<pipeline::ScanMorsel*>(l.get())->owner_id() <
down_cast<pipeline::ScanMorsel*>(r.get())->owner_id();
});
}

// None tablet to read shouldn't use tablet internal parallel.
if (morsels.empty()) {
return std::make_unique<pipeline::FixedMorselQueue>(std::move(morsels));
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/olap_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ class OlapScanNode final : public starrocks::ScanNode {
return starrocks::ScanNode::io_tasks_per_scan_operator();
}

bool output_chunk_by_bucket() const override { return _output_chunk_by_bucket; }

const std::vector<ExprContext*>& bucket_exprs() const { return _bucket_exprs; }

private:
Expand Down Expand Up @@ -198,6 +200,7 @@ class OlapScanNode final : public starrocks::ScanNode {
std::vector<std::vector<RowsetSharedPtr>> _tablet_rowsets;

bool _sorted_by_keys_per_tablet = false;
bool _output_chunk_by_bucket = false;

std::vector<ExprContext*> _bucket_exprs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <atomic>
#include <utility>

#include "exec/aggregator.h"
Expand Down Expand Up @@ -54,7 +55,7 @@ class AggregateBlockingSinkOperator : public Operator {

private:
// Whether prev operator has no output
bool _is_finished = false;
std::atomic_bool _is_finished = false;
// whether enable aggregate group by limit optimize
bool _agg_group_by_with_limit = false;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ Status SpillableAggregateBlockingSinkOperator::push_chunk(RuntimeState* state, c
return Status::OK();
}

Status SpillableAggregateBlockingSinkOperator::reset_state(RuntimeState* state,
const std::vector<ChunkPtr>& refill_chunks) {
_is_finished = false;
RETURN_IF_ERROR(_aggregator->spiller()->reset_state(state));
RETURN_IF_ERROR(AggregateBlockingSinkOperator::reset_state(state, refill_chunks));
return Status::OK();
}

Status SpillableAggregateBlockingSinkOperator::_spill_all_inputs(RuntimeState* state, const ChunkPtr& chunk) {
// spill all data
DCHECK(!_aggregator->is_none_group_by_exprs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class SpillableAggregateBlockingSinkOperator : public AggregateBlockingSinkOpera
return 0;
}

Status reset_state(RuntimeState* state, const std::vector<ChunkPtr>& refill_chunks) override;

private:
bool spilled() const { return _aggregator->spiller()->spilled(); }

Expand Down
Loading

0 comments on commit 212a550

Please sign in to comment.