Skip to content

Commit

Permalink
This is an automated cherry-pick of #8020
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
yibin87 authored and ti-chi-bot committed Aug 25, 2023
1 parent 453088e commit 63d4be6
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
5 changes: 5 additions & 0 deletions dbms/src/Flash/Mpp/MPPReceiverSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ class MPPReceiverSet
explicit MPPReceiverSet(const String & req_id)
: log(Logger::get(req_id))
{}
~MPPReceiverSet()
{
/// close will close every receiver's internal MPMC queue and avoid blocking risk in waitAllConnectionsDone
close();
}
void addExchangeReceiver(const String & executor_id, const ExchangeReceiverPtr & exchange_receiver);
void addCoprocessorReader(const CoprocessorReaderPtr & coprocessor_reader);
ExchangeReceiverPtr getExchangeReceiver(const String & executor_id) const;
Expand Down
45 changes: 45 additions & 0 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request)
void MPPTask::initExchangeReceivers()
{
auto receiver_set_local = std::make_shared<MPPReceiverSet>(log->identifier());
<<<<<<< HEAD
traverseExecutors(&dag_req, [&](const tipb::Executor & executor) {
if (executor.tp() == tipb::ExecType::TypeExchangeReceiver)
{
Expand Down Expand Up @@ -185,6 +186,50 @@ void MPPTask::initExchangeReceivers()
});
{
std::unique_lock lock(tunnel_and_receiver_mu);
=======
try
{
dag_context->dag_request.traverse([&](const tipb::Executor & executor) {
if (executor.tp() == tipb::ExecType::TypeExchangeReceiver)
{
assert(executor.has_executor_id());
const auto & executor_id = executor.executor_id();
// In order to distinguish different exchange receivers.
auto exchange_receiver = std::make_shared<ExchangeReceiver>(
std::make_shared<GRPCReceiverContext>(
executor.exchange_receiver(),
dag_context->getMPPTaskMeta(),
context->getTMTContext().getKVCluster(),
context->getTMTContext().getMPPTaskManager(),
context->getSettingsRef().enable_local_tunnel,
context->getSettingsRef().enable_async_grpc_client),
executor.exchange_receiver().encoded_task_meta_size(),
context->getMaxStreams(),
log->identifier(),
executor_id,
executor.fine_grained_shuffle_stream_count(),
context->getSettingsRef());

receiver_set_local->addExchangeReceiver(executor_id, exchange_receiver);

if (status != RUNNING)
throw Exception(
"exchange receiver map can not be initialized, because the task is not in running state");
}
return true;
});
}
catch (...)
{
std::lock_guard lock(mtx);
if (status != RUNNING)
throw Exception("exchange receiver map can not be initialized, because the task is not in running state");
receiver_set = std::move(receiver_set_local);
throw;
}
{
std::lock_guard lock(mtx);
>>>>>>> e5fedf703c (Fix potential receiver blocked in deconstruction (#8020))
if (status != RUNNING)
throw Exception("exchange receiver map can not be initialized, because the task is not in running state");
receiver_set = std::move(receiver_set_local);
Expand Down

0 comments on commit 63d4be6

Please sign in to comment.