Skip to content

Commit c076edf

Browse files
Merge to 24.1 (#1772)
Co-authored-by: Iuliia Sidorina <yulia@ydb.tech>
1 parent 81296f9 commit c076edf

File tree

20 files changed

+301
-182
lines changed

20 files changed

+301
-182
lines changed

ydb/core/kqp/opt/logical/kqp_opt_log.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,8 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
210210
return output;
211211
}
212212

213-
TMaybeNode<TExprBase> DeleteOverLookup(TExprBase node, TExprContext& ctx) {
214-
TExprBase output = KqpDeleteOverLookup(node, ctx, KqpCtx);
213+
TMaybeNode<TExprBase> DeleteOverLookup(TExprBase node, TExprContext& ctx, const TGetParents& getParents) {
214+
TExprBase output = KqpDeleteOverLookup(node, ctx, KqpCtx, *getParents());
215215
DumpAppliedRule("DeleteOverLookup", node.Ptr(), output.Ptr(), ctx);
216216
return output;
217217
}

ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp

Lines changed: 140 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,57 +2,175 @@
22

33
#include <ydb/core/kqp/opt/kqp_opt_impl.h>
44
#include <ydb/core/kqp/common/kqp_yql.h>
5+
#include <ydb/library/yql/core/yql_opt_utils.h>
6+
7+
namespace {
8+
9+
bool CanPushFlatMap(const NYql::NNodes::TCoFlatMapBase& flatMap, const NYql::TKikimrTableDescription& tableDesc, const NYql::TParentsMap& parentsMap, TVector<TString> & extraColumns) {
10+
auto flatMapLambda = flatMap.Lambda();
11+
if (!NYql::IsFilterFlatMap(flatMapLambda)) {
12+
return false;
13+
}
14+
15+
const auto & flatMapLambdaArgument = flatMapLambda.Args().Arg(0).Ref();
16+
auto flatMapLambdaConditional = flatMapLambda.Body().Cast<NYql::NNodes::TCoConditionalValueBase>();
17+
18+
TSet<TString> lambdaSubset;
19+
auto isSubSet = HaveFieldsSubset(flatMapLambdaConditional.Predicate().Ptr(), flatMapLambdaArgument, lambdaSubset, parentsMap, true);
20+
auto argType = NYql::RemoveOptionalType(flatMapLambdaArgument.GetTypeAnn());
21+
if (argType->GetKind() != NYql::ETypeAnnotationKind::Struct) {
22+
return false;
23+
}
24+
// helper doesn't accept if all columns are used
25+
if (!isSubSet && lambdaSubset.size() != argType->Cast<NYql::TStructExprType>()->GetSize()) {
26+
return false;
27+
}
28+
29+
for (auto & lambdaColumn : lambdaSubset) {
30+
auto columnIndex = tableDesc.GetKeyColumnIndex(lambdaColumn);
31+
if (!columnIndex) {
32+
return false;
33+
}
34+
}
35+
36+
extraColumns.insert(extraColumns.end(), lambdaSubset.begin(), lambdaSubset.end());
37+
return true;
38+
}
39+
40+
}
541

642
namespace NKikimr::NKqp::NOpt {
743

844
using namespace NYql;
945
using namespace NYql::NNodes;
1046

11-
TExprBase KqpDeleteOverLookup(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext &kqpCtx) {
47+
TExprBase KqpDeleteOverLookup(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext &kqpCtx, const NYql::TParentsMap& parentsMap) {
1248
if (!node.Maybe<TKqlDeleteRows>()) {
1349
return node;
1450
}
1551

1652
auto deleteRows = node.Cast<TKqlDeleteRows>();
1753

54+
TMaybeNode<TCoFlatMap> filter;
55+
1856
TMaybeNode<TKqlLookupTableBase> lookup;
57+
TMaybeNode<TKqlReadTable> read;
1958
TMaybeNode<TCoSkipNullMembers> skipNulMembers;
59+
TMaybeNode<TKqlReadTableRanges> readranges;
2060

2161
if (deleteRows.Input().Maybe<TKqlLookupTableBase>()) {
2262
lookup = deleteRows.Input().Cast<TKqlLookupTableBase>();
2363
} else if (deleteRows.Input().Maybe<TCoSkipNullMembers>().Input().Maybe<TKqlLookupTableBase>()) {
2464
skipNulMembers = deleteRows.Input().Cast<TCoSkipNullMembers>();
2565
lookup = skipNulMembers.Input().Cast<TKqlLookupTableBase>();
66+
} else if (deleteRows.Input().Maybe<TKqlReadTable>()) {
67+
read = deleteRows.Input().Cast<TKqlReadTable>();
2668
} else {
27-
return node;
28-
}
29-
30-
YQL_ENSURE(lookup);
31-
if (deleteRows.Table().Raw() != lookup.Cast().Table().Raw()) {
32-
return node;
69+
TMaybeNode<TExprBase> input = deleteRows.Input();
70+
if (input.Maybe<TCoFlatMap>()) {
71+
filter = deleteRows.Input().Cast<TCoFlatMap>();
72+
input = filter.Input();
73+
}
74+
readranges = input.Maybe<TKqlReadTableRanges>();
75+
if (!readranges) {
76+
return node;
77+
}
3378
}
3479

35-
auto lookupKeysType = lookup.Cast().LookupKeys().Ref().GetTypeAnn();
36-
auto lookupKeyType = lookupKeysType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
37-
YQL_ENSURE(lookupKeyType);
38-
39-
// Only consider complete PK lookups
80+
TMaybeNode<TExprBase> deleteInput;
4081
const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, deleteRows.Table().Path());
41-
if (lookupKeyType->GetSize() != tableDesc.Metadata->KeyColumnNames.size()) {
42-
return node;
43-
}
44-
45-
TExprBase deleteInput = lookup.Cast().LookupKeys();
46-
if (skipNulMembers) {
47-
deleteInput = Build<TCoSkipNullMembers>(ctx, skipNulMembers.Cast().Pos())
48-
.Input(deleteInput)
49-
.Members(skipNulMembers.Cast().Members())
82+
if (lookup) {
83+
if (deleteRows.Table().Raw() != lookup.Cast().Table().Raw()) {
84+
return node;
85+
}
86+
87+
auto lookupKeysType = lookup.Cast().LookupKeys().Ref().GetTypeAnn();
88+
auto lookupKeyType = lookupKeysType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
89+
YQL_ENSURE(lookupKeyType);
90+
91+
// Only consider complete PK lookups
92+
if (lookupKeyType->GetSize() != tableDesc.Metadata->KeyColumnNames.size()) {
93+
return node;
94+
}
95+
96+
deleteInput = lookup.Cast().LookupKeys();
97+
if (skipNulMembers) {
98+
deleteInput = Build<TCoSkipNullMembers>(ctx, skipNulMembers.Cast().Pos())
99+
.Input(deleteInput.Cast())
100+
.Members(skipNulMembers.Cast().Members())
101+
.Done();
102+
}
103+
} else if (read) {
104+
if (deleteRows.Table().Raw() != read.Cast().Table().Raw()) {
105+
return node;
106+
}
107+
108+
const auto& rangeFrom = read.Cast().Range().From();
109+
const auto& rangeTo = read.Cast().Range().To();
110+
111+
if (!rangeFrom.Maybe<TKqlKeyInc>() || !rangeTo.Maybe<TKqlKeyInc>()) {
112+
return node;
113+
}
114+
115+
if (rangeFrom.Raw() != rangeTo.Raw()) {
116+
return node;
117+
}
118+
119+
if (rangeFrom.ArgCount() != tableDesc.Metadata->KeyColumnNames.size()) {
120+
return node;
121+
}
122+
123+
TVector<TExprBase> structMembers;
124+
for (ui32 i = 0; i < rangeFrom.ArgCount(); ++i) {
125+
TCoAtom columnNameAtom(ctx.NewAtom(node.Pos(), tableDesc.Metadata->KeyColumnNames[i]));
126+
127+
auto member = Build<TCoNameValueTuple>(ctx, node.Pos())
128+
.Name(columnNameAtom)
129+
.Value(rangeFrom.Arg(i))
130+
.Done();
131+
132+
structMembers.push_back(member);
133+
}
134+
135+
deleteInput = Build<TCoAsList>(ctx, node.Pos())
136+
.Add<TCoAsStruct>()
137+
.Add(structMembers)
138+
.Build()
50139
.Done();
140+
} else if (readranges) {
141+
if (deleteRows.Table().Raw() != readranges.Cast().Table().Raw()) {
142+
return node;
143+
}
144+
145+
if (!readranges.Cast().PrefixPointsExpr()) {
146+
return node;
147+
}
148+
149+
const auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, readranges.Cast().Table().Path().Value());
150+
auto hint = TKqpReadTableExplainPrompt::Parse(readranges.Cast().ExplainPrompt());
151+
if (hint.PointPrefixLen != tableDesc.Metadata->KeyColumnNames.size()) {
152+
return node;
153+
}
154+
155+
if (filter) {
156+
TVector<TString> extraColumns;
157+
if (!CanPushFlatMap(filter.Cast(), tableDesc, parentsMap, extraColumns)) {
158+
return node;
159+
}
160+
deleteInput = Build<TCoFlatMap>(ctx, node.Pos())
161+
.Lambda(filter.Lambda().Cast())
162+
.Input(readranges.PrefixPointsExpr().Cast())
163+
.Done();
164+
} else {
165+
deleteInput = readranges.PrefixPointsExpr();
166+
}
51167
}
52168

169+
YQL_ENSURE(deleteInput);
170+
53171
return Build<TKqlDeleteRows>(ctx, deleteRows.Pos())
54172
.Table(deleteRows.Table())
55-
.Input(deleteInput)
173+
.Input(deleteInput.Cast())
56174
.Done();
57175
}
58176

ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,17 @@ TExprBase KqpApplyExtractMembersToReadTableRanges(TExprBase node, TExprContext&
124124
.Done();
125125
}
126126

127+
if (auto readRange = node.Maybe<TKqlReadTableRanges>()) {
128+
return Build<TKqlReadTableRanges>(ctx, read.Pos())
129+
.Table(read.Table())
130+
.Ranges(read.Ranges())
131+
.Columns(usedColumns.Cast())
132+
.Settings(read.Settings())
133+
.ExplainPrompt(read.ExplainPrompt())
134+
.PrefixPointsExpr(readRange.PrefixPointsExpr())
135+
.Done();
136+
}
137+
127138
return Build<TKqlReadTableRangesBase>(ctx, read.Pos())
128139
.CallableName(read.CallableName())
129140
.Table(read.Table())

ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,59 @@ TMaybeNode<TExprBase> KqpRewriteLiteralLookup(const TExprBase& node, TExprContex
353353
lookupKeys = skipNullMembers.Input();
354354
}
355355

356+
TKqpReadTableSettings settings;
357+
if (skipNullMembers) {
358+
auto skipNullColumns = skipNullMembers.Cast().Members();
359+
360+
if (skipNullColumns) {
361+
for (const auto &column : skipNullColumns.Cast()) {
362+
settings.AddSkipNullKey(TString(column.Value()));
363+
}
364+
}
365+
}
366+
367+
const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, lookup.Table().Path().Value());
368+
if (auto lookupKeysFlatMap = lookupKeys.Maybe<TCoFlatMapBase>()) {
369+
auto flatMapRangeInput = lookupKeysFlatMap.Cast().Input().Maybe<TCoRangeFinalize>();
370+
371+
// This rule should depend on feature flag for safety
372+
if (!flatMapRangeInput || !kqpCtx.Config->EnableKqpDataQueryStreamLookup) {
373+
return {};
374+
}
375+
376+
auto lookupKeysType = lookupKeys.Ref().GetTypeAnn();
377+
YQL_ENSURE(lookupKeysType);
378+
YQL_ENSURE(lookupKeysType->GetKind() == ETypeAnnotationKind::List);
379+
auto itemType = lookupKeysType->Cast<TListExprType>()->GetItemType();
380+
YQL_ENSURE(itemType->GetKind() == ETypeAnnotationKind::Struct);
381+
auto structType = itemType->Cast<TStructExprType>();
382+
383+
TVector<TString> usedColumns;
384+
usedColumns.reserve(structType->GetSize());
385+
for (const auto& keyColumnName : table.Metadata->KeyColumnNames) {
386+
if (!structType->FindItem(keyColumnName)) {
387+
break;
388+
}
389+
390+
usedColumns.emplace_back(keyColumnName);
391+
}
392+
393+
YQL_ENSURE(usedColumns.size() == structType->GetSize());
394+
395+
TKqpReadTableExplainPrompt prompt;
396+
prompt.SetUsedKeyColumns(std::move(usedColumns));
397+
prompt.SetPointPrefixLen(structType->GetSize());
398+
399+
400+
return Build<TKqlReadTableRanges>(ctx, lookup.Pos())
401+
.Table(lookup.Table())
402+
.Ranges(flatMapRangeInput.Cast())
403+
.Columns(lookup.Columns())
404+
.Settings(settings.BuildNode(ctx, lookup.Pos()))
405+
.ExplainPrompt(prompt.BuildNode(ctx, lookup.Pos()))
406+
.Done();
407+
}
408+
356409
auto maybeAsList = lookupKeys.Maybe<TCoAsList>();
357410
if (!maybeAsList) {
358411
return {};
@@ -369,7 +422,6 @@ TMaybeNode<TExprBase> KqpRewriteLiteralLookup(const TExprBase& node, TExprContex
369422
}
370423

371424
// full pk expected
372-
const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, lookup.Table().Path().Value());
373425
if (table.Metadata->KeyColumnNames.size() != maybeStruct.Cast().ArgCount()) {
374426
return {};
375427
}
@@ -380,7 +432,6 @@ TMaybeNode<TExprBase> KqpRewriteLiteralLookup(const TExprBase& node, TExprContex
380432
keyColumnsStruct.insert({TString(tuple.Name().Value()), tuple.Value().Cast()});
381433
}
382434

383-
TKqpReadTableSettings settings;
384435
TVector<TExprBase> keyValues;
385436
keyValues.reserve(maybeStruct.Cast().ArgCount());
386437
for (const auto& name : table.Metadata->KeyColumnNames) {
@@ -396,7 +447,6 @@ TMaybeNode<TExprBase> KqpRewriteLiteralLookup(const TExprBase& node, TExprContex
396447
for (const auto &column : skipNullColumns.Cast()) {
397448
settings.AddSkipNullKey(TString(column.Value()));
398449
}
399-
400450
}
401451
}
402452

ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -466,10 +466,6 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx
466466
.Done();
467467
}
468468
}
469-
} else if (buildResult.PointPrefixLen == tableDesc.Metadata->KeyColumnNames.size()) {
470-
YQL_ENSURE(prefixPointsExpr);
471-
residualLambda = pointsExtractionResult.PrunedLambda;
472-
buildLookup(prefixPointsExpr, input);
473469
}
474470
}
475471

ydb/core/kqp/opt/logical/kqp_opt_log_rules.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ NYql::NNodes::TExprBase KqpRewriteTakeOverIndexRead(const NYql::NNodes::TExprBas
5353
const TKqpOptimizeContext& kqpCtx, const NYql::TParentsMap& parentsMap);
5454

5555
NYql::NNodes::TExprBase KqpDeleteOverLookup(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx,
56-
const TKqpOptimizeContext &kqpCtx);
56+
const TKqpOptimizeContext &kqpCtx, const NYql::TParentsMap& parentsMap);
5757

5858
NYql::NNodes::TExprBase KqpExcessUpsertInputColumns(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx);
5959

ydb/core/kqp/runtime/kqp_read_actor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -728,8 +728,8 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
728728
if (intersection == 0) {
729729
newShard->AddPoint(std::move(points[pointIndex]));
730730
CA_LOG_D("Add point to new shardId: " << partition.ShardId);
731-
}
732-
if (intersection < 0) {
731+
} else {
732+
YQL_ENSURE(intersection > 0, "Missed intersection of point and partition ranges.");
733733
break;
734734
}
735735
pointIndex += 1;

0 commit comments

Comments
 (0)