Skip to content

Commit 57b3c20

Browse files
authored
YQ-3894 RD supported pushdown for SafeCast, ToBytes, FlatMap (#12191)
1 parent 91ebb71 commit 57b3c20

File tree

12 files changed

+837
-657
lines changed

12 files changed

+837
-657
lines changed

ydb/library/yql/providers/common/pushdown/collection.cpp

Lines changed: 508 additions & 561 deletions
Large diffs are not rendered by default.

ydb/library/yql/providers/common/pushdown/physical_opt.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ TPredicateNode SplitForPartialPushdown(const NPushdown::TPredicateNode& predicat
3939

4040
}
4141

42-
TMaybeNode<TCoLambda> MakePushdownPredicate(const TCoLambda& lambda, TExprContext& ctx, const TPositionHandle& pos, const TSettings& settings) {
42+
NPushdown::TPredicateNode MakePushdownNode(const NNodes::TCoLambda& lambda, TExprContext& ctx, const TPositionHandle& pos, const TSettings& settings) {
4343
auto lambdaArg = lambda.Args().Arg(0).Ptr();
4444

4545
YQL_LOG(TRACE) << "Push filter. Initial filter lambda: " << NCommon::ExprToPrettyString(ctx, lambda.Ref());
@@ -54,7 +54,11 @@ TMaybeNode<TCoLambda> MakePushdownPredicate(const TCoLambda& lambda, TExprContex
5454
NPushdown::CollectPredicates(optionalIf.Predicate(), predicateTree, TExprBase(lambdaArg), TExprBase(lambdaArg), settings);
5555
YQL_ENSURE(predicateTree.IsValid(), "Collected filter predicates are invalid");
5656

57-
NPushdown::TPredicateNode predicateToPush = SplitForPartialPushdown(predicateTree, ctx, pos, settings);
57+
return SplitForPartialPushdown(predicateTree, ctx, pos, settings);
58+
}
59+
60+
TMaybeNode<TCoLambda> MakePushdownPredicate(const TCoLambda& lambda, TExprContext& ctx, const TPositionHandle& pos, const TSettings& settings) {
61+
NPushdown::TPredicateNode predicateToPush = MakePushdownNode(lambda, ctx, pos, settings);
5862
if (!predicateToPush.IsValid()) {
5963
return {};
6064
}
@@ -64,7 +68,7 @@ TMaybeNode<TCoLambda> MakePushdownPredicate(const TCoLambda& lambda, TExprContex
6468
.Args({"filter_row"})
6569
.Body<TExprApplier>()
6670
.Apply(predicateToPush.ExprNode.Cast())
67-
.With(TExprBase(lambdaArg), "filter_row")
71+
.With(lambda.Args().Arg(0), "filter_row")
6872
.Build()
6973
.Done();
7074
// clang-format on
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
#pragma once
22

3+
#include "predicate_node.h"
4+
35
#include <ydb/library/yql/ast/yql_expr.h>
46
#include <ydb/library/yql/ast/yql_pos_handle.h>
7+
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
58
#include <ydb/library/yql/core/expr_nodes_gen/yql_expr_nodes_gen.h>
69

710
namespace NYql::NPushdown {
811

12+
NPushdown::TPredicateNode MakePushdownNode(const NNodes::TCoLambda& lambda, TExprContext& ctx, const TPositionHandle& pos, const TSettings& settings);
913
NNodes::TMaybeNode<NNodes::TCoLambda> MakePushdownPredicate(const NNodes::TCoLambda& lambda, TExprContext& ctx, const TPositionHandle& pos, const TSettings& settings);
1014

1115
} // namespace NYql::NPushdown

ydb/library/yql/providers/common/pushdown/predicate_node.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ bool TPredicateNode::IsValid() const {
3030
return res && ExprNode.IsValid();
3131
}
3232

33+
bool TPredicateNode::IsEmpty() const {
34+
if (!ExprNode || !IsValid()) {
35+
return true;
36+
}
37+
if (const auto maybeBool = ExprNode.Maybe<NNodes::TCoBool>()) {
38+
return TStringBuf(maybeBool.Cast().Literal()) == "true"sv;
39+
}
40+
return false;
41+
}
42+
3343
void TPredicateNode::SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos, EBoolOp op) {
3444
auto predicatesSize = predicates.size();
3545
if (predicatesSize == 0) {

ydb/library/yql/providers/common/pushdown/predicate_node.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ struct TPredicateNode {
2323
~TPredicateNode();
2424

2525
bool IsValid() const;
26+
bool IsEmpty() const;
2627
void SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos, EBoolOp op);
2728

2829
NNodes::TMaybeNode<NNodes::TExprBase> ExprNode;

ydb/library/yql/providers/common/pushdown/settings.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ struct TSettings {
3838
// May be partially pushdowned as:
3939
// $A OR $C
4040
// In case of unsupported / complicated expressions $B and $D
41-
SplitOrOperator = 1 << 22
41+
SplitOrOperator = 1 << 22,
42+
ToBytesFromStringExpressions = 1 << 23, // ToBytes(string like)
43+
FlatMapOverOptionals = 1 << 24 // FlatMap(Optional<T>, Lmabda (T) -> Optional<U>)
4244
};
4345

4446
explicit TSettings(NLog::EComponent logComponent)

ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,12 @@ message TExpression {
352352
TExpression else_expression = 3;
353353
}
354354

355+
// CAST($value AS $type)
356+
message TCast {
357+
TExpression value = 1;
358+
Ydb.Type type = 2;
359+
}
360+
355361
message TNull {
356362
}
357363

@@ -368,6 +374,8 @@ message TExpression {
368374
TCoalesce coalesce = 5;
369375

370376
TIf if = 6;
377+
378+
TCast cast = 7;
371379
}
372380
}
373381

ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp

Lines changed: 273 additions & 77 deletions
Large diffs are not rendered by default.

ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ namespace NYql::NConnector::NApi {
99
namespace NYql {
1010

1111
bool IsEmptyFilterPredicate(const NNodes::TCoLambda& lambda);
12+
bool SerializeFilterPredicate(const NNodes::TExprBase& predicateBody, const NNodes::TCoArgument& predicateArgument, NConnector::NApi::TPredicate* proto, TStringBuilder& err);
1213
bool SerializeFilterPredicate(const NNodes::TCoLambda& predicate, NConnector::NApi::TPredicate* proto, TStringBuilder& err);
1314
TString FormatWhere(const NConnector::NApi::TPredicate& predicate);
1415
} // namespace NYql

ydb/library/yql/providers/generic/pushdown/yql_generic_match_predicate.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ namespace NYql::NGenericPushDown {
5555
case NYql::NConnector::NApi::TExpression::kNull:
5656
case NYql::NConnector::NApi::TExpression::kCoalesce:
5757
case NYql::NConnector::NApi::TExpression::kIf:
58+
case NYql::NConnector::NApi::TExpression::kCast:
5859
case NYql::NConnector::NApi::TExpression::PAYLOAD_NOT_SET:
5960
return false;
6061
}
@@ -70,6 +71,7 @@ namespace NYql::NGenericPushDown {
7071
case NYql::NConnector::NApi::TExpression::kNull:
7172
case NYql::NConnector::NApi::TExpression::kCoalesce:
7273
case NYql::NConnector::NApi::TExpression::kIf:
74+
case NYql::NConnector::NApi::TExpression::kCast:
7375
case NYql::NConnector::NApi::TExpression::PAYLOAD_NOT_SET:
7476
return false;
7577
}
@@ -281,6 +283,7 @@ namespace NYql::NGenericPushDown {
281283
case NYql::NConnector::NApi::TExpression::kNull:
282284
case NYql::NConnector::NApi::TExpression::kCoalesce:
283285
case NYql::NConnector::NApi::TExpression::kIf:
286+
case NYql::NConnector::NApi::TExpression::kCast:
284287
case NYql::NConnector::NApi::TExpression::PAYLOAD_NOT_SET:
285288
return Triple::Unknown;
286289
}

0 commit comments

Comments
 (0)