Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] refine handle table scan #4674

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <Flash/Coprocessor/DAGCodec.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/DAGStorageInterpreter.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Coprocessor/StreamingDAGResponseWriter.h>
Expand Down Expand Up @@ -239,10 +240,10 @@ ExpressionActionsPtr generateProjectExpressionActions(
return project;
}

void DAGQueryBlockInterpreter::handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline)
void DAGQueryBlockInterpreter::handleTableScan(TiDBStorageTable & storage_table, DAGPipeline & pipeline)
{
bool has_region_to_read = false;
for (const auto physical_table_id : table_scan.getPhysicalTableIDs())
for (const auto physical_table_id : storage_table.getTiDBTableScan().getPhysicalTableIDs())
{
const auto & table_regions_info = dagContext().getTableRegionsInfoByTableID(physical_table_id);
if (!table_regions_info.local_regions.empty() || !table_regions_info.remote_regions.empty())
Expand All @@ -252,7 +253,7 @@ void DAGQueryBlockInterpreter::handleTableScan(const TiDBTableScan & table_scan,
}
}
if (!has_region_to_read)
throw TiFlashException(fmt::format("Dag Request does not have region to read for table: {}", table_scan.getLogicalTableID()), Errors::Coprocessor::BadRequest);
throw TiFlashException(fmt::format("Dag Request does not have region to read for table: {}", storage_table.getTiDBTableScan().getLogicalTableID()), Errors::Coprocessor::BadRequest);
// construct pushed down filter conditions.
std::vector<const tipb::Expr *> conditions;
if (query_block.selection)
Expand All @@ -261,11 +262,13 @@ void DAGQueryBlockInterpreter::handleTableScan(const TiDBTableScan & table_scan,
conditions.push_back(&condition);
}

DAGStorageInterpreter storage_interpreter(context, query_block, table_scan, conditions, max_streams);
DAGStorageInterpreter storage_interpreter(context, storage_table, query_block.selection_name, conditions, max_streams);
storage_interpreter.execute(pipeline);

analyzer = std::move(storage_interpreter.analyzer);
storage_table.getTiDBReadSnapshot().releaseLearnerReadSnapshot();
storage_table.releaseAlterLocks();

analyzer = std::move(storage_interpreter.analyzer);

auto remote_requests = std::move(storage_interpreter.remote_requests);
auto null_stream_if_empty = std::move(storage_interpreter.null_stream_if_empty);
Expand All @@ -290,14 +293,12 @@ void DAGQueryBlockInterpreter::handleTableScan(const TiDBTableScan & table_scan,
remote_read_streams_start_index = 1;
}

/// Theoretically we could move addTableLock to DAGStorageInterpreter, but we don't wants to the table to be dropped
/// during the lifetime of this query, and sometimes if there is no local region, we will use the RemoteBlockInputStream
/// or even the null_stream to hold the lock, so I would like too keep the addTableLock in DAGQueryBlockInterpreter
pipeline.transform([&](auto & stream) {
// todo do not need to hold all locks in each stream, if the stream is reading from table a
// it only needs to hold the lock of table a
for (auto & lock : storage_interpreter.drop_locks)
stream->addTableLock(lock);
// todo do not need to hold all locks in each stream, if the stream is reading from table a
// it only needs to hold the lock of table a
storage_table.moveDropLocks([&pipeline](const auto & drop_lock) {
pipeline.transform([&drop_lock](const auto & stream) {
stream->addTableLock(drop_lock);
});
});

/// Set the limits and quota for reading data, the speed and time of the query.
Expand All @@ -306,7 +307,7 @@ void DAGQueryBlockInterpreter::handleTableScan(const TiDBTableScan & table_scan,

/// handle timezone/duration cast for local and remote table scan.
executeCastAfterTableScan(
table_scan,
storage_table.getTiDBTableScan(),
storage_interpreter.is_need_add_cast_column,
remote_read_streams_start_index,
pipeline);
Expand Down Expand Up @@ -1035,8 +1036,8 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
}
else if (query_block.isTableScanSource())
{
TiDBTableScan table_scan(query_block.source, dagContext());
handleTableScan(table_scan, pipeline);
TiDBStorageTable storage_table(query_block.source, query_block.source_name, context, log->identifier());
handleTableScan(storage_table, pipeline);
dagContext().table_scan_executor_id = query_block.source_name;
}
else
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

#include <DataStreams/BlockIO.h>
#include <Flash/Coprocessor/ChunkCodec.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/DAGStorageInterpreter.h>
#include <Flash/Coprocessor/RemoteRequest.h>
#include <Flash/Coprocessor/TiDBStorageTable.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
Expand All @@ -36,7 +38,6 @@ namespace DB
{
class DAGQueryBlock;
class ExchangeReceiver;
class DAGExpressionAnalyzer;

/** build ch plan from dag request: dag executors -> ch plan
*/
Expand All @@ -55,7 +56,7 @@ class DAGQueryBlockInterpreter

private:
void executeImpl(DAGPipeline & pipeline);
void handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline);
void handleTableScan(TiDBStorageTable & storage_table, DAGPipeline & pipeline);
void executeCastAfterTableScan(
const TiDBTableScan & table_scan,
const std::vector<ExtraCastAfterTSMode> & is_need_add_cast_column,
Expand Down
Loading