Skip to content

Commit

Permalink
fix dag & select interpreter
Browse files Browse the repository at this point in the history
Signed-off-by: gengliqi <gengliqiii@gmail.com>
  • Loading branch information
gengliqi committed Jul 8, 2022
1 parent bb913a1 commit 11367da
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 40 deletions.
7 changes: 5 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ void DAGQueryBlockInterpreter::executeAggregation(
max_streams,
settings.aggregation_memory_efficient_merge_threads ? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads) : static_cast<size_t>(settings.max_threads),
log->identifier());

pipeline.streams.resize(1);
pipeline.streams_with_non_joined_data.clear();
pipeline.firstStream() = std::move(stream);
Expand All @@ -408,11 +409,13 @@ void DAGQueryBlockInterpreter::executeAggregation(
BlockInputStreams inputs;
if (!pipeline.streams.empty())
inputs.push_back(pipeline.firstStream());
else
pipeline.streams.resize(1);

if (!pipeline.streams_with_non_joined_data.empty())
inputs.push_back(pipeline.streams_with_non_joined_data.at(0));

pipeline.streams.resize(1);
pipeline.streams_with_non_joined_data.clear();

pipeline.firstStream() = std::make_shared<AggregatingBlockInputStream>(
std::make_shared<ConcatBlockInputStream>(inputs, log->identifier()),
params,
Expand Down
39 changes: 28 additions & 11 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,34 @@ void executeUnion(
bool ignore_block,
const String & extra_info)
{
if (pipeline.streams.size() == 1 && pipeline.streams_with_non_joined_data.empty())
return;
BlockInputStreamPtr stream;
if (ignore_block)
stream = std::make_shared<UnionWithoutBlock>(pipeline.streams, pipeline.streams_with_non_joined_data, max_streams, log->identifier());
else
stream = std::make_shared<UnionWithBlock>(pipeline.streams, pipeline.streams_with_non_joined_data, max_streams, log->identifier());
stream->setExtraInfo(extra_info);
pipeline.streams.resize(1);
pipeline.streams_with_non_joined_data.clear();
pipeline.firstStream() = std::move(stream);
switch (pipeline.streams.size() + pipeline.streams_with_non_joined_data.size())
{
case 0:
break;
case 1:
{
if (pipeline.streams.size() == 1)
break;
// streams_with_non_joined_data's size is 1.
pipeline.streams.push_back(pipeline.streams_with_non_joined_data.at(0));
pipeline.streams_with_non_joined_data.clear();
break;
}
default:
{
BlockInputStreamPtr stream;
if (ignore_block)
stream = std::make_shared<UnionWithoutBlock>(pipeline.streams, pipeline.streams_with_non_joined_data, max_streams, log->identifier());
else
stream = std::make_shared<UnionWithBlock>(pipeline.streams, pipeline.streams_with_non_joined_data, max_streams, log->identifier());
stream->setExtraInfo(extra_info);

pipeline.streams.resize(1);
pipeline.streams_with_non_joined_data.clear();
pipeline.firstStream() = std::move(stream);
break;
}
}
}

ExpressionActionsPtr generateProjectExpressionActions(
Expand Down
58 changes: 35 additions & 23 deletions dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,13 +512,13 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
{
const auto & join = static_cast<const ASTTableJoin &>(*query.join()->table_join);
if (join.kind == ASTTableJoin::Kind::Full || join.kind == ASTTableJoin::Kind::Right)
pipeline.stream_with_non_joined_data = expressions.before_join->createStreamWithNonJoinedDataIfFullOrRightJoin(
pipeline.streams_with_non_joined_data.push_back(expressions.before_join->createStreamWithNonJoinedDataIfFullOrRightJoin(
pipeline.firstStream()->getHeader(),
0,
1,
settings.max_block_size);
settings.max_block_size));

for (auto & stream : pipeline.streams) /// Applies to all sources except stream_with_non_joined_data.
for (auto & stream : pipeline.streams) /// Applies to all sources except streams_with_non_joined_data.
stream = std::make_shared<ExpressionBlockInputStream>(stream, expressions.before_join, /*req_id=*/"");
}

Expand Down Expand Up @@ -603,7 +603,7 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
if (need_second_distinct_pass
|| query.limit_length
|| query.limit_by_expression_list
|| pipeline.stream_with_non_joined_data)
|| !pipeline.streams_with_non_joined_data.empty())
{
need_merge_streams = true;
}
Expand Down Expand Up @@ -987,11 +987,11 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre
Aggregator::Params params(header, keys, aggregates, overflow_row, settings.max_rows_to_group_by, settings.group_by_overflow_mode, allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set, context.getTemporaryPath());

/// If there are several sources, then we perform parallel aggregation
if (pipeline.streams.size() > 1)
if (pipeline.streams.size() > 1 || pipeline.streams_with_non_joined_data.size() > 1)
{
pipeline.firstStream() = std::make_shared<ParallelAggregatingBlockInputStream>(
auto stream = std::make_shared<ParallelAggregatingBlockInputStream>(
pipeline.streams,
BlockInputStreams{pipeline.stream_with_non_joined_data},
pipeline.streams_with_non_joined_data,
params,
file_provider,
final,
Expand All @@ -1001,28 +1001,28 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre
: static_cast<size_t>(settings.max_threads),
/*req_id=*/"");

pipeline.stream_with_non_joined_data = nullptr;
pipeline.streams.resize(1);
pipeline.streams_with_non_joined_data.clear();
pipeline.firstStream() = std::move(stream);
}
else
{
BlockInputStreams inputs;
if (!pipeline.streams.empty())
inputs.push_back(pipeline.firstStream());
else
pipeline.streams.resize(1);

if (pipeline.stream_with_non_joined_data)
inputs.push_back(pipeline.stream_with_non_joined_data);
if (!pipeline.streams_with_non_joined_data.empty())
inputs.push_back(pipeline.streams_with_non_joined_data.at(0));

pipeline.streams.resize(1);
pipeline.streams_with_non_joined_data.clear();

pipeline.firstStream() = std::make_shared<AggregatingBlockInputStream>(
std::make_shared<ConcatBlockInputStream>(inputs, /*req_id=*/""),
params,
file_provider,
final,
/*req_id=*/"");

pipeline.stream_with_non_joined_data = nullptr;
}
}

Expand Down Expand Up @@ -1244,21 +1244,33 @@ void InterpreterSelectQuery::executeDistinct(Pipeline & pipeline, bool before_or

void InterpreterSelectQuery::executeUnion(Pipeline & pipeline)
{
/// If there are still several streams, then we combine them into one
if (pipeline.hasMoreThanOneStream())
switch (pipeline.streams.size() + pipeline.streams_with_non_joined_data.size())
{
pipeline.firstStream() = std::make_shared<UnionBlockInputStream<>>(
case 0:
break;
case 1:
{
if (pipeline.streams.size() == 1)
break;
// streams_with_non_joined_data's size is 1.
pipeline.streams.push_back(pipeline.streams_with_non_joined_data.at(0));
pipeline.streams_with_non_joined_data.clear();
break;
}
default:
{
BlockInputStreamPtr stream = std::make_shared<UnionBlockInputStream<>>(
pipeline.streams,
BlockInputStreams{pipeline.stream_with_non_joined_data},
pipeline.streams_with_non_joined_data,
max_streams,
/*req_id=*/"");
pipeline.stream_with_non_joined_data = nullptr;
;

pipeline.streams.resize(1);
pipeline.streams_with_non_joined_data.clear();
pipeline.firstStream() = std::move(stream);
break;
}
else if (pipeline.stream_with_non_joined_data)
{
pipeline.streams.push_back(pipeline.stream_with_non_joined_data);
pipeline.stream_with_non_joined_data = nullptr;
}
}

Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Interpreters/InterpreterSelectQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class InterpreterSelectQuery : public IInterpreter
* It has a special meaning, since reading from it should be done after reading from the main streams.
* It is appended to the main streams in UnionBlockInputStream or ParallelAggregatingBlockInputStream.
*/
BlockInputStreamPtr stream_with_non_joined_data;
BlockInputStreams streams_with_non_joined_data;

BlockInputStreamPtr & firstStream() { return streams.at(0); }

Expand All @@ -105,13 +105,13 @@ class InterpreterSelectQuery : public IInterpreter
for (auto & stream : streams)
transform(stream);

if (stream_with_non_joined_data)
transform(stream_with_non_joined_data);
for (auto & stream : streams_with_non_joined_data)
transform(stream);
}

bool hasMoreThanOneStream() const
{
return streams.size() + (stream_with_non_joined_data ? 1 : 0) > 1;
return streams.size() + streams_with_non_joined_data.size() > 1;
}
};

Expand Down

0 comments on commit 11367da

Please sign in to comment.