Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

basic support for selection/limit/topn executor in InterpreterDAGRequest #150

Merged
merged 1 commit into from
Aug 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
996 changes: 604 additions & 392 deletions dbms/src/Interpreters/CoprocessorBuilderUtils.cpp

Large diffs are not rendered by default.

26 changes: 22 additions & 4 deletions dbms/src/Interpreters/CoprocessorBuilderUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,27 @@

#include <unordered_map>

namespace DB {
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <tipb/select.pb.h>
#pragma GCC diagnostic pop

extern std::unordered_map<tipb::ExprType, String> aggFunMap;
extern std::unordered_map<tipb::ScalarFuncSig, String> scalarFunMap;
#include <Storages/Transaction/Types.h>

}
namespace DB
{

bool isLiteralExpr(const tipb::Expr & expr);
Field decodeLiteral(const tipb::Expr & expr);
bool isFunctionExpr(const tipb::Expr & expr);
bool isAggFunctionExpr(const tipb::Expr & expr);
const String & getFunctionName(const tipb::Expr & expr);
bool isColumnExpr(const tipb::Expr & expr);
ColumnID getColumnID(const tipb::Expr & expr);
String getName(const tipb::Expr & expr, const NamesAndTypesList & current_input_columns);
const String & getTypeName(const tipb::Expr & expr);
String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col);
extern std::unordered_map<tipb::ExprType, String> aggFunMap;
extern std::unordered_map<tipb::ScalarFuncSig, String> scalarFunMap;

} // namespace DB
171 changes: 171 additions & 0 deletions dbms/src/Interpreters/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@

#include <DataTypes/FieldToDataType.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/CoprocessorBuilderUtils.h>
#include <Interpreters/DAGExpressionAnalyzer.h>
#include <Interpreters/convertFieldToType.h>
#include <Storages/Transaction/Codec.h>
#include <Storages/Transaction/TypeMapping.h>

namespace DB
{
DAGExpressionAnalyzer::DAGExpressionAnalyzer(const NamesAndTypesList & source_columns_, const Context & context_)
: source_columns(source_columns_), context(context_)
{
settings = context.getSettings();
}

bool DAGExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, const tipb::Selection & sel, String & filter_column_name)
{
if (sel.conditions_size() == 0)
{
return false;
}
tipb::Expr final_condition;
if (sel.conditions_size() > 1)
{
final_condition.set_tp(tipb::ExprType::ScalarFunc);
final_condition.set_sig(tipb::ScalarFuncSig::LogicalAnd);

for (auto & condition : sel.conditions())
{
auto c = final_condition.add_children();
c->ParseFromString(condition.SerializeAsString());
}
}

const tipb::Expr & filter = sel.conditions_size() > 1 ? final_condition : sel.conditions(0);
initChain(chain, source_columns);
filter_column_name = getActions(filter, chain.steps.back().actions);
chain.steps.back().required_output.push_back(filter_column_name);
return true;
}

bool DAGExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names)
{
if (topN.order_by_size() == 0)
{
return false;
}
initChain(chain, aggregated_columns);
ExpressionActionsChain::Step & step = chain.steps.back();
for (const tipb::ByItem & byItem : topN.order_by())
{
String name = getActions(byItem.expr(), step.actions);
step.required_output.push_back(name);
order_column_names.push_back(name);
}
return true;
}

const NamesAndTypesList & DAGExpressionAnalyzer::getCurrentInputColumns() { return source_columns; }

String DAGExpressionAnalyzer::getActions(const tipb::Expr & expr, ExpressionActionsPtr & actions)
{
String expr_name = getName(expr, getCurrentInputColumns());
if ((isLiteralExpr(expr) || isFunctionExpr(expr)) && actions->getSampleBlock().has(expr_name))
{
return expr_name;
}
if (isLiteralExpr(expr))
{
Field value = decodeLiteral(expr);
DataTypePtr type = expr.has_field_type() ? getDataTypeByFieldType(expr.field_type()) : applyVisitor(FieldToDataType(), value);

ColumnWithTypeAndName column;
column.column = type->createColumnConst(1, convertFieldToType(value, *type));
column.name = expr_name;
column.type = type;

actions->add(ExpressionAction::addColumn(column));
return column.name;
}
else if (isColumnExpr(expr))
{
ColumnID columnId = getColumnID(expr);
if (columnId < 1 || columnId > (ColumnID)getCurrentInputColumns().size())
{
throw Exception("column id out of bound");
}
//todo check if the column type need to be cast to field type
return expr_name;
}
else if (isFunctionExpr(expr))
{
if (isAggFunctionExpr(expr))
{
throw Exception("agg function is not supported yet");
}
const String & func_name = getFunctionName(expr);
if (func_name == "in" || func_name == "notIn" || func_name == "globalIn" || func_name == "globalNotIn")
{
// todo support in
throw Exception(func_name + " is not supported yet");
}

const FunctionBuilderPtr & function_builder = FunctionFactory::instance().get(func_name, context);
Names argument_names;
DataTypes argument_types;
for (auto & child : expr.children())
{
String name = getActions(child, actions);
if (actions->getSampleBlock().has(name))
{
argument_names.push_back(name);
argument_types.push_back(actions->getSampleBlock().getByName(name).type);
}
else
{
throw Exception("Unknown expr: " + child.DebugString());
}
}

const ExpressionAction & applyFunction = ExpressionAction::applyFunction(function_builder, argument_names, expr_name);
actions->add(applyFunction);
// add cast if needed
if (expr.has_field_type())
{
DataTypePtr expected_type = getDataTypeByFieldType(expr.field_type());
DataTypePtr actual_type = applyFunction.result_type;
//todo maybe use a more decent compare method
if (expected_type->getName() != actual_type->getName())
{
// need to add cast function
// first construct the second argument
tipb::Expr type_expr;
type_expr.set_tp(tipb::ExprType::String);
std::stringstream ss;
EncodeCompactBytes(expected_type->getName(), ss);
type_expr.set_val(ss.str());
auto type_field_type = type_expr.field_type();
type_field_type.set_tp(0xfe);
type_field_type.set_flag(1);
String name = getActions(type_expr, actions);
String cast_name = "cast";
const FunctionBuilderPtr & cast_func_builder = FunctionFactory::instance().get(cast_name, context);
String cast_expr_name = cast_name + "_" + expr_name + "_" + getName(type_expr, getCurrentInputColumns());
Names cast_argument_names;
cast_argument_names.push_back(expr_name);
cast_argument_names.push_back(getName(type_expr, getCurrentInputColumns()));
const ExpressionAction & apply_cast_function
= ExpressionAction::applyFunction(cast_func_builder, argument_names, cast_expr_name);
actions->add(apply_cast_function);
return cast_expr_name;
}
else
{
return expr_name;
}
}
else
{
return expr_name;
}
}
else
{
throw Exception("Unsupported expr type: " + getTypeName(expr));
}
}
} // namespace DB
40 changes: 40 additions & 0 deletions dbms/src/Interpreters/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#pragma once

#include <Interpreters/CoprocessorBuilderUtils.h>
#include <Interpreters/ExpressionActions.h>
#include <tipb/executor.pb.h>

namespace DB
{

/** Transforms an expression from DAG expression into a sequence of actions to execute it.
*
*/
class DAGExpressionAnalyzer : private boost::noncopyable
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
// all columns from table scan
NamesAndTypesList source_columns;
// all columns after aggregation
NamesAndTypesList aggregated_columns;
Settings settings;
const Context & context;

public:
DAGExpressionAnalyzer(const NamesAndTypesList & source_columns_, const Context & context_);
bool appendWhere(ExpressionActionsChain & chain, const tipb::Selection & sel, String & filter_column_name);
bool appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names);
void initChain(ExpressionActionsChain & chain, const NamesAndTypesList & columns) const
{
if (chain.steps.empty())
{
chain.settings = settings;
chain.steps.emplace_back(std::make_shared<ExpressionActions>(columns, settings));
}
}
String getActions(const tipb::Expr & expr, ExpressionActionsPtr & actions);
const NamesAndTypesList & getCurrentInputColumns();
};

} // namespace DB
66 changes: 53 additions & 13 deletions dbms/src/Interpreters/DAGQueryInfo.cpp
Original file line number Diff line number Diff line change
@@ -1,27 +1,67 @@

#include <Parsers/ASTSelectQuery.h>
#include <Interpreters/DAGQueryInfo.h>
#include <Interpreters/InterpreterDAGRequest.h>
#include <Parsers/ASTSelectQuery.h>


namespace DB
{

DAGQueryInfo::DAGQueryInfo(const tipb::DAGRequest & dag_request_, CoprocessorContext & coprocessorContext_)
: dag_request(dag_request_), coprocessorContext(coprocessorContext_) {}
const String DAGQueryInfo::TS_NAME("tablescan");
const String DAGQueryInfo::SEL_NAME("selection");
const String DAGQueryInfo::AGG_NAME("aggregation");
const String DAGQueryInfo::TOPN_NAME("topN");
const String DAGQueryInfo::LIMIT_NAME("limit");

std::tuple<std::string, ASTPtr> DAGQueryInfo::parse(size_t ) {
query = String("cop query");
ast = std::make_shared<ASTSelectQuery>();
((ASTSelectQuery*)ast.get())->is_fake_sel = true;
return std::make_tuple(query, ast);
static void assignOrThrowException(Int32 & index, Int32 value, const String & name)
{
if (index != -1)
{
throw Exception("Duplicated " + name + " in DAG request");
}
index = value;
}

String DAGQueryInfo::get_query_ignore_error(size_t ) {
return query;
DAGQueryInfo::DAGQueryInfo(const tipb::DAGRequest & dag_request_, CoprocessorContext & coprocessorContext_)
: dag_request(dag_request_), coprocessorContext(coprocessorContext_)
{
for (int i = 0; i < dag_request.executors_size(); i++)
{
switch (dag_request.executors(i).tp())
{
case tipb::ExecType::TypeTableScan:
assignOrThrowException(ts_index, i, TS_NAME);
break;
case tipb::ExecType::TypeSelection:
assignOrThrowException(sel_index, i, SEL_NAME);
break;
case tipb::ExecType::TypeStreamAgg:
case tipb::ExecType::TypeAggregation:
assignOrThrowException(agg_index, i, AGG_NAME);
break;
case tipb::ExecType::TypeTopN:
assignOrThrowException(order_index, i, TOPN_NAME);
case tipb::ExecType::TypeLimit:
assignOrThrowException(limit_index, i, LIMIT_NAME);
break;
default:
throw Exception("Unsupported executor in DAG request: " + dag_request.executors(i).DebugString());
}
}
}

std::unique_ptr<IInterpreter> DAGQueryInfo::getInterpreter(Context & , QueryProcessingStage::Enum ) {
return std::make_unique<InterpreterDAGRequest>(coprocessorContext, dag_request);
}
std::tuple<std::string, ASTPtr> DAGQueryInfo::parse(size_t)
{
query = String("cop query");
ast = std::make_shared<ASTSelectQuery>();
((ASTSelectQuery *)ast.get())->is_fake_sel = true;
return std::make_tuple(query, ast);
}

String DAGQueryInfo::get_query_ignore_error(size_t) { return query; }

std::unique_ptr<IInterpreter> DAGQueryInfo::getInterpreter(Context &, QueryProcessingStage::Enum)
{
return std::make_unique<InterpreterDAGRequest>(coprocessorContext, *this);
}
} // namespace DB
Loading