diff --git a/dbms/src/DataStreams/CreatingSetsBlockInputStream.h b/dbms/src/DataStreams/CreatingSetsBlockInputStream.h index 2f9ad61e4c8..b8e2ee6fe87 100644 --- a/dbms/src/DataStreams/CreatingSetsBlockInputStream.h +++ b/dbms/src/DataStreams/CreatingSetsBlockInputStream.h @@ -17,7 +17,7 @@ #include #include #include -#include /// SubqueriesForSets +#include namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGContext.cpp b/dbms/src/Flash/Coprocessor/DAGContext.cpp index 1f6618d3170..17fb6553eab 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.cpp +++ b/dbms/src/Flash/Coprocessor/DAGContext.cpp @@ -63,6 +63,13 @@ bool DAGContext::allowInvalidDate() const return sql_mode & TiDBSQLMode::ALLOW_INVALID_DATES; } +void DAGContext::addSubquery(const String & subquery_id, SubqueryForSet && subquery) +{ + SubqueriesForSets subqueries_for_sets; + subqueries_for_sets[subquery_id] = std::move(subquery); + subqueries.push_back(std::move(subqueries_for_sets)); +} + std::unordered_map & DAGContext::getProfileStreamsMap() { return profile_streams_map; diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index 30397dc496a..18ad73ec207 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -29,6 +29,7 @@ #include #include #include +#include #include namespace DB @@ -279,6 +280,10 @@ class DAGContext void initExchangeReceiverIfMPP(Context & context, size_t max_streams); const std::unordered_map> & getMPPExchangeReceiverMap() const; + void addSubquery(const String & subquery_id, SubqueryForSet && subquery); + bool hasSubquery() const { return !subqueries.empty(); } + std::vector && moveSubqueries() { return std::move(subqueries); } + const tipb::DAGRequest * dag_request; Int64 compile_time_ns = 0; size_t final_concurrency = 1; @@ -337,6 +342,9 @@ class DAGContext /// key: executor_id of ExchangeReceiver nodes in dag. std::unordered_map> mpp_exchange_receiver_map; bool mpp_exchange_receiver_map_inited = false; + /// vector of SubqueriesForSets(such as join build subquery). + /// The order of the vector is also the order of the subquery. + std::vector subqueries; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 51cd1bf671f..b4832ff4f17 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -56,13 +56,11 @@ DAGQueryBlockInterpreter::DAGQueryBlockInterpreter( Context & context_, const std::vector & input_streams_vec_, const DAGQueryBlock & query_block_, - size_t max_streams_, - std::vector & subqueries_for_sets_) + size_t max_streams_) : context(context_) , input_streams_vec(input_streams_vec_) , query_block(query_block_) , max_streams(max_streams_) - , subqueries_for_sets(subqueries_for_sets_) , log(Logger::get("DAGQueryBlockInterpreter", dagContext().log ? dagContext().log->identifier() : "")) {} @@ -1023,10 +1021,7 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline) SubqueryForSet right_query; handleJoin(query_block.source->join(), pipeline, right_query); recordProfileStreams(pipeline, query_block.source_name); - - SubqueriesForSets subquries; - subquries[query_block.source_name] = right_query; - subqueries_for_sets.emplace_back(subquries); + dagContext().addSubquery(query_block.source_name, std::move(right_query)); } else if (query_block.source->tp() == tipb::ExecType::TypeExchangeReceiver) { diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h index 35627cd19ee..b681d22188c 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h @@ -47,8 +47,7 @@ class DAGQueryBlockInterpreter Context & context_, const std::vector & input_streams_vec_, const DAGQueryBlock & query_block_, - size_t max_streams_, - std::vector & subqueries_for_sets_); + size_t max_streams_); ~DAGQueryBlockInterpreter() = default; @@ -117,8 +116,6 @@ class DAGQueryBlockInterpreter std::unique_ptr analyzer; - std::vector & subqueries_for_sets; - LoggerPtr log; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp index 1bfe87e5695..4c67d67e4f9 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp @@ -13,10 +13,11 @@ // limitations under the License. #include -#include +#include +#include #include #include -#include +#include namespace DB { @@ -35,23 +36,27 @@ InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_) } } +DAGContext & InterpreterDAG::dagContext() const +{ + return *context.getDAGContext(); +} + /** executeQueryBlock recursively converts all the children of the DAGQueryBlock and itself (Coprocessor DAG request) * into an array of IBlockInputStream (element of physical executing plan of TiFlash) */ -BlockInputStreams InterpreterDAG::executeQueryBlock(DAGQueryBlock & query_block, std::vector & subqueries_for_sets) +BlockInputStreams InterpreterDAG::executeQueryBlock(DAGQueryBlock & query_block) { std::vector input_streams_vec; for (auto & child : query_block.children) { - BlockInputStreams child_streams = executeQueryBlock(*child, subqueries_for_sets); + BlockInputStreams child_streams = executeQueryBlock(*child); input_streams_vec.push_back(child_streams); } DAGQueryBlockInterpreter query_block_interpreter( context, input_streams_vec, query_block, - max_streams, - subqueries_for_sets); + max_streams); return query_block_interpreter.execute(); } @@ -60,26 +65,23 @@ BlockIO InterpreterDAG::execute() /// Due to learner read, DAGQueryBlockInterpreter may take a long time to build /// the query plan, so we init mpp exchange receiver before executeQueryBlock dagContext().initExchangeReceiverIfMPP(context, max_streams); - /// region_info should base on the source executor, however - /// tidb does not support multi-table dag request yet, so - /// it is ok to use the same region_info for the whole dag request - std::vector subqueries_for_sets; - BlockInputStreams streams = executeQueryBlock(*dag.getRootQueryBlock(), subqueries_for_sets); + + BlockInputStreams streams = executeQueryBlock(*dag.getRootQueryBlock()); DAGPipeline pipeline; pipeline.streams = streams; /// add union to run in parallel if needed - if (context.getDAGContext()->isMPPTask()) + if (dagContext().isMPPTask()) /// MPPTask do not need the returned blocks. executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/true); else executeUnion(pipeline, max_streams, dagContext().log); - if (!subqueries_for_sets.empty()) + if (dagContext().hasSubquery()) { const Settings & settings = context.getSettingsRef(); pipeline.firstStream() = std::make_shared( pipeline.firstStream(), - std::move(subqueries_for_sets), + std::move(dagContext().moveSubqueries()), SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode), dagContext().log->identifier()); } diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAG.h b/dbms/src/Flash/Coprocessor/InterpreterDAG.h index 46b995ef9a6..40f7d8c62cf 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAG.h +++ b/dbms/src/Flash/Coprocessor/InterpreterDAG.h @@ -21,16 +21,9 @@ #pragma GCC diagnostic pop #include -#include -#include +#include #include -#include -#include -#include #include -#include -#include -#include namespace DB { @@ -50,9 +43,9 @@ class InterpreterDAG : public IInterpreter BlockIO execute() override; private: - BlockInputStreams executeQueryBlock(DAGQueryBlock & query_block, std::vector & subqueries_for_sets); + BlockInputStreams executeQueryBlock(DAGQueryBlock & query_block); - DAGContext & dagContext() const { return *context.getDAGContext(); } + DAGContext & dagContext() const; Context & context; const DAGQuerySource & dag; diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.h b/dbms/src/Interpreters/ExpressionAnalyzer.h index fb8bea20a8a..3558b0ffc90 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.h +++ b/dbms/src/Interpreters/ExpressionAnalyzer.h @@ -17,6 +17,7 @@ #include #include #include +#include namespace DB @@ -26,21 +27,8 @@ class Context; class ExpressionActions; struct ExpressionActionsChain; -class Join; -using JoinPtr = std::shared_ptr; - -class IAST; -using ASTPtr = std::shared_ptr; - -class Set; -using SetPtr = std::shared_ptr; using PreparedSets = std::unordered_map; -class IBlockInputStream; -using BlockInputStreamPtr = std::shared_ptr; - -class IStorage; -using StoragePtr = std::shared_ptr; using Tables = std::map; class ASTFunction; @@ -48,26 +36,6 @@ class ASTExpressionList; class ASTSelectQuery; -/** Information on what to do when executing a subquery in the [GLOBAL] IN/JOIN section. - */ -struct SubqueryForSet -{ - /// The source is obtained using the InterpreterSelectQuery subquery. - BlockInputStreamPtr source; - - /// If set, build it from result. - SetPtr set; - JoinPtr join; - - /// If set, put the result into the table. - /// This is a temporary table for transferring to remote servers for distributed query processing. - StoragePtr table; -}; - -/// ID of subquery -> what to do with it. -using SubqueriesForSets = std::unordered_map; - - /** Transforms an expression from a syntax tree into a sequence of actions to execute it. * * NOTE: if `ast` is a SELECT query from a table, the structure of this table should not change during the lifetime of ExpressionAnalyzer. diff --git a/dbms/src/Interpreters/SubqueryForSet.h b/dbms/src/Interpreters/SubqueryForSet.h new file mode 100644 index 00000000000..b3c45e948e1 --- /dev/null +++ b/dbms/src/Interpreters/SubqueryForSet.h @@ -0,0 +1,57 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include +#include + +namespace DB +{ +class Join; +using JoinPtr = std::shared_ptr; + +class IAST; +using ASTPtr = std::shared_ptr; + +class Set; +using SetPtr = std::shared_ptr; + +class IBlockInputStream; +using BlockInputStreamPtr = std::shared_ptr; + +class IStorage; +using StoragePtr = std::shared_ptr; + +/** Information on what to do when executing a subquery in the [GLOBAL] IN/JOIN section. + */ +struct SubqueryForSet +{ + /// The source is obtained using the InterpreterSelectQuery subquery. + BlockInputStreamPtr source; + + /// If set, build it from result. + SetPtr set; + JoinPtr join; + + /// If set, put the result into the table. + /// This is a temporary table for transferring to remote servers for distributed query processing. + StoragePtr table; +}; + +/// ID of subquery -> what to do with it. +using SubqueriesForSets = std::unordered_map; +} // namespace DB