Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix potential receiver blocked in deconstruction (#8020) #8025

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions dbms/src/Flash/Mpp/MPPReceiverSet.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Flash/Coprocessor/CoprocessorReader.h>

namespace DB
{
class CoprocessorReader;
using CoprocessorReaderPtr = std::shared_ptr<CoprocessorReader>;
class ExchangeReceiver;
using ExchangeReceiverPtr = std::shared_ptr<ExchangeReceiver>;
using ExchangeReceiverMap = std::unordered_map<String, ExchangeReceiverPtr>;

class MPPReceiverSet
{
public:
explicit MPPReceiverSet(const String & req_id)
: log(Logger::get(req_id))
{}
~MPPReceiverSet()
{
/// close will close every receiver's internal MPMC queue and avoid blocking risk in waitAllConnectionsDone
close();
}
void addExchangeReceiver(const String & executor_id, const ExchangeReceiverPtr & exchange_receiver);
void addCoprocessorReader(const CoprocessorReaderPtr & coprocessor_reader);
ExchangeReceiverPtr getExchangeReceiver(const String & executor_id) const;
void cancel();
void close();

int getExternalThreadCnt() const { return external_thread_cnt; }

private:
/// two kinds of receiver in MPP
/// ExchangeReceiver: receiver data from other MPPTask
/// CoprocessorReader: used in remote read
ExchangeReceiverMap exchange_receiver_map;
std::vector<CoprocessorReaderPtr> coprocessor_readers;
const LoggerPtr log;
int external_thread_cnt = 0;
};

using MPPReceiverSetPtr = std::shared_ptr<MPPReceiverSet>;

} // namespace DB
90 changes: 90 additions & 0 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,97 @@ void MPPTask::registerTunnel(const MPPTaskId & id, MPPTunnelPtr tunnel)
if (tunnel_map.find(id) != tunnel_map.end())
throw Exception("the tunnel " + tunnel->id() + " has been registered");

<<<<<<< HEAD
tunnel_map[id] = tunnel;
=======
/// when the receiver task is root task, it should never be local tunnel
bool is_local = context->getSettingsRef().enable_local_tunnel && task_meta.task_id() != -1
&& meta.address() == task_meta.address();
bool is_async = !is_local && context->getSettingsRef().enable_async_server;
MPPTunnelPtr tunnel = std::make_shared<MPPTunnel>(
task_meta,
task_request.meta(),
timeout,
queue_limit,
is_local,
is_async,
log->identifier());

LOG_DEBUG(log, "begin to register the tunnel {}, is_local: {}, is_async: {}", tunnel->id(), is_local, is_async);

if (status != INITIALIZING)
throw Exception(fmt::format(
"The tunnel {} can not be registered, because the task is not in initializing state",
tunnel->id()));

MPPTaskId task_id(task_meta);
RUNTIME_CHECK_MSG(
id.gather_id.gather_id == task_id.gather_id.gather_id,
"MPP query has different gather id, should be something wrong in TiDB side");
tunnel_set_local->registerTunnel(task_id, tunnel);
injectFailPointDuringRegisterTunnel(dag_context->isRootMPPTask());
}
{
std::unique_lock lock(mtx);
if (status != INITIALIZING)
throw Exception(
fmt::format("The tunnels can not be registered, because the task is not in initializing state"));
tunnel_set = std::move(tunnel_set_local);
}
dag_context->tunnel_set = tunnel_set;
}

void MPPTask::initExchangeReceivers()
{
auto receiver_set_local = std::make_shared<MPPReceiverSet>(log->identifier());
try
{
dag_context->dag_request.traverse([&](const tipb::Executor & executor) {
if (executor.tp() == tipb::ExecType::TypeExchangeReceiver)
{
assert(executor.has_executor_id());
const auto & executor_id = executor.executor_id();
// In order to distinguish different exchange receivers.
auto exchange_receiver = std::make_shared<ExchangeReceiver>(
std::make_shared<GRPCReceiverContext>(
executor.exchange_receiver(),
dag_context->getMPPTaskMeta(),
context->getTMTContext().getKVCluster(),
context->getTMTContext().getMPPTaskManager(),
context->getSettingsRef().enable_local_tunnel,
context->getSettingsRef().enable_async_grpc_client),
executor.exchange_receiver().encoded_task_meta_size(),
context->getMaxStreams(),
log->identifier(),
executor_id,
executor.fine_grained_shuffle_stream_count(),
context->getSettingsRef());

receiver_set_local->addExchangeReceiver(executor_id, exchange_receiver);

if (status != RUNNING)
throw Exception(
"exchange receiver map can not be initialized, because the task is not in running state");
}
return true;
});
}
catch (...)
{
std::lock_guard lock(mtx);
if (status != RUNNING)
throw Exception("exchange receiver map can not be initialized, because the task is not in running state");
receiver_set = std::move(receiver_set_local);
throw;
}
{
std::lock_guard lock(mtx);
if (status != RUNNING)
throw Exception("exchange receiver map can not be initialized, because the task is not in running state");
receiver_set = std::move(receiver_set_local);
}
dag_context->setMPPReceiverSet(receiver_set);
>>>>>>> e5fedf703c (Fix potential receiver blocked in deconstruction (#8020))
}

std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConnectionRequest * request)
Expand Down