Skip to content

Commit

Permalink
due to pingcap#5274
Browse files Browse the repository at this point in the history
  • Loading branch information
SeaRise committed Jul 8, 2022
1 parent bbd687b commit 5c5b30e
Showing 1 changed file with 13 additions and 9 deletions.
22 changes: 13 additions & 9 deletions dbms/src/Flash/Planner/plans/PhysicalAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,34 +106,38 @@ void PhysicalAggregation::transformImpl(DAGPipeline & pipeline, Context & contex
is_final_agg);

/// If there are several sources, then we perform parallel aggregation
if (pipeline.streams.size() > 1)
if (pipeline.streams.size() > 1 || pipeline.streams_with_non_joined_data.size() > 1)
{
const Settings & settings = context.getSettingsRef();
BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams, log);
pipeline.firstStream() = std::make_shared<ParallelAggregatingBlockInputStream>(
BlockInputStreamPtr stream = std::make_shared<ParallelAggregatingBlockInputStream>(
pipeline.streams,
stream_with_non_joined_data,
pipeline.streams_with_non_joined_data,
params,
context.getFileProvider(),
true,
max_streams,
settings.aggregation_memory_efficient_merge_threads ? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads) : static_cast<size_t>(settings.max_threads),
log->identifier());
pipeline.streams.resize(1);
pipeline.streams_with_non_joined_data.clear();
pipeline.firstStream() = std::move(stream);

// should record for agg before restore concurrency. See #3804.
recordProfileStreams(pipeline, context);
restoreConcurrency(pipeline, context.getDAGContext()->final_concurrency, log);
}
else
{
BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams, log);
BlockInputStreams inputs;
if (!pipeline.streams.empty())
inputs.push_back(pipeline.firstStream());
else
pipeline.streams.resize(1);
if (stream_with_non_joined_data)
inputs.push_back(stream_with_non_joined_data);

if (!pipeline.streams_with_non_joined_data.empty())
inputs.push_back(pipeline.streams_with_non_joined_data.at(0));

pipeline.streams.resize(1);
pipeline.streams_with_non_joined_data.clear();

pipeline.firstStream() = std::make_shared<AggregatingBlockInputStream>(
std::make_shared<ConcatBlockInputStream>(inputs, log->identifier()),
params,
Expand Down

0 comments on commit 5c5b30e

Please sign in to comment.