Skip to content

Commit

Permalink
[fix](spill) disable partitioned agg when group by limit opt is set
Browse files Browse the repository at this point in the history
  • Loading branch information
mrhhsg committed Jul 17, 2024
1 parent 5670e1a commit f26f8cf
Showing 1 changed file with 13 additions and 5 deletions.
18 changes: 13 additions & 5 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1182,11 +1182,19 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
": group by and output is empty");
}
if (tnode.agg_node.aggregate_functions.empty() && !_runtime_state->enable_agg_spill() &&

const bool group_by_limit_opt =
tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;

/// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
/// If `group_by_limit_opt` is true, then it might not need to spill at all.
const bool enable_spill = _runtime_state->enable_agg_spill() &&
!tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;

if (tnode.agg_node.aggregate_functions.empty() && !enable_spill &&
request.query_options.__isset.enable_distinct_streaming_aggregation &&
request.query_options.enable_distinct_streaming_aggregation &&
!tnode.agg_node.grouping_exprs.empty() &&
!tnode.agg_node.__isset.agg_sort_info_by_group_key) {
!tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt) {
op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs,
_require_bucket_distribution));
_require_bucket_distribution =
Expand All @@ -1198,7 +1206,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
op.reset(new StreamingAggOperatorX(pool, next_operator_id(), tnode, descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
} else {
if (_runtime_state->enable_agg_spill() && !tnode.agg_node.grouping_exprs.empty()) {
if (enable_spill) {
op.reset(new PartitionedAggSourceOperatorX(pool, tnode, next_operator_id(), descs));
} else {
op.reset(new AggSourceOperatorX(pool, tnode, next_operator_id(), descs));
Expand All @@ -1213,7 +1221,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
_dag[downstream_pipeline_id].push_back(cur_pipe->id());

DataSinkOperatorXPtr sink;
if (_runtime_state->enable_agg_spill() && !tnode.agg_node.grouping_exprs.empty()) {
if (enable_spill) {
sink.reset(new PartitionedAggSinkOperatorX(pool, next_sink_operator_id(), tnode,
descs, _require_bucket_distribution));
} else {
Expand Down

0 comments on commit f26f8cf

Please sign in to comment.