Skip to content

Commit

Permalink
make join as top level operator (#6356)
Browse files Browse the repository at this point in the history
close #6351
  • Loading branch information
mengxin9014 authored Dec 5, 2022
1 parent f3832a1 commit e255112
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 67 deletions.
31 changes: 20 additions & 11 deletions dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,44 @@ namespace DB
{
HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream(
const BlockInputStreamPtr & input,
const ExpressionActionsPtr & join_probe_actions_,
const JoinPtr & join_,
const String & req_id)
: log(Logger::get(req_id))
, join_probe_actions(join_probe_actions_)
, join(join_)
{
children.push_back(input);

if (!join_probe_actions || join_probe_actions->getActions().size() != 1
|| join_probe_actions->getActions().back().type != ExpressionAction::Type::JOIN)
{
throw Exception("isn't valid join probe actions", ErrorCodes::LOGICAL_ERROR);
}
RUNTIME_CHECK_MSG(join != nullptr, "join ptr should not be null.");
}

Block HashJoinProbeBlockInputStream::getTotals()
{
if (auto * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
{
totals = child->getTotals();
join_probe_actions->executeOnTotals(totals);
if (!totals)
{
if (join->hasTotals())
{
for (const auto & name_and_type : child->getHeader().getColumnsWithTypeAndName())
{
auto column = name_and_type.type->createColumn();
column->insertDefault();
totals.insert(ColumnWithTypeAndName(std::move(column), name_and_type.type, name_and_type.name));
}
}
else
return totals; /// There's nothing to JOIN.
}
join->joinTotals(totals);
}

return totals;
}

Block HashJoinProbeBlockInputStream::getHeader() const
{
Block res = children.back()->getHeader();
join_probe_actions->execute(res);
join->joinBlock(res);
return res;
}

Expand All @@ -57,7 +66,7 @@ Block HashJoinProbeBlockInputStream::readImpl()
if (!res)
return res;

join_probe_actions->execute(res);
join->joinBlock(res);

// TODO split block if block.size() > settings.max_block_size
// https://github.com/pingcap/tiflash/issues/3436
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/DataStreams/HashJoinProbeBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <DataStreams/IProfilingBlockInputStream.h>
#include <Interpreters/Join.h>

namespace DB
{
Expand All @@ -31,13 +32,12 @@ class ExpressionActions;
class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
static constexpr auto name = "HashJoinProbe";

public:
HashJoinProbeBlockInputStream(
const BlockInputStreamPtr & input,
const ExpressionActionsPtr & join_probe_actions_,
const JoinPtr & join_,
const String & req_id);

String getName() const override { return name; }
Expand All @@ -49,7 +49,7 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream

private:
const LoggerPtr log;
ExpressionActionsPtr join_probe_actions;
JoinPtr join;
};

} // namespace DB
10 changes: 0 additions & 10 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -910,16 +910,6 @@ String DAGExpressionAnalyzer::appendDurationCast(
return applyFunction(func_name, {dur_expr, fsp_expr}, actions, nullptr);
}

void DAGExpressionAnalyzer::appendJoin(
ExpressionActionsChain & chain,
SubqueryForSet & join_query,
const NamesAndTypesList & columns_added_by_join) const
{
initChain(chain, getCurrentInputColumns());
ExpressionActionsPtr actions = chain.getLastActions();
actions->add(ExpressionAction::ordinaryJoin(join_query.join, columns_added_by_join));
}

std::pair<bool, Names> DAGExpressionAnalyzer::buildJoinKey(
const ExpressionActionsPtr & actions,
const google::protobuf::RepeatedPtrField<tipb::Expr> & keys,
Expand Down
5 changes: 0 additions & 5 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,6 @@ class DAGExpressionAnalyzer : private boost::noncopyable

ExpressionActionsChain::Step & initAndGetLastStep(ExpressionActionsChain & chain) const;

void appendJoin(
ExpressionActionsChain & chain,
SubqueryForSet & join_query,
const NamesAndTypesList & columns_added_by_join) const;

// Generate a project action for non-root DAGQueryBlock,
// to keep the schema of Block and tidb-schema the same, and
// guarantee that left/right block of join don't have duplicated column names.
Expand Down
5 changes: 1 addition & 4 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline &
const Block & right_input_header = input_streams_vec[1].back()->getHeader();

String match_helper_name = tiflash_join.genMatchHelperName(left_input_header, right_input_header);
NamesAndTypesList columns_added_by_join = tiflash_join.genColumnsAddedByJoin(build_pipeline.firstStream()->getHeader(), match_helper_name);
NamesAndTypes join_output_columns = tiflash_join.genJoinOutputColumns(left_input_header, right_input_header, match_helper_name);

/// add necessary transformation if the join key is an expression
Expand Down Expand Up @@ -302,8 +301,6 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline &
for (const auto & p : probe_pipeline.firstStream()->getHeader())
source_columns.emplace_back(p.name, p.type);
DAGExpressionAnalyzer dag_analyzer(std::move(source_columns), context);
ExpressionActionsChain chain;
dag_analyzer.appendJoin(chain, right_query, columns_added_by_join);
pipeline.streams = probe_pipeline.streams;
/// add join input stream
if (is_tiflash_right_join)
Expand All @@ -323,7 +320,7 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline &
}
for (auto & stream : pipeline.streams)
{
stream = std::make_shared<HashJoinProbeBlockInputStream>(stream, chain.getLastActions(), log->identifier());
stream = std::make_shared<HashJoinProbeBlockInputStream>(stream, join_ptr, log->identifier());
stream->setExtraInfo(fmt::format("join probe, join_executor_id = {}", query_block.source_name));
}

Expand Down
18 changes: 0 additions & 18 deletions dbms/src/Flash/Coprocessor/JoinInterpreterHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,24 +300,6 @@ NamesAndTypes TiFlashJoin::genColumnsForOtherJoinFilter(
return columns_for_other_join_filter;
}

/// all the columns from build side streams should be added after join, even for the join key.
NamesAndTypesList TiFlashJoin::genColumnsAddedByJoin(
const Block & build_side_header,
const String & match_helper_name) const
{
NamesAndTypesList columns_added_by_join;
bool make_nullable = isTiFlashLeftJoin();
for (auto const & p : build_side_header)
{
columns_added_by_join.emplace_back(p.name, make_nullable ? makeNullable(p.type) : p.type);
}
if (!match_helper_name.empty())
{
columns_added_by_join.emplace_back(match_helper_name, Join::match_helper_type);
}
return columns_added_by_join;
}

NamesAndTypes TiFlashJoin::genJoinOutputColumns(
const Block & left_input_header,
const Block & right_input_header,
Expand Down
7 changes: 0 additions & 7 deletions dbms/src/Flash/Coprocessor/JoinInterpreterHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,6 @@ struct TiFlashJoin
/// return "" for everything else.
String genMatchHelperName(const Block & header1, const Block & header2) const;

/// columns_added_by_join
/// = join_output_columns - probe_side_columns
/// = build_side_columns + match_helper_name
NamesAndTypesList genColumnsAddedByJoin(
const Block & build_side_header,
const String & match_helper_name) const;

/// The columns output by join will be:
/// {columns of left_input, columns of right_input, match_helper_name}
NamesAndTypes genJoinOutputColumns(
Expand Down
6 changes: 1 addition & 5 deletions dbms/src/Flash/Planner/plans/PhysicalJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ PhysicalPlanNodePtr PhysicalJoin::build(
const Block & build_side_header = build_plan->getSampleBlock();

String match_helper_name = tiflash_join.genMatchHelperName(left_input_header, right_input_header);
NamesAndTypesList columns_added_by_join = tiflash_join.genColumnsAddedByJoin(build_side_header, match_helper_name);
NamesAndTypes join_output_schema = tiflash_join.genJoinOutputColumns(left_input_header, right_input_header, match_helper_name);

auto & dag_context = *context.getDAGContext();
Expand Down Expand Up @@ -171,7 +170,6 @@ PhysicalPlanNodePtr PhysicalJoin::build(
probe_plan,
build_plan,
join_ptr,
columns_added_by_join,
probe_side_prepare_actions,
build_side_prepare_actions,
is_tiflash_right_join,
Expand All @@ -191,8 +189,6 @@ void PhysicalJoin::probeSideTransform(DAGPipeline & probe_pipeline, Context & co
/// probe side streams
assert(probe_pipeline.streams_with_non_joined_data.empty());
executeExpression(probe_pipeline, probe_side_prepare_actions, log, "append join key and join filters for probe side");
auto join_probe_actions = PhysicalPlanHelper::newActions(probe_pipeline.firstStream()->getHeader(), context);
join_probe_actions->add(ExpressionAction::ordinaryJoin(join_ptr, columns_added_by_join));
/// add join input stream
if (has_non_joined)
{
Expand All @@ -210,7 +206,7 @@ void PhysicalJoin::probeSideTransform(DAGPipeline & probe_pipeline, Context & co
String join_probe_extra_info = fmt::format("join probe, join_executor_id = {}", execId());
for (auto & stream : probe_pipeline.streams)
{
stream = std::make_shared<HashJoinProbeBlockInputStream>(stream, join_probe_actions, log->identifier());
stream = std::make_shared<HashJoinProbeBlockInputStream>(stream, join_ptr, log->identifier());
stream->setExtraInfo(join_probe_extra_info);
}
}
Expand Down
4 changes: 0 additions & 4 deletions dbms/src/Flash/Planner/plans/PhysicalJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,13 @@ class PhysicalJoin : public PhysicalBinary
const PhysicalPlanNodePtr & probe_,
const PhysicalPlanNodePtr & build_,
const JoinPtr & join_ptr_,
const NamesAndTypesList & columns_added_by_join_,
const ExpressionActionsPtr & probe_side_prepare_actions_,
const ExpressionActionsPtr & build_side_prepare_actions_,
bool has_non_joined_,
const Block & sample_block_,
const FineGrainedShuffle & fine_grained_shuffle_)
: PhysicalBinary(executor_id_, PlanType::Join, schema_, req_id, probe_, build_)
, join_ptr(join_ptr_)
, columns_added_by_join(columns_added_by_join_)
, probe_side_prepare_actions(probe_side_prepare_actions_)
, build_side_prepare_actions(build_side_prepare_actions_)
, has_non_joined(has_non_joined_)
Expand Down Expand Up @@ -76,8 +74,6 @@ class PhysicalJoin : public PhysicalBinary
private:
JoinPtr join_ptr;

NamesAndTypesList columns_added_by_join;

ExpressionActionsPtr probe_side_prepare_actions;
ExpressionActionsPtr build_side_prepare_actions;

Expand Down

0 comments on commit e255112

Please sign in to comment.