Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: stdpain <drfeng08@gmail.com>
  • Loading branch information
stdpain committed Aug 17, 2023
1 parent 9b81928 commit 4472b99
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 11 deletions.
5 changes: 4 additions & 1 deletion be/src/exec/aggregate/aggregate_blocking_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@
#include "exec/pipeline/aggregate/spillable_aggregate_blocking_source_operator.h"
#include "exec/pipeline/bucket_process_operator.h"
#include "exec/pipeline/chunk_accumulate_operator.h"
#include "exec/pipeline/exchange/exchange_source_operator.h"
#include "exec/pipeline/exchange/local_exchange_source_operator.h"
#include "exec/pipeline/limit_operator.h"
#include "exec/pipeline/noop_sink_operator.h"
#include "exec/pipeline/operator.h"
#include "exec/pipeline/pipeline_builder.h"
#include "exec/pipeline/spill_process_operator.h"
#include "exec/sorted_streaming_aggregator.h"
#include "gutil/casts.h"
#include "runtime/current_thread.h"
#include "simd/simd.h"

Expand Down Expand Up @@ -280,6 +281,8 @@ pipeline::OpFactories AggregateBlockingNode::decompose_to_pipeline(pipeline::Pip
}
}

use_per_bucket_optimize &= dynamic_cast<LocalExchangeSourceOperatorFactory*>(ops_with_sink.back().get()) == nullptr;

OpFactories ops_with_source;
if (sorted_streaming_aggregate) {
ops_with_source =
Expand Down
33 changes: 26 additions & 7 deletions be/src/exec/aggregate/distinct_blocking_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
#include "exec/pipeline/aggregate/sorted_aggregate_streaming_sink_operator.h"
#include "exec/pipeline/aggregate/sorted_aggregate_streaming_source_operator.h"
#include "exec/pipeline/aggregate/spillable_aggregate_distinct_blocking_operator.h"
#include "exec/pipeline/bucket_process_operator.h"
#include "exec/pipeline/chunk_accumulate_operator.h"
#include "exec/pipeline/exchange/local_exchange_source_operator.h"
#include "exec/pipeline/limit_operator.h"
#include "exec/pipeline/operator.h"
#include "exec/pipeline/pipeline_builder.h"
Expand Down Expand Up @@ -128,7 +130,8 @@ Status DistinctBlockingNode::get_next(RuntimeState* state, ChunkPtr* chunk, bool

template <class AggFactory, class SourceFactory, class SinkFactory>
pipeline::OpFactories DistinctBlockingNode::_decompose_to_pipeline(pipeline::OpFactories& ops_with_sink,
pipeline::PipelineBuilderContext* context) {
pipeline::PipelineBuilderContext* context,
bool per_bucket_optimize) {
using namespace pipeline;

auto workgroup = context->fragment_context()->workgroup();
Expand Down Expand Up @@ -157,11 +160,24 @@ pipeline::OpFactories DistinctBlockingNode::_decompose_to_pipeline(pipeline::OpF

auto [agg_sink_op, agg_source_op] = operators_generator(false);

auto bucket_process_context_factory = std::make_shared<BucketProcessContextFactory>();
if (per_bucket_optimize) {
agg_sink_op = std::make_shared<BucketProcessSinkOperatorFactory>(
context->next_operator_id(), id(), bucket_process_context_factory, std::move(agg_sink_op));
}

// Create a shared RefCountedRuntimeFilterCollector
auto&& rc_rf_probe_collector = std::make_shared<RcRfProbeCollector>(2, std::move(this->runtime_filter_collector()));
// Initialize OperatorFactory's fields involving runtime filters.
this->init_runtime_filter_for_operator(agg_sink_op.get(), context, rc_rf_probe_collector);

if (per_bucket_optimize) {
auto bucket_source_operator = std::make_shared<BucketProcessSourceOperatorFactory>(
context->next_operator_id(), id(), bucket_process_context_factory, std::move(agg_source_op));
context->inherit_upstream_source_properties(bucket_source_operator.get(), upstream_source_op);
agg_source_op = std::move(bucket_source_operator);
}

OpFactories ops_with_source;
// Initialize OperatorFactory's fields involving runtime filters.
this->init_runtime_filter_for_operator(agg_source_op.get(), context, rc_rf_probe_collector);
Expand All @@ -185,6 +201,8 @@ pipeline::OpFactories DistinctBlockingNode::decompose_to_pipeline(pipeline::Pipe

OpFactories ops_with_sink = _children[0]->decompose_to_pipeline(context);
bool sorted_streaming_aggregate = _tnode.agg_node.__isset.use_sort_agg && _tnode.agg_node.use_sort_agg;
bool use_per_bucket_optimize =
_tnode.agg_node.__isset.use_per_bucket_optimize && _tnode.agg_node.use_per_bucket_optimize;
bool could_local_shuffle = context->could_local_shuffle(ops_with_sink);

auto try_interpolate_local_shuffle = [this, context](auto& ops) {
Expand All @@ -199,23 +217,24 @@ pipeline::OpFactories DistinctBlockingNode::decompose_to_pipeline(pipeline::Pipe
if (!sorted_streaming_aggregate) {
ops_with_sink = try_interpolate_local_shuffle(ops_with_sink);
}
use_per_bucket_optimize &= dynamic_cast<LocalExchangeSourceOperatorFactory*>(ops_with_sink.back().get()) == nullptr;

OpFactories ops_with_source;

if (sorted_streaming_aggregate) {
ops_with_source =
_decompose_to_pipeline<StreamingAggregatorFactory, SortedAggregateStreamingSourceOperatorFactory,
SortedAggregateStreamingSinkOperatorFactory>(ops_with_sink, context);
SortedAggregateStreamingSinkOperatorFactory>(ops_with_sink, context, false);
} else {
if (runtime_state()->enable_spill() && runtime_state()->enable_agg_distinct_spill()) {
ops_with_source =
_decompose_to_pipeline<AggregatorFactory, SpillableAggregateDistinctBlockingSourceOperatorFactory,
SpillableAggregateDistinctBlockingSinkOperatorFactory>(ops_with_sink,
context);
SpillableAggregateDistinctBlockingSinkOperatorFactory>(
ops_with_sink, context, use_per_bucket_optimize);
} else {
ops_with_source =
_decompose_to_pipeline<AggregatorFactory, AggregateDistinctBlockingSourceOperatorFactory,
AggregateDistinctBlockingSinkOperatorFactory>(ops_with_sink, context);
ops_with_source = _decompose_to_pipeline<AggregatorFactory, AggregateDistinctBlockingSourceOperatorFactory,
AggregateDistinctBlockingSinkOperatorFactory>(
ops_with_sink, context, use_per_bucket_optimize);
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/aggregate/distinct_blocking_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ class DistinctBlockingNode final : public AggregateBaseNode {
private:
template <class AggFactory, class SourceFactory, class SinkFactory>
pipeline::OpFactories _decompose_to_pipeline(pipeline::OpFactories& ops_with_sink,
pipeline::PipelineBuilderContext* context);
pipeline::PipelineBuilderContext* context, bool per_bucket_optimize);
};
} // namespace starrocks
3 changes: 1 addition & 2 deletions be/src/exec/pipeline/scan/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,7 @@ StatusOr<ChunkPtr> ScanOperator::pull_chunk(RuntimeState* state) {
std::tuple<int64_t, bool> ScanOperator::_should_emit_eos(const ChunkPtr& chunk) {
auto owner_id = chunk->owner_info().owner_id();
auto is_last_chunk = chunk->owner_info().is_last_chunk();
is_last_chunk &= _ticket_checker != nullptr;
if (is_last_chunk) {
if (is_last_chunk && _ticket_checker != nullptr) {
is_last_chunk = _ticket_checker->leave(owner_id);
}
return {owner_id, is_last_chunk};
Expand Down

0 comments on commit 4472b99

Please sign in to comment.