From ead960966b736386c1f70ece4f4bcb156386f851 Mon Sep 17 00:00:00 2001 From: xufei Date: Fri, 2 Aug 2019 13:58:33 +0800 Subject: [PATCH] basic support for selection/limit/topn executor in InterpreterDAGRequest (#150) --- .../Interpreters/CoprocessorBuilderUtils.cpp | 996 +++++++++++------- .../Interpreters/CoprocessorBuilderUtils.h | 26 +- .../Interpreters/DAGExpressionAnalyzer.cpp | 171 +++ dbms/src/Interpreters/DAGExpressionAnalyzer.h | 40 + dbms/src/Interpreters/DAGQueryInfo.cpp | 66 +- dbms/src/Interpreters/DAGQueryInfo.h | 61 +- dbms/src/Interpreters/DAGStringConverter.cpp | 318 +++--- dbms/src/Interpreters/DAGStringConverter.h | 40 +- .../Interpreters/InterpreterDAGRequest.cpp | 502 +++++---- dbms/src/Interpreters/InterpreterDAGRequest.h | 59 +- dbms/src/Server/cop_test.cpp | 104 +- dbms/src/Storages/Transaction/TypeMapping.cpp | 17 +- dbms/src/Storages/Transaction/TypeMapping.h | 7 + 13 files changed, 1552 insertions(+), 855 deletions(-) create mode 100644 dbms/src/Interpreters/DAGExpressionAnalyzer.cpp create mode 100644 dbms/src/Interpreters/DAGExpressionAnalyzer.h diff --git a/dbms/src/Interpreters/CoprocessorBuilderUtils.cpp b/dbms/src/Interpreters/CoprocessorBuilderUtils.cpp index d8058b5c1bc..de720e0c7b4 100644 --- a/dbms/src/Interpreters/CoprocessorBuilderUtils.cpp +++ b/dbms/src/Interpreters/CoprocessorBuilderUtils.cpp @@ -1,403 +1,615 @@ #include + #include -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wunused-parameter" -#include -#pragma GCC diagnostic pop #include +#include +#include + +namespace DB +{ + +bool isFunctionExpr(const tipb::Expr & expr) +{ + switch (expr.tp()) + { + case tipb::ExprType::ScalarFunc: + case tipb::ExprType::Count: + case tipb::ExprType::Sum: + case tipb::ExprType::Avg: + case tipb::ExprType::Min: + case tipb::ExprType::Max: + case tipb::ExprType::First: + case tipb::ExprType::GroupConcat: + case tipb::ExprType::Agg_BitAnd: + case tipb::ExprType::Agg_BitOr: + case tipb::ExprType::Agg_BitXor: + case tipb::ExprType::Std: + case tipb::ExprType::Stddev: + case tipb::ExprType::StddevPop: + case tipb::ExprType::StddevSamp: + case tipb::ExprType::VarPop: + case tipb::ExprType::VarSamp: + case tipb::ExprType::Variance: + case tipb::ExprType::JsonArrayAgg: + case tipb::ExprType::JsonObjectAgg: + return true; + default: + return false; + } +} -namespace DB { +const String & getFunctionName(const tipb::Expr & expr) +{ + if (isAggFunctionExpr(expr)) + { + if (!aggFunMap.count(expr.tp())) + { + throw Exception(tipb::ExprType_Name(expr.tp()) + " is not supported."); + } + return aggFunMap[expr.tp()]; + } + else + { + if (!scalarFunMap.count(expr.sig())) + { + throw Exception(tipb::ScalarFuncSig_Name(expr.sig()) + " is not supported."); + } + return scalarFunMap[expr.sig()]; + } +} - std::unordered_map aggFunMap( +String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col) +{ + std::stringstream ss; + size_t cursor = 1; + Int64 columnId = 0; + String func_name; + Field f; + switch (expr.tp()) + { + case tipb::ExprType::Null: + return "NULL"; + case tipb::ExprType::Int64: + return std::to_string(DecodeInt(cursor, expr.val())); + case tipb::ExprType::Uint64: + return std::to_string(DecodeInt(cursor, expr.val())); + case tipb::ExprType::Float32: + case tipb::ExprType::Float64: + return std::to_string(DecodeFloat64(cursor, expr.val())); + case tipb::ExprType::String: + // + return expr.val(); + case tipb::ExprType::Bytes: + return DecodeBytes(cursor, expr.val()); + case tipb::ExprType::ColumnRef: + columnId = DecodeInt(cursor, expr.val()); + if (columnId < 1 || columnId > (ColumnID)input_col.size()) { - {tipb::ExprType::Count, "count"}, - {tipb::ExprType::Sum, "sum"}, - {tipb::ExprType::Avg, "avg"}, - {tipb::ExprType::Min, "min"}, - {tipb::ExprType::Max, "max"}, - {tipb::ExprType::First, "any"}, - //{tipb::ExprType::GroupConcat, ""}, - //{tipb::ExprType::Agg_BitAnd, ""}, - //{tipb::ExprType::Agg_BitOr, ""}, - //{tipb::ExprType::Agg_BitXor, ""}, - //{tipb::ExprType::Std, ""}, - //{tipb::ExprType::Stddev, ""}, - //{tipb::ExprType::StddevPop, ""}, - //{tipb::ExprType::StddevSamp, ""}, - //{tipb::ExprType::VarPop, ""}, - //{tipb::ExprType::VarSamp, ""}, - //{tipb::ExprType::Variance, ""}, - //{tipb::ExprType::JsonArrayAgg, ""}, - //{tipb::ExprType::JsonObjectAgg, ""}, + throw Exception("out of bound"); } - ); - - std::unordered_map scalarFunMap( + return input_col.getNames()[columnId - 1]; + case tipb::ExprType::Count: + case tipb::ExprType::Sum: + case tipb::ExprType::Avg: + case tipb::ExprType::Min: + case tipb::ExprType::Max: + case tipb::ExprType::First: + if (!aggFunMap.count(expr.tp())) + { + throw Exception("not supported"); + } + func_name = aggFunMap.find(expr.tp())->second; + break; + case tipb::ExprType::ScalarFunc: + if (!scalarFunMap.count(expr.sig())) { - {tipb::ScalarFuncSig::CastIntAsInt, "cast"}, - {tipb::ScalarFuncSig::CastIntAsReal, "cast"}, - {tipb::ScalarFuncSig::CastIntAsString, "cast"}, - {tipb::ScalarFuncSig::CastIntAsDecimal, "cast"}, - {tipb::ScalarFuncSig::CastIntAsTime, "cast"}, - {tipb::ScalarFuncSig::CastIntAsDuration, "cast"}, - {tipb::ScalarFuncSig::CastIntAsJson, "cast"}, - - {tipb::ScalarFuncSig::CastRealAsInt, "cast"}, - {tipb::ScalarFuncSig::CastRealAsReal, "cast"}, - {tipb::ScalarFuncSig::CastRealAsString, "cast"}, - {tipb::ScalarFuncSig::CastRealAsDecimal, "cast"}, - {tipb::ScalarFuncSig::CastRealAsTime, "cast"}, - {tipb::ScalarFuncSig::CastRealAsDuration, "cast"}, - {tipb::ScalarFuncSig::CastRealAsJson, "cast"}, - - {tipb::ScalarFuncSig::CastDecimalAsInt, "cast"}, - {tipb::ScalarFuncSig::CastDecimalAsReal, "cast"}, - {tipb::ScalarFuncSig::CastDecimalAsString, "cast"}, - {tipb::ScalarFuncSig::CastDecimalAsDecimal, "cast"}, - {tipb::ScalarFuncSig::CastDecimalAsTime, "cast"}, - {tipb::ScalarFuncSig::CastDecimalAsDuration, "cast"}, - {tipb::ScalarFuncSig::CastDecimalAsJson, "cast"}, - - {tipb::ScalarFuncSig::CastStringAsInt, "cast"}, - {tipb::ScalarFuncSig::CastStringAsReal, "cast"}, - {tipb::ScalarFuncSig::CastStringAsString, "cast"}, - {tipb::ScalarFuncSig::CastStringAsDecimal, "cast"}, - {tipb::ScalarFuncSig::CastStringAsTime, "cast"}, - {tipb::ScalarFuncSig::CastStringAsDuration, "cast"}, - {tipb::ScalarFuncSig::CastStringAsJson, "cast"}, - - {tipb::ScalarFuncSig::CastTimeAsInt, "cast"}, - {tipb::ScalarFuncSig::CastTimeAsReal, "cast"}, - {tipb::ScalarFuncSig::CastTimeAsString, "cast"}, - {tipb::ScalarFuncSig::CastTimeAsDecimal, "cast"}, - {tipb::ScalarFuncSig::CastTimeAsTime, "cast"}, - {tipb::ScalarFuncSig::CastTimeAsDuration, "cast"}, - {tipb::ScalarFuncSig::CastTimeAsJson, "cast"}, - - {tipb::ScalarFuncSig::CastDurationAsInt, "cast"}, - {tipb::ScalarFuncSig::CastDurationAsReal, "cast"}, - {tipb::ScalarFuncSig::CastDurationAsString, "cast"}, - {tipb::ScalarFuncSig::CastDurationAsDecimal, "cast"}, - {tipb::ScalarFuncSig::CastDurationAsTime, "cast"}, - {tipb::ScalarFuncSig::CastDurationAsDuration, "cast"}, - {tipb::ScalarFuncSig::CastDurationAsJson, "cast"}, - - {tipb::ScalarFuncSig::CastJsonAsInt, "cast"}, - {tipb::ScalarFuncSig::CastJsonAsReal, "cast"}, - {tipb::ScalarFuncSig::CastJsonAsString, "cast"}, - {tipb::ScalarFuncSig::CastJsonAsDecimal, "cast"}, - {tipb::ScalarFuncSig::CastJsonAsTime, "cast"}, - {tipb::ScalarFuncSig::CastJsonAsDuration, "cast"}, - {tipb::ScalarFuncSig::CastJsonAsJson, "cast"}, - - {tipb::ScalarFuncSig::CoalesceInt, "coalesce"}, - {tipb::ScalarFuncSig::CoalesceReal, "coalesce"}, - {tipb::ScalarFuncSig::CoalesceString, "coalesce"}, - {tipb::ScalarFuncSig::CoalesceDecimal, "coalesce"}, - {tipb::ScalarFuncSig::CoalesceTime, "coalesce"}, - {tipb::ScalarFuncSig::CoalesceDuration, "coalesce"}, - {tipb::ScalarFuncSig::CoalesceJson, "coalesce"}, - - {tipb::ScalarFuncSig::LTInt, "less"}, - {tipb::ScalarFuncSig::LTReal, "less"}, - {tipb::ScalarFuncSig::LTString, "less"}, - {tipb::ScalarFuncSig::LTDecimal, "less"}, - {tipb::ScalarFuncSig::LTTime, "less"}, - {tipb::ScalarFuncSig::LTDuration, "less"}, - {tipb::ScalarFuncSig::LTJson, "less"}, - - {tipb::ScalarFuncSig::LEInt, "lessOrEquals"}, - {tipb::ScalarFuncSig::LEReal, "lessOrEquals"}, - {tipb::ScalarFuncSig::LEString, "lessOrEquals"}, - {tipb::ScalarFuncSig::LEDecimal, "lessOrEquals"}, - {tipb::ScalarFuncSig::LETime, "lessOrEquals"}, - {tipb::ScalarFuncSig::LEDuration, "lessOrEquals"}, - {tipb::ScalarFuncSig::LEJson, "lessOrEquals"}, - - {tipb::ScalarFuncSig::GTInt, "greater"}, - {tipb::ScalarFuncSig::GTReal, "greater"}, - {tipb::ScalarFuncSig::GTString, "greater"}, - {tipb::ScalarFuncSig::GTDecimal, "greater"}, - {tipb::ScalarFuncSig::GTTime, "greater"}, - {tipb::ScalarFuncSig::GTDuration, "greater"}, - {tipb::ScalarFuncSig::GTJson, "greater"}, - - {tipb::ScalarFuncSig::GreatestInt, "greatest"}, - {tipb::ScalarFuncSig::GreatestReal, "greatest"}, - {tipb::ScalarFuncSig::GreatestString, "greatest"}, - {tipb::ScalarFuncSig::GreatestDecimal, "greatest"}, - {tipb::ScalarFuncSig::GreatestTime, "greatest"}, - - {tipb::ScalarFuncSig::LeastInt, "least"}, - {tipb::ScalarFuncSig::LeastReal, "least"}, - {tipb::ScalarFuncSig::LeastString, "least"}, - {tipb::ScalarFuncSig::LeastDecimal, "least"}, - {tipb::ScalarFuncSig::LeastTime, "least"}, - - //{tipb::ScalarFuncSig::IntervalInt, "cast"}, - //{tipb::ScalarFuncSig::IntervalReal, "cast"}, - - {tipb::ScalarFuncSig::GEInt, "greaterOrEquals"}, - {tipb::ScalarFuncSig::GEReal, "greaterOrEquals"}, - {tipb::ScalarFuncSig::GEString, "greaterOrEquals"}, - {tipb::ScalarFuncSig::GEDecimal, "greaterOrEquals"}, - {tipb::ScalarFuncSig::GETime, "greaterOrEquals"}, - {tipb::ScalarFuncSig::GEDuration, "greaterOrEquals"}, - {tipb::ScalarFuncSig::GEJson, "greaterOrEquals"}, - - {tipb::ScalarFuncSig::EQInt, "equals"}, - {tipb::ScalarFuncSig::EQReal, "equals"}, - {tipb::ScalarFuncSig::EQString, "equals"}, - {tipb::ScalarFuncSig::EQDecimal, "equals"}, - {tipb::ScalarFuncSig::EQTime, "equals"}, - {tipb::ScalarFuncSig::EQDuration, "equals"}, - {tipb::ScalarFuncSig::EQJson, "equals"}, - - {tipb::ScalarFuncSig::NEInt, "notEquals"}, - {tipb::ScalarFuncSig::NEReal, "notEquals"}, - {tipb::ScalarFuncSig::NEString, "notEquals"}, - {tipb::ScalarFuncSig::NEDecimal, "notEquals"}, - {tipb::ScalarFuncSig::NETime, "notEquals"}, - {tipb::ScalarFuncSig::NEDuration, "notEquals"}, - {tipb::ScalarFuncSig::NEJson, "notEquals"}, - - //{tipb::ScalarFuncSig::NullEQInt, "cast"}, - //{tipb::ScalarFuncSig::NullEQReal, "cast"}, - //{tipb::ScalarFuncSig::NullEQString, "cast"}, - //{tipb::ScalarFuncSig::NullEQDecimal, "cast"}, - //{tipb::ScalarFuncSig::NullEQTime, "cast"}, - //{tipb::ScalarFuncSig::NullEQDuration, "cast"}, - //{tipb::ScalarFuncSig::NullEQJson, "cast"}, - - {tipb::ScalarFuncSig::PlusReal, "plus"}, - {tipb::ScalarFuncSig::PlusDecimal, "plus"}, - {tipb::ScalarFuncSig::PlusInt, "plus"}, - - {tipb::ScalarFuncSig::MinusReal, "minus"}, - {tipb::ScalarFuncSig::MinusDecimal, "minus"}, - {tipb::ScalarFuncSig::MinusInt, "minus"}, - - {tipb::ScalarFuncSig::MultiplyReal, "multiply"}, - {tipb::ScalarFuncSig::MultiplyDecimal, "multiply"}, - {tipb::ScalarFuncSig::MultiplyInt, "multiply"}, - - {tipb::ScalarFuncSig::DivideReal, "divide"}, - {tipb::ScalarFuncSig::DivideDecimal, "divide"}, - {tipb::ScalarFuncSig::IntDivideInt, "intDiv"}, - {tipb::ScalarFuncSig::IntDivideDecimal, "divide"}, - - {tipb::ScalarFuncSig::ModReal, "modulo"}, - {tipb::ScalarFuncSig::ModDecimal, "modulo"}, - {tipb::ScalarFuncSig::ModInt, "modulo"}, - - {tipb::ScalarFuncSig::MultiplyIntUnsigned, "multiply"}, - - {tipb::ScalarFuncSig::AbsInt, "abs"}, - {tipb::ScalarFuncSig::AbsUInt, "abs"}, - {tipb::ScalarFuncSig::AbsReal, "abs"}, - {tipb::ScalarFuncSig::AbsDecimal, "abs"}, - - {tipb::ScalarFuncSig::CeilIntToDec, "ceil"}, - {tipb::ScalarFuncSig::CeilIntToInt, "ceil"}, - {tipb::ScalarFuncSig::CeilDecToInt, "ceil"}, - {tipb::ScalarFuncSig::CeilDecToDec, "ceil"}, - {tipb::ScalarFuncSig::CeilReal, "ceil"}, - - {tipb::ScalarFuncSig::FloorIntToDec, "floor"}, - {tipb::ScalarFuncSig::FloorIntToInt, "floor"}, - {tipb::ScalarFuncSig::FloorDecToInt, "floor"}, - {tipb::ScalarFuncSig::FloorDecToDec, "floor"}, - {tipb::ScalarFuncSig::FloorReal, "floor"}, - - {tipb::ScalarFuncSig::RoundReal, "round"}, - {tipb::ScalarFuncSig::RoundInt, "round"}, - {tipb::ScalarFuncSig::RoundDec, "round"}, - //{tipb::ScalarFuncSig::RoundWithFracReal, "cast"}, - //{tipb::ScalarFuncSig::RoundWithFracInt, "cast"}, - //{tipb::ScalarFuncSig::RoundWithFracDec, "cast"}, - - {tipb::ScalarFuncSig::Log1Arg, "log"}, - //{tipb::ScalarFuncSig::Log2Args, "cast"}, - {tipb::ScalarFuncSig::Log2, "log2"}, - {tipb::ScalarFuncSig::Log10, "log10"}, - - {tipb::ScalarFuncSig::Rand, "rand"}, - //{tipb::ScalarFuncSig::RandWithSeed, "cast"}, - - {tipb::ScalarFuncSig::Pow, "pow"}, - //{tipb::ScalarFuncSig::Conv, "cast"}, - //{tipb::ScalarFuncSig::CRC32, "cast"}, - //{tipb::ScalarFuncSig::Sign, "cast"}, - - {tipb::ScalarFuncSig::Sqrt, "sqrt"}, - {tipb::ScalarFuncSig::Acos, "acos"}, - {tipb::ScalarFuncSig::Asin, "asin"}, - {tipb::ScalarFuncSig::Atan1Arg, "atan"}, - //{tipb::ScalarFuncSig::Atan2Args, "cast"}, - {tipb::ScalarFuncSig::Cos, "cos"}, - //{tipb::ScalarFuncSig::Cot, "cast"}, - //{tipb::ScalarFuncSig::Degrees, "cast"}, - {tipb::ScalarFuncSig::Exp, "exp"}, - //{tipb::ScalarFuncSig::PI, "cast"}, - //{tipb::ScalarFuncSig::Radians, "cast"}, - {tipb::ScalarFuncSig::Sin, "sin"}, - {tipb::ScalarFuncSig::Tan, "tan"}, - {tipb::ScalarFuncSig::TruncateInt, "trunc"}, - {tipb::ScalarFuncSig::TruncateReal, "trunc"}, - //{tipb::ScalarFuncSig::TruncateDecimal, "cast"}, - - {tipb::ScalarFuncSig::LogicalAnd, "and"}, - {tipb::ScalarFuncSig::LogicalOr, "or"}, - {tipb::ScalarFuncSig::LogicalXor, "xor"}, - {tipb::ScalarFuncSig::UnaryNot, "not"}, - {tipb::ScalarFuncSig::UnaryMinusInt, "negate"}, - {tipb::ScalarFuncSig::UnaryMinusReal, "negate"}, - {tipb::ScalarFuncSig::UnaryMinusDecimal, "negate"}, - {tipb::ScalarFuncSig::DecimalIsNull, "isNull"}, - {tipb::ScalarFuncSig::DurationIsNull, "isNull"}, - {tipb::ScalarFuncSig::RealIsNull, "isNull"}, - {tipb::ScalarFuncSig::StringIsNull, "isNull"}, - {tipb::ScalarFuncSig::TimeIsNull, "isNull"}, - {tipb::ScalarFuncSig::IntIsNull, "isNull"}, - {tipb::ScalarFuncSig::JsonIsNull, "isNull"}, - - //{tipb::ScalarFuncSig::BitAndSig, "cast"}, - //{tipb::ScalarFuncSig::BitOrSig, "cast"}, - //{tipb::ScalarFuncSig::BitXorSig, "cast"}, - //{tipb::ScalarFuncSig::BitNegSig, "cast"}, - //{tipb::ScalarFuncSig::IntIsTrue, "cast"}, - //{tipb::ScalarFuncSig::RealIsTrue, "cast"}, - //{tipb::ScalarFuncSig::DecimalIsTrue, "cast"}, - //{tipb::ScalarFuncSig::IntIsFalse, "cast"}, - //{tipb::ScalarFuncSig::RealIsFalse, "cast"}, - //{tipb::ScalarFuncSig::DecimalIsFalse, "cast"}, - - //{tipb::ScalarFuncSig::LeftShift, "cast"}, - //{tipb::ScalarFuncSig::RightShift, "cast"}, - - //{tipb::ScalarFuncSig::BitCount, "cast"}, - //{tipb::ScalarFuncSig::GetParamString, "cast"}, - //{tipb::ScalarFuncSig::GetVar, "cast"}, - //{tipb::ScalarFuncSig::RowSig, "cast"}, - //{tipb::ScalarFuncSig::SetVar, "cast"}, - //{tipb::ScalarFuncSig::ValuesDecimal, "cast"}, - //{tipb::ScalarFuncSig::ValuesDuration, "cast"}, - //{tipb::ScalarFuncSig::ValuesInt, "cast"}, - //{tipb::ScalarFuncSig::ValuesJSON, "cast"}, - //{tipb::ScalarFuncSig::ValuesReal, "cast"}, - //{tipb::ScalarFuncSig::ValuesString, "cast"}, - //{tipb::ScalarFuncSig::ValuesTime, "cast"}, - - {tipb::ScalarFuncSig::InInt, "in"}, - {tipb::ScalarFuncSig::InReal, "in"}, - {tipb::ScalarFuncSig::InString, "in"}, - {tipb::ScalarFuncSig::InDecimal, "in"}, - {tipb::ScalarFuncSig::InTime, "in"}, - {tipb::ScalarFuncSig::InDuration, "in"}, - {tipb::ScalarFuncSig::InJson, "in"}, - - {tipb::ScalarFuncSig::IfNullInt, "ifNull"}, - {tipb::ScalarFuncSig::IfNullReal, "ifNull"}, - {tipb::ScalarFuncSig::IfNullString, "ifNull"}, - {tipb::ScalarFuncSig::IfNullDecimal, "ifNull"}, - {tipb::ScalarFuncSig::IfNullTime, "ifNull"}, - {tipb::ScalarFuncSig::IfNullDuration, "ifNull"}, - {tipb::ScalarFuncSig::IfNullJson, "ifNull"}, - - {tipb::ScalarFuncSig::IfInt, "if"}, - {tipb::ScalarFuncSig::IfReal, "if"}, - {tipb::ScalarFuncSig::IfString, "if"}, - {tipb::ScalarFuncSig::IfDecimal, "if"}, - {tipb::ScalarFuncSig::IfTime, "if"}, - {tipb::ScalarFuncSig::IfDuration, "if"}, - {tipb::ScalarFuncSig::IfJson, "if"}, - - //todo need further check for caseWithExpression and multiIf - {tipb::ScalarFuncSig::CaseWhenInt, "caseWithExpression"}, - {tipb::ScalarFuncSig::CaseWhenReal, "caseWithExpression"}, - {tipb::ScalarFuncSig::CaseWhenString, "caseWithExpression"}, - {tipb::ScalarFuncSig::CaseWhenDecimal, "caseWithExpression"}, - {tipb::ScalarFuncSig::CaseWhenTime, "caseWithExpression"}, - {tipb::ScalarFuncSig::CaseWhenDuration, "caseWithExpression"}, - {tipb::ScalarFuncSig::CaseWhenJson, "caseWithExpression"}, - - //{tipb::ScalarFuncSig::AesDecrypt, "cast"}, - //{tipb::ScalarFuncSig::AesEncrypt, "cast"}, - //{tipb::ScalarFuncSig::Compress, "cast"}, - //{tipb::ScalarFuncSig::MD5, "cast"}, - //{tipb::ScalarFuncSig::Password, "cast"}, - //{tipb::ScalarFuncSig::RandomBytes, "cast"}, - //{tipb::ScalarFuncSig::SHA1, "cast"}, - //{tipb::ScalarFuncSig::SHA2, "cast"}, - //{tipb::ScalarFuncSig::Uncompress, "cast"}, - //{tipb::ScalarFuncSig::UncompressedLength, "cast"}, - - //{tipb::ScalarFuncSig::Database, "cast"}, - //{tipb::ScalarFuncSig::FoundRows, "cast"}, - //{tipb::ScalarFuncSig::CurrentUser, "cast"}, - //{tipb::ScalarFuncSig::User, "cast"}, - //{tipb::ScalarFuncSig::ConnectionID, "cast"}, - //{tipb::ScalarFuncSig::LastInsertID, "cast"}, - //{tipb::ScalarFuncSig::LastInsertIDWithID, "cast"}, - //{tipb::ScalarFuncSig::Version, "cast"}, - //{tipb::ScalarFuncSig::TiDBVersion, "cast"}, - //{tipb::ScalarFuncSig::RowCount, "cast"}, - - //{tipb::ScalarFuncSig::Sleep, "cast"}, - //{tipb::ScalarFuncSig::Lock, "cast"}, - //{tipb::ScalarFuncSig::ReleaseLock, "cast"}, - //{tipb::ScalarFuncSig::DecimalAnyValue, "cast"}, - //{tipb::ScalarFuncSig::DurationAnyValue, "cast"}, - //{tipb::ScalarFuncSig::IntAnyValue, "cast"}, - //{tipb::ScalarFuncSig::JSONAnyValue, "cast"}, - //{tipb::ScalarFuncSig::RealAnyValue, "cast"}, - //{tipb::ScalarFuncSig::StringAnyValue, "cast"}, - //{tipb::ScalarFuncSig::TimeAnyValue, "cast"}, - //{tipb::ScalarFuncSig::InetAton, "cast"}, - //{tipb::ScalarFuncSig::InetNtoa, "cast"}, - //{tipb::ScalarFuncSig::Inet6Aton, "cast"}, - //{tipb::ScalarFuncSig::Inet6Ntoa, "cast"}, - //{tipb::ScalarFuncSig::IsIPv4, "cast"}, - //{tipb::ScalarFuncSig::IsIPv4Compat, "cast"}, - //{tipb::ScalarFuncSig::IsIPv4Mapped, "cast"}, - //{tipb::ScalarFuncSig::IsIPv6, "cast"}, - //{tipb::ScalarFuncSig::UUID, "cast"}, - - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, - {tipb::ScalarFuncSig::Uncompress, "cast"}, + throw Exception("not supported"); } - ); + func_name = scalarFunMap.find(expr.sig())->second; + break; + default: + throw Exception("not supported"); + } + // build function expr + if (func_name == "in") + { + // for in, we could not represent the function expr using func_name(param1, param2, ...) + throw Exception("not supported"); + } + else + { + ss << func_name << "("; + bool first = true; + for (const tipb::Expr & child : expr.children()) + { + String s = exprToString(child, input_col); + if (first) + { + first = false; + } + else + { + ss << ", "; + } + ss << s; + } + ss << ") "; + return ss.str(); + } +} + +const String & getTypeName(const tipb::Expr & expr) { return tipb::ExprType_Name(expr.tp()); } + +String getName(const tipb::Expr & expr, const NamesAndTypesList & current_input_columns) +{ + return exprToString(expr, current_input_columns); +} + +bool isAggFunctionExpr(const tipb::Expr & expr) +{ + switch (expr.tp()) + { + case tipb::ExprType::Count: + case tipb::ExprType::Sum: + case tipb::ExprType::Avg: + case tipb::ExprType::Min: + case tipb::ExprType::Max: + case tipb::ExprType::First: + case tipb::ExprType::GroupConcat: + case tipb::ExprType::Agg_BitAnd: + case tipb::ExprType::Agg_BitOr: + case tipb::ExprType::Agg_BitXor: + case tipb::ExprType::Std: + case tipb::ExprType::Stddev: + case tipb::ExprType::StddevPop: + case tipb::ExprType::StddevSamp: + case tipb::ExprType::VarPop: + case tipb::ExprType::VarSamp: + case tipb::ExprType::Variance: + case tipb::ExprType::JsonArrayAgg: + case tipb::ExprType::JsonObjectAgg: + return true; + default: + return false; + } +} + +bool isLiteralExpr(const tipb::Expr & expr) +{ + switch (expr.tp()) + { + case tipb::ExprType::Null: + case tipb::ExprType::Int64: + case tipb::ExprType::Uint64: + case tipb::ExprType::Float32: + case tipb::ExprType::Float64: + case tipb::ExprType::String: + case tipb::ExprType::Bytes: + case tipb::ExprType::MysqlBit: + case tipb::ExprType::MysqlDecimal: + case tipb::ExprType::MysqlDuration: + case tipb::ExprType::MysqlEnum: + case tipb::ExprType::MysqlHex: + case tipb::ExprType::MysqlSet: + case tipb::ExprType::MysqlTime: + case tipb::ExprType::MysqlJson: + case tipb::ExprType::ValueList: + return true; + default: + return false; + } } + +bool isColumnExpr(const tipb::Expr & expr) { return expr.tp() == tipb::ExprType::ColumnRef; } + +Field decodeLiteral(const tipb::Expr & expr) +{ + size_t cursor = 0; + switch (expr.tp()) + { + case tipb::ExprType::MysqlBit: + case tipb::ExprType::MysqlDecimal: + case tipb::ExprType::MysqlDuration: + case tipb::ExprType::MysqlEnum: + case tipb::ExprType::MysqlHex: + case tipb::ExprType::MysqlSet: + case tipb::ExprType::MysqlTime: + case tipb::ExprType::MysqlJson: + case tipb::ExprType::ValueList: + throw Exception("mysql type literal is not supported yet"); + default: + return DecodeDatum(cursor, expr.val()); + } +} + +ColumnID getColumnID(const tipb::Expr & expr) +{ + size_t cursor = 1; + return DecodeInt(cursor, expr.val()); +} + +std::unordered_map aggFunMap({ + {tipb::ExprType::Count, "count"}, {tipb::ExprType::Sum, "sum"}, {tipb::ExprType::Avg, "avg"}, {tipb::ExprType::Min, "min"}, + {tipb::ExprType::Max, "max"}, {tipb::ExprType::First, "any"}, + //{tipb::ExprType::GroupConcat, ""}, + //{tipb::ExprType::Agg_BitAnd, ""}, + //{tipb::ExprType::Agg_BitOr, ""}, + //{tipb::ExprType::Agg_BitXor, ""}, + //{tipb::ExprType::Std, ""}, + //{tipb::ExprType::Stddev, ""}, + //{tipb::ExprType::StddevPop, ""}, + //{tipb::ExprType::StddevSamp, ""}, + //{tipb::ExprType::VarPop, ""}, + //{tipb::ExprType::VarSamp, ""}, + //{tipb::ExprType::Variance, ""}, + //{tipb::ExprType::JsonArrayAgg, ""}, + //{tipb::ExprType::JsonObjectAgg, ""}, +}); + +std::unordered_map scalarFunMap({ + {tipb::ScalarFuncSig::CastIntAsInt, "cast"}, + {tipb::ScalarFuncSig::CastIntAsReal, "cast"}, + {tipb::ScalarFuncSig::CastIntAsString, "cast"}, + {tipb::ScalarFuncSig::CastIntAsDecimal, "cast"}, + {tipb::ScalarFuncSig::CastIntAsTime, "cast"}, + {tipb::ScalarFuncSig::CastIntAsDuration, "cast"}, + {tipb::ScalarFuncSig::CastIntAsJson, "cast"}, + + {tipb::ScalarFuncSig::CastRealAsInt, "cast"}, + {tipb::ScalarFuncSig::CastRealAsReal, "cast"}, + {tipb::ScalarFuncSig::CastRealAsString, "cast"}, + {tipb::ScalarFuncSig::CastRealAsDecimal, "cast"}, + {tipb::ScalarFuncSig::CastRealAsTime, "cast"}, + {tipb::ScalarFuncSig::CastRealAsDuration, "cast"}, + {tipb::ScalarFuncSig::CastRealAsJson, "cast"}, + + {tipb::ScalarFuncSig::CastDecimalAsInt, "cast"}, + {tipb::ScalarFuncSig::CastDecimalAsReal, "cast"}, + {tipb::ScalarFuncSig::CastDecimalAsString, "cast"}, + {tipb::ScalarFuncSig::CastDecimalAsDecimal, "cast"}, + {tipb::ScalarFuncSig::CastDecimalAsTime, "cast"}, + {tipb::ScalarFuncSig::CastDecimalAsDuration, "cast"}, + {tipb::ScalarFuncSig::CastDecimalAsJson, "cast"}, + + {tipb::ScalarFuncSig::CastStringAsInt, "cast"}, + {tipb::ScalarFuncSig::CastStringAsReal, "cast"}, + {tipb::ScalarFuncSig::CastStringAsString, "cast"}, + {tipb::ScalarFuncSig::CastStringAsDecimal, "cast"}, + {tipb::ScalarFuncSig::CastStringAsTime, "cast"}, + {tipb::ScalarFuncSig::CastStringAsDuration, "cast"}, + {tipb::ScalarFuncSig::CastStringAsJson, "cast"}, + + {tipb::ScalarFuncSig::CastTimeAsInt, "cast"}, + {tipb::ScalarFuncSig::CastTimeAsReal, "cast"}, + {tipb::ScalarFuncSig::CastTimeAsString, "cast"}, + {tipb::ScalarFuncSig::CastTimeAsDecimal, "cast"}, + {tipb::ScalarFuncSig::CastTimeAsTime, "cast"}, + {tipb::ScalarFuncSig::CastTimeAsDuration, "cast"}, + {tipb::ScalarFuncSig::CastTimeAsJson, "cast"}, + + {tipb::ScalarFuncSig::CastDurationAsInt, "cast"}, + {tipb::ScalarFuncSig::CastDurationAsReal, "cast"}, + {tipb::ScalarFuncSig::CastDurationAsString, "cast"}, + {tipb::ScalarFuncSig::CastDurationAsDecimal, "cast"}, + {tipb::ScalarFuncSig::CastDurationAsTime, "cast"}, + {tipb::ScalarFuncSig::CastDurationAsDuration, "cast"}, + {tipb::ScalarFuncSig::CastDurationAsJson, "cast"}, + + {tipb::ScalarFuncSig::CastJsonAsInt, "cast"}, + {tipb::ScalarFuncSig::CastJsonAsReal, "cast"}, + {tipb::ScalarFuncSig::CastJsonAsString, "cast"}, + {tipb::ScalarFuncSig::CastJsonAsDecimal, "cast"}, + {tipb::ScalarFuncSig::CastJsonAsTime, "cast"}, + {tipb::ScalarFuncSig::CastJsonAsDuration, "cast"}, + {tipb::ScalarFuncSig::CastJsonAsJson, "cast"}, + + {tipb::ScalarFuncSig::CoalesceInt, "coalesce"}, + {tipb::ScalarFuncSig::CoalesceReal, "coalesce"}, + {tipb::ScalarFuncSig::CoalesceString, "coalesce"}, + {tipb::ScalarFuncSig::CoalesceDecimal, "coalesce"}, + {tipb::ScalarFuncSig::CoalesceTime, "coalesce"}, + {tipb::ScalarFuncSig::CoalesceDuration, "coalesce"}, + {tipb::ScalarFuncSig::CoalesceJson, "coalesce"}, + + {tipb::ScalarFuncSig::LTInt, "less"}, + {tipb::ScalarFuncSig::LTReal, "less"}, + {tipb::ScalarFuncSig::LTString, "less"}, + {tipb::ScalarFuncSig::LTDecimal, "less"}, + {tipb::ScalarFuncSig::LTTime, "less"}, + {tipb::ScalarFuncSig::LTDuration, "less"}, + {tipb::ScalarFuncSig::LTJson, "less"}, + + {tipb::ScalarFuncSig::LEInt, "lessOrEquals"}, + {tipb::ScalarFuncSig::LEReal, "lessOrEquals"}, + {tipb::ScalarFuncSig::LEString, "lessOrEquals"}, + {tipb::ScalarFuncSig::LEDecimal, "lessOrEquals"}, + {tipb::ScalarFuncSig::LETime, "lessOrEquals"}, + {tipb::ScalarFuncSig::LEDuration, "lessOrEquals"}, + {tipb::ScalarFuncSig::LEJson, "lessOrEquals"}, + + {tipb::ScalarFuncSig::GTInt, "greater"}, + {tipb::ScalarFuncSig::GTReal, "greater"}, + {tipb::ScalarFuncSig::GTString, "greater"}, + {tipb::ScalarFuncSig::GTDecimal, "greater"}, + {tipb::ScalarFuncSig::GTTime, "greater"}, + {tipb::ScalarFuncSig::GTDuration, "greater"}, + {tipb::ScalarFuncSig::GTJson, "greater"}, + + {tipb::ScalarFuncSig::GreatestInt, "greatest"}, + {tipb::ScalarFuncSig::GreatestReal, "greatest"}, + {tipb::ScalarFuncSig::GreatestString, "greatest"}, + {tipb::ScalarFuncSig::GreatestDecimal, "greatest"}, + {tipb::ScalarFuncSig::GreatestTime, "greatest"}, + + {tipb::ScalarFuncSig::LeastInt, "least"}, + {tipb::ScalarFuncSig::LeastReal, "least"}, + {tipb::ScalarFuncSig::LeastString, "least"}, + {tipb::ScalarFuncSig::LeastDecimal, "least"}, + {tipb::ScalarFuncSig::LeastTime, "least"}, + + //{tipb::ScalarFuncSig::IntervalInt, "cast"}, + //{tipb::ScalarFuncSig::IntervalReal, "cast"}, + + {tipb::ScalarFuncSig::GEInt, "greaterOrEquals"}, + {tipb::ScalarFuncSig::GEReal, "greaterOrEquals"}, + {tipb::ScalarFuncSig::GEString, "greaterOrEquals"}, + {tipb::ScalarFuncSig::GEDecimal, "greaterOrEquals"}, + {tipb::ScalarFuncSig::GETime, "greaterOrEquals"}, + {tipb::ScalarFuncSig::GEDuration, "greaterOrEquals"}, + {tipb::ScalarFuncSig::GEJson, "greaterOrEquals"}, + + {tipb::ScalarFuncSig::EQInt, "equals"}, + {tipb::ScalarFuncSig::EQReal, "equals"}, + {tipb::ScalarFuncSig::EQString, "equals"}, + {tipb::ScalarFuncSig::EQDecimal, "equals"}, + {tipb::ScalarFuncSig::EQTime, "equals"}, + {tipb::ScalarFuncSig::EQDuration, "equals"}, + {tipb::ScalarFuncSig::EQJson, "equals"}, + + {tipb::ScalarFuncSig::NEInt, "notEquals"}, + {tipb::ScalarFuncSig::NEReal, "notEquals"}, + {tipb::ScalarFuncSig::NEString, "notEquals"}, + {tipb::ScalarFuncSig::NEDecimal, "notEquals"}, + {tipb::ScalarFuncSig::NETime, "notEquals"}, + {tipb::ScalarFuncSig::NEDuration, "notEquals"}, + {tipb::ScalarFuncSig::NEJson, "notEquals"}, + + //{tipb::ScalarFuncSig::NullEQInt, "cast"}, + //{tipb::ScalarFuncSig::NullEQReal, "cast"}, + //{tipb::ScalarFuncSig::NullEQString, "cast"}, + //{tipb::ScalarFuncSig::NullEQDecimal, "cast"}, + //{tipb::ScalarFuncSig::NullEQTime, "cast"}, + //{tipb::ScalarFuncSig::NullEQDuration, "cast"}, + //{tipb::ScalarFuncSig::NullEQJson, "cast"}, + + {tipb::ScalarFuncSig::PlusReal, "plus"}, + {tipb::ScalarFuncSig::PlusDecimal, "plus"}, + {tipb::ScalarFuncSig::PlusInt, "plus"}, + + {tipb::ScalarFuncSig::MinusReal, "minus"}, + {tipb::ScalarFuncSig::MinusDecimal, "minus"}, + {tipb::ScalarFuncSig::MinusInt, "minus"}, + + {tipb::ScalarFuncSig::MultiplyReal, "multiply"}, + {tipb::ScalarFuncSig::MultiplyDecimal, "multiply"}, + {tipb::ScalarFuncSig::MultiplyInt, "multiply"}, + + {tipb::ScalarFuncSig::DivideReal, "divide"}, + {tipb::ScalarFuncSig::DivideDecimal, "divide"}, + {tipb::ScalarFuncSig::IntDivideInt, "intDiv"}, + {tipb::ScalarFuncSig::IntDivideDecimal, "divide"}, + + {tipb::ScalarFuncSig::ModReal, "modulo"}, + {tipb::ScalarFuncSig::ModDecimal, "modulo"}, + {tipb::ScalarFuncSig::ModInt, "modulo"}, + + {tipb::ScalarFuncSig::MultiplyIntUnsigned, "multiply"}, + + {tipb::ScalarFuncSig::AbsInt, "abs"}, + {tipb::ScalarFuncSig::AbsUInt, "abs"}, + {tipb::ScalarFuncSig::AbsReal, "abs"}, + {tipb::ScalarFuncSig::AbsDecimal, "abs"}, + + {tipb::ScalarFuncSig::CeilIntToDec, "ceil"}, + {tipb::ScalarFuncSig::CeilIntToInt, "ceil"}, + {tipb::ScalarFuncSig::CeilDecToInt, "ceil"}, + {tipb::ScalarFuncSig::CeilDecToDec, "ceil"}, + {tipb::ScalarFuncSig::CeilReal, "ceil"}, + + {tipb::ScalarFuncSig::FloorIntToDec, "floor"}, + {tipb::ScalarFuncSig::FloorIntToInt, "floor"}, + {tipb::ScalarFuncSig::FloorDecToInt, "floor"}, + {tipb::ScalarFuncSig::FloorDecToDec, "floor"}, + {tipb::ScalarFuncSig::FloorReal, "floor"}, + + {tipb::ScalarFuncSig::RoundReal, "round"}, + {tipb::ScalarFuncSig::RoundInt, "round"}, + {tipb::ScalarFuncSig::RoundDec, "round"}, + //{tipb::ScalarFuncSig::RoundWithFracReal, "cast"}, + //{tipb::ScalarFuncSig::RoundWithFracInt, "cast"}, + //{tipb::ScalarFuncSig::RoundWithFracDec, "cast"}, + + {tipb::ScalarFuncSig::Log1Arg, "log"}, + //{tipb::ScalarFuncSig::Log2Args, "cast"}, + {tipb::ScalarFuncSig::Log2, "log2"}, + {tipb::ScalarFuncSig::Log10, "log10"}, + + {tipb::ScalarFuncSig::Rand, "rand"}, + //{tipb::ScalarFuncSig::RandWithSeed, "cast"}, + + {tipb::ScalarFuncSig::Pow, "pow"}, + //{tipb::ScalarFuncSig::Conv, "cast"}, + //{tipb::ScalarFuncSig::CRC32, "cast"}, + //{tipb::ScalarFuncSig::Sign, "cast"}, + + {tipb::ScalarFuncSig::Sqrt, "sqrt"}, + {tipb::ScalarFuncSig::Acos, "acos"}, + {tipb::ScalarFuncSig::Asin, "asin"}, + {tipb::ScalarFuncSig::Atan1Arg, "atan"}, + //{tipb::ScalarFuncSig::Atan2Args, "cast"}, + {tipb::ScalarFuncSig::Cos, "cos"}, + //{tipb::ScalarFuncSig::Cot, "cast"}, + //{tipb::ScalarFuncSig::Degrees, "cast"}, + {tipb::ScalarFuncSig::Exp, "exp"}, + //{tipb::ScalarFuncSig::PI, "cast"}, + //{tipb::ScalarFuncSig::Radians, "cast"}, + {tipb::ScalarFuncSig::Sin, "sin"}, + {tipb::ScalarFuncSig::Tan, "tan"}, + {tipb::ScalarFuncSig::TruncateInt, "trunc"}, + {tipb::ScalarFuncSig::TruncateReal, "trunc"}, + //{tipb::ScalarFuncSig::TruncateDecimal, "cast"}, + + {tipb::ScalarFuncSig::LogicalAnd, "and"}, + {tipb::ScalarFuncSig::LogicalOr, "or"}, + {tipb::ScalarFuncSig::LogicalXor, "xor"}, + {tipb::ScalarFuncSig::UnaryNot, "not"}, + {tipb::ScalarFuncSig::UnaryMinusInt, "negate"}, + {tipb::ScalarFuncSig::UnaryMinusReal, "negate"}, + {tipb::ScalarFuncSig::UnaryMinusDecimal, "negate"}, + {tipb::ScalarFuncSig::DecimalIsNull, "isNull"}, + {tipb::ScalarFuncSig::DurationIsNull, "isNull"}, + {tipb::ScalarFuncSig::RealIsNull, "isNull"}, + {tipb::ScalarFuncSig::StringIsNull, "isNull"}, + {tipb::ScalarFuncSig::TimeIsNull, "isNull"}, + {tipb::ScalarFuncSig::IntIsNull, "isNull"}, + {tipb::ScalarFuncSig::JsonIsNull, "isNull"}, + + //{tipb::ScalarFuncSig::BitAndSig, "cast"}, + //{tipb::ScalarFuncSig::BitOrSig, "cast"}, + //{tipb::ScalarFuncSig::BitXorSig, "cast"}, + //{tipb::ScalarFuncSig::BitNegSig, "cast"}, + //{tipb::ScalarFuncSig::IntIsTrue, "cast"}, + //{tipb::ScalarFuncSig::RealIsTrue, "cast"}, + //{tipb::ScalarFuncSig::DecimalIsTrue, "cast"}, + //{tipb::ScalarFuncSig::IntIsFalse, "cast"}, + //{tipb::ScalarFuncSig::RealIsFalse, "cast"}, + //{tipb::ScalarFuncSig::DecimalIsFalse, "cast"}, + + //{tipb::ScalarFuncSig::LeftShift, "cast"}, + //{tipb::ScalarFuncSig::RightShift, "cast"}, + + //{tipb::ScalarFuncSig::BitCount, "cast"}, + //{tipb::ScalarFuncSig::GetParamString, "cast"}, + //{tipb::ScalarFuncSig::GetVar, "cast"}, + //{tipb::ScalarFuncSig::RowSig, "cast"}, + //{tipb::ScalarFuncSig::SetVar, "cast"}, + //{tipb::ScalarFuncSig::ValuesDecimal, "cast"}, + //{tipb::ScalarFuncSig::ValuesDuration, "cast"}, + //{tipb::ScalarFuncSig::ValuesInt, "cast"}, + //{tipb::ScalarFuncSig::ValuesJSON, "cast"}, + //{tipb::ScalarFuncSig::ValuesReal, "cast"}, + //{tipb::ScalarFuncSig::ValuesString, "cast"}, + //{tipb::ScalarFuncSig::ValuesTime, "cast"}, + + {tipb::ScalarFuncSig::InInt, "in"}, + {tipb::ScalarFuncSig::InReal, "in"}, + {tipb::ScalarFuncSig::InString, "in"}, + {tipb::ScalarFuncSig::InDecimal, "in"}, + {tipb::ScalarFuncSig::InTime, "in"}, + {tipb::ScalarFuncSig::InDuration, "in"}, + {tipb::ScalarFuncSig::InJson, "in"}, + + {tipb::ScalarFuncSig::IfNullInt, "ifNull"}, + {tipb::ScalarFuncSig::IfNullReal, "ifNull"}, + {tipb::ScalarFuncSig::IfNullString, "ifNull"}, + {tipb::ScalarFuncSig::IfNullDecimal, "ifNull"}, + {tipb::ScalarFuncSig::IfNullTime, "ifNull"}, + {tipb::ScalarFuncSig::IfNullDuration, "ifNull"}, + {tipb::ScalarFuncSig::IfNullJson, "ifNull"}, + + {tipb::ScalarFuncSig::IfInt, "if"}, + {tipb::ScalarFuncSig::IfReal, "if"}, + {tipb::ScalarFuncSig::IfString, "if"}, + {tipb::ScalarFuncSig::IfDecimal, "if"}, + {tipb::ScalarFuncSig::IfTime, "if"}, + {tipb::ScalarFuncSig::IfDuration, "if"}, + {tipb::ScalarFuncSig::IfJson, "if"}, + + //todo need further check for caseWithExpression and multiIf + {tipb::ScalarFuncSig::CaseWhenInt, "caseWithExpression"}, + {tipb::ScalarFuncSig::CaseWhenReal, "caseWithExpression"}, + {tipb::ScalarFuncSig::CaseWhenString, "caseWithExpression"}, + {tipb::ScalarFuncSig::CaseWhenDecimal, "caseWithExpression"}, + {tipb::ScalarFuncSig::CaseWhenTime, "caseWithExpression"}, + {tipb::ScalarFuncSig::CaseWhenDuration, "caseWithExpression"}, + {tipb::ScalarFuncSig::CaseWhenJson, "caseWithExpression"}, + + //{tipb::ScalarFuncSig::AesDecrypt, "cast"}, + //{tipb::ScalarFuncSig::AesEncrypt, "cast"}, + //{tipb::ScalarFuncSig::Compress, "cast"}, + //{tipb::ScalarFuncSig::MD5, "cast"}, + //{tipb::ScalarFuncSig::Password, "cast"}, + //{tipb::ScalarFuncSig::RandomBytes, "cast"}, + //{tipb::ScalarFuncSig::SHA1, "cast"}, + //{tipb::ScalarFuncSig::SHA2, "cast"}, + //{tipb::ScalarFuncSig::Uncompress, "cast"}, + //{tipb::ScalarFuncSig::UncompressedLength, "cast"}, + + //{tipb::ScalarFuncSig::Database, "cast"}, + //{tipb::ScalarFuncSig::FoundRows, "cast"}, + //{tipb::ScalarFuncSig::CurrentUser, "cast"}, + //{tipb::ScalarFuncSig::User, "cast"}, + //{tipb::ScalarFuncSig::ConnectionID, "cast"}, + //{tipb::ScalarFuncSig::LastInsertID, "cast"}, + //{tipb::ScalarFuncSig::LastInsertIDWithID, "cast"}, + //{tipb::ScalarFuncSig::Version, "cast"}, + //{tipb::ScalarFuncSig::TiDBVersion, "cast"}, + //{tipb::ScalarFuncSig::RowCount, "cast"}, + + //{tipb::ScalarFuncSig::Sleep, "cast"}, + //{tipb::ScalarFuncSig::Lock, "cast"}, + //{tipb::ScalarFuncSig::ReleaseLock, "cast"}, + //{tipb::ScalarFuncSig::DecimalAnyValue, "cast"}, + //{tipb::ScalarFuncSig::DurationAnyValue, "cast"}, + //{tipb::ScalarFuncSig::IntAnyValue, "cast"}, + //{tipb::ScalarFuncSig::JSONAnyValue, "cast"}, + //{tipb::ScalarFuncSig::RealAnyValue, "cast"}, + //{tipb::ScalarFuncSig::StringAnyValue, "cast"}, + //{tipb::ScalarFuncSig::TimeAnyValue, "cast"}, + //{tipb::ScalarFuncSig::InetAton, "cast"}, + //{tipb::ScalarFuncSig::InetNtoa, "cast"}, + //{tipb::ScalarFuncSig::Inet6Aton, "cast"}, + //{tipb::ScalarFuncSig::Inet6Ntoa, "cast"}, + //{tipb::ScalarFuncSig::IsIPv4, "cast"}, + //{tipb::ScalarFuncSig::IsIPv4Compat, "cast"}, + //{tipb::ScalarFuncSig::IsIPv4Mapped, "cast"}, + //{tipb::ScalarFuncSig::IsIPv6, "cast"}, + //{tipb::ScalarFuncSig::UUID, "cast"}, + + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, + {tipb::ScalarFuncSig::Uncompress, "cast"}, +}); +} // namespace DB diff --git a/dbms/src/Interpreters/CoprocessorBuilderUtils.h b/dbms/src/Interpreters/CoprocessorBuilderUtils.h index 908a8638c77..22cf460141b 100644 --- a/dbms/src/Interpreters/CoprocessorBuilderUtils.h +++ b/dbms/src/Interpreters/CoprocessorBuilderUtils.h @@ -2,9 +2,27 @@ #include -namespace DB { +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#include +#pragma GCC diagnostic pop - extern std::unordered_map aggFunMap; - extern std::unordered_map scalarFunMap; +#include -} +namespace DB +{ + +bool isLiteralExpr(const tipb::Expr & expr); +Field decodeLiteral(const tipb::Expr & expr); +bool isFunctionExpr(const tipb::Expr & expr); +bool isAggFunctionExpr(const tipb::Expr & expr); +const String & getFunctionName(const tipb::Expr & expr); +bool isColumnExpr(const tipb::Expr & expr); +ColumnID getColumnID(const tipb::Expr & expr); +String getName(const tipb::Expr & expr, const NamesAndTypesList & current_input_columns); +const String & getTypeName(const tipb::Expr & expr); +String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col); +extern std::unordered_map aggFunMap; +extern std::unordered_map scalarFunMap; + +} // namespace DB diff --git a/dbms/src/Interpreters/DAGExpressionAnalyzer.cpp b/dbms/src/Interpreters/DAGExpressionAnalyzer.cpp new file mode 100644 index 00000000000..2cc8ce0b9c2 --- /dev/null +++ b/dbms/src/Interpreters/DAGExpressionAnalyzer.cpp @@ -0,0 +1,171 @@ + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +DAGExpressionAnalyzer::DAGExpressionAnalyzer(const NamesAndTypesList & source_columns_, const Context & context_) + : source_columns(source_columns_), context(context_) +{ + settings = context.getSettings(); +} + +bool DAGExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain, const tipb::Selection & sel, String & filter_column_name) +{ + if (sel.conditions_size() == 0) + { + return false; + } + tipb::Expr final_condition; + if (sel.conditions_size() > 1) + { + final_condition.set_tp(tipb::ExprType::ScalarFunc); + final_condition.set_sig(tipb::ScalarFuncSig::LogicalAnd); + + for (auto & condition : sel.conditions()) + { + auto c = final_condition.add_children(); + c->ParseFromString(condition.SerializeAsString()); + } + } + + const tipb::Expr & filter = sel.conditions_size() > 1 ? final_condition : sel.conditions(0); + initChain(chain, source_columns); + filter_column_name = getActions(filter, chain.steps.back().actions); + chain.steps.back().required_output.push_back(filter_column_name); + return true; +} + +bool DAGExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names) +{ + if (topN.order_by_size() == 0) + { + return false; + } + initChain(chain, aggregated_columns); + ExpressionActionsChain::Step & step = chain.steps.back(); + for (const tipb::ByItem & byItem : topN.order_by()) + { + String name = getActions(byItem.expr(), step.actions); + step.required_output.push_back(name); + order_column_names.push_back(name); + } + return true; +} + +const NamesAndTypesList & DAGExpressionAnalyzer::getCurrentInputColumns() { return source_columns; } + +String DAGExpressionAnalyzer::getActions(const tipb::Expr & expr, ExpressionActionsPtr & actions) +{ + String expr_name = getName(expr, getCurrentInputColumns()); + if ((isLiteralExpr(expr) || isFunctionExpr(expr)) && actions->getSampleBlock().has(expr_name)) + { + return expr_name; + } + if (isLiteralExpr(expr)) + { + Field value = decodeLiteral(expr); + DataTypePtr type = expr.has_field_type() ? getDataTypeByFieldType(expr.field_type()) : applyVisitor(FieldToDataType(), value); + + ColumnWithTypeAndName column; + column.column = type->createColumnConst(1, convertFieldToType(value, *type)); + column.name = expr_name; + column.type = type; + + actions->add(ExpressionAction::addColumn(column)); + return column.name; + } + else if (isColumnExpr(expr)) + { + ColumnID columnId = getColumnID(expr); + if (columnId < 1 || columnId > (ColumnID)getCurrentInputColumns().size()) + { + throw Exception("column id out of bound"); + } + //todo check if the column type need to be cast to field type + return expr_name; + } + else if (isFunctionExpr(expr)) + { + if (isAggFunctionExpr(expr)) + { + throw Exception("agg function is not supported yet"); + } + const String & func_name = getFunctionName(expr); + if (func_name == "in" || func_name == "notIn" || func_name == "globalIn" || func_name == "globalNotIn") + { + // todo support in + throw Exception(func_name + " is not supported yet"); + } + + const FunctionBuilderPtr & function_builder = FunctionFactory::instance().get(func_name, context); + Names argument_names; + DataTypes argument_types; + for (auto & child : expr.children()) + { + String name = getActions(child, actions); + if (actions->getSampleBlock().has(name)) + { + argument_names.push_back(name); + argument_types.push_back(actions->getSampleBlock().getByName(name).type); + } + else + { + throw Exception("Unknown expr: " + child.DebugString()); + } + } + + const ExpressionAction & applyFunction = ExpressionAction::applyFunction(function_builder, argument_names, expr_name); + actions->add(applyFunction); + // add cast if needed + if (expr.has_field_type()) + { + DataTypePtr expected_type = getDataTypeByFieldType(expr.field_type()); + DataTypePtr actual_type = applyFunction.result_type; + //todo maybe use a more decent compare method + if (expected_type->getName() != actual_type->getName()) + { + // need to add cast function + // first construct the second argument + tipb::Expr type_expr; + type_expr.set_tp(tipb::ExprType::String); + std::stringstream ss; + EncodeCompactBytes(expected_type->getName(), ss); + type_expr.set_val(ss.str()); + auto type_field_type = type_expr.field_type(); + type_field_type.set_tp(0xfe); + type_field_type.set_flag(1); + String name = getActions(type_expr, actions); + String cast_name = "cast"; + const FunctionBuilderPtr & cast_func_builder = FunctionFactory::instance().get(cast_name, context); + String cast_expr_name = cast_name + "_" + expr_name + "_" + getName(type_expr, getCurrentInputColumns()); + Names cast_argument_names; + cast_argument_names.push_back(expr_name); + cast_argument_names.push_back(getName(type_expr, getCurrentInputColumns())); + const ExpressionAction & apply_cast_function + = ExpressionAction::applyFunction(cast_func_builder, argument_names, cast_expr_name); + actions->add(apply_cast_function); + return cast_expr_name; + } + else + { + return expr_name; + } + } + else + { + return expr_name; + } + } + else + { + throw Exception("Unsupported expr type: " + getTypeName(expr)); + } +} +} // namespace DB diff --git a/dbms/src/Interpreters/DAGExpressionAnalyzer.h b/dbms/src/Interpreters/DAGExpressionAnalyzer.h new file mode 100644 index 00000000000..6a63600fb12 --- /dev/null +++ b/dbms/src/Interpreters/DAGExpressionAnalyzer.h @@ -0,0 +1,40 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +/** Transforms an expression from DAG expression into a sequence of actions to execute it. + * + */ +class DAGExpressionAnalyzer : private boost::noncopyable +{ +private: + using ExpressionActionsPtr = std::shared_ptr; + // all columns from table scan + NamesAndTypesList source_columns; + // all columns after aggregation + NamesAndTypesList aggregated_columns; + Settings settings; + const Context & context; + +public: + DAGExpressionAnalyzer(const NamesAndTypesList & source_columns_, const Context & context_); + bool appendWhere(ExpressionActionsChain & chain, const tipb::Selection & sel, String & filter_column_name); + bool appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names); + void initChain(ExpressionActionsChain & chain, const NamesAndTypesList & columns) const + { + if (chain.steps.empty()) + { + chain.settings = settings; + chain.steps.emplace_back(std::make_shared(columns, settings)); + } + } + String getActions(const tipb::Expr & expr, ExpressionActionsPtr & actions); + const NamesAndTypesList & getCurrentInputColumns(); +}; + +} // namespace DB diff --git a/dbms/src/Interpreters/DAGQueryInfo.cpp b/dbms/src/Interpreters/DAGQueryInfo.cpp index 77c6b2daef1..ad03da917d6 100644 --- a/dbms/src/Interpreters/DAGQueryInfo.cpp +++ b/dbms/src/Interpreters/DAGQueryInfo.cpp @@ -1,27 +1,67 @@ -#include #include #include +#include namespace DB { - DAGQueryInfo::DAGQueryInfo(const tipb::DAGRequest & dag_request_, CoprocessorContext & coprocessorContext_) - : dag_request(dag_request_), coprocessorContext(coprocessorContext_) {} +const String DAGQueryInfo::TS_NAME("tablescan"); +const String DAGQueryInfo::SEL_NAME("selection"); +const String DAGQueryInfo::AGG_NAME("aggregation"); +const String DAGQueryInfo::TOPN_NAME("topN"); +const String DAGQueryInfo::LIMIT_NAME("limit"); - std::tuple DAGQueryInfo::parse(size_t ) { - query = String("cop query"); - ast = std::make_shared(); - ((ASTSelectQuery*)ast.get())->is_fake_sel = true; - return std::make_tuple(query, ast); +static void assignOrThrowException(Int32 & index, Int32 value, const String & name) +{ + if (index != -1) + { + throw Exception("Duplicated " + name + " in DAG request"); } + index = value; +} - String DAGQueryInfo::get_query_ignore_error(size_t ) { - return query; +DAGQueryInfo::DAGQueryInfo(const tipb::DAGRequest & dag_request_, CoprocessorContext & coprocessorContext_) + : dag_request(dag_request_), coprocessorContext(coprocessorContext_) +{ + for (int i = 0; i < dag_request.executors_size(); i++) + { + switch (dag_request.executors(i).tp()) + { + case tipb::ExecType::TypeTableScan: + assignOrThrowException(ts_index, i, TS_NAME); + break; + case tipb::ExecType::TypeSelection: + assignOrThrowException(sel_index, i, SEL_NAME); + break; + case tipb::ExecType::TypeStreamAgg: + case tipb::ExecType::TypeAggregation: + assignOrThrowException(agg_index, i, AGG_NAME); + break; + case tipb::ExecType::TypeTopN: + assignOrThrowException(order_index, i, TOPN_NAME); + case tipb::ExecType::TypeLimit: + assignOrThrowException(limit_index, i, LIMIT_NAME); + break; + default: + throw Exception("Unsupported executor in DAG request: " + dag_request.executors(i).DebugString()); + } } +} - std::unique_ptr DAGQueryInfo::getInterpreter(Context & , QueryProcessingStage::Enum ) { - return std::make_unique(coprocessorContext, dag_request); - } +std::tuple DAGQueryInfo::parse(size_t) +{ + query = String("cop query"); + ast = std::make_shared(); + ((ASTSelectQuery *)ast.get())->is_fake_sel = true; + return std::make_tuple(query, ast); +} + +String DAGQueryInfo::get_query_ignore_error(size_t) { return query; } + +std::unique_ptr DAGQueryInfo::getInterpreter(Context &, QueryProcessingStage::Enum) +{ + return std::make_unique(coprocessorContext, *this); } +} // namespace DB diff --git a/dbms/src/Interpreters/DAGQueryInfo.h b/dbms/src/Interpreters/DAGQueryInfo.h index 826a07cfc33..aa2baa833c9 100644 --- a/dbms/src/Interpreters/DAGQueryInfo.h +++ b/dbms/src/Interpreters/DAGQueryInfo.h @@ -1,10 +1,14 @@ #pragma once -#include +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" #include -#include -#include +#pragma GCC diagnostic pop + #include +#include +#include +#include namespace DB @@ -15,18 +19,65 @@ namespace DB class DAGQueryInfo : public IQueryInfo { public: + static const String TS_NAME; + static const String SEL_NAME; + static const String AGG_NAME; + static const String TOPN_NAME; + static const String LIMIT_NAME; DAGQueryInfo(const tipb::DAGRequest & dag_request, CoprocessorContext & coprocessorContext_); - bool isInternalQuery() { return false;}; + bool isInternalQuery() { return false; }; virtual std::tuple parse(size_t max_query_size); virtual String get_query_ignore_error(size_t max_query_size); virtual std::unique_ptr getInterpreter(Context & context, QueryProcessingStage::Enum stage); + void assertValid(Int32 index, const String & name) + { + if (index < 0 || index > dag_request.executors_size()) + { + throw Exception("Access invalid executor: " + name); + } + } + bool has_selection() { return sel_index != -1; }; + bool has_aggregation() { return agg_index != -1; }; + bool has_topN() { return order_index != -1; }; + bool has_limit() { return order_index == -1 && limit_index != -1; }; + const tipb::TableScan & get_ts() + { + assertValid(ts_index, TS_NAME); + return dag_request.executors(ts_index).tbl_scan(); + }; + const tipb::Selection & get_sel() + { + assertValid(sel_index, SEL_NAME); + return dag_request.executors(sel_index).selection(); + }; + const tipb::Aggregation & get_agg() + { + assertValid(agg_index, AGG_NAME); + return dag_request.executors(agg_index).aggregation(); + }; + const tipb::TopN & get_topN() + { + assertValid(order_index, TOPN_NAME); + return dag_request.executors(order_index).topn(); + }; + const tipb::Limit & get_limit() + { + assertValid(limit_index, LIMIT_NAME); + return dag_request.executors(limit_index).limit(); + }; + const tipb::DAGRequest & get_dag_request() { return dag_request; }; private: const tipb::DAGRequest & dag_request; CoprocessorContext & coprocessorContext; String query; ASTPtr ast; + Int32 ts_index = -1; + Int32 sel_index = -1; + Int32 agg_index = -1; + Int32 order_index = -1; + Int32 limit_index = -1; }; -} +} // namespace DB diff --git a/dbms/src/Interpreters/DAGStringConverter.cpp b/dbms/src/Interpreters/DAGStringConverter.cpp index f06f92704e4..aa49500d274 100644 --- a/dbms/src/Interpreters/DAGStringConverter.cpp +++ b/dbms/src/Interpreters/DAGStringConverter.cpp @@ -1,209 +1,159 @@ +#include #include +#include #include -#include #include -#include -#include -#include -#include #include #include +#include +#include +#include -namespace DB { +namespace DB +{ - bool DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringstream & ss) { - TableID id; - if(ts.has_table_id()) { - id = ts.table_id(); - } else { - // do not have table id - return false; - } - auto & tmt_ctx = context.ch_context.getTMTContext(); - auto storage = tmt_ctx.getStorages().get(id); - if(storage == nullptr) { - tmt_ctx.getSchemaSyncer()->syncSchema(id, context.ch_context, false); - storage = tmt_ctx.getStorages().get(id); - } - if(storage == nullptr) { - return false; - } - const auto * merge_tree = dynamic_cast(storage.get()); - if (!merge_tree) { - return false; - } +bool DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringstream & ss) +{ + TableID id; + if (ts.has_table_id()) + { + id = ts.table_id(); + } + else + { + // do not have table id + return false; + } + auto & tmt_ctx = context.ch_context.getTMTContext(); + auto storage = tmt_ctx.getStorages().get(id); + if (storage == nullptr) + { + tmt_ctx.getSchemaSyncer()->syncSchema(id, context.ch_context, false); + storage = tmt_ctx.getStorages().get(id); + } + if (storage == nullptr) + { + return false; + } + const auto * merge_tree = dynamic_cast(storage.get()); + if (!merge_tree) + { + return false; + } - for(const tipb::ColumnInfo &ci : ts.columns()) { - ColumnID cid = ci.column_id(); - String name = merge_tree->getTableInfo().columns[cid-1].name; - column_name_from_ts.emplace(std::make_pair(cid, name)); - } - if(column_name_from_ts.empty()) { - // no column selected, must be something wrong - return false; + if (ts.columns_size() == 0) + { + // no column selected, must be something wrong + return false; + } + columns_from_ts = storage->getColumns().getAllPhysical(); + for (const tipb::ColumnInfo & ci : ts.columns()) + { + ColumnID cid = ci.column_id(); + if (cid <= 0 || cid > (ColumnID)columns_from_ts.size()) + { + throw Exception("column id out of bound"); } - ss << "FROM " << merge_tree->getTableInfo().db_name << "." << merge_tree->getTableInfo().name << " "; - return true; + String name = merge_tree->getTableInfo().columns[cid - 1].name; + output_from_ts.push_back(std::move(name)); } + ss << "FROM " << merge_tree->getTableInfo().db_name << "." << merge_tree->getTableInfo().name << " "; + return true; +} - String DAGStringConverter::exprToString(const tipb::Expr & expr, bool &succ) { - std::stringstream ss; - succ = true; - size_t cursor = 1; - Int64 columnId = 0; - String func_name; - Field f; - switch (expr.tp()) { - case tipb::ExprType::Null: - return "NULL"; - case tipb::ExprType::Int64: - return std::to_string(DecodeInt(cursor, expr.val())); - case tipb::ExprType::Uint64: - return std::to_string(DecodeInt(cursor, expr.val())); - case tipb::ExprType::Float32: - case tipb::ExprType::Float64: - return std::to_string(DecodeFloat64(cursor, expr.val())); - case tipb::ExprType::String: - // - return expr.val(); - case tipb::ExprType::Bytes: - return DecodeBytes(cursor, expr.val()); - case tipb::ExprType::ColumnRef: - columnId = DecodeInt(cursor, expr.val()); - if(getCurrentColumnNames().count(columnId) == 0) { - succ = false; - return ""; - } - return getCurrentColumnNames().find(columnId)->second; - case tipb::ExprType::Count: - case tipb::ExprType::Sum: - case tipb::ExprType::Avg: - case tipb::ExprType::Min: - case tipb::ExprType::Max: - case tipb::ExprType::First: - if(!aggFunMap.count(expr.tp())) { - succ = false; - return ""; - } - func_name = aggFunMap.find(expr.tp())->second; - break; - case tipb::ExprType::ScalarFunc: - if(!scalarFunMap.count(expr.sig())) { - succ = false; - return ""; - } - func_name = scalarFunMap.find(expr.sig())->second; - break; - default: - succ = false; - return ""; +bool DAGStringConverter::buildSelString(const tipb::Selection & sel, std::stringstream & ss) +{ + bool first = true; + for (const tipb::Expr & expr : sel.conditions()) + { + auto s = exprToString(expr, getCurrentColumns()); + if (first) + { + ss << "WHERE "; + first = false; } - // build function expr - if(func_name == "in") { - // for in, we could not represent the function expr using func_name(param1, param2, ...) - succ = false; - return ""; - } else { - ss << func_name << "("; - bool first = true; - bool sub_succ = true; - for(const tipb::Expr &child : expr.children()) { - String s = exprToString(child, sub_succ); - if(!sub_succ) { - succ = false; - return ""; - } - if(first) { - first = false; - } else { - ss << ", "; - } - ss << s; - } - ss << ") "; - return ss.str(); + else + { + ss << "AND "; } + ss << s << " "; } + return true; +} - bool DAGStringConverter::buildSelString(const tipb::Selection & sel, std::stringstream & ss) { - bool first = true; - for(const tipb::Expr & expr : sel.conditions()) { - bool succ = true; - auto s = exprToString(expr, succ); - if(!succ) { - return false; - } - if(first) { - ss << "WHERE "; - first = false; - } else { - ss << "AND "; - } - ss << s << " "; - } - return true; - } +bool DAGStringConverter::buildLimitString(const tipb::Limit & limit, std::stringstream & ss) +{ + ss << "LIMIT " << limit.limit() << " "; + return true; +} - bool DAGStringConverter::buildLimitString(const tipb::Limit & limit, std::stringstream & ss) { - ss << "LIMIT " << limit.limit() << " "; - return true; +//todo return the error message +bool DAGStringConverter::buildString(const tipb::Executor & executor, std::stringstream & ss) +{ + switch (executor.tp()) + { + case tipb::ExecType::TypeTableScan: + return buildTSString(executor.tbl_scan(), ss); + case tipb::ExecType::TypeIndexScan: + // index scan not supported + return false; + case tipb::ExecType::TypeSelection: + return buildSelString(executor.selection(), ss); + case tipb::ExecType::TypeAggregation: + // stream agg is not supported, treated as normal agg + case tipb::ExecType::TypeStreamAgg: + //todo support agg + return false; + case tipb::ExecType::TypeTopN: + // todo support top n + return false; + case tipb::ExecType::TypeLimit: + return buildLimitString(executor.limit(), ss); } +} - //todo return the error message - bool DAGStringConverter::buildString(const tipb::Executor & executor, std::stringstream & ss) { - switch (executor.tp()) { - case tipb::ExecType::TypeTableScan: - return buildTSString(executor.tbl_scan(), ss); - case tipb::ExecType::TypeIndexScan: - // index scan not supported - return false; - case tipb::ExecType::TypeSelection: - return buildSelString(executor.selection(), ss); - case tipb::ExecType::TypeAggregation: - // stream agg is not supported, treated as normal agg - case tipb::ExecType::TypeStreamAgg: - //todo support agg - return false; - case tipb::ExecType::TypeTopN: - // todo support top n - return false; - case tipb::ExecType::TypeLimit: - return buildLimitString(executor.limit(), ss); - } - } +bool isProject(const tipb::Executor &) +{ + // currently, project is not pushed so always return false + return false; +} +DAGStringConverter::DAGStringConverter(CoprocessorContext & context_, tipb::DAGRequest & dag_request_) + : context(context_), dag_request(dag_request_) +{ + afterAgg = false; +} - bool isProject(const tipb::Executor &) { - // currently, project is not pushed so always return false - return false; - } - DAGStringConverter::DAGStringConverter(CoprocessorContext & context_, tipb::DAGRequest & dag_request_) - : context(context_), dag_request(dag_request_) { - afterAgg = false; +String DAGStringConverter::buildSqlString() +{ + std::stringstream query_buf; + std::stringstream project; + for (const tipb::Executor & executor : dag_request.executors()) + { + if (!buildString(executor, query_buf)) + { + return ""; + } } - - String DAGStringConverter::buildSqlString() { - std::stringstream query_buf; - std::stringstream project; - for(const tipb::Executor & executor : dag_request.executors()) { - if(!buildString(executor, query_buf)) { - return ""; + if (!isProject(dag_request.executors(dag_request.executors_size() - 1))) + { + //append final project + project << "SELECT "; + bool first = true; + for (UInt32 index : dag_request.output_offsets()) + { + if (first) + { + first = false; } - } - if(!isProject(dag_request.executors(dag_request.executors_size()-1))) { - //append final project - project << "SELECT "; - bool first = true; - for(UInt32 index : dag_request.output_offsets()) { - if(first) { - first = false; - } else { - project << ", "; - } - project << getCurrentColumnNames()[index+1]; + else + { + project << ", "; } - project << " "; + project << getCurrentOutputColumns()[index]; } - return project.str() + query_buf.str(); + project << " "; } - + return project.str() + query_buf.str(); } + +} // namespace DB diff --git a/dbms/src/Interpreters/DAGStringConverter.h b/dbms/src/Interpreters/DAGStringConverter.h index cae42a54f19..2fa200e0f8e 100644 --- a/dbms/src/Interpreters/DAGStringConverter.h +++ b/dbms/src/Interpreters/DAGStringConverter.h @@ -2,40 +2,56 @@ #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" -#include #include +#include #pragma GCC diagnostic pop -#include #include +#include -namespace DB { +namespace DB +{ -class DAGStringConverter { +class DAGStringConverter +{ public: DAGStringConverter(CoprocessorContext & context_, tipb::DAGRequest & dag_request_); ~DAGStringConverter() = default; String buildSqlString(); + private: bool buildTSString(const tipb::TableScan & ts, std::stringstream & ss); - String exprToString(const tipb::Expr & expr, bool &succ); bool buildSelString(const tipb::Selection & sel, std::stringstream & ss); bool buildLimitString(const tipb::Limit & limit, std::stringstream & ss); bool buildString(const tipb::Executor & executor, std::stringstream & ss); CoprocessorContext & context; tipb::DAGRequest & dag_request; - std::unordered_map column_name_from_ts; - std::unordered_map column_name_from_agg; + // used by columnRef, which starts with 1, and refs column index in the original ts/agg output + NamesAndTypesList columns_from_ts; + NamesAndTypesList columns_from_agg; + // used by output_offset, which starts with 0, and refs the index in the selected output of ts/agg operater + Names output_from_ts; + Names output_from_agg; bool afterAgg; - std::unordered_map & getCurrentColumnNames() { - if(afterAgg) { - return column_name_from_agg; + const NamesAndTypesList & getCurrentColumns() + { + if (afterAgg) + { + return columns_from_agg; } - return column_name_from_ts; + return columns_from_ts; } + const Names & getCurrentOutputColumns() + { + if (afterAgg) + { + return output_from_agg; + } + return output_from_ts; + } }; -} +} // namespace DB diff --git a/dbms/src/Interpreters/InterpreterDAGRequest.cpp b/dbms/src/Interpreters/InterpreterDAGRequest.cpp index 8a8e6fe4698..483ef96fa2e 100644 --- a/dbms/src/Interpreters/InterpreterDAGRequest.cpp +++ b/dbms/src/Interpreters/InterpreterDAGRequest.cpp @@ -1,219 +1,349 @@ #include +#include +#include +#include +#include +#include +#include +#include +#include #include -#include -#include +#include +#include +#include +#include #include +#include #include -#include -#include -#include -#include -#include -#include +#include -namespace DB { +namespace DB +{ + +namespace ErrorCodes +{ +extern const int TOO_MANY_COLUMNS; +} - namespace ErrorCodes +InterpreterDAGRequest::InterpreterDAGRequest(CoprocessorContext & context_, DAGQueryInfo & dag_query_info_) + : context(context_), dag_query_info(dag_query_info_) +{} + +// the flow is the same as executeFetchcolumns +bool InterpreterDAGRequest::executeTS(const tipb::TableScan & ts, Pipeline & pipeline) +{ + if (!ts.has_table_id()) { - extern const int TOO_MANY_COLUMNS; + // do not have table id + return false; } - - static void assignOrThrowException(Int32 & index, Int32 value, String name) { - if(index != -1) { - throw Exception("Duplicated " + name + " in DAG request"); - } - index = value; - } - - InterpreterDAGRequest::InterpreterDAGRequest(CoprocessorContext & context_, const tipb::DAGRequest & dag_request_) - : context(context_), dag_request(dag_request_) { - for(int i = 0; i < dag_request.executors_size(); i++) { - switch (dag_request.executors(i).tp()) { - case tipb::ExecType::TypeTableScan: - assignOrThrowException(ts_index, i, "TableScan"); - break; - case tipb::ExecType::TypeSelection: - assignOrThrowException(sel_index, i, "Selection"); - break; - case tipb::ExecType::TypeStreamAgg: - case tipb::ExecType::TypeAggregation: - assignOrThrowException(agg_index, i, "Aggregation"); - break; - case tipb::ExecType::TypeTopN: - assignOrThrowException(order_index, i, "Order"); - case tipb::ExecType::TypeLimit: - assignOrThrowException(limit_index, i, "Limit"); - break; - default: - throw Exception("Unsupported executor in DAG request: " + dag_request.executors(i).DebugString()); - } - } + TableID id = ts.table_id(); + auto & tmt_ctx = context.ch_context.getTMTContext(); + auto storage = tmt_ctx.getStorages().get(id); + if (storage == nullptr) + { + tmt_ctx.getSchemaSyncer()->syncSchema(id, context.ch_context, false); + storage = tmt_ctx.getStorages().get(id); } - - bool InterpreterDAGRequest::buildSelPlan(const tipb::Selection & , Pipeline & ) { + if (storage == nullptr) + { + return false; + } + auto table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__); + const auto * merge_tree = dynamic_cast(storage.get()); + if (!merge_tree) + { return false; } - // the flow is the same as executeFetchcolumns - bool InterpreterDAGRequest::buildTSPlan(const tipb::TableScan & ts, Pipeline & pipeline) { - if(!ts.has_table_id()) { - // do not have table id - return false; - } - TableID id = ts.table_id(); - auto & tmt_ctx = context.ch_context.getTMTContext(); - auto storage = tmt_ctx.getStorages().get(id); - if(storage == nullptr) { - tmt_ctx.getSchemaSyncer()->syncSchema(id, context.ch_context, false); - storage = tmt_ctx.getStorages().get(id); - } - if(storage == nullptr) { - return false; - } - auto table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__); - const auto * merge_tree = dynamic_cast(storage.get()); - if(!merge_tree) { + Names required_columns; + for (const tipb::ColumnInfo & ci : ts.columns()) + { + ColumnID cid = ci.column_id(); + if (cid < 1 || cid > (Int64)merge_tree->getTableInfo().columns.size()) + { + // cid out of bound return false; } + String name = merge_tree->getTableInfo().columns[cid - 1].name; + //todo handle output_offset + required_columns.push_back(name); + } + if (required_columns.empty()) + { + // no column selected, must be something wrong + return false; + } - Names required_columns; - for(const tipb::ColumnInfo & ci : ts.columns()) { - ColumnID cid = ci.column_id(); - if(cid < 1 || cid > (Int64)merge_tree->getTableInfo().columns.size()) { - // cid out of bound + if (!dag_query_info.has_aggregation()) + { + // if the dag request does not contain agg, then the final output is + // based on the output of table scan + for (auto i : dag_query_info.get_dag_request().output_offsets()) + { + if (i < 0 || i >= required_columns.size()) + { + // array index out of bound return false; } - String name = merge_tree->getTableInfo().columns[cid - 1].name; - //todo handle output_offset - required_columns.push_back(name); - } - if(required_columns.empty()) { - // no column selected, must be something wrong - return false; + // do not have alias + final_project.emplace_back(required_columns[i], ""); } + } + // todo handle alias column + const Settings & settings = context.ch_context.getSettingsRef(); - if(agg_index == -1) { - // if the dag request does not contain agg, then the final output is - // based on the output of table scan - for (auto i : dag_request.output_offsets()) { - if (i < 0 || i >= required_columns.size()) { - // array index out of bound - return false; - } - // do not have alias - final_project.emplace_back(required_columns[i], ""); - } - } - // todo handle alias column - const Settings & settings = context.ch_context.getSettingsRef(); - - if(settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read) { - throw Exception("Limit for number of columns to read exceeded. " - "Requested: " + toString(required_columns.size()) - + ", maximum: " + settings.max_columns_to_read.toString(), - ErrorCodes::TOO_MANY_COLUMNS); - } + if (settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read) + { + throw Exception("Limit for number of columns to read exceeded. " + "Requested: " + + toString(required_columns.size()) + ", maximum: " + settings.max_columns_to_read.toString(), + ErrorCodes::TOO_MANY_COLUMNS); + } - size_t max_block_size = settings.max_block_size; - size_t max_streams = settings.max_threads; - QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; - if(max_streams > 1) { - max_streams *= settings.max_streams_to_max_threads_ratio; - } + size_t max_block_size = settings.max_block_size; + max_streams = settings.max_threads; + QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; + if (max_streams > 1) + { + max_streams *= settings.max_streams_to_max_threads_ratio; + } - //todo support index in - SelectQueryInfo query_info; - query_info.query = std::make_unique(); - ((ASTSelectQuery*)query_info.query.get())->is_fake_sel = true; - query_info.mvcc_query_info = std::make_unique(); - query_info.mvcc_query_info->resolve_locks = true; - query_info.mvcc_query_info->read_tso = settings.read_tso; - RegionQueryInfo info; - info.region_id = context.kv_context.region_id(); - info.conf_version = context.kv_context.region_epoch().conf_ver(); - info.version = context.kv_context.region_epoch().version(); - auto current_region = context.ch_context.getTMTContext().getRegionTable().getRegionById(id, info.region_id); - if(!current_region) { - return false; - } - info.range_in_table = current_region->getHandleRangeByTable(id); - query_info.mvcc_query_info->regions_query_info.push_back(info); - query_info.mvcc_query_info->concurrent = 0.0; - pipeline.streams = storage->read(required_columns, query_info, context.ch_context, from_stage, max_block_size, max_streams); - /// Set the limits and quota for reading data, the speed and time of the query. - { - IProfilingBlockInputStream::LocalLimits limits; - limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; - limits.size_limits = SizeLimits(settings.max_rows_to_read, settings.max_bytes_to_read, settings.read_overflow_mode); - limits.max_execution_time = settings.max_execution_time; - limits.timeout_overflow_mode = settings.timeout_overflow_mode; + //todo support index in + SelectQueryInfo query_info; + query_info.query = std::make_unique(); + ((ASTSelectQuery *)query_info.query.get())->is_fake_sel = true; + query_info.mvcc_query_info = std::make_unique(); + query_info.mvcc_query_info->resolve_locks = true; + query_info.mvcc_query_info->read_tso = settings.read_tso; + RegionQueryInfo info; + info.region_id = context.kv_context.region_id(); + info.conf_version = context.kv_context.region_epoch().conf_ver(); + info.version = context.kv_context.region_epoch().version(); + auto current_region = context.ch_context.getTMTContext().getRegionTable().getRegionById(id, info.region_id); + if (!current_region) + { + return false; + } + info.range_in_table = current_region->getHandleRangeByTable(id); + query_info.mvcc_query_info->regions_query_info.push_back(info); + query_info.mvcc_query_info->concurrent = 0.0; + pipeline.streams = storage->read(required_columns, query_info, context.ch_context, from_stage, max_block_size, max_streams); + /// Set the limits and quota for reading data, the speed and time of the query. + { + IProfilingBlockInputStream::LocalLimits limits; + limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; + limits.size_limits = SizeLimits(settings.max_rows_to_read, settings.max_bytes_to_read, settings.read_overflow_mode); + limits.max_execution_time = settings.max_execution_time; + limits.timeout_overflow_mode = settings.timeout_overflow_mode; - /** Quota and minimal speed restrictions are checked on the initiating server of the request, and not on remote servers, + /** Quota and minimal speed restrictions are checked on the initiating server of the request, and not on remote servers, * because the initiating server has a summary of the execution of the request on all servers. * * But limits on data size to read and maximum execution time are reasonable to check both on initiator and * additionally on each remote server, because these limits are checked per block of data processed, * and remote servers may process way more blocks of data than are received by initiator. */ - limits.min_execution_speed = settings.min_execution_speed; - limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed; - - QuotaForIntervals & quota = context.ch_context.getQuota(); - - pipeline.transform([&](auto & stream) - { - if (IProfilingBlockInputStream * p_stream = dynamic_cast(stream.get())) - { - p_stream->setLimits(limits); - p_stream->setQuota(quota); - } - }); - } - return true; - } + limits.min_execution_speed = settings.min_execution_speed; + limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed; - //todo return the error message - bool InterpreterDAGRequest::buildPlan(Pipeline & pipeline) { - // step 1. build table scan - if(!buildTSPlan(dag_request.executors(ts_index).tbl_scan(), pipeline)) { - return false; - } - // step 2. build selection if needed - if(sel_index != -1) { - if(buildSelPlan(dag_request.executors(sel_index).selection(), pipeline)) { - return false; + QuotaForIntervals & quota = context.ch_context.getQuota(); + + pipeline.transform([&](auto & stream) { + if (IProfilingBlockInputStream * p_stream = dynamic_cast(stream.get())) + { + p_stream->setLimits(limits); + p_stream->setQuota(quota); } + }); + } + ColumnsWithTypeAndName columnsWithTypeAndName = pipeline.firstStream()->getHeader().getColumnsWithTypeAndName(); + source_columns = storage->getColumns().getAllPhysical(); + return true; +} + +InterpreterDAGRequest::AnalysisResult InterpreterDAGRequest::analyzeExpressions() +{ + AnalysisResult res; + ExpressionActionsChain chain; + res.need_aggregate = dag_query_info.has_aggregation(); + DAGExpressionAnalyzer expressionAnalyzer(source_columns, context.ch_context); + if (dag_query_info.has_selection()) + { + if (expressionAnalyzer.appendWhere(chain, dag_query_info.get_sel(), res.filter_column_name)) + { + res.has_where = true; + res.before_where = chain.getLastActions(); + res.filter_column_name = chain.steps.back().required_output[0]; + chain.addStep(); } - // step 3. build agg if needed - if(agg_index != -1) { - return false; - } - // step 3. build order by if needed - if(order_index != -1) { - return false; - } - // step 3. build limit if needed - if(limit_index != -1) { - return false; - } - return true; - } - - BlockIO InterpreterDAGRequest::execute() { - Pipeline pipeline; - buildPlan(pipeline); - // add final project - auto stream_before_project = pipeline.firstStream(); - auto columns = stream_before_project->getHeader(); - NamesAndTypesList input_column; - for(auto column : columns.getColumnsWithTypeAndName()) { - input_column.emplace_back(column.name, column.type); - } - ExpressionActionsPtr project = std::make_shared(input_column, context.ch_context.getSettingsRef()); - project->add(ExpressionAction::project(final_project)); - auto final_stream = std::make_shared(stream_before_project, project); - BlockIO res; - res.in = final_stream; - return res; } + if (res.need_aggregate) + { + throw Exception("agg not supported"); + } + if (dag_query_info.has_topN()) + { + res.has_order_by = expressionAnalyzer.appendOrderBy(chain, dag_query_info.get_topN(), res.order_column_names); + } + // append final project results + for (auto & name : final_project) + { + chain.steps.back().required_output.push_back(name.first); + } + res.before_order_and_select = chain.getLastActions(); + chain.finalize(); + chain.clear(); + //todo need call prependProjectInput?? + return res; +} + +void InterpreterDAGRequest::executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, String & filter_column) +{ + pipeline.transform( + [&](auto & stream) { stream = std::make_shared(stream, expressionActionsPtr, filter_column); }); +} + +void InterpreterDAGRequest::executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr) +{ + if (expressionActionsPtr->getActions().size() > 0) + { + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, expressionActionsPtr); }); + } +} + +SortDescription InterpreterDAGRequest::getSortDescription(Strings & order_column_names) +{ + // construct SortDescription + SortDescription order_descr; + const tipb::TopN & topN = dag_query_info.get_topN(); + order_descr.reserve(topN.order_by_size()); + for (int i = 0; i < topN.order_by_size(); i++) + { + String name = order_column_names[i]; + int direction = topN.order_by(i).desc() ? -1 : 1; + // todo get this information from DAGRequest + // currently use NULLS LAST + int nulls_direction = direction; + // todo get this information from DAGRequest + // currently use the defalut value + std::shared_ptr collator; + + order_descr.emplace_back(name, direction, nulls_direction, collator); + } + return order_descr; +} + +void InterpreterDAGRequest::executeUnion(Pipeline & pipeline) +{ + if (pipeline.hasMoreThanOneStream()) + { + pipeline.firstStream() = std::make_shared>(pipeline.streams, nullptr, max_streams); + pipeline.streams.resize(1); + } +} + +void InterpreterDAGRequest::executeOrder(Pipeline & pipeline, Strings & order_column_names) +{ + SortDescription order_descr = getSortDescription(order_column_names); + const Settings & settings = context.ch_context.getSettingsRef(); + Int64 limit = dag_query_info.get_topN().limit(); + + pipeline.transform([&](auto & stream) { + auto sorting_stream = std::make_shared(stream, order_descr, limit); + + /// Limits on sorting + IProfilingBlockInputStream::LocalLimits limits; + limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; + limits.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode); + sorting_stream->setLimits(limits); + + stream = sorting_stream; + }); + + /// If there are several streams, we merge them into one + executeUnion(pipeline); + + /// Merge the sorted blocks. + pipeline.firstStream() = std::make_shared(pipeline.firstStream(), order_descr, settings.max_block_size, + limit, settings.max_bytes_before_external_sort, context.ch_context.getTemporaryPath()); +} + +//todo return the error message +bool InterpreterDAGRequest::executeImpl(Pipeline & pipeline) +{ + if (!executeTS(dag_query_info.get_ts(), pipeline)) + { + return false; + } + + auto res = analyzeExpressions(); + // execute selection + if (res.has_where) + { + executeWhere(pipeline, res.before_where, res.filter_column_name); + } + if (res.need_aggregate) + { + // execute aggregation + throw Exception("agg not supported"); + } + else + { + executeExpression(pipeline, res.before_order_and_select); + } + + if (res.has_order_by) + { + // execute topN + executeOrder(pipeline, res.order_column_names); + } + + // execute projection + executeFinalProject(pipeline); + + // execute limit + if (dag_query_info.has_limit() && !dag_query_info.has_topN()) + { + executeLimit(pipeline); + } + return true; +} + +void InterpreterDAGRequest::executeFinalProject(Pipeline & pipeline) +{ + auto columns = pipeline.firstStream()->getHeader(); + NamesAndTypesList input_column; + for (auto column : columns.getColumnsWithTypeAndName()) + { + input_column.emplace_back(column.name, column.type); + } + ExpressionActionsPtr project = std::make_shared(input_column, context.ch_context.getSettingsRef()); + project->add(ExpressionAction::project(final_project)); + // add final project + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, project); }); +} + +void InterpreterDAGRequest::executeLimit(Pipeline & pipeline) +{ + pipeline.transform( + [&](auto & stream) { stream = std::make_shared(stream, dag_query_info.get_limit().limit(), 0, false); }); + if (pipeline.hasMoreThanOneStream()) + { + executeUnion(pipeline); + pipeline.transform( + [&](auto & stream) { stream = std::make_shared(stream, dag_query_info.get_limit().limit(), 0, false); }); + } +} + +BlockIO InterpreterDAGRequest::execute() +{ + Pipeline pipeline; + executeImpl(pipeline); + executeUnion(pipeline); + + BlockIO res; + res.in = pipeline.firstStream(); + return res; } +} // namespace DB diff --git a/dbms/src/Interpreters/InterpreterDAGRequest.h b/dbms/src/Interpreters/InterpreterDAGRequest.h index 13a542b597a..7cfe18c9374 100644 --- a/dbms/src/Interpreters/InterpreterDAGRequest.h +++ b/dbms/src/Interpreters/InterpreterDAGRequest.h @@ -2,23 +2,26 @@ #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" -#include #include +#include #pragma GCC diagnostic pop -#include #include +#include #include +#include #include #include -namespace DB { +namespace DB +{ /** build ch plan from dag request: dag executors -> ch plan */ -class InterpreterDAGRequest : public IInterpreter { +class InterpreterDAGRequest : public IInterpreter +{ public: - InterpreterDAGRequest(CoprocessorContext & context_, const tipb::DAGRequest & dag_request); + InterpreterDAGRequest(CoprocessorContext & context_, DAGQueryInfo & dag_query_info); ~InterpreterDAGRequest() = default; @@ -26,13 +29,11 @@ class InterpreterDAGRequest : public IInterpreter { private: CoprocessorContext & context; - const tipb::DAGRequest & dag_request; NamesWithAliases final_project; - Int32 ts_index = -1; - Int32 sel_index = -1; - Int32 agg_index = -1; - Int32 order_index = -1; - Int32 limit_index = -1; + DAGQueryInfo & dag_query_info; + NamesAndTypesList source_columns; + size_t max_streams = 1; + struct Pipeline { BlockInputStreams streams; @@ -46,15 +47,35 @@ class InterpreterDAGRequest : public IInterpreter { transform(stream); } - bool hasMoreThanOneStream() const - { - return streams.size() > 1; - } + bool hasMoreThanOneStream() const { return streams.size() > 1; } }; - bool buildPlan(Pipeline & streams); - bool buildTSPlan(const tipb::TableScan & ts, Pipeline & streams); - bool buildSelPlan(const tipb::Selection & sel, Pipeline & streams); + struct AnalysisResult + { + bool has_where = false; + bool need_aggregate = false; + bool has_order_by = false; + + ExpressionActionsPtr before_where; + ExpressionActionsPtr before_aggregation; + ExpressionActionsPtr before_order_and_select; + ExpressionActionsPtr final_projection; + + String filter_column_name; + Strings order_column_names; + /// Columns from the SELECT list, before renaming them to aliases. + Names selected_columns; + }; + bool executeImpl(Pipeline & pipeline); + bool executeTS(const tipb::TableScan & ts, Pipeline & pipeline); + void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, String & filter_column); + void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr); + void executeOrder(Pipeline & pipeline, Strings & order_column_names); + void executeUnion(Pipeline & pipeline); + void executeLimit(Pipeline & pipeline); + void executeFinalProject(Pipeline & pipeline); + SortDescription getSortDescription(Strings & order_column_names); + AnalysisResult analyzeExpressions(); }; -} +} // namespace DB diff --git a/dbms/src/Server/cop_test.cpp b/dbms/src/Server/cop_test.cpp index 13559193ad0..d039d90465d 100644 --- a/dbms/src/Server/cop_test.cpp +++ b/dbms/src/Server/cop_test.cpp @@ -1,50 +1,57 @@ +#include #include + #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" -#include +#include #include +#include #include -#include #pragma GCC diagnostic pop -#include + #include using ChannelPtr = std::shared_ptr; using SubPtr = std::shared_ptr; static const int DAGREQUEST = 103; -class FlashClient { +class FlashClient +{ private: SubPtr sp; + public: - FlashClient(ChannelPtr cp) : sp(tikvpb::Tikv::NewStub(cp)){ - } - grpc::Status coprocessor(coprocessor::Request* rqst) { + FlashClient(ChannelPtr cp) : sp(tikvpb::Tikv::NewStub(cp)) {} + grpc::Status coprocessor(coprocessor::Request * rqst) + { grpc::ClientContext clientContext; - clientContext.AddMetadata("user_name",""); - clientContext.AddMetadata("builder_version","v2"); + clientContext.AddMetadata("user_name", ""); + clientContext.AddMetadata("builder_version", "v2"); coprocessor::Response response; grpc::Status status = sp->Coprocessor(&clientContext, *rqst, &response); size_t column_num = 3; - if(status.ok()) { + if (status.ok()) + { // if status is ok, try to decode the result tipb::SelectResponse selectResponse; - if(selectResponse.ParseFromString(response.data())) { - for(tipb::Chunk chunk : selectResponse.chunks()) { + if (selectResponse.ParseFromString(response.data())) + { + for (tipb::Chunk chunk : selectResponse.chunks()) + { size_t cursor = 0; std::vector row_result; - const std::string &data = chunk.rows_data(); - while (cursor < data.size()) { + const std::string & data = chunk.rows_data(); + while (cursor < data.size()) + { row_result.push_back(DB::DecodeDatum(cursor, data)); - if(row_result.size() == column_num) { + if (row_result.size() == column_num) + { //print the result - std::cout << row_result[0].get() - << " "<< row_result[1].get() - << " "<< row_result[2].get() << std::endl; + std::cout << row_result[0].get() << " " << row_result[1].get() << " " + << row_result[2].get() << std::endl; row_result.clear(); } } - } } } @@ -53,15 +60,16 @@ class FlashClient { }; using ClientPtr = std::shared_ptr; -grpc::Status rpcTest() { - ChannelPtr cp = grpc::CreateChannel("localhost:9093", grpc::InsecureChannelCredentials()); +grpc::Status rpcTest() +{ + ChannelPtr cp = grpc::CreateChannel("localhost:9093", grpc::InsecureChannelCredentials()); ClientPtr clientPtr = std::make_shared(cp); // construct a dag request tipb::DAGRequest dagRequest; dagRequest.set_start_ts(18446744073709551615uL); - tipb::Executor *executor = dagRequest.add_executors(); + tipb::Executor * executor = dagRequest.add_executors(); executor->set_tp(tipb::ExecType::TypeTableScan); - tipb::TableScan *ts = executor->mutable_tbl_scan(); + tipb::TableScan * ts = executor->mutable_tbl_scan(); ts->set_table_id(41); tipb::ColumnInfo * ci = ts->add_columns(); ci->set_column_id(1); @@ -70,30 +78,48 @@ grpc::Status rpcTest() { dagRequest.add_output_offsets(1); dagRequest.add_output_offsets(0); dagRequest.add_output_offsets(1); - /* executor = dagRequest.add_executors(); executor->set_tp(tipb::ExecType::TypeSelection); - tipb::Selection *selection = executor->mutable_selection(); - tipb::Expr *expr = selection->add_conditions(); + tipb::Selection * selection = executor->mutable_selection(); + tipb::Expr * expr = selection->add_conditions(); expr->set_tp(tipb::ExprType::ScalarFunc); expr->set_sig(tipb::ScalarFuncSig::LTInt); - tipb::Expr *col = expr->add_children(); - tipb::Expr *value = expr->add_children(); + tipb::Expr * col = expr->add_children(); + tipb::Expr * value = expr->add_children(); col->set_tp(tipb::ExprType::ColumnRef); std::stringstream ss; DB::EncodeNumber(2, ss); col->set_val(ss.str()); value->set_tp(tipb::ExprType::Int64); ss.str(""); - DB::EncodeNumber(289,ss); + DB::EncodeNumber(123, ss); value->set_val(std::string(ss.str())); - */ + + // topn + executor = dagRequest.add_executors(); + executor->set_tp(tipb::ExecType::TypeTopN); + tipb::TopN * topN = executor->mutable_topn(); + topN->set_limit(3); + tipb::ByItem * byItem = topN->add_order_by(); + byItem->set_desc(true); + tipb::Expr * expr1 = byItem->mutable_expr(); + expr1->set_tp(tipb::ExprType::ColumnRef); + ss.str(""); + DB::EncodeNumber(2, ss); + expr1->set_val(ss.str()); + // limit + /* + executor = dagRequest.add_executors(); + executor->set_tp(tipb::ExecType::TypeLimit); + tipb::Limit *limit = executor->mutable_limit(); + limit->set_limit(1); + */ // construct a coprocessor request coprocessor::Request request; //todo add context info - kvrpcpb::Context *ctx = request.mutable_context(); + kvrpcpb::Context * ctx = request.mutable_context(); ctx->set_region_id(2); auto region_epoch = ctx->mutable_region_epoch(); region_epoch->set_version(20); @@ -104,7 +130,8 @@ grpc::Status rpcTest() { return clientPtr->coprocessor(&request); } -void codecTest() { +void codecTest() +{ Int64 i = 123; std::stringstream ss; DB::EncodeNumber(i, ss); @@ -116,12 +143,13 @@ void codecTest() { r++; } -int main() { -// std::cout << "Before rpcTest"<< std::endl; +int main() +{ + // std::cout << "Before rpcTest"<< std::endl; grpc::Status ret = rpcTest(); -// codecTest(); -// std::cout << "End rpcTest " << std::endl; -// std::cout << "The ret is " << ret.error_code() << " " << ret.error_details() -// << " " << ret.error_message() << std::endl; + // codecTest(); + // std::cout << "End rpcTest " << std::endl; + // std::cout << "The ret is " << ret.error_code() << " " << ret.error_details() + // << " " << ret.error_message() << std::endl; return 0; } diff --git a/dbms/src/Storages/Transaction/TypeMapping.cpp b/dbms/src/Storages/Transaction/TypeMapping.cpp index 91161b787a4..706f98322f7 100644 --- a/dbms/src/Storages/Transaction/TypeMapping.cpp +++ b/dbms/src/Storages/Transaction/TypeMapping.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include @@ -85,15 +86,27 @@ DataTypePtr TypeMapping::getUnsigned(const ColumnInfo & column_info) return unsigned_type_map[column_info.tp](column_info); } -TiDB::CodecFlag TypeMapping::getCodecFlag(const DB::DataTypePtr & dataTypePtr) { +TiDB::CodecFlag TypeMapping::getCodecFlag(const DB::DataTypePtr & dataTypePtr) +{ // fixme: String's CodecFlag will be CodecFlagCompactBytes, which is wrong for Json type return codec_flag_map[dataTypePtr->getFamilyName()]; } -TiDB::CodecFlag getCodecFlagByDataType(const DataTypePtr & dataTypePtr) { +TiDB::CodecFlag getCodecFlagByDataType(const DataTypePtr & dataTypePtr) +{ return TypeMapping::instance().getCodecFlag(dataTypePtr); } +DataTypePtr getDataTypeByFieldType(const tipb::FieldType & field_type) +{ + ColumnInfo mock_ci; + mock_ci.tp = static_cast(field_type.tp()); + mock_ci.flag = field_type.flag(); + mock_ci.flen = field_type.flen(); + mock_ci.decimal = field_type.decimal(); + return getDataTypeByColumnInfo(mock_ci); +} + DataTypePtr getDataTypeByColumnInfo(const ColumnInfo & column_info) { DataTypePtr base; diff --git a/dbms/src/Storages/Transaction/TypeMapping.h b/dbms/src/Storages/Transaction/TypeMapping.h index d8b2fc32357..db05d27ff84 100644 --- a/dbms/src/Storages/Transaction/TypeMapping.h +++ b/dbms/src/Storages/Transaction/TypeMapping.h @@ -1,5 +1,10 @@ #pragma once +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#include +#pragma GCC diagnostic pop + #include #include @@ -11,6 +16,8 @@ using ColumnInfo = TiDB::ColumnInfo; DataTypePtr getDataTypeByColumnInfo(const ColumnInfo & column_info); +DataTypePtr getDataTypeByFieldType(const tipb::FieldType & field_type); + TiDB::CodecFlag getCodecFlagByDataType(const DataTypePtr & dataTypePtr); }