Skip to content

Commit

Permalink
[Enhancement] support interpolate passthrough before streaming distin…
Browse files Browse the repository at this point in the history
…ct sink (StarRocks#55242)

Signed-off-by: stdpain <drfeng08@gmail.com>
  • Loading branch information
stdpain authored Jan 24, 2025
1 parent 947e009 commit e85b1a5
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion be/src/exec/aggregate/distinct_streaming_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,17 @@ pipeline::OpFactories DistinctStreamingNode::decompose_to_pipeline(pipeline::Pip
using namespace pipeline;

OpFactories ops_with_sink = _children[0]->decompose_to_pipeline(context);

size_t degree_of_parallelism = context->source_operator(ops_with_sink)->degree_of_parallelism();
auto should_cache = context->should_interpolate_cache_operator(id(), ops_with_sink[0]);
auto* upstream_source_op = context->source_operator(ops_with_sink);

bool could_local_shuffle = !should_cache && !context->enable_group_execution();
if (could_local_shuffle && _tnode.agg_node.__isset.interpolate_passthrough &&
_tnode.agg_node.interpolate_passthrough && context->could_local_shuffle(ops_with_sink)) {
ops_with_sink = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), ops_with_sink,
degree_of_parallelism, true);
}

auto operators_generator = [this, should_cache, upstream_source_op, context](bool post_cache) {
// shared by sink operator factory and source operator factory
AggregatorFactoryPtr aggregator_factory = std::make_shared<AggregatorFactory>(_tnode);
Expand Down

0 comments on commit e85b1a5

Please sign in to comment.