diff --git a/dbms/src/Flash/Mpp/MPPReceiverSet.h b/dbms/src/Flash/Mpp/MPPReceiverSet.h new file mode 100644 index 00000000000..5d36254db1b --- /dev/null +++ b/dbms/src/Flash/Mpp/MPPReceiverSet.h @@ -0,0 +1,58 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB +{ +class CoprocessorReader; +using CoprocessorReaderPtr = std::shared_ptr; +class ExchangeReceiver; +using ExchangeReceiverPtr = std::shared_ptr; +using ExchangeReceiverMap = std::unordered_map; + +class MPPReceiverSet +{ +public: + explicit MPPReceiverSet(const String & req_id) + : log(Logger::get(req_id)) + {} + ~MPPReceiverSet() + { + /// close will close every receiver's internal MPMC queue and avoid blocking risk in waitAllConnectionsDone + close(); + } + void addExchangeReceiver(const String & executor_id, const ExchangeReceiverPtr & exchange_receiver); + void addCoprocessorReader(const CoprocessorReaderPtr & coprocessor_reader); + ExchangeReceiverPtr getExchangeReceiver(const String & executor_id) const; + void cancel(); + void close(); + + int getExternalThreadCnt() const { return external_thread_cnt; } + +private: + /// two kinds of receiver in MPP + /// ExchangeReceiver: receiver data from other MPPTask + /// CoprocessorReader: used in remote read + ExchangeReceiverMap exchange_receiver_map; + std::vector coprocessor_readers; + const LoggerPtr log; + int external_thread_cnt = 0; +}; + +using MPPReceiverSetPtr = std::shared_ptr; + +} // namespace DB diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index a1023497289..12a5b4ef358 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -106,7 +106,97 @@ void MPPTask::registerTunnel(const MPPTaskId & id, MPPTunnelPtr tunnel) if (tunnel_map.find(id) != tunnel_map.end()) throw Exception("the tunnel " + tunnel->id() + " has been registered"); +<<<<<<< HEAD tunnel_map[id] = tunnel; +======= + /// when the receiver task is root task, it should never be local tunnel + bool is_local = context->getSettingsRef().enable_local_tunnel && task_meta.task_id() != -1 + && meta.address() == task_meta.address(); + bool is_async = !is_local && context->getSettingsRef().enable_async_server; + MPPTunnelPtr tunnel = std::make_shared( + task_meta, + task_request.meta(), + timeout, + queue_limit, + is_local, + is_async, + log->identifier()); + + LOG_DEBUG(log, "begin to register the tunnel {}, is_local: {}, is_async: {}", tunnel->id(), is_local, is_async); + + if (status != INITIALIZING) + throw Exception(fmt::format( + "The tunnel {} can not be registered, because the task is not in initializing state", + tunnel->id())); + + MPPTaskId task_id(task_meta); + RUNTIME_CHECK_MSG( + id.gather_id.gather_id == task_id.gather_id.gather_id, + "MPP query has different gather id, should be something wrong in TiDB side"); + tunnel_set_local->registerTunnel(task_id, tunnel); + injectFailPointDuringRegisterTunnel(dag_context->isRootMPPTask()); + } + { + std::unique_lock lock(mtx); + if (status != INITIALIZING) + throw Exception( + fmt::format("The tunnels can not be registered, because the task is not in initializing state")); + tunnel_set = std::move(tunnel_set_local); + } + dag_context->tunnel_set = tunnel_set; +} + +void MPPTask::initExchangeReceivers() +{ + auto receiver_set_local = std::make_shared(log->identifier()); + try + { + dag_context->dag_request.traverse([&](const tipb::Executor & executor) { + if (executor.tp() == tipb::ExecType::TypeExchangeReceiver) + { + assert(executor.has_executor_id()); + const auto & executor_id = executor.executor_id(); + // In order to distinguish different exchange receivers. + auto exchange_receiver = std::make_shared( + std::make_shared( + executor.exchange_receiver(), + dag_context->getMPPTaskMeta(), + context->getTMTContext().getKVCluster(), + context->getTMTContext().getMPPTaskManager(), + context->getSettingsRef().enable_local_tunnel, + context->getSettingsRef().enable_async_grpc_client), + executor.exchange_receiver().encoded_task_meta_size(), + context->getMaxStreams(), + log->identifier(), + executor_id, + executor.fine_grained_shuffle_stream_count(), + context->getSettingsRef()); + + receiver_set_local->addExchangeReceiver(executor_id, exchange_receiver); + + if (status != RUNNING) + throw Exception( + "exchange receiver map can not be initialized, because the task is not in running state"); + } + return true; + }); + } + catch (...) + { + std::lock_guard lock(mtx); + if (status != RUNNING) + throw Exception("exchange receiver map can not be initialized, because the task is not in running state"); + receiver_set = std::move(receiver_set_local); + throw; + } + { + std::lock_guard lock(mtx); + if (status != RUNNING) + throw Exception("exchange receiver map can not be initialized, because the task is not in running state"); + receiver_set = std::move(receiver_set_local); + } + dag_context->setMPPReceiverSet(receiver_set); +>>>>>>> e5fedf703c (Fix potential receiver blocked in deconstruction (#8020)) } std::pair MPPTask::getTunnel(const ::mpp::EstablishMPPConnectionRequest * request)