Skip to content

Commit

Permalink
FLASH-489 support key condition for coprocessor query (#261)
Browse files Browse the repository at this point in the history
* support key condition for coprocessor query

* add tests

* remove useless code

* check validation when build RPNElement for function in/notIn

* address comments

* address comments
  • Loading branch information
windtalker authored and zanmato1984 committed Sep 30, 2019
1 parent 8d2576e commit d33a278
Show file tree
Hide file tree
Showing 16 changed files with 854 additions and 604 deletions.
12 changes: 10 additions & 2 deletions dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ DatabaseID MockTiDB::newDataBase(const String & database_name)
return schema_id;
}

TableID MockTiDB::newTable(const String & database_name, const String & table_name, const ColumnsDescription & columns, Timestamp tso)
TableID MockTiDB::newTable(const String & database_name, const String & table_name,
const ColumnsDescription & columns, Timestamp tso, const String & handle_pk_name)
{
std::lock_guard lock(tables_mutex);

Expand All @@ -153,14 +154,21 @@ TableID MockTiDB::newTable(const String & database_name, const String & table_na
table_info.db_name = database_name;
table_info.id = table_id_allocator++;
table_info.name = table_name;
table_info.pk_is_handle = false;

int i = 1;
for (auto & column : columns.getAllPhysical())
{
table_info.columns.emplace_back(reverseGetColumnInfo(column, i++, Field()));
if (handle_pk_name == column.name)
{
if (!column.type->isInteger() && !column.type->isUnsignedInteger())
throw Exception("MockTiDB pk column must be integer or unsigned integer type", ErrorCodes::LOGICAL_ERROR);
table_info.columns.back().setPriKeyFlag();
table_info.pk_is_handle = true;
}
}

table_info.pk_is_handle = false;
table_info.comment = "Mocked.";
table_info.update_timestamp = tso;

Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class MockTiDB : public ext::singleton<MockTiDB>
using TablePtr = std::shared_ptr<Table>;

public:
TableID newTable(const String & database_name, const String & table_name, const ColumnsDescription & columns, Timestamp tso);
TableID newTable(const String & database_name, const String & table_name,
const ColumnsDescription & columns, Timestamp tso, const String & handle_pk_name);

DatabaseID newDataBase(const String & database_name);

Expand Down
19 changes: 11 additions & 8 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,32 +163,35 @@ void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::un
{
expr->set_sig(tipb::ScalarFuncSig::EQInt);
auto * ft = expr->mutable_field_type();
// TODO: TiDB will infer Int64.
ft->set_tp(TiDB::TypeTiny);
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagUnsigned);
}
else if (func_name_lowercase == "and")
{
expr->set_sig(tipb::ScalarFuncSig::LogicalAnd);
auto * ft = expr->mutable_field_type();
// TODO: TiDB will infer Int64.
ft->set_tp(TiDB::TypeTiny);
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagUnsigned);
}
else if (func_name_lowercase == "or")
{
expr->set_sig(tipb::ScalarFuncSig::LogicalOr);
auto * ft = expr->mutable_field_type();
// TODO: TiDB will infer Int64.
ft->set_tp(TiDB::TypeTiny);
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagUnsigned);
}
else if (func_name_lowercase == "greater")
{
expr->set_sig(tipb::ScalarFuncSig::GTInt);
auto * ft = expr->mutable_field_type();
// TODO: TiDB will infer Int64.
ft->set_tp(TiDB::TypeTiny);
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagUnsigned);
}
else if (func_name_lowercase == "greaterorequals")
{
expr->set_sig(tipb::ScalarFuncSig::GEInt);
auto *ft = expr->mutable_field_type();
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagUnsigned);
}
else
Expand Down
10 changes: 7 additions & 3 deletions dbms/src/Debug/dbgFuncMockTiDBTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@ extern const int LOGICAL_ERROR;

void MockTiDBTable::dbgFuncMockTiDBTable(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() != 3)
throw Exception("Args not matched, should be: database-name, table-name, schema-string", ErrorCodes::BAD_ARGUMENTS);
if (args.size() != 3 && args.size() != 4)
throw Exception("Args not matched, should be: database-name, table-name, schema-string [, handle_pk_name]", ErrorCodes::BAD_ARGUMENTS);

const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
const String & table_name = typeid_cast<const ASTIdentifier &>(*args[1]).name;

auto schema_str = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[2]).value);
String handle_pk_name = "";
if (args.size() == 4)
handle_pk_name = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[3]).value);

ASTPtr columns_ast;
ParserColumnDeclarationList schema_parser;
Tokens tokens(schema_str.data(), schema_str.data() + schema_str.length());
Expand All @@ -43,7 +47,7 @@ void MockTiDBTable::dbgFuncMockTiDBTable(Context & context, const ASTs & args, D
= InterpreterCreateQuery::getColumnsDescription(typeid_cast<const ASTExpressionList &>(*columns_ast), context);
auto tso = context.getTMTContext().getPDClient()->getTS();

TableID table_id = MockTiDB::instance().newTable(database_name, table_name, columns, tso);
TableID table_id = MockTiDB::instance().newTable(database_name, table_name, columns, tso, handle_pk_name);

std::stringstream ss;
ss << "mock table #" << table_id;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgTools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ Field convertField(const ColumnInfo & column_info, const Field & field)

void encodeRow(const TiDB::TableInfo & table_info, const std::vector<Field> & fields, std::stringstream & ss)
{
if (table_info.columns.size() != fields.size())
if (table_info.columns.size() != fields.size() + table_info.pk_is_handle)
throw Exception("Encoding row has different sizes between columns and values", ErrorCodes::LOGICAL_ERROR);
for (size_t i = 0; i < fields.size(); i++)
{
Expand All @@ -261,7 +261,7 @@ void insert(const TiDB::TableInfo & table_info, RegionID region_id, HandleID han
fields.emplace_back(field);
idx++;
}
if (fields.size() != table_info.columns.size())
if (fields.size() + table_info.pk_is_handle != table_info.columns.size())
throw Exception("Number of insert values and columns do not match.", ErrorCodes::LOGICAL_ERROR);

TMTContext & tmt = context.getTMTContext();
Expand Down
37 changes: 34 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include <Interpreters/Context.h>
#include <Interpreters/Set.h>
#include <Interpreters/convertFieldToType.h>
#include <Parsers/ASTIdentifier.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/Codec.h>
#include <Storages/Transaction/TypeMapping.h>

Expand Down Expand Up @@ -214,7 +216,7 @@ void constructTZExpr(tipb::Expr & tz_expr, const tipb::DAGRequest & rqst, bool f
}
}

bool hasMeaningfulTZInfo(const tipb::DAGRequest &rqst)
bool hasMeaningfulTZInfo(const tipb::DAGRequest & rqst)
{
if (rqst.has_time_zone_name() && rqst.time_zone_name().length() > 0)
return rqst.time_zone_name() != "UTC";
Expand Down Expand Up @@ -249,7 +251,7 @@ String DAGExpressionAnalyzer::appendTimeZoneCast(
// column with UTC timezone will never be used in during agg), all the column with ts datatype will
// convert back to UTC timezone
bool DAGExpressionAnalyzer::appendTimeZoneCastsAfterTS(
ExpressionActionsChain &chain, std::vector<bool> is_ts_column, const tipb::DAGRequest &rqst)
ExpressionActionsChain & chain, std::vector<bool> is_ts_column, const tipb::DAGRequest & rqst)
{
if (!hasMeaningfulTZInfo(rqst))
return false;
Expand Down Expand Up @@ -391,6 +393,35 @@ String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, Expres
return expr_name;
}

void DAGExpressionAnalyzer::makeExplicitSetForIndex(const tipb::Expr & expr, const TMTStoragePtr & storage)
{
for (auto & child : expr.children())
{
makeExplicitSetForIndex(child, storage);
}
if (expr.tp() != tipb::ExprType::ScalarFunc)
{
return;
}
const String & func_name = getFunctionName(expr);
// only support col_name in (value_list)
if (isInOrGlobalInOperator(func_name) && expr.children(0).tp() == tipb::ExprType::ColumnRef && !prepared_sets.count(&expr))
{
NamesAndTypesList column_list;
for (const auto & col : getCurrentInputColumns())
{
column_list.emplace_back(col.name, col.type);
}
ExpressionActionsPtr temp_actions = std::make_shared<ExpressionActions>(column_list, settings);
String name = getActions(expr.children(0), temp_actions);
ASTPtr name_ast = std::make_shared<ASTIdentifier>(name);
if (storage->mayBenefitFromIndexForIn(name_ast))
{
makeExplicitSet(expr, temp_actions->getSampleBlock(), true, name);
}
}
}

void DAGExpressionAnalyzer::makeExplicitSet(
const tipb::Expr & expr, const Block & sample_block, bool create_ordered_set, const String & left_arg_name)
{
Expand All @@ -400,7 +431,7 @@ void DAGExpressionAnalyzer::makeExplicitSet(
}
DataTypes set_element_types;
// todo support tuple in, i.e. (a,b) in ((1,2), (3,4)), currently TiDB convert tuple in into a series of or/and/eq exprs
// which means tuple in is never be pushed to coprocessor, but it is quite in-efficient
// which means tuple in is never be pushed to coprocessor, but it is quite in-efficient
set_element_types.push_back(sample_block.getByName(left_arg_name).type);

// todo if this is a single value in, then convert it to equal expr
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <Flash/Coprocessor/DAGUtils.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/ExpressionActions.h>
#include <Storages/Transaction/TMTStorages.h>

namespace DB
{
Expand Down Expand Up @@ -60,11 +61,12 @@ class DAGExpressionAnalyzer : private boost::noncopyable
String getActions(const tipb::Expr & expr, ExpressionActionsPtr & actions);
const std::vector<NameAndTypePair> & getCurrentInputColumns();
void makeExplicitSet(const tipb::Expr & expr, const Block & sample_block, bool create_ordered_set, const String & left_arg_name);
void makeExplicitSetForIndex(const tipb::Expr & expr, const TMTStoragePtr & storage);
String applyFunction(const String & func_name, Names & arg_names, ExpressionActionsPtr & actions);
Int32 getImplicitCastCount() { return implicit_cast_count; };
bool appendTimeZoneCastsAfterTS(ExpressionActionsChain &chain, std::vector<bool> is_ts_column,
const tipb::DAGRequest &rqst);
bool appendTimeZoneCastsAfterTS(ExpressionActionsChain & chain, std::vector<bool> is_ts_column, const tipb::DAGRequest & rqst);
String appendTimeZoneCast(const String & tz_col, const String & ts_col, const String & func_name, ExpressionActionsPtr & actions);
DAGPreparedSets getPreparedSets() { return prepared_sets; }
};

} // namespace DB
23 changes: 23 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryInfo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#pragma once

#include <unordered_map>

#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGQuerySource.h>

namespace DB
{

struct DAGQueryInfo
{
DAGQueryInfo(const DAGQuerySource & dag_, DAGPreparedSets dag_sets_, std::vector<NameAndTypePair> & source_columns_)
: dag(dag_), dag_sets(std::move(dag_sets_))
{
for (auto & c : source_columns_)
source_columns.emplace_back(c.name, c.type);
};
const DAGQuerySource & dag;
DAGPreparedSets dag_sets;
NamesAndTypesList source_columns;
};
} // namespace DB
10 changes: 10 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <DataStreams/PartialSortingBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGQueryInfo.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Interpreters/Aggregator.h>
#include <Parsers/ASTSelectQuery.h>
Expand Down Expand Up @@ -208,9 +209,18 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
if (!checkKeyRanges(dag.getKeyRanges(), table_id, storage->pkIsUInt64()))
throw Exception("Cop request only support full range scan for given region", ErrorCodes::COP_BAD_DAG_REQUEST);

if (dag.hasSelection())
{
for (auto & condition : dag.getSelection().conditions())
{
analyzer->makeExplicitSetForIndex(condition, storage);
}
}
//todo support index in
SelectQueryInfo query_info;
// set query to avoid unexpected NPE
query_info.query = dag.getAST();
query_info.dag_query = std::make_unique<DAGQueryInfo>(dag, analyzer->getPreparedSets(), source_columns);
query_info.mvcc_query_info = std::make_unique<MvccQueryInfo>();
query_info.mvcc_query_info->resolve_locks = true;
query_info.mvcc_query_info->read_tso = settings.read_tso;
Expand Down
Loading

0 comments on commit d33a278

Please sign in to comment.