diff --git a/dbms/src/Flash/Planner/plans/PhysicalAggregation.cpp b/dbms/src/Flash/Planner/plans/PhysicalAggregation.cpp index 45e4586dd18..db3cd9c3534 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalAggregation.cpp +++ b/dbms/src/Flash/Planner/plans/PhysicalAggregation.cpp @@ -106,13 +106,12 @@ void PhysicalAggregation::transformImpl(DAGPipeline & pipeline, Context & contex is_final_agg); /// If there are several sources, then we perform parallel aggregation - if (pipeline.streams.size() > 1) + if (pipeline.streams.size() > 1 || pipeline.streams_with_non_joined_data.size() > 1) { const Settings & settings = context.getSettingsRef(); - BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams, log); - pipeline.firstStream() = std::make_shared( + BlockInputStreamPtr stream = std::make_shared( pipeline.streams, - stream_with_non_joined_data, + pipeline.streams_with_non_joined_data, params, context.getFileProvider(), true, @@ -120,20 +119,25 @@ void PhysicalAggregation::transformImpl(DAGPipeline & pipeline, Context & contex settings.aggregation_memory_efficient_merge_threads ? static_cast(settings.aggregation_memory_efficient_merge_threads) : static_cast(settings.max_threads), log->identifier()); pipeline.streams.resize(1); + pipeline.streams_with_non_joined_data.clear(); + pipeline.firstStream() = std::move(stream); + // should record for agg before restore concurrency. See #3804. recordProfileStreams(pipeline, context); restoreConcurrency(pipeline, context.getDAGContext()->final_concurrency, log); } else { - BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams, log); BlockInputStreams inputs; if (!pipeline.streams.empty()) inputs.push_back(pipeline.firstStream()); - else - pipeline.streams.resize(1); - if (stream_with_non_joined_data) - inputs.push_back(stream_with_non_joined_data); + + if (!pipeline.streams_with_non_joined_data.empty()) + inputs.push_back(pipeline.streams_with_non_joined_data.at(0)); + + pipeline.streams.resize(1); + pipeline.streams_with_non_joined_data.clear(); + pipeline.firstStream() = std::make_shared( std::make_shared(inputs, log->identifier()), params,