Skip to content

Commit cde0bcf

Browse files
fix(kqp): pass column order to KqpIndexLookupJoin (#3654)
1 parent dccc8c7 commit cde0bcf

File tree

4 files changed

+118
-27
lines changed

4 files changed

+118
-27
lines changed

ydb/core/kqp/host/kqp_type_ann.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1718,9 +1718,10 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx)
17181718

17191719
TVector<const TItemExprType*> resultStructItems;
17201720
for (const auto& item : leftRowType->GetItems()) {
1721-
resultStructItems.emplace_back(
1722-
ctx.MakeType<TItemExprType>(TString::Join(leftLabel.Value(), ".", item->GetName()), item->GetItemType())
1723-
);
1721+
TString itemName = leftLabel.Value().empty()
1722+
? TString(item->GetName())
1723+
: TString::Join(leftLabel.Value(), ".", item->GetName());
1724+
resultStructItems.emplace_back(ctx.MakeType<TItemExprType>(itemName, item->GetItemType()));
17241725
}
17251726

17261727
if (RightJoinSideAllowed(joinType.Value())) {
@@ -1731,9 +1732,10 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx)
17311732
? ctx.MakeType<TOptionalExprType>(item->GetItemType())
17321733
: item->GetItemType();
17331734

1734-
resultStructItems.emplace_back(
1735-
ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", item->GetName()), itemType)
1736-
);
1735+
TString itemName = rightLabel.Value().empty()
1736+
? TString(item->GetName())
1737+
: TString::Join(rightLabel.Value(), ".", item->GetName());
1738+
resultStructItems.emplace_back(ctx.MakeType<TItemExprType>(itemName, itemType));
17371739
}
17381740
}
17391741

ydb/core/kqp/runtime/kqp_compute.cpp

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -103,26 +103,24 @@ class TKqpIndexLookupJoinWrapper : public TMutableComputationNode<TKqpIndexLooku
103103
};
104104

105105
NUdf::TUnboxedValue FillResultItems(NUdf::TUnboxedValue leftRow, NUdf::TUnboxedValue rightRow, EOutputMode mode) {
106-
auto resultRowSize = (mode == EOutputMode::OnlyLeftRow) ? Self->LeftColumnsCount
107-
: Self->LeftColumnsCount + Self->RightColumnsCount;
106+
auto resultRowSize = (mode == EOutputMode::OnlyLeftRow) ? Self->LeftColumnsIndices.size()
107+
: Self->LeftColumnsIndices.size() + Self->RightColumnsIndices.size();
108108
auto resultRow = Self->ResultRowCache.NewArray(Ctx, resultRowSize, ResultItems);
109109

110-
size_t resIdx = 0;
111-
112110
if (mode == EOutputMode::OnlyLeftRow || mode == EOutputMode::Both) {
113-
for (size_t i = 0; i < Self->LeftColumnsCount; ++i) {
114-
ResultItems[resIdx++] = std::move(leftRow.GetElement(i));
111+
for (size_t i = 0; i < Self->LeftColumnsIndices.size(); ++i) {
112+
ResultItems[Self->LeftColumnsIndices[i]] = std::move(leftRow.GetElement(i));
115113
}
116114
}
117115

118116
if (mode == EOutputMode::Both) {
119117
if (rightRow.HasValue()) {
120-
for (size_t i = 0; i < Self->RightColumnsCount; ++i) {
121-
ResultItems[resIdx++] = std::move(rightRow.GetElement(i));
118+
for (size_t i = 0; i < Self->RightColumnsIndices.size(); ++i) {
119+
ResultItems[Self->RightColumnsIndices[i]] = std::move(rightRow.GetElement(i));
122120
}
123121
} else {
124-
for (size_t i = 0; i < Self->RightColumnsCount; ++i) {
125-
ResultItems[resIdx++] = NUdf::TUnboxedValuePod();
122+
for (size_t i = 0; i < Self->RightColumnsIndices.size(); ++i) {
123+
ResultItems[Self->RightColumnsIndices[i]] = NUdf::TUnboxedValuePod();
126124
}
127125
}
128126
}
@@ -189,12 +187,12 @@ class TKqpIndexLookupJoinWrapper : public TMutableComputationNode<TKqpIndexLooku
189187

190188
public:
191189
TKqpIndexLookupJoinWrapper(TComputationMutables& mutables, IComputationNode* inputNode,
192-
EJoinKind joinType, ui64 leftColumnsCount, ui64 rightColumnsCount)
190+
EJoinKind joinType, TVector<ui32>&& leftColumnsIndices, TVector<ui32>&& rightColumnsIndices)
193191
: TMutableComputationNode<TKqpIndexLookupJoinWrapper>(mutables)
194192
, InputNode(inputNode)
195193
, JoinType(joinType)
196-
, LeftColumnsCount(leftColumnsCount)
197-
, RightColumnsCount(rightColumnsCount)
194+
, LeftColumnsIndices(std::move(leftColumnsIndices))
195+
, RightColumnsIndices(std::move(rightColumnsIndices))
198196
, ResultRowCache(mutables) {
199197
}
200198

@@ -210,8 +208,8 @@ class TKqpIndexLookupJoinWrapper : public TMutableComputationNode<TKqpIndexLooku
210208
private:
211209
IComputationNode* InputNode;
212210
const EJoinKind JoinType;
213-
const ui64 LeftColumnsCount;
214-
const ui64 RightColumnsCount;
211+
const TVector<ui32> LeftColumnsIndices;
212+
const TVector<ui32> RightColumnsIndices;
215213
const TContainerCacheOnContext ResultRowCache;
216214
};
217215

@@ -236,10 +234,26 @@ IComputationNode* WrapKqpIndexLookupJoin(TCallable& callable, const TComputation
236234

237235
auto inputNode = LocateNode(ctx.NodeLocator, callable, 0);
238236
ui32 joinKind = AS_VALUE(TDataLiteral, callable.GetInput(1))->AsValue().Get<ui32>();
239-
ui64 leftColumnsCount = AS_VALUE(TDataLiteral, callable.GetInput(2))->AsValue().Get<ui64>();
240-
ui64 rightColumnsCount = AS_VALUE(TDataLiteral, callable.GetInput(3))->AsValue().Get<ui64>();
237+
auto leftColumnsIndicesMap = AS_VALUE(TDictLiteral, callable.GetInput(2));
238+
auto rightColumnsIndicesMap = AS_VALUE(TDictLiteral, callable.GetInput(3));
239+
240+
TVector<ui32> leftColumnsIndices(leftColumnsIndicesMap->GetItemsCount());
241+
for (ui32 i = 0; i < leftColumnsIndicesMap->GetItemsCount(); ++i) {
242+
auto item = leftColumnsIndicesMap->GetItem(i);
243+
ui32 leftIndex = AS_VALUE(TDataLiteral, item.first)->AsValue().Get<ui32>();
244+
ui32 resultIndex = AS_VALUE(TDataLiteral, item.second)->AsValue().Get<ui32>();
245+
leftColumnsIndices[leftIndex] = resultIndex;
246+
}
247+
248+
TVector<ui32> rightColumnsIndices(rightColumnsIndicesMap->GetItemsCount());
249+
for (ui32 i = 0; i < rightColumnsIndicesMap->GetItemsCount(); ++i) {
250+
auto item = rightColumnsIndicesMap->GetItem(i);
251+
ui32 rightIndex = AS_VALUE(TDataLiteral, item.first)->AsValue().Get<ui32>();
252+
ui32 resultIndex = AS_VALUE(TDataLiteral, item.second)->AsValue().Get<ui32>();
253+
rightColumnsIndices[rightIndex] = resultIndex;
254+
}
241255

242-
return new TKqpIndexLookupJoinWrapper(ctx.Mutables, inputNode, GetJoinKind(joinKind), leftColumnsCount, rightColumnsCount);
256+
return new TKqpIndexLookupJoinWrapper(ctx.Mutables, inputNode, GetJoinKind(joinKind), std::move(leftColumnsIndices), std::move(rightColumnsIndices));
243257
}
244258

245259
} // namespace NMiniKQL

ydb/core/kqp/runtime/kqp_program_builder.cpp

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -348,12 +348,17 @@ TRuntimeNode TKqpProgramBuilder::KqpIndexLookupJoin(const TRuntimeNode& input, c
348348

349349
TStructTypeBuilder rowTypeBuilder(GetTypeEnvironment());
350350

351+
TVector<TString> leftRowColumns;
352+
leftRowColumns.reserve(leftRowType->GetMembersCount());
351353
for (ui32 i = 0; i < leftRowType->GetMembersCount(); ++i) {
352354
TString newMemberName = leftLabel.empty() ? TString(leftRowType->GetMemberName(i))
353355
: TString::Join(leftLabel, ".", leftRowType->GetMemberName(i));
354356
rowTypeBuilder.Add(newMemberName, leftRowType->GetMemberType(i));
357+
leftRowColumns.push_back(newMemberName);
355358
}
356359

360+
TVector<TString> rightRowColumns;
361+
rightRowColumns.reserve(rightRowType->GetMembersCount());
357362
if (RightJoinSideAllowed(joinType)) {
358363
for (ui32 i = 0; i < rightRowType->GetMembersCount(); ++i) {
359364
TString newMemberName = rightLabel.empty() ? TString(rightRowType->GetMemberName(i))
@@ -368,16 +373,39 @@ TRuntimeNode TKqpProgramBuilder::KqpIndexLookupJoin(const TRuntimeNode& input, c
368373
: rightRowType->GetMemberType(i);
369374

370375
rowTypeBuilder.Add(newMemberName, memberType);
376+
rightRowColumns.push_back(newMemberName);
371377
}
372378
}
373379

374-
auto returnType = NewStreamType(rowTypeBuilder.Build());
380+
auto resultRowStruct = rowTypeBuilder.Build();
381+
382+
TDictLiteralBuilder leftIndicesMap(GetTypeEnvironment(),
383+
TDataType::Create(NUdf::TDataType<ui32>::Id, GetTypeEnvironment()),
384+
TDataType::Create(NUdf::TDataType<ui32>::Id, GetTypeEnvironment())
385+
);
386+
387+
for (ui32 i = 0; i < leftRowColumns.size(); ++i) {
388+
auto resultIndex = resultRowStruct->GetMemberIndex(leftRowColumns[i]);
389+
leftIndicesMap.Add(NewDataLiteral<ui32>(i), NewDataLiteral<ui32>(resultIndex));
390+
}
391+
392+
TDictLiteralBuilder rightIndicesMap(GetTypeEnvironment(),
393+
TDataType::Create(NUdf::TDataType<ui32>::Id, GetTypeEnvironment()),
394+
TDataType::Create(NUdf::TDataType<ui32>::Id, GetTypeEnvironment())
395+
);
396+
397+
for (ui32 i = 0; i < rightRowColumns.size(); ++i) {
398+
auto resultIndex = resultRowStruct->GetMemberIndex(rightRowColumns[i]);
399+
rightIndicesMap.Add(NewDataLiteral<ui32>(i), NewDataLiteral<ui32>(resultIndex));
400+
}
401+
402+
auto returnType = NewStreamType(resultRowStruct);
375403

376404
TCallableBuilder callableBuilder(Env, __func__, returnType);
377405
callableBuilder.Add(input);
378406
callableBuilder.Add(NewDataLiteral<ui32>((ui32)GetIndexLookupJoinKind(joinType)));
379-
callableBuilder.Add(NewDataLiteral<ui64>(leftRowType->GetMembersCount()));
380-
callableBuilder.Add(NewDataLiteral<ui64>(rightRowType->GetMembersCount()));
407+
callableBuilder.Add(TRuntimeNode(leftIndicesMap.Build(), true));
408+
callableBuilder.Add(TRuntimeNode(rightIndicesMap.Build(), true));
381409
return TRuntimeNode(callableBuilder.Build(), false);
382410
}
383411

ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ void PrepareTables(TSession session) {
2424
PRIMARY KEY (Key)
2525
);
2626
27+
CREATE TABLE `/Root/Kv` (
28+
Key Int32,
29+
Value String,
30+
PRIMARY KEY (Key)
31+
);
32+
2733
CREATE TABLE `/Root/LaunchByProcessIdAndPinned` (
2834
idx_processId Utf8,
2935
idx_pinned Bool,
@@ -64,6 +70,12 @@ void PrepareTables(TSession session) {
6470
(NULL, "Value24"),
6571
(104, NULL);
6672
73+
REPLACE INTO `/Root/Kv` (Key, Value) VALUES
74+
(1, "Value1"),
75+
(2, "Value2"),
76+
(3, "Value3"),
77+
(4, "Value4");
78+
6779
REPLACE INTO `/Root/LaunchByProcessIdAndPinned` (idx_processId, idx_pinned, idx_launchNumber) VALUES
6880
("eProcess", false, 4),
6981
("eProcess", true, 5),
@@ -190,6 +202,41 @@ Y_UNIT_TEST_TWIN(Inner, StreamLookup) {
190202
])", 2, StreamLookup);
191203
}
192204

205+
Y_UNIT_TEST_TWIN(JoinWithSubquery, StreamLookup) {
206+
const auto query = R"(
207+
$join = (SELECT l.Key AS lKey, l.Value AS lValue, r.Value AS rValue
208+
FROM `/Root/Left` AS l
209+
INNER JOIN `/Root/Right` AS r
210+
ON l.Fk = r.Key
211+
);
212+
SELECT j.lValue AS Value FROM $join AS j INNER JOIN `/Root/Kv` AS kv
213+
ON j.lKey = kv.Key;
214+
)";
215+
216+
NKikimrConfig::TAppConfig appConfig;
217+
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookup);
218+
219+
auto settings = TKikimrSettings().SetAppConfig(appConfig);
220+
TKikimrRunner kikimr(settings);
221+
auto db = kikimr.GetTableClient();
222+
auto session = db.CreateSession().GetValueSync().GetSession();
223+
224+
PrepareTables(session);
225+
226+
TExecDataQuerySettings execSettings;
227+
execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
228+
229+
auto result = session.ExecuteDataQuery(Q_(query), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync();
230+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
231+
232+
CompareYson(R"([
233+
[["Value1"]];
234+
[["Value1"]];
235+
[["Value2"]];
236+
[["Value2"]]
237+
])", FormatResultSetYson(result.GetResultSet(0)));
238+
}
239+
193240
Y_UNIT_TEST_TWIN(Left, StreamLookup) {
194241
Test(
195242
R"(

0 commit comments

Comments
 (0)