Skip to content

fix(kqp): pass column order to KqpIndexLookupJoin #3654

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions ydb/core/kqp/host/kqp_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1718,9 +1718,10 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx)

TVector<const TItemExprType*> resultStructItems;
for (const auto& item : leftRowType->GetItems()) {
resultStructItems.emplace_back(
ctx.MakeType<TItemExprType>(TString::Join(leftLabel.Value(), ".", item->GetName()), item->GetItemType())
);
TString itemName = leftLabel.Value().empty()
? TString(item->GetName())
: TString::Join(leftLabel.Value(), ".", item->GetName());
resultStructItems.emplace_back(ctx.MakeType<TItemExprType>(itemName, item->GetItemType()));
}

if (RightJoinSideAllowed(joinType.Value())) {
Expand All @@ -1731,9 +1732,10 @@ TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx)
? ctx.MakeType<TOptionalExprType>(item->GetItemType())
: item->GetItemType();

resultStructItems.emplace_back(
ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", item->GetName()), itemType)
);
TString itemName = rightLabel.Value().empty()
? TString(item->GetName())
: TString::Join(rightLabel.Value(), ".", item->GetName());
resultStructItems.emplace_back(ctx.MakeType<TItemExprType>(itemName, itemType));
}
}

Expand Down
50 changes: 32 additions & 18 deletions ydb/core/kqp/runtime/kqp_compute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,26 +103,24 @@ class TKqpIndexLookupJoinWrapper : public TMutableComputationNode<TKqpIndexLooku
};

NUdf::TUnboxedValue FillResultItems(NUdf::TUnboxedValue leftRow, NUdf::TUnboxedValue rightRow, EOutputMode mode) {
auto resultRowSize = (mode == EOutputMode::OnlyLeftRow) ? Self->LeftColumnsCount
: Self->LeftColumnsCount + Self->RightColumnsCount;
auto resultRowSize = (mode == EOutputMode::OnlyLeftRow) ? Self->LeftColumnsIndices.size()
: Self->LeftColumnsIndices.size() + Self->RightColumnsIndices.size();
auto resultRow = Self->ResultRowCache.NewArray(Ctx, resultRowSize, ResultItems);

size_t resIdx = 0;

if (mode == EOutputMode::OnlyLeftRow || mode == EOutputMode::Both) {
for (size_t i = 0; i < Self->LeftColumnsCount; ++i) {
ResultItems[resIdx++] = std::move(leftRow.GetElement(i));
for (size_t i = 0; i < Self->LeftColumnsIndices.size(); ++i) {
ResultItems[Self->LeftColumnsIndices[i]] = std::move(leftRow.GetElement(i));
}
}

if (mode == EOutputMode::Both) {
if (rightRow.HasValue()) {
for (size_t i = 0; i < Self->RightColumnsCount; ++i) {
ResultItems[resIdx++] = std::move(rightRow.GetElement(i));
for (size_t i = 0; i < Self->RightColumnsIndices.size(); ++i) {
ResultItems[Self->RightColumnsIndices[i]] = std::move(rightRow.GetElement(i));
}
} else {
for (size_t i = 0; i < Self->RightColumnsCount; ++i) {
ResultItems[resIdx++] = NUdf::TUnboxedValuePod();
for (size_t i = 0; i < Self->RightColumnsIndices.size(); ++i) {
ResultItems[Self->RightColumnsIndices[i]] = NUdf::TUnboxedValuePod();
}
}
}
Expand Down Expand Up @@ -189,12 +187,12 @@ class TKqpIndexLookupJoinWrapper : public TMutableComputationNode<TKqpIndexLooku

public:
TKqpIndexLookupJoinWrapper(TComputationMutables& mutables, IComputationNode* inputNode,
EJoinKind joinType, ui64 leftColumnsCount, ui64 rightColumnsCount)
EJoinKind joinType, TVector<ui32>&& leftColumnsIndices, TVector<ui32>&& rightColumnsIndices)
: TMutableComputationNode<TKqpIndexLookupJoinWrapper>(mutables)
, InputNode(inputNode)
, JoinType(joinType)
, LeftColumnsCount(leftColumnsCount)
, RightColumnsCount(rightColumnsCount)
, LeftColumnsIndices(std::move(leftColumnsIndices))
, RightColumnsIndices(std::move(rightColumnsIndices))
, ResultRowCache(mutables) {
}

Expand All @@ -210,8 +208,8 @@ class TKqpIndexLookupJoinWrapper : public TMutableComputationNode<TKqpIndexLooku
private:
IComputationNode* InputNode;
const EJoinKind JoinType;
const ui64 LeftColumnsCount;
const ui64 RightColumnsCount;
const TVector<ui32> LeftColumnsIndices;
const TVector<ui32> RightColumnsIndices;
const TContainerCacheOnContext ResultRowCache;
};

Expand All @@ -236,10 +234,26 @@ IComputationNode* WrapKqpIndexLookupJoin(TCallable& callable, const TComputation

auto inputNode = LocateNode(ctx.NodeLocator, callable, 0);
ui32 joinKind = AS_VALUE(TDataLiteral, callable.GetInput(1))->AsValue().Get<ui32>();
ui64 leftColumnsCount = AS_VALUE(TDataLiteral, callable.GetInput(2))->AsValue().Get<ui64>();
ui64 rightColumnsCount = AS_VALUE(TDataLiteral, callable.GetInput(3))->AsValue().Get<ui64>();
auto leftColumnsIndicesMap = AS_VALUE(TDictLiteral, callable.GetInput(2));
auto rightColumnsIndicesMap = AS_VALUE(TDictLiteral, callable.GetInput(3));

TVector<ui32> leftColumnsIndices(leftColumnsIndicesMap->GetItemsCount());
for (ui32 i = 0; i < leftColumnsIndicesMap->GetItemsCount(); ++i) {
auto item = leftColumnsIndicesMap->GetItem(i);
ui32 leftIndex = AS_VALUE(TDataLiteral, item.first)->AsValue().Get<ui32>();
ui32 resultIndex = AS_VALUE(TDataLiteral, item.second)->AsValue().Get<ui32>();
leftColumnsIndices[leftIndex] = resultIndex;
}

TVector<ui32> rightColumnsIndices(rightColumnsIndicesMap->GetItemsCount());
for (ui32 i = 0; i < rightColumnsIndicesMap->GetItemsCount(); ++i) {
auto item = rightColumnsIndicesMap->GetItem(i);
ui32 rightIndex = AS_VALUE(TDataLiteral, item.first)->AsValue().Get<ui32>();
ui32 resultIndex = AS_VALUE(TDataLiteral, item.second)->AsValue().Get<ui32>();
rightColumnsIndices[rightIndex] = resultIndex;
}

return new TKqpIndexLookupJoinWrapper(ctx.Mutables, inputNode, GetJoinKind(joinKind), leftColumnsCount, rightColumnsCount);
return new TKqpIndexLookupJoinWrapper(ctx.Mutables, inputNode, GetJoinKind(joinKind), std::move(leftColumnsIndices), std::move(rightColumnsIndices));
}

} // namespace NMiniKQL
Expand Down
34 changes: 31 additions & 3 deletions ydb/core/kqp/runtime/kqp_program_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,17 @@ TRuntimeNode TKqpProgramBuilder::KqpIndexLookupJoin(const TRuntimeNode& input, c

TStructTypeBuilder rowTypeBuilder(GetTypeEnvironment());

TVector<TString> leftRowColumns;
leftRowColumns.reserve(leftRowType->GetMembersCount());
for (ui32 i = 0; i < leftRowType->GetMembersCount(); ++i) {
TString newMemberName = leftLabel.empty() ? TString(leftRowType->GetMemberName(i))
: TString::Join(leftLabel, ".", leftRowType->GetMemberName(i));
rowTypeBuilder.Add(newMemberName, leftRowType->GetMemberType(i));
leftRowColumns.push_back(newMemberName);
}

TVector<TString> rightRowColumns;
rightRowColumns.reserve(rightRowType->GetMembersCount());
if (RightJoinSideAllowed(joinType)) {
for (ui32 i = 0; i < rightRowType->GetMembersCount(); ++i) {
TString newMemberName = rightLabel.empty() ? TString(rightRowType->GetMemberName(i))
Expand All @@ -368,16 +373,39 @@ TRuntimeNode TKqpProgramBuilder::KqpIndexLookupJoin(const TRuntimeNode& input, c
: rightRowType->GetMemberType(i);

rowTypeBuilder.Add(newMemberName, memberType);
rightRowColumns.push_back(newMemberName);
}
}

auto returnType = NewStreamType(rowTypeBuilder.Build());
auto resultRowStruct = rowTypeBuilder.Build();

TDictLiteralBuilder leftIndicesMap(GetTypeEnvironment(),
TDataType::Create(NUdf::TDataType<ui32>::Id, GetTypeEnvironment()),
TDataType::Create(NUdf::TDataType<ui32>::Id, GetTypeEnvironment())
);

for (ui32 i = 0; i < leftRowColumns.size(); ++i) {
auto resultIndex = resultRowStruct->GetMemberIndex(leftRowColumns[i]);
leftIndicesMap.Add(NewDataLiteral<ui32>(i), NewDataLiteral<ui32>(resultIndex));
}

TDictLiteralBuilder rightIndicesMap(GetTypeEnvironment(),
TDataType::Create(NUdf::TDataType<ui32>::Id, GetTypeEnvironment()),
TDataType::Create(NUdf::TDataType<ui32>::Id, GetTypeEnvironment())
);

for (ui32 i = 0; i < rightRowColumns.size(); ++i) {
auto resultIndex = resultRowStruct->GetMemberIndex(rightRowColumns[i]);
rightIndicesMap.Add(NewDataLiteral<ui32>(i), NewDataLiteral<ui32>(resultIndex));
}

auto returnType = NewStreamType(resultRowStruct);

TCallableBuilder callableBuilder(Env, __func__, returnType);
callableBuilder.Add(input);
callableBuilder.Add(NewDataLiteral<ui32>((ui32)GetIndexLookupJoinKind(joinType)));
callableBuilder.Add(NewDataLiteral<ui64>(leftRowType->GetMembersCount()));
callableBuilder.Add(NewDataLiteral<ui64>(rightRowType->GetMembersCount()));
callableBuilder.Add(TRuntimeNode(leftIndicesMap.Build(), true));
callableBuilder.Add(TRuntimeNode(rightIndicesMap.Build(), true));
return TRuntimeNode(callableBuilder.Build(), false);
}

Expand Down
47 changes: 47 additions & 0 deletions ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ void PrepareTables(TSession session) {
PRIMARY KEY (Key)
);

CREATE TABLE `/Root/Kv` (
Key Int32,
Value String,
PRIMARY KEY (Key)
);

CREATE TABLE `/Root/LaunchByProcessIdAndPinned` (
idx_processId Utf8,
idx_pinned Bool,
Expand Down Expand Up @@ -64,6 +70,12 @@ void PrepareTables(TSession session) {
(NULL, "Value24"),
(104, NULL);

REPLACE INTO `/Root/Kv` (Key, Value) VALUES
(1, "Value1"),
(2, "Value2"),
(3, "Value3"),
(4, "Value4");

REPLACE INTO `/Root/LaunchByProcessIdAndPinned` (idx_processId, idx_pinned, idx_launchNumber) VALUES
("eProcess", false, 4),
("eProcess", true, 5),
Expand Down Expand Up @@ -190,6 +202,41 @@ Y_UNIT_TEST_TWIN(Inner, StreamLookup) {
])", 2, StreamLookup);
}

Y_UNIT_TEST_TWIN(JoinWithSubquery, StreamLookup) {
const auto query = R"(
$join = (SELECT l.Key AS lKey, l.Value AS lValue, r.Value AS rValue
FROM `/Root/Left` AS l
INNER JOIN `/Root/Right` AS r
ON l.Fk = r.Key
);
SELECT j.lValue AS Value FROM $join AS j INNER JOIN `/Root/Kv` AS kv
ON j.lKey = kv.Key;
)";

NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookup);

auto settings = TKikimrSettings().SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

PrepareTables(session);

TExecDataQuerySettings execSettings;
execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);

auto result = session.ExecuteDataQuery(Q_(query), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());

CompareYson(R"([
[["Value1"]];
[["Value1"]];
[["Value2"]];
[["Value2"]]
])", FormatResultSetYson(result.GetResultSet(0)));
}

Y_UNIT_TEST_TWIN(Left, StreamLookup) {
Test(
R"(
Expand Down