Skip to content

Commit

Permalink
refine SubqueryForSet (#4623)
Browse files Browse the repository at this point in the history
ref #4118
  • Loading branch information
SeaRise authored Apr 18, 2022
1 parent bd50b0e commit 66f45c7
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 69 deletions.
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/CreatingSetsBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <Common/MemoryTracker.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Flash/Mpp/MPPTaskId.h>
#include <Interpreters/ExpressionAnalyzer.h> /// SubqueriesForSets
#include <Interpreters/SubqueryForSet.h>


namespace DB
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ bool DAGContext::allowInvalidDate() const
return sql_mode & TiDBSQLMode::ALLOW_INVALID_DATES;
}

void DAGContext::addSubquery(const String & subquery_id, SubqueryForSet && subquery)
{
SubqueriesForSets subqueries_for_sets;
subqueries_for_sets[subquery_id] = std::move(subquery);
subqueries.push_back(std::move(subqueries_for_sets));
}

std::unordered_map<String, BlockInputStreams> & DAGContext::getProfileStreamsMap()
{
return profile_streams_map;
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/TablesRegionsInfo.h>
#include <Flash/Mpp/MPPTaskId.h>
#include <Interpreters/SubqueryForSet.h>
#include <Storages/Transaction/TiDB.h>

namespace DB
Expand Down Expand Up @@ -279,6 +280,10 @@ class DAGContext
void initExchangeReceiverIfMPP(Context & context, size_t max_streams);
const std::unordered_map<String, std::shared_ptr<ExchangeReceiver>> & getMPPExchangeReceiverMap() const;

void addSubquery(const String & subquery_id, SubqueryForSet && subquery);
bool hasSubquery() const { return !subqueries.empty(); }
std::vector<SubqueriesForSets> && moveSubqueries() { return std::move(subqueries); }

const tipb::DAGRequest * dag_request;
Int64 compile_time_ns = 0;
size_t final_concurrency = 1;
Expand Down Expand Up @@ -337,6 +342,9 @@ class DAGContext
/// key: executor_id of ExchangeReceiver nodes in dag.
std::unordered_map<String, std::shared_ptr<ExchangeReceiver>> mpp_exchange_receiver_map;
bool mpp_exchange_receiver_map_inited = false;
/// vector of SubqueriesForSets(such as join build subquery).
/// The order of the vector is also the order of the subquery.
std::vector<SubqueriesForSets> subqueries;
};

} // namespace DB
9 changes: 2 additions & 7 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,11 @@ DAGQueryBlockInterpreter::DAGQueryBlockInterpreter(
Context & context_,
const std::vector<BlockInputStreams> & input_streams_vec_,
const DAGQueryBlock & query_block_,
size_t max_streams_,
std::vector<SubqueriesForSets> & subqueries_for_sets_)
size_t max_streams_)
: context(context_)
, input_streams_vec(input_streams_vec_)
, query_block(query_block_)
, max_streams(max_streams_)
, subqueries_for_sets(subqueries_for_sets_)
, log(Logger::get("DAGQueryBlockInterpreter", dagContext().log ? dagContext().log->identifier() : ""))
{}

Expand Down Expand Up @@ -1023,10 +1021,7 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
SubqueryForSet right_query;
handleJoin(query_block.source->join(), pipeline, right_query);
recordProfileStreams(pipeline, query_block.source_name);

SubqueriesForSets subquries;
subquries[query_block.source_name] = right_query;
subqueries_for_sets.emplace_back(subquries);
dagContext().addSubquery(query_block.source_name, std::move(right_query));
}
else if (query_block.source->tp() == tipb::ExecType::TypeExchangeReceiver)
{
Expand Down
5 changes: 1 addition & 4 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ class DAGQueryBlockInterpreter
Context & context_,
const std::vector<BlockInputStreams> & input_streams_vec_,
const DAGQueryBlock & query_block_,
size_t max_streams_,
std::vector<SubqueriesForSets> & subqueries_for_sets_);
size_t max_streams_);

~DAGQueryBlockInterpreter() = default;

Expand Down Expand Up @@ -117,8 +116,6 @@ class DAGQueryBlockInterpreter

std::unique_ptr<DAGExpressionAnalyzer> analyzer;

std::vector<SubqueriesForSets> & subqueries_for_sets;

LoggerPtr log;
};
} // namespace DB
30 changes: 16 additions & 14 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
// limitations under the License.

#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <Flash/Coprocessor/DAGBlockOutputStream.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/InterpreterDAG.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Interpreters/Aggregator.h>
#include <Interpreters/Context.h>

namespace DB
{
Expand All @@ -35,23 +36,27 @@ InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_)
}
}

DAGContext & InterpreterDAG::dagContext() const
{
return *context.getDAGContext();
}

/** executeQueryBlock recursively converts all the children of the DAGQueryBlock and itself (Coprocessor DAG request)
* into an array of IBlockInputStream (element of physical executing plan of TiFlash)
*/
BlockInputStreams InterpreterDAG::executeQueryBlock(DAGQueryBlock & query_block, std::vector<SubqueriesForSets> & subqueries_for_sets)
BlockInputStreams InterpreterDAG::executeQueryBlock(DAGQueryBlock & query_block)
{
std::vector<BlockInputStreams> input_streams_vec;
for (auto & child : query_block.children)
{
BlockInputStreams child_streams = executeQueryBlock(*child, subqueries_for_sets);
BlockInputStreams child_streams = executeQueryBlock(*child);
input_streams_vec.push_back(child_streams);
}
DAGQueryBlockInterpreter query_block_interpreter(
context,
input_streams_vec,
query_block,
max_streams,
subqueries_for_sets);
max_streams);
return query_block_interpreter.execute();
}

Expand All @@ -60,26 +65,23 @@ BlockIO InterpreterDAG::execute()
/// Due to learner read, DAGQueryBlockInterpreter may take a long time to build
/// the query plan, so we init mpp exchange receiver before executeQueryBlock
dagContext().initExchangeReceiverIfMPP(context, max_streams);
/// region_info should base on the source executor, however
/// tidb does not support multi-table dag request yet, so
/// it is ok to use the same region_info for the whole dag request
std::vector<SubqueriesForSets> subqueries_for_sets;
BlockInputStreams streams = executeQueryBlock(*dag.getRootQueryBlock(), subqueries_for_sets);

BlockInputStreams streams = executeQueryBlock(*dag.getRootQueryBlock());
DAGPipeline pipeline;
pipeline.streams = streams;

/// add union to run in parallel if needed
if (context.getDAGContext()->isMPPTask())
if (dagContext().isMPPTask())
/// MPPTask do not need the returned blocks.
executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/true);
else
executeUnion(pipeline, max_streams, dagContext().log);
if (!subqueries_for_sets.empty())
if (dagContext().hasSubquery())
{
const Settings & settings = context.getSettingsRef();
pipeline.firstStream() = std::make_shared<CreatingSetsBlockInputStream>(
pipeline.firstStream(),
std::move(subqueries_for_sets),
std::move(dagContext().moveSubqueries()),
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode),
dagContext().log->identifier());
}
Expand Down
13 changes: 3 additions & 10 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,9 @@
#pragma GCC diagnostic pop

#include <DataStreams/BlockIO.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/IInterpreter.h>
#include <Storages/RegionQueryInfo.h>
#include <Storages/Transaction/Collator.h>
#include <Storages/Transaction/TMTStorages.h>

namespace DB
{
Expand All @@ -50,9 +43,9 @@ class InterpreterDAG : public IInterpreter
BlockIO execute() override;

private:
BlockInputStreams executeQueryBlock(DAGQueryBlock & query_block, std::vector<SubqueriesForSets> & subqueries_for_sets);
BlockInputStreams executeQueryBlock(DAGQueryBlock & query_block);

DAGContext & dagContext() const { return *context.getDAGContext(); }
DAGContext & dagContext() const;

Context & context;
const DAGQuerySource & dag;
Expand Down
34 changes: 1 addition & 33 deletions dbms/src/Interpreters/ExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Core/Block.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/Settings.h>
#include <Interpreters/SubqueryForSet.h>


namespace DB
Expand All @@ -26,48 +27,15 @@ class Context;
class ExpressionActions;
struct ExpressionActionsChain;

class Join;
using JoinPtr = std::shared_ptr<Join>;

class IAST;
using ASTPtr = std::shared_ptr<IAST>;

class Set;
using SetPtr = std::shared_ptr<Set>;
using PreparedSets = std::unordered_map<IAST *, SetPtr>;

class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;

class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
using Tables = std::map<String, StoragePtr>;

class ASTFunction;
class ASTExpressionList;
class ASTSelectQuery;


/** Information on what to do when executing a subquery in the [GLOBAL] IN/JOIN section.
*/
struct SubqueryForSet
{
/// The source is obtained using the InterpreterSelectQuery subquery.
BlockInputStreamPtr source;

/// If set, build it from result.
SetPtr set;
JoinPtr join;

/// If set, put the result into the table.
/// This is a temporary table for transferring to remote servers for distributed query processing.
StoragePtr table;
};

/// ID of subquery -> what to do with it.
using SubqueriesForSets = std::unordered_map<String, SubqueryForSet>;


/** Transforms an expression from a syntax tree into a sequence of actions to execute it.
*
* NOTE: if `ast` is a SELECT query from a table, the structure of this table should not change during the lifetime of ExpressionAnalyzer.
Expand Down
57 changes: 57 additions & 0 deletions dbms/src/Interpreters/SubqueryForSet.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <common/types.h>

#include <memory>
#include <unordered_map>

namespace DB
{
class Join;
using JoinPtr = std::shared_ptr<Join>;

class IAST;
using ASTPtr = std::shared_ptr<IAST>;

class Set;
using SetPtr = std::shared_ptr<Set>;

class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;

class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;

/** Information on what to do when executing a subquery in the [GLOBAL] IN/JOIN section.
*/
struct SubqueryForSet
{
/// The source is obtained using the InterpreterSelectQuery subquery.
BlockInputStreamPtr source;

/// If set, build it from result.
SetPtr set;
JoinPtr join;

/// If set, put the result into the table.
/// This is a temporary table for transferring to remote servers for distributed query processing.
StoragePtr table;
};

/// ID of subquery -> what to do with it.
using SubqueriesForSets = std::unordered_map<String, SubqueryForSet>;
} // namespace DB

0 comments on commit 66f45c7

Please sign in to comment.