Skip to content

Commit 3b98ee4

Browse files
authored
Optimize multiple Filter consumers. Expand SkipNull{Members,Elements} early (#6052)
1 parent b163e85 commit 3b98ee4

File tree

7 files changed

+244
-55
lines changed

7 files changed

+244
-55
lines changed

ydb/library/yql/core/common_opt/yql_co_finalizers.cpp

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "yql_flatmap_over_join.h"
33
#include "yql_co.h"
44

5+
#include <ydb/library/yql/core/yql_expr_csee.h>
56
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
67
#include <ydb/library/yql/core/yql_opt_utils.h>
78

@@ -46,6 +47,155 @@ IGraphTransformer::TStatus MultiUsageFlatMapOverJoin(const TExprNode::TPtr& node
4647
return IGraphTransformer::TStatus::Repeat;
4748
}
4849

50+
bool IsFilterMultiusageEnabled(const TOptimizeContext& optCtx) {
51+
YQL_ENSURE(optCtx.Types);
52+
static const TString multiUsageFlags = to_lower(TString("FilterPushdownEnableMultiusage"));
53+
return optCtx.Types->OptimizerFlags.contains(multiUsageFlags);
54+
}
55+
56+
void FilterPushdownWithMultiusage(const TExprNode::TPtr& node, TNodeOnNodeOwnedMap& toOptimize, TExprContext& ctx, TOptimizeContext& optCtx) {
57+
if (!node->IsCallable() || !IsFilterMultiusageEnabled(optCtx) || !optCtx.HasParent(*node) || optCtx.IsSingleUsage(*node)) {
58+
return;
59+
}
60+
61+
if (node->GetTypeAnn()->GetKind() != ETypeAnnotationKind::List ||
62+
node->GetTypeAnn()->Cast<TListExprType>()->GetItemType()->GetKind() != ETypeAnnotationKind::Struct)
63+
{
64+
return;
65+
}
66+
67+
static const THashSet<TStringBuf> skipNodes = {"ExtractMembers", "Unordered", "AssumeColumnOrder"};
68+
69+
TVector<const TExprNode*> immediateParents;
70+
YQL_ENSURE(optCtx.ParentsMap);
71+
auto immediate = optCtx.ParentsMap->find(node.Get());
72+
if (immediate != optCtx.ParentsMap->end()) {
73+
immediateParents.assign(immediate->second.begin(), immediate->second.end());
74+
// normalize parent order
75+
Sort(immediateParents, [](const TExprNode* left, const TExprNode* right) { return CompareNodes(*left, *right) < 0; });
76+
}
77+
78+
TVector<const TExprNode*> parentFilters;
79+
TExprNodeList parentFilterLambdas;
80+
TExprNodeList parentValueLambdas;
81+
size_t likelyCount = 0;
82+
for (auto parent : immediateParents) {
83+
while (skipNodes.contains(parent->Content())) {
84+
auto newParent = optCtx.GetParentIfSingle(*parent);
85+
if (newParent) {
86+
parent = newParent;
87+
} else {
88+
break;
89+
}
90+
}
91+
if (!TCoFlatMapBase::Match(parent)) {
92+
return;
93+
}
94+
95+
TCoFlatMapBase parentFlatMap(parent);
96+
if (auto cond = parentFlatMap.Lambda().Body().Maybe<TCoConditionalValueBase>()) {
97+
likelyCount += bool(cond.Cast().Predicate().Maybe<TCoLikely>());
98+
auto pos = cond.Cast().Predicate().Pos();
99+
parentFilterLambdas.push_back(ctx.NewLambda(pos,
100+
ctx.NewArguments(pos, { parentFlatMap.Lambda().Args().Arg(0).Ptr() }),
101+
cond.Cast().Predicate().Ptr()));
102+
parentValueLambdas.push_back(ctx.NewLambda(pos,
103+
ctx.NewArguments(pos, { parentFlatMap.Lambda().Args().Arg(0).Ptr() }),
104+
cond.Cast().Value().Ptr()));
105+
parentFilters.push_back(parent);
106+
} else {
107+
return;
108+
}
109+
}
110+
YQL_ENSURE(parentFilterLambdas.size() > 1);
111+
if (likelyCount == parentFilters.size()) {
112+
return;
113+
}
114+
115+
YQL_CLOG(DEBUG, Core) << "Pushdown " << parentFilters.size() << " filters to common parent " << node->Content();
116+
const TStringBuf columnPrefx = "_yql_filter_pushdown";
117+
118+
TExprNode::TPtr mapArg = ctx.NewArgument(node->Pos(), "row");
119+
TExprNode::TPtr mapBody = mapArg;
120+
TExprNode::TPtr filterArg = ctx.NewArgument(node->Pos(), "row");
121+
TExprNodeList filterPreds;
122+
for (size_t i = 0; i < parentFilterLambdas.size(); ++i) {
123+
TString memberName = TStringBuilder() << columnPrefx << i;
124+
mapBody = ctx.Builder(mapBody->Pos())
125+
.Callable("AddMember")
126+
.Add(0, mapBody)
127+
.Atom(1, memberName)
128+
.Apply(2, parentFilterLambdas[i])
129+
.With(0, mapArg)
130+
.Seal()
131+
.Seal()
132+
.Build();
133+
filterPreds.push_back(ctx.Builder(node->Pos())
134+
.Callable("Member")
135+
.Add(0, filterArg)
136+
.Atom(1, memberName)
137+
.Seal()
138+
.Build());
139+
}
140+
141+
auto newNode = ctx.Builder(node->Pos())
142+
.Callable("OrderedFilter")
143+
.Callable(0, "OrderedMap")
144+
.Add(0, node)
145+
.Add(1, ctx.NewLambda(node->Pos(), ctx.NewArguments(node->Pos(), { mapArg }), std::move(mapBody)))
146+
.Seal()
147+
.Add(1, ctx.NewLambda(node->Pos(), ctx.NewArguments(node->Pos(), { filterArg }), ctx.NewCallable(node->Pos(), "Or", std::move(filterPreds))))
148+
.Seal()
149+
.Build();
150+
151+
for (size_t i = 0; i < immediateParents.size(); ++i) {
152+
const TExprNode* curr = immediateParents[i];
153+
TExprNode::TPtr resultNode = newNode;
154+
while (curr != parentFilters[i]) {
155+
if (curr->IsCallable("AssumeColumnOrder")) {
156+
resultNode = ctx.ChangeChild(*ctx.RenameNode(*curr, "AssumeColumnOrderPartial"), 0, std::move(resultNode));
157+
} else if (curr->IsCallable("ExtractMembers")) {
158+
TExprNodeList columns = curr->Child(1)->ChildrenList();
159+
columns.push_back(ctx.NewAtom(curr->Child(1)->Pos(), TStringBuilder() << columnPrefx << i));
160+
resultNode = ctx.ChangeChildren(*curr, { resultNode, ctx.NewList(curr->Child(1)->Pos(), std::move(columns)) });
161+
} else {
162+
resultNode = ctx.ChangeChild(*curr, 0, std::move(resultNode));
163+
}
164+
curr = optCtx.GetParentIfSingle(*curr);
165+
YQL_ENSURE(curr);
166+
}
167+
168+
TCoFlatMapBase flatMap(curr);
169+
TCoConditionalValueBase cond = flatMap.Lambda().Body().Cast<TCoConditionalValueBase>();
170+
TExprNode::TPtr input = flatMap.Input().Ptr();
171+
const TTypeAnnotationNode* originalType = input->GetTypeAnn()->Cast<TListExprType>()->GetItemType();
172+
toOptimize[parentFilters[i]] = ctx.Builder(curr->Pos())
173+
.Callable(flatMap.CallableName())
174+
.Add(0, resultNode)
175+
.Lambda(1)
176+
.Param("row")
177+
.Callable(cond.CallableName())
178+
.Callable(0, "Likely")
179+
.Callable(0, "Member")
180+
.Arg(0, "row")
181+
.Atom(1, TStringBuilder() << columnPrefx << i)
182+
.Seal()
183+
.Seal()
184+
.Apply(1, parentValueLambdas[i])
185+
.With(0)
186+
.Callable("CastStruct")
187+
.Arg(0, "row")
188+
.Add(1, ExpandType(curr->Pos(), *originalType, ctx))
189+
.Seal()
190+
.Done()
191+
.Seal()
192+
.Seal()
193+
.Seal()
194+
.Seal()
195+
.Build();
196+
}
197+
}
198+
49199
}
50200

51201
void RegisterCoFinalizers(TFinalizingOptimizerMap& map) {
@@ -243,6 +393,11 @@ void RegisterCoFinalizers(TFinalizingOptimizerMap& map) {
243393

244394
return true;
245395
};
396+
397+
map[""] = [](const TExprNode::TPtr& node, TNodeOnNodeOwnedMap& toOptimize, TExprContext& ctx, TOptimizeContext& optCtx) {
398+
FilterPushdownWithMultiusage(node, toOptimize, ctx, optCtx);
399+
return true;
400+
};
246401
}
247402

248403
} // NYql

ydb/library/yql/core/common_opt/yql_co_simple1.cpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3546,6 +3546,12 @@ TExprNode::TPtr OptimizeMerge(const TExprNode::TPtr& node, TExprContext& ctx, TO
35463546
return OptimizeExtend<true>(node, ctx, optCtx);
35473547
}
35483548

3549+
bool IsEarlyExpandOfSkipNullAllowed(const TOptimizeContext& optCtx) {
3550+
YQL_ENSURE(optCtx.Types);
3551+
static const TString skipNullFlags = to_lower(TString("EarlyExpandSkipNull"));
3552+
return optCtx.Types->OptimizerFlags.contains(skipNullFlags);
3553+
}
3554+
35493555
} // namespace
35503556

35513557
void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
@@ -3700,7 +3706,10 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
37003706
return ConvertSqlInPredicatesToJoins(self, ctx);
37013707
};
37023708

3703-
map["SkipNullMembers"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& /*optCtx*/) {
3709+
map["SkipNullMembers"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
3710+
if (IsEarlyExpandOfSkipNullAllowed(optCtx)) {
3711+
return ExpandSkipNullFields(node, ctx);
3712+
}
37043713
const auto skipNullMembers = TCoSkipNullMembers(node);
37053714
if (!skipNullMembers.Members()) {
37063715
return node;
@@ -3741,7 +3750,10 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
37413750
return node;
37423751
};
37433752

3744-
map["SkipNullElements"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& /*optCtx*/) {
3753+
map["SkipNullElements"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
3754+
if (IsEarlyExpandOfSkipNullAllowed(optCtx)) {
3755+
return ExpandSkipNullFields(node, ctx);
3756+
}
37453757
const auto skipNullElements = TCoSkipNullElements(node);
37463758
if (!skipNullElements.Elements()) {
37473759
return node;

ydb/library/yql/core/common_opt/yql_co_transformer.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,16 @@ IGraphTransformer::TStatus TCommonOptTransformer::DoTransform(const TExprNode::T
142142

143143
TNodeOnNodeOwnedMap toOptimize;
144144
bool isError = false;
145+
TFinalizingOptimizerExt defaultOpt;
146+
auto defaultIt = callables.find("");
147+
if (defaultIt != callables.end()) {
148+
defaultOpt = defaultIt->second;
149+
}
145150
VisitExpr(input,
146151
[&toOptimize, &isError](const TExprNode::TPtr&) {
147152
return toOptimize.empty() && !isError;
148153
},
149-
[&callables, &toOptimize, &ctx, &optCtx, &isError](const TExprNode::TPtr& node) {
154+
[&callables, &defaultOpt, &toOptimize, &ctx, &optCtx, &isError](const TExprNode::TPtr& node) {
150155
if (isError) {
151156
return false;
152157
}
@@ -156,6 +161,9 @@ IGraphTransformer::TStatus TCommonOptTransformer::DoTransform(const TExprNode::T
156161
if (callables.cend() != rule) {
157162
isError = isError || !(rule->second)(node, toOptimize, ctx, optCtx);
158163
}
164+
if (defaultOpt && toOptimize.empty() && !isError) {
165+
isError = isError || !defaultOpt(node, toOptimize, ctx, optCtx);
166+
}
159167
}
160168
return toOptimize.empty() && !isError;
161169
}

ydb/library/yql/core/expr_nodes/yql_expr_nodes.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2561,6 +2561,14 @@
25612561
{
25622562
"Name": "TCoReplicationTargetList",
25632563
"ListBase": "TCoReplicationTarget"
2564+
},
2565+
{
2566+
"Name" : "TCoLikely",
2567+
"Base" : "TCallable",
2568+
"Match": {"Type": "Callable", "Name": "Likely"},
2569+
"Children": [
2570+
{"Index": 0, "Name": "Predicate", "Type": "TExprBase"}
2571+
]
25642572
}
25652573
]
25662574
}

ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp

Lines changed: 2 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -6560,56 +6560,6 @@ TExprNode::TPtr OptimizeSqueezeToDict(const TExprNode::TPtr& node, TExprContext
65606560
return node;
65616561
}
65626562

6563-
TExprNode::TListType GetOptionals(const TPositionHandle& pos, const TStructExprType& type, TExprContext& ctx) {
6564-
TExprNode::TListType result;
6565-
for (const auto& item : type.GetItems())
6566-
if (ETypeAnnotationKind::Optional == item->GetItemType()->GetKind())
6567-
result.emplace_back(ctx.NewAtom(pos, item->GetName()));
6568-
return result;
6569-
}
6570-
6571-
TExprNode::TListType GetOptionals(const TPositionHandle& pos, const TTupleExprType& type, TExprContext& ctx) {
6572-
TExprNode::TListType result;
6573-
if (const auto& items = type.GetItems(); !items.empty())
6574-
for (ui32 i = 0U; i < items.size(); ++i)
6575-
if (ETypeAnnotationKind::Optional == items[i]->GetKind())
6576-
result.emplace_back(ctx.NewAtom(pos, i));
6577-
return result;
6578-
}
6579-
6580-
template <bool TupleOrStruct>
6581-
TExprNode::TPtr ExpandSkipNullFields(const TExprNode::TPtr& node, TExprContext& ctx) {
6582-
YQL_CLOG(DEBUG, CorePeepHole) << "Expand " << node->Content();
6583-
if (auto fields = node->ChildrenSize() > 1U ? node->Tail().ChildrenList() :
6584-
GetOptionals(node->Pos(), *GetSeqItemType(node->Head().GetTypeAnn())->Cast<std::conditional_t<TupleOrStruct, TTupleExprType, TStructExprType>>(), ctx);
6585-
fields.empty()) {
6586-
return node->HeadPtr();
6587-
} else {
6588-
return ctx.Builder(node->Pos())
6589-
.Callable("OrderedFilter")
6590-
.Add(0, node->HeadPtr())
6591-
.Lambda(1)
6592-
.Param("item")
6593-
.Callable("And")
6594-
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
6595-
for (ui32 i = 0U; i < fields.size(); ++i) {
6596-
parent
6597-
.Callable(i, "Exists")
6598-
.Callable(0, TupleOrStruct ? "Nth" : "Member")
6599-
.Arg(0, "item")
6600-
.Add(1, std::move(fields[i]))
6601-
.Seal()
6602-
.Seal();
6603-
}
6604-
return parent;
6605-
})
6606-
.Seal()
6607-
.Seal()
6608-
.Seal().Build();
6609-
}
6610-
return node;
6611-
}
6612-
66136563
TExprNode::TPtr ExpandConstraintsOf(const TExprNode::TPtr& node, TExprContext& ctx) {
66146564
YQL_CLOG(DEBUG, CorePeepHole) << "Expand " << node->Content();
66156565

@@ -8256,8 +8206,8 @@ struct TPeepHoleRules {
82568206
{"Or", &OptimizeLogicalDups<false>},
82578207
{"CombineByKey", &ExpandCombineByKey},
82588208
{"FinalizeByKey", &ExpandFinalizeByKey},
8259-
{"SkipNullMembers", &ExpandSkipNullFields<false>},
8260-
{"SkipNullElements", &ExpandSkipNullFields<true>},
8209+
{"SkipNullMembers", &ExpandSkipNullFields},
8210+
{"SkipNullElements", &ExpandSkipNullFields},
82618211
{"ConstraintsOf", &ExpandConstraintsOf},
82628212
{"==", &ExpandSqlEqual<true, false>},
82638213
{"!=", &ExpandSqlEqual<false, false>},

ydb/library/yql/core/yql_opt_utils.cpp

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1203,6 +1203,61 @@ TExprNode::TPtr ExpandCastStruct(const TExprNode::TPtr& node, TExprContext& ctx)
12031203
return ctx.NewCallable(node->Pos(), "AsStruct", std::move(items));
12041204
}
12051205

1206+
TExprNode::TListType GetOptionals(const TPositionHandle& pos, const TStructExprType& type, TExprContext& ctx) {
1207+
TExprNode::TListType result;
1208+
for (const auto& item : type.GetItems())
1209+
if (ETypeAnnotationKind::Optional == item->GetItemType()->GetKind())
1210+
result.emplace_back(ctx.NewAtom(pos, item->GetName()));
1211+
return result;
1212+
}
1213+
1214+
TExprNode::TListType GetOptionals(const TPositionHandle& pos, const TTupleExprType& type, TExprContext& ctx) {
1215+
TExprNode::TListType result;
1216+
if (const auto& items = type.GetItems(); !items.empty())
1217+
for (ui32 i = 0U; i < items.size(); ++i)
1218+
if (ETypeAnnotationKind::Optional == items[i]->GetKind())
1219+
result.emplace_back(ctx.NewAtom(pos, i));
1220+
return result;
1221+
}
1222+
1223+
TExprNode::TPtr ExpandSkipNullFields(const TExprNode::TPtr& node, TExprContext& ctx) {
1224+
YQL_ENSURE(node->IsCallable({"SkipNullMembers", "SkipNullElements"}));
1225+
YQL_CLOG(DEBUG, Core) << "Expand " << node->Content();
1226+
const bool isTuple = node->IsCallable("SkipNullElements");
1227+
TExprNode::TListType fields;
1228+
if (node->ChildrenSize() > 1) {
1229+
fields = node->Child(1)->ChildrenList();
1230+
} else if (isTuple) {
1231+
fields = GetOptionals(node->Pos(), *GetSeqItemType(node->Head().GetTypeAnn())->Cast<TTupleExprType>(), ctx);
1232+
} else {
1233+
fields = GetOptionals(node->Pos(), *GetSeqItemType(node->Head().GetTypeAnn())->Cast<TStructExprType>(), ctx);
1234+
}
1235+
if (fields.empty()) {
1236+
return node->HeadPtr();
1237+
}
1238+
return ctx.Builder(node->Pos())
1239+
.Callable("OrderedFilter")
1240+
.Add(0, node->HeadPtr())
1241+
.Lambda(1)
1242+
.Param("item")
1243+
.Callable("And")
1244+
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
1245+
for (ui32 i = 0U; i < fields.size(); ++i) {
1246+
parent
1247+
.Callable(i, "Exists")
1248+
.Callable(0, isTuple ? "Nth" : "Member")
1249+
.Arg(0, "item")
1250+
.Add(1, std::move(fields[i]))
1251+
.Seal()
1252+
.Seal();
1253+
}
1254+
return parent;
1255+
})
1256+
.Seal()
1257+
.Seal()
1258+
.Seal().Build();
1259+
}
1260+
12061261
void ExtractSimpleKeys(const TExprNode* keySelectorBody, const TExprNode* keySelectorArg, TVector<TStringBuf>& columns) {
12071262
if (keySelectorBody->IsList()) {
12081263
for (auto& child: keySelectorBody->Children()) {

ydb/library/yql/core/yql_opt_utils.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ TExprNode::TPtr ExpandAddMember(const TExprNode::TPtr& node, TExprContext& ctx);
8585
TExprNode::TPtr ExpandReplaceMember(const TExprNode::TPtr& node, TExprContext& ctx);
8686
TExprNode::TPtr ExpandFlattenByColumns(const TExprNode::TPtr& node, TExprContext& ctx);
8787
TExprNode::TPtr ExpandCastStruct(const TExprNode::TPtr& node, TExprContext& ctx);
88+
TExprNode::TPtr ExpandSkipNullFields(const TExprNode::TPtr& node, TExprContext& ctx);
8889

8990
void ExtractSimpleKeys(const TExprNode* keySelectorBody, const TExprNode* keySelectorArg, TVector<TStringBuf>& columns);
9091
inline void ExtractSimpleKeys(const TExprNode& keySelectorLambda, TVector<TStringBuf>& columns) {

0 commit comments

Comments
 (0)