Skip to content

Commit

Permalink
basic support for selection/limit/topn executor in InterpreterDAGRequ…
Browse files Browse the repository at this point in the history
…est (#150)
  • Loading branch information
windtalker authored and zanmato1984 committed Aug 2, 2019
1 parent f516f00 commit ead9609
Show file tree
Hide file tree
Showing 13 changed files with 1,552 additions and 855 deletions.
996 changes: 604 additions & 392 deletions dbms/src/Interpreters/CoprocessorBuilderUtils.cpp

Large diffs are not rendered by default.

26 changes: 22 additions & 4 deletions dbms/src/Interpreters/CoprocessorBuilderUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,27 @@

#include <unordered_map>

namespace DB {
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <tipb/select.pb.h>
#pragma GCC diagnostic pop

extern std::unordered_map<tipb::ExprType, String> aggFunMap;
extern std::unordered_map<tipb::ScalarFuncSig, String> scalarFunMap;
#include <Storages/Transaction/Types.h>

}
namespace DB
{

bool isLiteralExpr(const tipb::Expr & expr);
Field decodeLiteral(const tipb::Expr & expr);
bool isFunctionExpr(const tipb::Expr & expr);
bool isAggFunctionExpr(const tipb::Expr & expr);
const String & getFunctionName(const tipb::Expr & expr);
bool isColumnExpr(const tipb::Expr & expr);
ColumnID getColumnID(const tipb::Expr & expr);
String getName(const tipb::Expr & expr, const NamesAndTypesList & current_input_columns);
const String & getTypeName(const tipb::Expr & expr);
String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col);
extern std::unordered_map<tipb::ExprType, String> aggFunMap;
extern std::unordered_map<tipb::ScalarFuncSig, String> scalarFunMap;

} // namespace DB
171 changes: 171 additions & 0 deletions dbms/src/Interpreters/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@

#include <DataTypes/FieldToDataType.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/CoprocessorBuilderUtils.h>
#include <Interpreters/DAGExpressionAnalyzer.h>
#include <Interpreters/convertFieldToType.h>
#include <Storages/Transaction/Codec.h>
#include <Storages/Transaction/TypeMapping.h>

namespace DB
{
DAGExpressionAnalyzer::DAGExpressionAnalyzer(const NamesAndTypesList & source_columns_, const Context & context_)
: source_columns(source_columns_), context(context_)
{
settings = context.getSettings();
}

bool DAGExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, const tipb::Selection & sel, String & filter_column_name)
{
if (sel.conditions_size() == 0)
{
return false;
}
tipb::Expr final_condition;
if (sel.conditions_size() > 1)
{
final_condition.set_tp(tipb::ExprType::ScalarFunc);
final_condition.set_sig(tipb::ScalarFuncSig::LogicalAnd);

for (auto & condition : sel.conditions())
{
auto c = final_condition.add_children();
c->ParseFromString(condition.SerializeAsString());
}
}

const tipb::Expr & filter = sel.conditions_size() > 1 ? final_condition : sel.conditions(0);
initChain(chain, source_columns);
filter_column_name = getActions(filter, chain.steps.back().actions);
chain.steps.back().required_output.push_back(filter_column_name);
return true;
}

bool DAGExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names)
{
if (topN.order_by_size() == 0)
{
return false;
}
initChain(chain, aggregated_columns);
ExpressionActionsChain::Step & step = chain.steps.back();
for (const tipb::ByItem & byItem : topN.order_by())
{
String name = getActions(byItem.expr(), step.actions);
step.required_output.push_back(name);
order_column_names.push_back(name);
}
return true;
}

const NamesAndTypesList & DAGExpressionAnalyzer::getCurrentInputColumns() { return source_columns; }

String DAGExpressionAnalyzer::getActions(const tipb::Expr & expr, ExpressionActionsPtr & actions)
{
String expr_name = getName(expr, getCurrentInputColumns());
if ((isLiteralExpr(expr) || isFunctionExpr(expr)) && actions->getSampleBlock().has(expr_name))
{
return expr_name;
}
if (isLiteralExpr(expr))
{
Field value = decodeLiteral(expr);
DataTypePtr type = expr.has_field_type() ? getDataTypeByFieldType(expr.field_type()) : applyVisitor(FieldToDataType(), value);

ColumnWithTypeAndName column;
column.column = type->createColumnConst(1, convertFieldToType(value, *type));
column.name = expr_name;
column.type = type;

actions->add(ExpressionAction::addColumn(column));
return column.name;
}
else if (isColumnExpr(expr))
{
ColumnID columnId = getColumnID(expr);
if (columnId < 1 || columnId > (ColumnID)getCurrentInputColumns().size())
{
throw Exception("column id out of bound");
}
//todo check if the column type need to be cast to field type
return expr_name;
}
else if (isFunctionExpr(expr))
{
if (isAggFunctionExpr(expr))
{
throw Exception("agg function is not supported yet");
}
const String & func_name = getFunctionName(expr);
if (func_name == "in" || func_name == "notIn" || func_name == "globalIn" || func_name == "globalNotIn")
{
// todo support in
throw Exception(func_name + " is not supported yet");
}

const FunctionBuilderPtr & function_builder = FunctionFactory::instance().get(func_name, context);
Names argument_names;
DataTypes argument_types;
for (auto & child : expr.children())
{
String name = getActions(child, actions);
if (actions->getSampleBlock().has(name))
{
argument_names.push_back(name);
argument_types.push_back(actions->getSampleBlock().getByName(name).type);
}
else
{
throw Exception("Unknown expr: " + child.DebugString());
}
}

const ExpressionAction & applyFunction = ExpressionAction::applyFunction(function_builder, argument_names, expr_name);
actions->add(applyFunction);
// add cast if needed
if (expr.has_field_type())
{
DataTypePtr expected_type = getDataTypeByFieldType(expr.field_type());
DataTypePtr actual_type = applyFunction.result_type;
//todo maybe use a more decent compare method
if (expected_type->getName() != actual_type->getName())
{
// need to add cast function
// first construct the second argument
tipb::Expr type_expr;
type_expr.set_tp(tipb::ExprType::String);
std::stringstream ss;
EncodeCompactBytes(expected_type->getName(), ss);
type_expr.set_val(ss.str());
auto type_field_type = type_expr.field_type();
type_field_type.set_tp(0xfe);
type_field_type.set_flag(1);
String name = getActions(type_expr, actions);
String cast_name = "cast";
const FunctionBuilderPtr & cast_func_builder = FunctionFactory::instance().get(cast_name, context);
String cast_expr_name = cast_name + "_" + expr_name + "_" + getName(type_expr, getCurrentInputColumns());
Names cast_argument_names;
cast_argument_names.push_back(expr_name);
cast_argument_names.push_back(getName(type_expr, getCurrentInputColumns()));
const ExpressionAction & apply_cast_function
= ExpressionAction::applyFunction(cast_func_builder, argument_names, cast_expr_name);
actions->add(apply_cast_function);
return cast_expr_name;
}
else
{
return expr_name;
}
}
else
{
return expr_name;
}
}
else
{
throw Exception("Unsupported expr type: " + getTypeName(expr));
}
}
} // namespace DB
40 changes: 40 additions & 0 deletions dbms/src/Interpreters/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#pragma once

#include <Interpreters/CoprocessorBuilderUtils.h>
#include <Interpreters/ExpressionActions.h>
#include <tipb/executor.pb.h>

namespace DB
{

/** Transforms an expression from DAG expression into a sequence of actions to execute it.
*
*/
class DAGExpressionAnalyzer : private boost::noncopyable
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
// all columns from table scan
NamesAndTypesList source_columns;
// all columns after aggregation
NamesAndTypesList aggregated_columns;
Settings settings;
const Context & context;

public:
DAGExpressionAnalyzer(const NamesAndTypesList & source_columns_, const Context & context_);
bool appendWhere(ExpressionActionsChain & chain, const tipb::Selection & sel, String & filter_column_name);
bool appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names);
void initChain(ExpressionActionsChain & chain, const NamesAndTypesList & columns) const
{
if (chain.steps.empty())
{
chain.settings = settings;
chain.steps.emplace_back(std::make_shared<ExpressionActions>(columns, settings));
}
}
String getActions(const tipb::Expr & expr, ExpressionActionsPtr & actions);
const NamesAndTypesList & getCurrentInputColumns();
};

} // namespace DB
66 changes: 53 additions & 13 deletions dbms/src/Interpreters/DAGQueryInfo.cpp
Original file line number Diff line number Diff line change
@@ -1,27 +1,67 @@

#include <Parsers/ASTSelectQuery.h>
#include <Interpreters/DAGQueryInfo.h>
#include <Interpreters/InterpreterDAGRequest.h>
#include <Parsers/ASTSelectQuery.h>


namespace DB
{

DAGQueryInfo::DAGQueryInfo(const tipb::DAGRequest & dag_request_, CoprocessorContext & coprocessorContext_)
: dag_request(dag_request_), coprocessorContext(coprocessorContext_) {}
const String DAGQueryInfo::TS_NAME("tablescan");
const String DAGQueryInfo::SEL_NAME("selection");
const String DAGQueryInfo::AGG_NAME("aggregation");
const String DAGQueryInfo::TOPN_NAME("topN");
const String DAGQueryInfo::LIMIT_NAME("limit");

std::tuple<std::string, ASTPtr> DAGQueryInfo::parse(size_t ) {
query = String("cop query");
ast = std::make_shared<ASTSelectQuery>();
((ASTSelectQuery*)ast.get())->is_fake_sel = true;
return std::make_tuple(query, ast);
static void assignOrThrowException(Int32 & index, Int32 value, const String & name)
{
if (index != -1)
{
throw Exception("Duplicated " + name + " in DAG request");
}
index = value;
}

String DAGQueryInfo::get_query_ignore_error(size_t ) {
return query;
DAGQueryInfo::DAGQueryInfo(const tipb::DAGRequest & dag_request_, CoprocessorContext & coprocessorContext_)
: dag_request(dag_request_), coprocessorContext(coprocessorContext_)
{
for (int i = 0; i < dag_request.executors_size(); i++)
{
switch (dag_request.executors(i).tp())
{
case tipb::ExecType::TypeTableScan:
assignOrThrowException(ts_index, i, TS_NAME);
break;
case tipb::ExecType::TypeSelection:
assignOrThrowException(sel_index, i, SEL_NAME);
break;
case tipb::ExecType::TypeStreamAgg:
case tipb::ExecType::TypeAggregation:
assignOrThrowException(agg_index, i, AGG_NAME);
break;
case tipb::ExecType::TypeTopN:
assignOrThrowException(order_index, i, TOPN_NAME);
case tipb::ExecType::TypeLimit:
assignOrThrowException(limit_index, i, LIMIT_NAME);
break;
default:
throw Exception("Unsupported executor in DAG request: " + dag_request.executors(i).DebugString());
}
}
}

std::unique_ptr<IInterpreter> DAGQueryInfo::getInterpreter(Context & , QueryProcessingStage::Enum ) {
return std::make_unique<InterpreterDAGRequest>(coprocessorContext, dag_request);
}
std::tuple<std::string, ASTPtr> DAGQueryInfo::parse(size_t)
{
query = String("cop query");
ast = std::make_shared<ASTSelectQuery>();
((ASTSelectQuery *)ast.get())->is_fake_sel = true;
return std::make_tuple(query, ast);
}

String DAGQueryInfo::get_query_ignore_error(size_t) { return query; }

std::unique_ptr<IInterpreter> DAGQueryInfo::getInterpreter(Context &, QueryProcessingStage::Enum)
{
return std::make_unique<InterpreterDAGRequest>(coprocessorContext, *this);
}
} // namespace DB
Loading

0 comments on commit ead9609

Please sign in to comment.