Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refine DAGStorageInterpreter #4312

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,10 @@ void DAGQueryBlockInterpreter::handleTableScan(const TiDBTableScan & table_scan,
{
for (const auto & condition : query_block.selection->selection().conditions())
conditions.push_back(&condition);
assert(!query_block.selection_name.empty() && !conditions.empty());
}

DAGStorageInterpreter storage_interpreter(context, query_block, table_scan, conditions, max_streams);
DAGStorageInterpreter storage_interpreter(context, table_scan, query_block.selection_name, conditions, max_streams);
storage_interpreter.execute(pipeline);

analyzer = std::move(storage_interpreter.analyzer);
Expand Down Expand Up @@ -1036,7 +1037,7 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
}
else if (query_block.isTableScanSource())
{
TiDBTableScan table_scan(query_block.source, dagContext());
TiDBTableScan table_scan(query_block.source_name, query_block.source, dagContext());
handleTableScan(table_scan, pipeline);
dagContext().table_scan_executor_id = query_block.source_name;
}
Expand Down
86 changes: 49 additions & 37 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ MakeRegionQueryInfos(
if (r.key_ranges.empty())
{
throw TiFlashException(
"Income key ranges is empty for region: " + std::to_string(r.region_id),
fmt::format("Income key ranges is empty for region: {}", r.region_id),
Errors::Coprocessor::BadRequest);
}
if (region_force_retry.count(id))
Expand Down Expand Up @@ -104,14 +104,16 @@ MakeRegionQueryInfos(
if (!computeMappedTableID(*p.first, table_id_in_range) || table_id_in_range != physical_table_id)
{
throw TiFlashException(
"Income key ranges is illegal for region: " + std::to_string(r.region_id)
+ ", table id in key range is " + std::to_string(table_id_in_range) + ", table id in region is "
+ std::to_string(physical_table_id),
fmt::format(
"Income key ranges is illegal for region: {}, table id in key range is {}, table id in region is {}",
r.region_id,
table_id_in_range,
physical_table_id),
Errors::Coprocessor::BadRequest);
}
if (p.first->compare(*info.range_in_table.first) < 0 || p.second->compare(*info.range_in_table.second) > 0)
throw TiFlashException(
"Income key ranges is illegal for region: " + std::to_string(r.region_id),
fmt::format("Income key ranges is illegal for region: {}", r.region_id),
Errors::Coprocessor::BadRequest);
}
info.required_handle_ranges = r.key_ranges;
Expand All @@ -132,21 +134,25 @@ MakeRegionQueryInfos(

DAGStorageInterpreter::DAGStorageInterpreter(
Context & context_,
const DAGQueryBlock & query_block_,
const TiDBTableScan & table_scan_,
const std::vector<const tipb::Expr *> & conditions_,
const String & pushed_down_selection_name_,
const std::vector<const tipb::Expr *> & pushed_down_conditions_,
size_t max_streams_)
: context(context_)
, query_block(query_block_)
, table_scan(table_scan_)
, conditions(conditions_)
, pushed_down_selection_name(pushed_down_selection_name_)
, pushed_down_conditions(pushed_down_conditions_)
, max_streams(max_streams_)
, log(getMPPTaskLog(*context.getDAGContext(), "DAGStorageInterpreter"))
, logical_table_id(table_scan.getLogicalTableID())
, settings(context.getSettingsRef())
, tmt(context.getTMTContext())
, mvcc_query_info(new MvccQueryInfo(true, settings.read_tso))
{
if (pushed_down_selection_name.empty() != pushed_down_conditions.empty())
{
throw Exception("Both pushed_down_selection_name and pushed_down_conditions should be empty or neither should be empty");
}
}

void DAGStorageInterpreter::execute(DAGPipeline & pipeline)
Expand Down Expand Up @@ -253,7 +259,7 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead()
}
catch (DB::Exception & e)
{
e.addMessage("(while doing learner read for table, logical table_id: " + DB::toString(logical_table_id) + ")");
e.addMessage(fmt::format("(while doing learner read for table, logical table_id: {})", logical_table_id));
throw;
}
}
Expand All @@ -267,7 +273,7 @@ std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSele
/// to avoid null point exception
query_info.query = makeDummyQuery();
query_info.dag_query = std::make_unique<DAGQueryInfo>(
conditions,
pushed_down_conditions,
analyzer->getPreparedSets(),
analyzer->getCurrentInputColumns(),
context.getTimezoneInfo());
Expand Down Expand Up @@ -417,24 +423,26 @@ void DAGStorageInterpreter::doLocalRead(DAGPipeline & pipeline, size_t max_block
else
{
// Throw an exception for TiDB / TiSpark to retry
if (table_id == logical_table_id)
e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName()
+ "`, table_id: " + DB::toString(table_id) + ")");
else
e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName()
+ "`, table_id: " + DB::toString(table_id) + ", logical_table_id: " + DB::toString(logical_table_id) + ")");
e.addMessage(
fmt::format(
"(while creating InputStreams from storage `{}`.`{}`, table_id: {}{})",
storage->getDatabaseName(),
storage->getTableName(),
table_id,
table_id == logical_table_id ? "" : fmt::format(", logical_table_id: {}", logical_table_id)));
throw;
}
}
catch (DB::Exception & e)
{
/// Other unknown exceptions
if (table_id == logical_table_id)
e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName()
+ "`, table_id: " + DB::toString(table_id) + ")");
else
e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName()
+ "`, table_id: " + DB::toString(table_id) + ", logical_table_id: " + DB::toString(logical_table_id) + ")");
e.addMessage(
fmt::format(
"(while creating InputStreams from storage `{}`.`{}`, table_id: {}{})",
storage->getDatabaseName(),
storage->getTableName(),
table_id,
table_id == logical_table_id ? "" : fmt::format(", logical_table_id: {}", logical_table_id)));
throw;
}
}
Expand All @@ -450,7 +458,7 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
auto logical_table_storage = tmt.getStorages().get(logical_table_id);
if (!logical_table_storage)
{
throw TiFlashException("Table " + std::to_string(logical_table_id) + " doesn't exist.", Errors::Table::NotExists);
throw TiFlashException(fmt::format("Table {} doesn't exist.", logical_table_id), Errors::Table::NotExists);
}
storages_with_lock[logical_table_id] = {logical_table_storage, logical_table_storage->lockStructureForShare(context.getCurrentQueryId())};
if (table_scan.isPartitionTableScan())
Expand All @@ -460,7 +468,7 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
auto physical_table_storage = tmt.getStorages().get(physical_table_id);
if (!physical_table_storage)
{
throw TiFlashException("Table " + std::to_string(physical_table_id) + " doesn't exist.", Errors::Table::NotExists);
throw TiFlashException(fmt::format("Table {} doesn't exist.", physical_table_id), Errors::Table::NotExists);
}
storages_with_lock[physical_table_id] = {physical_table_storage, physical_table_storage->lockStructureForShare(context.getCurrentQueryId())};
}
Expand All @@ -479,16 +487,20 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
if (!table_store)
{
if (schema_synced)
throw TiFlashException("Table " + std::to_string(table_id) + " doesn't exist.", Errors::Table::NotExists);
throw TiFlashException(fmt::format("Table {} doesn't exist.", table_id), Errors::Table::NotExists);
else
return {{}, {}, {}, false};
}

if (table_store->engineType() != ::TiDB::StorageEngine::TMT && table_store->engineType() != ::TiDB::StorageEngine::DT)
{
throw TiFlashException("Specifying schema_version for non-managed storage: " + table_store->getName()
+ ", table: " + table_store->getTableName() + ", id: " + DB::toString(table_id) + " is not allowed",
Errors::Coprocessor::Internal);
throw TiFlashException(
fmt::format(
"Specifying schema_version for non-managed storage: {}, table: {}, id: {} is not allowed",
table_store->getName(),
table_store->getTableName(),
table_id),
Errors::Coprocessor::Internal);
}

auto lock = table_store->lockStructureForShare(context.getCurrentQueryId());
Expand All @@ -502,9 +514,9 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
auto storage_schema_version = table_store->getTableInfo().schema_version;
// Not allow storage > query in any case, one example is time travel queries.
if (storage_schema_version > query_schema_version)
throw TiFlashException("Table " + std::to_string(table_id) + " schema version " + std::to_string(storage_schema_version)
+ " newer than query schema version " + std::to_string(query_schema_version),
Errors::Table::SchemaVersionError);
throw TiFlashException(
fmt::format("Table {} schema version {} newer than query schema version {}", table_id, storage_schema_version, query_schema_version),
Errors::Table::SchemaVersionError);
// From now on we have storage <= query.
// If schema was synced, it implies that global >= query, as mentioned above we have storage <= query, we are OK to serve.
if (schema_synced)
Expand Down Expand Up @@ -605,10 +617,9 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageIn
// todo handle alias column
if (max_columns_to_read && table_scan.getColumnSize() > max_columns_to_read)
{
throw TiFlashException("Limit for number of columns to read exceeded. "
"Requested: "
+ toString(table_scan.getColumnSize()) + ", maximum: " + toString(max_columns_to_read),
Errors::BroadcastJoin::TooManyColumns);
throw TiFlashException(
fmt::format("Limit for number of columns to read exceeded. Requested: {}, maximum: {}", table_scan.getColumnSize(), max_columns_to_read),
Errors::BroadcastJoin::TooManyColumns);
}

Names required_columns_tmp;
Expand Down Expand Up @@ -687,7 +698,8 @@ void DAGStorageInterpreter::buildRemoteRequests()
*context.getDAGContext(),
table_scan,
storages_with_structure_lock[physical_table_id].storage->getTableInfo(),
query_block.selection,
pushed_down_selection_name,
pushed_down_conditions,
log));
}
}
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ class DAGStorageInterpreter
public:
DAGStorageInterpreter(
Context & context_,
const DAGQueryBlock & query_block_,
const TiDBTableScan & table_scan,
const std::vector<const tipb::Expr *> & conditions_,
const String & pushed_down_selection_name_,
const std::vector<const tipb::Expr *> & pushed_down_conditions_,
size_t max_streams_);

DAGStorageInterpreter(DAGStorageInterpreter &&) = delete;
Expand Down Expand Up @@ -95,9 +95,9 @@ class DAGStorageInterpreter
/// passed from caller, doesn't change during DAGStorageInterpreter's lifetime

Context & context;
const DAGQueryBlock & query_block;
const TiDBTableScan & table_scan;
const std::vector<const tipb::Expr *> & conditions;
const String & pushed_down_selection_name;
const std::vector<const tipb::Expr *> & pushed_down_conditions;
size_t max_streams;
LogWithPrefixPtr log;

Expand Down
11 changes: 6 additions & 5 deletions dbms/src/Flash/Coprocessor/RemoteRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

namespace DB
{
RemoteRequest RemoteRequest::build(const RegionRetryList & retry_regions, DAGContext & dag_context, const TiDBTableScan & table_scan, const TiDB::TableInfo & table_info, const tipb::Executor * selection, LogWithPrefixPtr & log)
RemoteRequest RemoteRequest::build(const RegionRetryList & retry_regions, DAGContext & dag_context, const TiDBTableScan & table_scan, const TiDB::TableInfo & table_info, const String & selection_name, const std::vector<const tipb::Expr *> & conditions, LogWithPrefixPtr & log)
{
auto print_retry_regions = [&retry_regions, &table_info] {
FmtBuffer buffer;
Expand All @@ -36,13 +36,14 @@ RemoteRequest RemoteRequest::build(const RegionRetryList & retry_regions, DAGCon
DAGSchema schema;
tipb::DAGRequest dag_req;
auto * executor = dag_req.mutable_root_executor();
if (selection != nullptr)
if (!conditions.empty())
{
assert(!selection_name.empty());
executor->set_tp(tipb::ExecType::TypeSelection);
executor->set_executor_id(selection->executor_id());
executor->set_executor_id(selection_name);
auto * new_selection = executor->mutable_selection();
for (const auto & condition : selection->selection().conditions())
*new_selection->add_conditions() = condition;
for (const auto & condition : conditions)
*new_selection->add_conditions() = *condition;
executor = new_selection->mutable_child();
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/RemoteRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ struct RemoteRequest
DAGSchema schema;
/// the sorted key ranges
std::vector<pingcap::coprocessor::KeyRange> key_ranges;
static RemoteRequest build(const RegionRetryList & retry_regions, DAGContext & dag_context, const TiDBTableScan & table_scan, const TiDB::TableInfo & table_info, const tipb::Executor * selection, LogWithPrefixPtr & log);
static RemoteRequest build(const RegionRetryList & retry_regions, DAGContext & dag_context, const TiDBTableScan & table_scan, const TiDB::TableInfo & table_info, const String & selection_name, const std::vector<const tipb::Expr *> & conditions, LogWithPrefixPtr & log);
};
} // namespace DB
5 changes: 3 additions & 2 deletions dbms/src/Flash/Coprocessor/TiDBTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

namespace DB
{
TiDBTableScan::TiDBTableScan(const tipb::Executor * table_scan_, const DAGContext & dag_context)
: table_scan(table_scan_)
TiDBTableScan::TiDBTableScan(const String & table_scan_name_, const tipb::Executor * table_scan_, const DAGContext & dag_context)
: table_scan_name(table_scan_name_)
, table_scan(table_scan_)
, is_partition_table_scan(table_scan->tp() == tipb::TypePartitionTableScan)
, columns(is_partition_table_scan ? table_scan->partition_table_scan().columns() : table_scan->tbl_scan().columns())
{
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Flash/Coprocessor/TiDBTableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace DB
class TiDBTableScan
{
public:
TiDBTableScan(const tipb::Executor * table_scan_, const DAGContext & dag_context);
TiDBTableScan(const String & table_scan_name_, const tipb::Executor * table_scan_, const DAGContext & dag_context);
bool isPartitionTableScan() const
{
return is_partition_table_scan;
Expand All @@ -48,10 +48,11 @@ class TiDBTableScan
}
String getTableScanExecutorID() const
{
return table_scan->executor_id();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table_scan->executor_id() may be empty

return table_scan_name;
}

private:
const String table_scan_name;
const tipb::Executor * table_scan;
bool is_partition_table_scan;
const google::protobuf::RepeatedPtrField<tipb::ColumnInfo> & columns;
Expand Down