Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make join as top level operator #6356

Merged
merged 11 commits into from
Dec 5, 2022
31 changes: 19 additions & 12 deletions dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,42 @@ namespace DB
{
HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream(
const BlockInputStreamPtr & input,
const ExpressionActionsPtr & join_probe_actions_,
const JoinPtr & join_,
const String & req_id)
: log(Logger::get(req_id))
, join_probe_actions(join_probe_actions_)
, join(join_)
{
children.push_back(input);
mengxin9014 marked this conversation as resolved.
Show resolved Hide resolved

if (!join_probe_actions || join_probe_actions->getActions().size() != 1
|| join_probe_actions->getActions().back().type != ExpressionAction::Type::JOIN)
{
throw Exception("isn't valid join probe actions", ErrorCodes::LOGICAL_ERROR);
}
}

Block HashJoinProbeBlockInputStream::getTotals()
{
if (auto * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
{
totals = child->getTotals();
join_probe_actions->executeOnTotals(totals);
if (!totals)
{
if (join->hasTotals())
{
for (const auto & name_and_type : child->getHeader().getColumnsWithTypeAndName())
{
auto column = name_and_type.type->createColumn();
column->insertDefault();
totals.insert(ColumnWithTypeAndName(std::move(column), name_and_type.type, name_and_type.name));
}
}
else
return totals; /// There's nothing to JOIN.
}
join->joinTotals(totals);
}

return totals;
}

Block HashJoinProbeBlockInputStream::getHeader() const
{
Block res = children.back()->getHeader();
join_probe_actions->execute(res);
join->joinBlock(res);
return res;
}

Expand All @@ -57,7 +64,7 @@ Block HashJoinProbeBlockInputStream::readImpl()
if (!res)
return res;

join_probe_actions->execute(res);
join->joinBlock(res);

// TODO split block if block.size() > settings.max_block_size
// https://github.com/pingcap/tiflash/issues/3436
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/DataStreams/HashJoinProbeBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <DataStreams/IProfilingBlockInputStream.h>
#include <Interpreters/Join.h>

namespace DB
{
Expand All @@ -31,13 +32,12 @@ class ExpressionActions;
class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
static constexpr auto name = "HashJoinProbe";

public:
HashJoinProbeBlockInputStream(
const BlockInputStreamPtr & input,
const ExpressionActionsPtr & join_probe_actions_,
const JoinPtr & join_,
const String & req_id);

String getName() const override { return name; }
Expand All @@ -49,7 +49,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream

private:
const LoggerPtr log;
ExpressionActionsPtr join_probe_actions;
JoinPtr join;
};

} // namespace DB
4 changes: 1 addition & 3 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,6 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline &
for (const auto & p : probe_pipeline.firstStream()->getHeader())
source_columns.emplace_back(p.name, p.type);
DAGExpressionAnalyzer dag_analyzer(std::move(source_columns), context);
ExpressionActionsChain chain;
dag_analyzer.appendJoin(chain, right_query, columns_added_by_join);
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
pipeline.streams = probe_pipeline.streams;
/// add join input stream
if (is_tiflash_right_join)
Expand All @@ -323,7 +321,7 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline &
}
for (auto & stream : pipeline.streams)
{
stream = std::make_shared<HashJoinProbeBlockInputStream>(stream, chain.getLastActions(), log->identifier());
stream = std::make_shared<HashJoinProbeBlockInputStream>(stream, join_ptr, log->identifier());
stream->setExtraInfo(fmt::format("join probe, join_executor_id = {}", query_block.source_name));
}

Expand Down
4 changes: 1 addition & 3 deletions dbms/src/Flash/Planner/plans/PhysicalJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,6 @@ void PhysicalJoin::probeSideTransform(DAGPipeline & probe_pipeline, Context & co
/// probe side streams
assert(probe_pipeline.streams_with_non_joined_data.empty());
executeExpression(probe_pipeline, probe_side_prepare_actions, log, "append join key and join filters for probe side");
auto join_probe_actions = PhysicalPlanHelper::newActions(probe_pipeline.firstStream()->getHeader(), context);
join_probe_actions->add(ExpressionAction::ordinaryJoin(join_ptr, columns_added_by_join));
/// add join input stream
if (has_non_joined)
{
Expand All @@ -210,7 +208,7 @@ void PhysicalJoin::probeSideTransform(DAGPipeline & probe_pipeline, Context & co
String join_probe_extra_info = fmt::format("join probe, join_executor_id = {}", execId());
for (auto & stream : probe_pipeline.streams)
{
stream = std::make_shared<HashJoinProbeBlockInputStream>(stream, join_probe_actions, log->identifier());
stream = std::make_shared<HashJoinProbeBlockInputStream>(stream, join_ptr, log->identifier());
stream->setExtraInfo(join_probe_extra_info);
}
}
Expand Down