Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#8020
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
yibin87 authored and ti-chi-bot committed Aug 25, 2023
1 parent d8cb780 commit fd29812
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 0 deletions.
58 changes: 58 additions & 0 deletions dbms/src/Flash/Mpp/MPPReceiverSet.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Flash/Coprocessor/CoprocessorReader.h>

namespace DB
{
class CoprocessorReader;
using CoprocessorReaderPtr = std::shared_ptr<CoprocessorReader>;
class ExchangeReceiver;
using ExchangeReceiverPtr = std::shared_ptr<ExchangeReceiver>;
using ExchangeReceiverMap = std::unordered_map<String, ExchangeReceiverPtr>;

class MPPReceiverSet
{
public:
explicit MPPReceiverSet(const String & req_id)
: log(Logger::get(req_id))
{}
~MPPReceiverSet()
{
/// close will close every receiver's internal MPMC queue and avoid blocking risk in waitAllConnectionsDone
close();
}
void addExchangeReceiver(const String & executor_id, const ExchangeReceiverPtr & exchange_receiver);
void addCoprocessorReader(const CoprocessorReaderPtr & coprocessor_reader);
ExchangeReceiverPtr getExchangeReceiver(const String & executor_id) const;
void cancel();
void close();

int getExternalThreadCnt() const { return external_thread_cnt; }

private:
/// two kinds of receiver in MPP
/// ExchangeReceiver: receiver data from other MPPTask
/// CoprocessorReader: used in remote read
ExchangeReceiverMap exchange_receiver_map;
std::vector<CoprocessorReaderPtr> coprocessor_readers;
const LoggerPtr log;
int external_thread_cnt = 0;
};

using MPPReceiverSetPtr = std::shared_ptr<MPPReceiverSet>;

} // namespace DB
90 changes: 90 additions & 0 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,97 @@ void MPPTask::registerTunnel(const MPPTaskId & id, MPPTunnelPtr tunnel)
if (tunnel_map.find(id) != tunnel_map.end())
throw Exception("the tunnel " + tunnel->id() + " has been registered");

<<<<<<< HEAD
tunnel_map[id] = tunnel;
=======
/// when the receiver task is root task, it should never be local tunnel
bool is_local = context->getSettingsRef().enable_local_tunnel && task_meta.task_id() != -1
&& meta.address() == task_meta.address();
bool is_async = !is_local && context->getSettingsRef().enable_async_server;
MPPTunnelPtr tunnel = std::make_shared<MPPTunnel>(
task_meta,
task_request.meta(),
timeout,
queue_limit,
is_local,
is_async,
log->identifier());

LOG_DEBUG(log, "begin to register the tunnel {}, is_local: {}, is_async: {}", tunnel->id(), is_local, is_async);

if (status != INITIALIZING)
throw Exception(fmt::format(
"The tunnel {} can not be registered, because the task is not in initializing state",
tunnel->id()));

MPPTaskId task_id(task_meta);
RUNTIME_CHECK_MSG(
id.gather_id.gather_id == task_id.gather_id.gather_id,
"MPP query has different gather id, should be something wrong in TiDB side");
tunnel_set_local->registerTunnel(task_id, tunnel);
injectFailPointDuringRegisterTunnel(dag_context->isRootMPPTask());
}
{
std::unique_lock lock(mtx);
if (status != INITIALIZING)
throw Exception(
fmt::format("The tunnels can not be registered, because the task is not in initializing state"));
tunnel_set = std::move(tunnel_set_local);
}
dag_context->tunnel_set = tunnel_set;
}

void MPPTask::initExchangeReceivers()
{
auto receiver_set_local = std::make_shared<MPPReceiverSet>(log->identifier());
try
{
dag_context->dag_request.traverse([&](const tipb::Executor & executor) {
if (executor.tp() == tipb::ExecType::TypeExchangeReceiver)
{
assert(executor.has_executor_id());
const auto & executor_id = executor.executor_id();
// In order to distinguish different exchange receivers.
auto exchange_receiver = std::make_shared<ExchangeReceiver>(
std::make_shared<GRPCReceiverContext>(
executor.exchange_receiver(),
dag_context->getMPPTaskMeta(),
context->getTMTContext().getKVCluster(),
context->getTMTContext().getMPPTaskManager(),
context->getSettingsRef().enable_local_tunnel,
context->getSettingsRef().enable_async_grpc_client),
executor.exchange_receiver().encoded_task_meta_size(),
context->getMaxStreams(),
log->identifier(),
executor_id,
executor.fine_grained_shuffle_stream_count(),
context->getSettingsRef());

receiver_set_local->addExchangeReceiver(executor_id, exchange_receiver);

if (status != RUNNING)
throw Exception(
"exchange receiver map can not be initialized, because the task is not in running state");
}
return true;
});
}
catch (...)
{
std::lock_guard lock(mtx);
if (status != RUNNING)
throw Exception("exchange receiver map can not be initialized, because the task is not in running state");
receiver_set = std::move(receiver_set_local);
throw;
}
{
std::lock_guard lock(mtx);
if (status != RUNNING)
throw Exception("exchange receiver map can not be initialized, because the task is not in running state");
receiver_set = std::move(receiver_set_local);
}
dag_context->setMPPReceiverSet(receiver_set);
>>>>>>> e5fedf703c (Fix potential receiver blocked in deconstruction (#8020))
}

std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConnectionRequest * request)
Expand Down

0 comments on commit fd29812

Please sign in to comment.