Skip to content

Commit

Permalink
Refine GetResultSink and use QueryExecutor for cop/batchCop (#6830)
Browse files Browse the repository at this point in the history
ref #6518
  • Loading branch information
SeaRise authored Feb 20, 2023
1 parent 9b66b50 commit ff5b4eb
Show file tree
Hide file tree
Showing 44 changed files with 377 additions and 376 deletions.
48 changes: 0 additions & 48 deletions dbms/src/Flash/Coprocessor/DAGBlockOutputStream.h

This file was deleted.

102 changes: 102 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,98 @@ bool strictSqlMode(UInt64 sql_mode)
return sql_mode & TiDBSQLMode::STRICT_ALL_TABLES || sql_mode & TiDBSQLMode::STRICT_TRANS_TABLES;
}

// for non-mpp(cop/batchCop)
DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
, tidb_host(tidb_host_)
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
, is_mpp_task(false)
, is_root_mpp_task(false)
, is_batch_cop(is_batch_cop_)
, tables_regions_info(std::move(tables_regions_info_))
, log(std::move(log_))
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
{
RUNTIME_CHECK((dag_request->executors_size() > 0) != dag_request->has_root_executor());
const auto & root_executor = dag_request->has_root_executor()
? dag_request->root_executor()
: dag_request->executors(dag_request->executors_size() - 1);
return_executor_id = root_executor.has_executor_id();
if (return_executor_id)
root_executor_id = root_executor.executor_id();
initOutputInfo();
}

// for mpp
DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
, return_executor_id(true)
, is_mpp_task(true)
, is_root_mpp_task(is_root_mpp_task_)
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, mpp_task_meta(meta_)
, mpp_task_id(mpp_task_meta)
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
{
RUNTIME_CHECK(dag_request->has_root_executor() && dag_request->root_executor().has_executor_id());
root_executor_id = dag_request->root_executor().executor_id();
// only mpp task has join executor.
initExecutorIdToJoinIdMap();
initOutputInfo();
}

// for test
DAGContext::DAGContext(UInt64 max_error_count_)
: dag_request(nullptr)
, dummy_ast(makeDummyQuery())
, collect_execution_summaries(false)
, is_mpp_task(false)
, is_root_mpp_task(false)
, flags(0)
, sql_mode(0)
, max_recorded_error_count(max_error_count_)
, warnings(max_recorded_error_count)
, warning_count(0)
{}

// for tests need to run query tasks.
DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, String log_identifier, size_t concurrency)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
, initialize_concurrency(concurrency)
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
, is_mpp_task(false)
, is_root_mpp_task(false)
, log(Logger::get(log_identifier))
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
{
RUNTIME_CHECK((dag_request->executors_size() > 0) != dag_request->has_root_executor());
const auto & root_executor = dag_request->has_root_executor()
? dag_request->root_executor()
: dag_request->executors(dag_request->executors_size() - 1);
return_executor_id = root_executor.has_executor_id();
if (return_executor_id)
root_executor_id = root_executor.executor_id();
initOutputInfo();
}

void DAGContext::initOutputInfo()
{
output_field_types = collectOutputFieldTypes(*dag_request);
Expand All @@ -53,6 +145,16 @@ void DAGContext::initOutputInfo()
keep_session_timezone_info = encode_type == tipb::EncodeType::TypeChunk || encode_type == tipb::EncodeType::TypeCHBlock;
}

String DAGContext::getRootExecutorId()
{
// If return_executor_id is false, we can get the generated executor_id from list_based_executors_order.
return return_executor_id
? root_executor_id
: (list_based_executors_order.empty()
? ""
: list_based_executors_order.back());
}

bool DAGContext::allowZeroInDate() const
{
return flags & TiDBSQLFlags::IGNORE_ZERO_IN_DATE;
Expand Down
82 changes: 7 additions & 75 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,87 +127,16 @@ class DAGContext
{
public:
// for non-mpp(cop/batchCop)
explicit DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
, tidb_host(tidb_host_)
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
, is_mpp_task(false)
, is_root_mpp_task(false)
, is_batch_cop(is_batch_cop_)
, tables_regions_info(std::move(tables_regions_info_))
, log(std::move(log_))
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
{
assert(dag_request->has_root_executor() || dag_request->executors_size() > 0);
return_executor_id = dag_request->root_executor().has_executor_id() || dag_request->executors(0).has_executor_id();

initOutputInfo();
}
DAGContext(const tipb::DAGRequest & dag_request_, TablesRegionsInfo && tables_regions_info_, const String & tidb_host_, bool is_batch_cop_, LoggerPtr log_);

// for mpp
DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
, return_executor_id(true)
, is_mpp_task(true)
, is_root_mpp_task(is_root_mpp_task_)
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, mpp_task_meta(meta_)
, mpp_task_id(mpp_task_meta)
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
{
assert(dag_request->has_root_executor() && dag_request->root_executor().has_executor_id());
// only mpp task has join executor.
initExecutorIdToJoinIdMap();
initOutputInfo();
}
DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_);

// for test
explicit DAGContext(UInt64 max_error_count_)
: dag_request(nullptr)
, dummy_ast(makeDummyQuery())
, collect_execution_summaries(false)
, is_mpp_task(false)
, is_root_mpp_task(false)
, flags(0)
, sql_mode(0)
, max_recorded_error_count(max_error_count_)
, warnings(max_recorded_error_count)
, warning_count(0)
{}
explicit DAGContext(UInt64 max_error_count_);

// for tests need to run query tasks.
explicit DAGContext(const tipb::DAGRequest & dag_request_, String log_identifier, size_t concurrency)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
, initialize_concurrency(concurrency)
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
, is_mpp_task(false)
, is_root_mpp_task(false)
, log(Logger::get(log_identifier))
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
{
assert(dag_request->has_root_executor() || dag_request->executors_size() > 0);
return_executor_id = dag_request->root_executor().has_executor_id() || dag_request->executors(0).has_executor_id();

initOutputInfo();
}
DAGContext(const tipb::DAGRequest & dag_request_, String log_identifier, size_t concurrency);

std::unordered_map<String, BlockInputStreams> & getProfileStreamsMap();

Expand Down Expand Up @@ -335,6 +264,8 @@ class DAGContext

void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }

String getRootExecutorId();

const tipb::DAGRequest * dag_request;
/// Some existing code inherited from Clickhouse assume that each query must have a valid query string and query ast,
/// dummy_query_string and dummy_ast is used for that
Expand All @@ -350,6 +281,7 @@ class DAGContext
String tidb_host = "Unknown";
bool collect_execution_summaries{};
bool return_executor_id{};
String root_executor_id = "";
/* const */ bool is_mpp_task = false;
/* const */ bool is_root_mpp_task = false;
/* const */ bool is_batch_cop = false;
Expand Down
53 changes: 21 additions & 32 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
#include <DataStreams/BlockIO.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <Flash/Coprocessor/DAGBlockOutputStream.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/ExecutionSummaryCollector.h>
#include <Flash/Coprocessor/StreamWriter.h>
#include <Flash/Coprocessor/StreamingDAGResponseWriter.h>
#include <Flash/Coprocessor/UnaryDAGResponseWriter.h>
#include <Flash/Executor/toRU.h>
#include <Flash/executeQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
Expand Down Expand Up @@ -93,10 +91,9 @@ try
auto start_time = Clock::now();
DAGContext & dag_context = *context.getDAGContext();

// TODO use query executor for cop/batch cop.
BlockIO streams = executeAsBlockIO(context, internal);
if (!streams.in || streams.out)
// Only query is allowed, so streams.in must not be null and streams.out must be null
auto query_executor = queryExecute(context, internal);
if (!query_executor)
// Only query is allowed, so query_executor must not be null
throw TiFlashException("DAG is not query.", Errors::Coprocessor::Internal);

auto end_time = Clock::now();
Expand All @@ -107,12 +104,14 @@ try
BlockOutputStreamPtr dag_output_stream = nullptr;
if constexpr (!batch)
{
std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<UnaryDAGResponseWriter>(
auto response_writer = std::make_unique<UnaryDAGResponseWriter>(
dag_response,
context.getSettingsRef().dag_records_per_chunk,
dag_context);
dag_output_stream = std::make_shared<DAGBlockOutputStream>(streams.in->getHeader(), std::move(response_writer));
copyData(*streams.in, *dag_output_stream);
response_writer->prepare(query_executor->getSampleBlock());
query_executor->execute([&response_writer](const Block & block) { response_writer->write(block); }).verify();
response_writer->flush();

if (dag_context.collect_execution_summaries)
{
ExecutionSummaryCollector summary_collector(dag_context);
Expand All @@ -136,14 +135,15 @@ try

auto streaming_writer = std::make_shared<StreamWriter>(writer);
TiDB::TiDBCollators collators;

std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<StreamWriterPtr>>(
auto response_writer = std::make_unique<StreamingDAGResponseWriter<StreamWriterPtr>>(
streaming_writer,
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
dag_context);
dag_output_stream = std::make_shared<DAGBlockOutputStream>(streams.in->getHeader(), std::move(response_writer));
copyData(*streams.in, *dag_output_stream);
response_writer->prepare(query_executor->getSampleBlock());
query_executor->execute([&response_writer](const Block & block) { response_writer->write(block); }).verify();
response_writer->flush();

if (dag_context.collect_execution_summaries)
{
ExecutionSummaryCollector summary_collector(dag_context);
Expand All @@ -152,7 +152,7 @@ try
}
}

auto ru = toRU(streams.in->estimateCPUTimeNs());
auto ru = query_executor->collectRequestUnit();
if constexpr (!batch)
{
LOG_INFO(log, "cop finish with request unit: {}", ru);
Expand Down Expand Up @@ -181,24 +181,13 @@ try
}
}

if (auto * p_stream = dynamic_cast<IProfilingBlockInputStream *>(streams.in.get()))
{
LOG_DEBUG(
log,
"dag request without encode cost: {} seconds, produce {} rows, {} bytes.",
p_stream->getProfileInfo().execution_time / (double)1000000000,
p_stream->getProfileInfo().rows,
p_stream->getProfileInfo().bytes);

if constexpr (!batch)
{
// Under some test cases, there may be dag response whose size is bigger than INT_MAX, and GRPC can not limit it.
// Throw exception to prevent receiver from getting wrong response.
if (accurate::greaterOp(p_stream->getProfileInfo().bytes, std::numeric_limits<int>::max()))
throw TiFlashException("DAG response is too big, please check config about region size or region merge scheduler",
Errors::Coprocessor::Internal);
}
}
auto runtime_statistics = query_executor->getRuntimeStatistics();
LOG_DEBUG(
log,
"dag request without encode cost: {} seconds, produce {} rows, {} bytes.",
runtime_statistics.execution_time_ns / static_cast<double>(1000000000),
runtime_statistics.rows,
runtime_statistics.bytes);
}
catch (const RegionException & e)
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ DAGResponseWriter::DAGResponseWriter(
&& dag_context.encode_type != tipb::EncodeType::TypeDefault)
{
throw TiFlashException(
"Only Default/Arrow/CHBlock encode type is supported in DAGBlockOutputStream.",
"Only Default/Arrow/CHBlock encode type is supported in DAGResponseWriter.",
Errors::Coprocessor::Unimplemented);
}
}
Expand Down
Loading

0 comments on commit ff5b4eb

Please sign in to comment.