Skip to content

Commit 04cc17d

Browse files
Merge e5ff1dc into bf6d776
2 parents bf6d776 + e5ff1dc commit 04cc17d

File tree

5 files changed

+121
-47
lines changed

5 files changed

+121
-47
lines changed

ydb/core/kqp/opt/logical/kqp_opt_log_effects.cpp

Lines changed: 83 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,43 +16,106 @@ TExprBase KqpDeleteOverLookup(const TExprBase& node, TExprContext& ctx, const TK
1616
auto deleteRows = node.Cast<TKqlDeleteRows>();
1717

1818
TMaybeNode<TKqlLookupTableBase> lookup;
19+
TMaybeNode<TKqlReadTable> read;
20+
TMaybeNode<TKqlReadTableRanges> rangeRead;
1921
TMaybeNode<TCoSkipNullMembers> skipNulMembers;
2022

2123
if (deleteRows.Input().Maybe<TKqlLookupTableBase>()) {
2224
lookup = deleteRows.Input().Cast<TKqlLookupTableBase>();
2325
} else if (deleteRows.Input().Maybe<TCoSkipNullMembers>().Input().Maybe<TKqlLookupTableBase>()) {
2426
skipNulMembers = deleteRows.Input().Cast<TCoSkipNullMembers>();
2527
lookup = skipNulMembers.Input().Cast<TKqlLookupTableBase>();
28+
} else if (deleteRows.Input().Maybe<TKqlReadTable>()) {
29+
read = deleteRows.Input().Cast<TKqlReadTable>();
30+
} else if (deleteRows.Input().Maybe<TKqlReadTableRanges>()) {
31+
rangeRead = deleteRows.Input().Cast<TKqlReadTableRanges>();
2632
} else {
2733
return node;
2834
}
2935

30-
YQL_ENSURE(lookup);
31-
if (deleteRows.Table().Raw() != lookup.Cast().Table().Raw()) {
32-
return node;
33-
}
34-
35-
auto lookupKeysType = lookup.Cast().LookupKeys().Ref().GetTypeAnn();
36-
auto lookupKeyType = lookupKeysType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
37-
YQL_ENSURE(lookupKeyType);
38-
39-
// Only consider complete PK lookups
36+
TMaybeNode<TExprBase> deleteInput;
4037
const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, deleteRows.Table().Path());
41-
if (lookupKeyType->GetSize() != tableDesc.Metadata->KeyColumnNames.size()) {
42-
return node;
43-
}
44-
45-
TExprBase deleteInput = lookup.Cast().LookupKeys();
46-
if (skipNulMembers) {
47-
deleteInput = Build<TCoSkipNullMembers>(ctx, skipNulMembers.Cast().Pos())
48-
.Input(deleteInput)
49-
.Members(skipNulMembers.Cast().Members())
38+
if (lookup) {
39+
if (deleteRows.Table().Raw() != lookup.Cast().Table().Raw()) {
40+
return node;
41+
}
42+
43+
auto lookupKeysType = lookup.Cast().LookupKeys().Ref().GetTypeAnn();
44+
auto lookupKeyType = lookupKeysType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
45+
YQL_ENSURE(lookupKeyType);
46+
47+
// Only consider complete PK lookups
48+
if (lookupKeyType->GetSize() != tableDesc.Metadata->KeyColumnNames.size()) {
49+
return node;
50+
}
51+
52+
deleteInput = lookup.Cast().LookupKeys();
53+
if (skipNulMembers) {
54+
deleteInput = Build<TCoSkipNullMembers>(ctx, skipNulMembers.Cast().Pos())
55+
.Input(deleteInput.Cast())
56+
.Members(skipNulMembers.Cast().Members())
57+
.Done();
58+
}
59+
} else if (read) {
60+
if (deleteRows.Table().Raw() != read.Cast().Table().Raw()) {
61+
return node;
62+
}
63+
64+
const auto& rangeFrom = read.Cast().Range().From();
65+
const auto& rangeTo = read.Cast().Range().To();
66+
67+
if (!rangeFrom.Maybe<TKqlKeyInc>() || !rangeTo.Maybe<TKqlKeyInc>()) {
68+
return node;
69+
}
70+
71+
if (rangeFrom.Raw() != rangeTo.Raw()) {
72+
return node;
73+
}
74+
75+
if (rangeFrom.ArgCount() != tableDesc.Metadata->KeyColumnNames.size()) {
76+
return node;
77+
}
78+
79+
TVector<TExprBase> structMembers;
80+
for (ui32 i = 0; i < rangeFrom.ArgCount(); ++i) {
81+
TCoAtom columnNameAtom(ctx.NewAtom(node.Pos(), tableDesc.Metadata->KeyColumnNames[i]));
82+
83+
auto member = Build<TCoNameValueTuple>(ctx, node.Pos())
84+
.Name(columnNameAtom)
85+
.Value(rangeFrom.Arg(i))
86+
.Done();
87+
88+
structMembers.push_back(member);
89+
}
90+
91+
deleteInput = Build<TCoAsList>(ctx, node.Pos())
92+
.Add<TCoAsStruct>()
93+
.Add(structMembers)
94+
.Build()
5095
.Done();
96+
} else if (rangeRead) {
97+
YQL_ENSURE(rangeRead);
98+
if (deleteRows.Table().Raw() != rangeRead.Cast().Table().Raw()) {
99+
return node;
100+
}
101+
102+
if (rangeRead.Cast().Ranges().Maybe<TCoVoid>()) {
103+
return node;
104+
}
105+
106+
auto prompt = TKqpReadTableExplainPrompt::Parse(rangeRead.Cast());
107+
if (prompt.PointPrefixLen != tableDesc.Metadata->KeyColumnNames.size()) {
108+
return node;
109+
}
110+
111+
deleteInput = rangeRead.Cast().PrefixPointsExpr();
51112
}
52113

114+
YQL_ENSURE(deleteInput);
115+
53116
return Build<TKqlDeleteRows>(ctx, deleteRows.Pos())
54117
.Table(deleteRows.Table())
55-
.Input(deleteInput)
118+
.Input(deleteInput.Cast())
56119
.Done();
57120
}
58121

ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,8 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T
233233
// NOTE: Use more efficient full key lookup implementation in datashard.
234234
// Consider using lookup for partial keys as well once better constant folding
235235
// is available, currently it can introduce redundant compute stage.
236-
useDataOrGenericQueryLookup = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery()) && isFullKey;
236+
useDataOrGenericQueryLookup = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery()) && isFullKey
237+
&& !kqpCtx.Config->EnableKqpDataQuerySourceRead;
237238
useScanQueryLookup = kqpCtx.IsScanQuery() && isFullKey
238239
&& kqpCtx.Config->EnableKqpScanQueryStreamLookup;
239240
}

ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,29 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx
371371
YQL_CLOG(DEBUG, ProviderKqp) << "Ranges extracted: " << KqpExprToPrettyString(*ranges, ctx);
372372
YQL_CLOG(DEBUG, ProviderKqp) << "Residual lambda: " << KqpExprToPrettyString(*residualLambda, ctx);
373373

374+
auto buildRangeRead = [&](const TMaybeNode<TExprBase>& prefix, TMaybe<TExprBase>& result) {
375+
if (indexName) {
376+
result = Build<TKqlReadTableIndexRanges>(ctx, read.Pos())
377+
.Table(read.Table())
378+
.Ranges(ranges)
379+
.Columns(read.Columns())
380+
.Settings(read.Settings())
381+
.ExplainPrompt(prompt.BuildNode(ctx, read.Pos()))
382+
.Index(indexName.Cast())
383+
.PrefixPointsExpr(prefix)
384+
.Done();
385+
} else {
386+
result = Build<TKqlReadTableRanges>(ctx, read.Pos())
387+
.Table(read.Table())
388+
.Ranges(ranges)
389+
.Columns(read.Columns())
390+
.Settings(read.Settings())
391+
.ExplainPrompt(prompt.BuildNode(ctx, read.Pos()))
392+
.PrefixPointsExpr(prefix)
393+
.Done();
394+
}
395+
};
396+
374397
TMaybe<TExprBase> input;
375398
if (kqpCtx.Config->PredicateExtract20 &&
376399
(tableDesc.Metadata->Kind == EKikimrTableKind::Datashard ||
@@ -417,8 +440,14 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx
417440
};
418441

419442
if (buildResult.LiteralRange) {
420-
bool ispoint = buildResult.PointPrefixLen == tableDesc.Metadata->KeyColumnNames.size();
421-
if (ispoint) {
443+
const bool isFullKey = buildResult.PointPrefixLen == tableDesc.Metadata->KeyColumnNames.size();
444+
const bool useDataOrGenericQueryLookup = isFullKey
445+
&& (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery())
446+
&& !kqpCtx.Config->EnableKqpDataQuerySourceRead;
447+
const bool useScanQueryLookup = isFullKey
448+
&& kqpCtx.IsScanQuery()
449+
&& kqpCtx.Config->EnableKqpScanQueryStreamLookup;
450+
if (useDataOrGenericQueryLookup || useScanQueryLookup) {
422451
TVector<TExprBase> structMembers;
423452
for (size_t i = 0; i < tableDesc.Metadata->KeyColumnNames.size(); ++i) {
424453
auto member = Build<TCoNameValueTuple>(ctx, node.Pos())
@@ -469,7 +498,7 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx
469498
} else if (buildResult.PointPrefixLen == tableDesc.Metadata->KeyColumnNames.size()) {
470499
YQL_ENSURE(prefixPointsExpr);
471500
residualLambda = pointsExtractionResult.PrunedLambda;
472-
buildLookup(prefixPointsExpr, input);
501+
buildRangeRead(prefixPointsExpr, input);
473502
}
474503
}
475504

@@ -479,26 +508,7 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx
479508
prefix = prefixPointsExpr;
480509
}
481510

482-
if (indexName) {
483-
input = Build<TKqlReadTableIndexRanges>(ctx, read.Pos())
484-
.Table(read.Table())
485-
.Ranges(ranges)
486-
.Columns(read.Columns())
487-
.Settings(read.Settings())
488-
.ExplainPrompt(prompt.BuildNode(ctx, read.Pos()))
489-
.Index(indexName.Cast())
490-
.PrefixPointsExpr(prefix)
491-
.Done();
492-
} else {
493-
input = Build<TKqlReadTableRanges>(ctx, read.Pos())
494-
.Table(read.Table())
495-
.Ranges(ranges)
496-
.Columns(read.Columns())
497-
.Settings(read.Settings())
498-
.ExplainPrompt(prompt.BuildNode(ctx, read.Pos()))
499-
.PrefixPointsExpr(prefix)
500-
.Done();
501-
}
511+
buildRangeRead(prefix, input);
502512
}
503513

504514
*input = readMatch->BuildProcessNodes(*input, ctx);

ydb/core/kqp/runtime/kqp_read_actor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -728,8 +728,8 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
728728
if (intersection == 0) {
729729
newShard->AddPoint(std::move(points[pointIndex]));
730730
CA_LOG_D("Add point to new shardId: " << partition.ShardId);
731-
}
732-
if (intersection < 0) {
731+
} else {
732+
YQL_ENSURE(intersection > 0, "Missed intersection of point and partition ranges.");
733733
break;
734734
}
735735
pointIndex += 1;

ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
171171
NJson::ReadJsonTree(stats.query_plan(), &plan, true);
172172

173173
auto stages = FindPlanStages(plan);
174-
UNIT_ASSERT_VALUES_EQUAL(stages.size(), 2);
174+
UNIT_ASSERT_VALUES_EQUAL(stages.size(), QueryService ? 2 : 3);
175175

176176
i64 totalTasks = 0;
177177
for (const auto& stage : stages) {

0 commit comments

Comments
 (0)