Skip to content

Commit

Permalink
Add MPPReceiverSet, which includes ExchangeReceiver and CoprocessorRe…
Browse files Browse the repository at this point in the history
…ader (#5175)

ref #5095
  • Loading branch information
windtalker authored Jun 22, 2022
1 parent bfceb28 commit e14c677
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 26 deletions.
14 changes: 11 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,20 @@ void DAGContext::attachBlockIO(const BlockIO & io_)
io = io_;
}

const std::unordered_map<String, std::shared_ptr<ExchangeReceiver>> & DAGContext::getMPPExchangeReceiverMap() const
ExchangeReceiverPtr DAGContext::getMPPExchangeReceiver(const String & executor_id) const
{
if (!isMPPTask())
throw TiFlashException("mpp_exchange_receiver_map is used in mpp only", Errors::Coprocessor::Internal);
RUNTIME_ASSERT(mpp_exchange_receiver_map != nullptr, log, "MPPTask without exchange receiver map");
return *mpp_exchange_receiver_map;
RUNTIME_ASSERT(mpp_receiver_set != nullptr, log, "MPPTask without receiver set");
return mpp_receiver_set->getExchangeReceiver(executor_id);
}

void DAGContext::addCoprocessorReader(const CoprocessorReaderPtr & coprocessor_reader)
{
if (!isMPPTask())
return;
RUNTIME_ASSERT(mpp_receiver_set != nullptr, log, "MPPTask without receiver set");
return mpp_receiver_set->addCoprocessorReader(coprocessor_reader);
}

bool DAGContext::containsRegionsInfoForTable(Int64 table_id) const
Expand Down
20 changes: 13 additions & 7 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@ namespace DB
class Context;
class MPPTunnelSet;
class ExchangeReceiver;
using ExchangeReceiverMap = std::unordered_map<String, std::shared_ptr<ExchangeReceiver>>;
using ExchangeReceiverMapPtr = std::shared_ptr<std::unordered_map<String, std::shared_ptr<ExchangeReceiver>>>;
using ExchangeReceiverPtr = std::shared_ptr<ExchangeReceiver>;
/// key: executor_id of ExchangeReceiver nodes in dag.
using ExchangeReceiverMap = std::unordered_map<String, ExchangeReceiverPtr>;
class MPPReceiverSet;
using MPPReceiverSetPtr = std::shared_ptr<MPPReceiverSet>;
class CoprocessorReader;
using CoprocessorReaderPtr = std::shared_ptr<CoprocessorReader>;

class Join;
using JoinPtr = std::shared_ptr<Join>;
Expand Down Expand Up @@ -304,11 +309,12 @@ class DAGContext

bool columnsForTestEmpty() { return columns_for_test_map.empty(); }

const std::unordered_map<String, std::shared_ptr<ExchangeReceiver>> & getMPPExchangeReceiverMap() const;
void setMPPExchangeReceiverMap(ExchangeReceiverMapPtr & exchange_receiver_map)
ExchangeReceiverPtr getMPPExchangeReceiver(const String & executor_id) const;
void setMPPReceiverSet(const MPPReceiverSetPtr & receiver_set)
{
mpp_exchange_receiver_map = exchange_receiver_map;
mpp_receiver_set = receiver_set;
}
void addCoprocessorReader(const CoprocessorReaderPtr & coprocessor_reader);

void addSubquery(const String & subquery_id, SubqueryForSet && subquery);
bool hasSubquery() const { return !subqueries.empty(); }
Expand Down Expand Up @@ -369,8 +375,8 @@ class DAGContext
ConcurrentBoundedQueue<tipb::Error> warnings;
/// warning_count is the actual warning count during the entire execution
std::atomic<UInt64> warning_count;
/// key: executor_id of ExchangeReceiver nodes in dag.
ExchangeReceiverMapPtr mpp_exchange_receiver_map;

MPPReceiverSetPtr mpp_receiver_set;
/// vector of SubqueriesForSets(such as join build subquery).
/// The order of the vector is also the order of the subquery.
std::vector<SubqueriesForSets> subqueries;
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,14 +481,14 @@ void DAGQueryBlockInterpreter::recordProfileStreams(DAGPipeline & pipeline, cons

void DAGQueryBlockInterpreter::handleExchangeReceiver(DAGPipeline & pipeline)
{
auto it = dagContext().getMPPExchangeReceiverMap().find(query_block.source_name);
if (unlikely(it == dagContext().getMPPExchangeReceiverMap().end()))
auto exchange_receiver = dagContext().getMPPExchangeReceiver(query_block.source_name);
if (unlikely(exchange_receiver == nullptr))
throw Exception("Can not find exchange receiver for " + query_block.source_name, ErrorCodes::LOGICAL_ERROR);
// todo choose a more reasonable stream number
auto & exchange_receiver_io_input_streams = dagContext().getInBoundIOInputStreamsMap()[query_block.source_name];
for (size_t i = 0; i < max_streams; ++i)
{
BlockInputStreamPtr stream = std::make_shared<ExchangeReceiverInputStream>(it->second, log->identifier(), query_block.source_name);
BlockInputStreamPtr stream = std::make_shared<ExchangeReceiverInputStream>(exchange_receiver, log->identifier(), query_block.source_name);
exchange_receiver_io_input_streams.push_back(stream);
stream = std::make_shared<SquashingBlockInputStream>(stream, 8192, 0, log->identifier());
stream->setExtraInfo("squashing after exchange receiver");
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ void DAGStorageInterpreter::buildRemoteStreams(std::vector<RemoteRequest> && rem
std::vector<pingcap::coprocessor::copTask> tasks(all_tasks.begin() + task_start, all_tasks.begin() + task_end);

auto coprocessor_reader = std::make_shared<CoprocessorReader>(schema, cluster, tasks, has_enforce_encode_type, 1);
context.getDAGContext()->addCoprocessorReader(coprocessor_reader);
BlockInputStreamPtr input = std::make_shared<CoprocessorBlockInputStream>(coprocessor_reader, log->identifier(), table_scan.getTableScanExecutorID());
pipeline.streams.push_back(input);
task_start = task_end;
Expand Down
48 changes: 48 additions & 0 deletions dbms/src/Flash/Mpp/MPPReceiverSet.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Mpp/MPPReceiverSet.h>

namespace DB
{
void MPPReceiverSet::addExchangeReceiver(const String & executor_id, const ExchangeReceiverPtr & exchange_receiver)
{
RUNTIME_ASSERT(exchange_receiver_map.find(executor_id) == exchange_receiver_map.end(), log, "Duplicate executor_id: {} in DAGRequest", executor_id);
exchange_receiver_map[executor_id] = exchange_receiver;
}

void MPPReceiverSet::addCoprocessorReader(const CoprocessorReaderPtr & coprocessor_reader)
{
coprocessor_readers.push_back(coprocessor_reader);
}

ExchangeReceiverPtr MPPReceiverSet::getExchangeReceiver(const String & executor_id) const
{
auto it = exchange_receiver_map.find(executor_id);
if (unlikely(it == exchange_receiver_map.end()))
return nullptr;
return it->second;
}

void MPPReceiverSet::cancel()
{
for (auto & it : exchange_receiver_map)
{
it.second->cancel();
}
for (auto & cop_reader : coprocessor_readers)
cop_reader->cancel();
}
} // namespace DB
44 changes: 44 additions & 0 deletions dbms/src/Flash/Mpp/MPPReceiverSet.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Flash/Coprocessor/CoprocessorReader.h>
#include <Flash/Coprocessor/DAGContext.h>

namespace DB
{
class MPPReceiverSet
{
public:
explicit MPPReceiverSet(const String & req_id)
: log(Logger::get("MPPReceiverSet", req_id))
{}
void addExchangeReceiver(const String & executor_id, const ExchangeReceiverPtr & exchange_receiver);
void addCoprocessorReader(const CoprocessorReaderPtr & coprocessor_reader);
ExchangeReceiverPtr getExchangeReceiver(const String & executor_id) const;
void cancel();

private:
/// two kinds of receiver in MPP
/// ExchangeReceiver: receiver data from other MPPTask
/// CoprocessorReader: used in remote read
ExchangeReceiverMap exchange_receiver_map;
std::vector<CoprocessorReaderPtr> coprocessor_readers;
const LoggerPtr log;
};

using MPPReceiverSetPtr = std::shared_ptr<MPPReceiverSet>;

} // namespace DB
17 changes: 7 additions & 10 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request)

void MPPTask::initExchangeReceivers()
{
mpp_exchange_receiver_map = std::make_shared<ExchangeReceiverMap>();
receiver_set = std::make_shared<MPPReceiverSet>(log->identifier());
traverseExecutors(&dag_req, [&](const tipb::Executor & executor) {
if (executor.tp() == tipb::ExecType::TypeExchangeReceiver)
{
Expand All @@ -147,22 +147,19 @@ void MPPTask::initExchangeReceivers()
if (status != RUNNING)
throw Exception("exchange receiver map can not be initialized, because the task is not in running state");

(*mpp_exchange_receiver_map)[executor_id] = exchange_receiver;
receiver_set->addExchangeReceiver(executor_id, exchange_receiver);
new_thread_count_of_exchange_receiver += exchange_receiver->computeNewThreadCount();
}
return true;
});
dag_context->setMPPExchangeReceiverMap(mpp_exchange_receiver_map);
dag_context->setMPPReceiverSet(receiver_set);
}

void MPPTask::cancelAllExchangeReceivers()
void MPPTask::cancelAllReceivers()
{
if (likely(mpp_exchange_receiver_map != nullptr))
if (likely(receiver_set != nullptr))
{
for (auto & it : *mpp_exchange_receiver_map)
{
it.second->cancel();
}
receiver_set->cancel();
}
}

Expand Down Expand Up @@ -393,7 +390,7 @@ void MPPTask::runImpl()
else
{
context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, true);
cancelAllExchangeReceivers();
cancelAllReceivers();
writeErrToAllTunnels(err_msg);
}
LOG_FMT_INFO(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds());
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Common/MemoryTracker.h>
#include <DataStreams/BlockIO.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Mpp/MPPReceiverSet.h>
#include <Flash/Mpp/MPPTaskId.h>
#include <Flash/Mpp/MPPTaskStatistics.h>
#include <Flash/Mpp/MPPTunnel.h>
Expand Down Expand Up @@ -109,7 +110,7 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

void initExchangeReceivers();

void cancelAllExchangeReceivers();
void cancelAllReceivers();

tipb::DAGRequest dag_req;

Expand All @@ -126,8 +127,8 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>
MPPTaskId id;

MPPTunnelSetPtr tunnel_set;
/// key: executor_id of ExchangeReceiver nodes in dag.
ExchangeReceiverMapPtr mpp_exchange_receiver_map;

MPPReceiverSetPtr receiver_set;

int new_thread_count_of_exchange_receiver = 0;

Expand Down

0 comments on commit e14c677

Please sign in to comment.