Skip to content

Commit a990d04

Browse files
authored
Merge 6106b8e into 465e724
2 parents 465e724 + 6106b8e commit a990d04

File tree

15 files changed

+214
-28
lines changed

15 files changed

+214
-28
lines changed

ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ class TKqpPeepholeTransformer : public TOptimizeTransformerBase {
9393
{
9494
#define HNDL(name) "KqpPeephole-"#name, Hndl(&TKqpPeepholeTransformer::name)
9595
AddHandler(0, &TDqReplicate::Match, HNDL(RewriteReplicate));
96-
AddHandler(0, &TDqPhyMapJoin::Match, HNDL(RewriteMapJoin));
96+
AddHandler(0, &TDqPhyGraceJoin::Match, HNDL(RewriteMapJoinWithGraceCore));
97+
AddHandler(0, &TDqPhyMapJoin::Match, HNDL(RewriteMapJoinWithMapCore));
9798
AddHandler(0, &TDqPhyCrossJoin::Match, HNDL(RewriteCrossJoin));
9899
AddHandler(0, &TDqPhyJoinDict::Match, HNDL(RewriteDictJoin));
99100
AddHandler(0, &TDqJoin::Match, HNDL(RewritePureJoin));
@@ -110,9 +111,15 @@ class TKqpPeepholeTransformer : public TOptimizeTransformerBase {
110111
return output;
111112
}
112113

113-
TMaybeNode<TExprBase> RewriteMapJoin(TExprBase node, TExprContext& ctx) {
114-
TExprBase output = DqPeepholeRewriteMapJoin(node, ctx);
115-
DumpAppliedRule("RewriteMapJoin", node.Ptr(), output.Ptr(), ctx);
114+
TMaybeNode<TExprBase> RewriteMapJoinWithGraceCore(TExprBase node, TExprContext& ctx) {
115+
TExprBase output = DqPeepholeRewriteMapJoinWithGraceCore(node, ctx);
116+
DumpAppliedRule("RewriteMapJoinWithGraceCore", node.Ptr(), output.Ptr(), ctx);
117+
return output;
118+
}
119+
120+
TMaybeNode<TExprBase> RewriteMapJoinWithMapCore(TExprBase node, TExprContext& ctx) {
121+
TExprBase output = DqPeepholeRewriteMapJoinWithMapCore(node, ctx);
122+
DumpAppliedRule("RewriteMapJoinWithMapCore", node.Ptr(), output.Ptr(), ctx);
116123
return output;
117124
}
118125

ydb/core/kqp/opt/physical/kqp_opt_phy.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
421421
// It is now possible as we don't use datashard transactions for reads in data queries.
422422
bool pushLeftStage = (KqpCtx.IsScanQuery() || KqpCtx.Config->EnableKqpDataQueryStreamLookup) && AllowFuseJoinInputs(node);
423423
TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal,
424-
pushLeftStage, KqpCtx.Config->GetHashJoinMode(), false
424+
pushLeftStage, KqpCtx.Config->GetHashJoinMode(), false, KqpCtx.Config->UseGraceJoinCoreForMap.Get().GetOrElse(false)
425425
);
426426
DumpAppliedRule("BuildJoin", node.Ptr(), output.Ptr(), ctx);
427427
return output;

ydb/core/kqp/provider/yql_kikimr_settings.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ TKikimrConfiguration::TKikimrConfiguration() {
8383
REGISTER_SETTING(*this, OptEnableOlapProvideComputeSharding);
8484
REGISTER_SETTING(*this, OverrideStatistics);
8585
REGISTER_SETTING(*this, OverridePlanner);
86-
86+
REGISTER_SETTING(*this, UseGraceJoinCoreForMap);
8787

8888
REGISTER_SETTING(*this, OptUseFinalizeByKey);
8989
REGISTER_SETTING(*this, CostBasedOptimizationLevel);

ydb/core/kqp/provider/yql_kikimr_settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ struct TKikimrSettings {
5353
NCommon::TConfSetting<TString, false> OverrideStatistics;
5454
NCommon::TConfSetting<ui64, false> EnableSpillingNodes;
5555
NCommon::TConfSetting<TString, false> OverridePlanner;
56+
NCommon::TConfSetting<bool, false> UseGraceJoinCoreForMap;
5657

5758
/* Disable optimizer rules */
5859
NCommon::TConfSetting<bool, false> OptDisableTopSort;

ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@
4747
{"Index": 9, "Name": "Flags", "Type": "TCoAtomList", "Optional": true}
4848
]
4949
},
50+
{
51+
"Name": "TDqPhyGraceJoin",
52+
"Base": "TDqJoinBase",
53+
"Match": {"Type": "Callable", "Name": "DqPhyGraceJoin"}
54+
},
5055
{
5156
"Name": "TDqPhyMapJoin",
5257
"Base": "TDqJoinBase",

ydb/library/yql/dq/opt/dq_opt_join.cpp

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -323,8 +323,8 @@ std::pair<TVector<TCoAtom>, TVector<TCoAtom>> GetJoinKeys(const TDqJoin& join, T
323323
}
324324

325325

326-
TDqPhyMapJoin DqMakePhyMapJoin(const TDqJoin& join, const TExprBase& leftInput, const TExprBase& rightInput,
327-
TExprContext& ctx)
326+
TDqJoinBase DqMakePhyMapJoin(const TDqJoin& join, const TExprBase& leftInput, const TExprBase& rightInput,
327+
TExprContext& ctx, bool useGraceCore)
328328
{
329329
static const std::set<std::string_view> supportedTypes = {"Inner"sv, "Left"sv, "LeftOnly"sv, "LeftSemi"sv};
330330
auto joinType = join.JoinType().Value();
@@ -349,16 +349,29 @@ TDqPhyMapJoin DqMakePhyMapJoin(const TDqJoin& join, const TExprBase& leftInput,
349349
auto leftFilteredInput = BuildSkipNullKeys(ctx, join.Pos(), leftInput, leftFilterKeys);
350350
auto rightFilteredInput = BuildSkipNullKeys(ctx, join.Pos(), rightInput, rightFilterKeys);
351351

352-
return Build<TDqPhyMapJoin>(ctx, join.Pos())
353-
.LeftInput(leftFilteredInput)
354-
.LeftLabel(join.LeftLabel())
355-
.RightInput(rightFilteredInput)
356-
.RightLabel(join.RightLabel())
357-
.JoinType(join.JoinType())
358-
.JoinKeys(join.JoinKeys())
359-
.LeftJoinKeyNames(join.LeftJoinKeyNames())
360-
.RightJoinKeyNames(join.RightJoinKeyNames())
361-
.Done();
352+
if (useGraceCore) {
353+
return Build<TDqPhyGraceJoin>(ctx, join.Pos())
354+
.LeftInput(leftFilteredInput)
355+
.LeftLabel(join.LeftLabel())
356+
.RightInput(rightFilteredInput)
357+
.RightLabel(join.RightLabel())
358+
.JoinType(join.JoinType())
359+
.JoinKeys(join.JoinKeys())
360+
.LeftJoinKeyNames(join.LeftJoinKeyNames())
361+
.RightJoinKeyNames(join.RightJoinKeyNames())
362+
.Done();
363+
} else {
364+
return Build<TDqPhyMapJoin>(ctx, join.Pos())
365+
.LeftInput(leftFilteredInput)
366+
.LeftLabel(join.LeftLabel())
367+
.RightInput(rightFilteredInput)
368+
.RightLabel(join.RightLabel())
369+
.JoinType(join.JoinType())
370+
.JoinKeys(join.JoinKeys())
371+
.LeftJoinKeyNames(join.LeftJoinKeyNames())
372+
.RightJoinKeyNames(join.RightJoinKeyNames())
373+
.Done();
374+
}
362375
}
363376

364377
} // namespace
@@ -609,7 +622,7 @@ TExprBase DqRewriteLeftPureJoin(const TExprBase node, TExprContext& ctx, const T
609622
.Done();
610623
}
611624

612-
TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx) {
625+
TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx, bool useGraceCoreForMap) {
613626
static const std::set<std::string_view> supportedTypes = {
614627
"Inner"sv,
615628
"Left"sv,
@@ -760,7 +773,7 @@ TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext&
760773

761774
TMaybeNode<TExprBase> phyJoin;
762775
if (join.JoinType().Value() != "Cross"sv) {
763-
phyJoin = DqMakePhyMapJoin(join, leftInputArg, joinRightInput, ctx);
776+
phyJoin = DqMakePhyMapJoin(join, leftInputArg, joinRightInput, ctx, useGraceCoreForMap);
764777
} else {
765778
YQL_ENSURE(join.JoinKeys().Empty());
766779

ydb/library/yql/dq/opt/dq_opt_join.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ NNodes::TExprBase DqRewriteEquiJoin(const NNodes::TExprBase& node, EHashJoinMode
1616

1717
NNodes::TExprBase DqRewriteEquiJoin(const NNodes::TExprBase& node, EHashJoinMode mode, bool useCBO, TExprContext& ctx, const TTypeAnnotationContext& typeCtx, int& joinCounter);
1818

19-
NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx);
19+
NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx, bool useGraceCoreForMap);
2020

2121
NNodes::TExprBase DqBuildJoin(const NNodes::TExprBase& node, TExprContext& ctx,
22-
IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin = EHashJoinMode::Off, bool shuffleMapJoin = true);
22+
IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin = EHashJoinMode::Off, bool shuffleMapJoin = true, bool useGraceCoreForMap = false);
2323

2424
NNodes::TExprBase DqBuildHashJoin(const NNodes::TDqJoin& join, EHashJoinMode mode, TExprContext& ctx, IOptimizationContext& optCtx);
2525

ydb/library/yql/dq/opt/dq_opt_peephole.cpp

Lines changed: 153 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <ydb/library/yql/core/yql_opt_utils.h>
55
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
66
#include <ydb/library/yql/core/yql_expr_optimize.h>
7+
#include <ydb/library/yql/core/yql_type_helpers.h>
78

89
#include <ydb/library/yql/utils/log/log.h>
910

@@ -130,8 +131,158 @@ TExprNode::TListType OriginalJoinOutputMembers(const TDqPhyMapJoin& mapJoin, TEx
130131
}
131132
return structMembers;
132133
}
134+
135+
TExprNode::TPtr ExpandJoinInput(const TStructExprType& type, TExprNode::TPtr&& arg, TExprContext& ctx) {
136+
return ctx.Builder(arg->Pos())
137+
.Callable("ExpandMap")
138+
.Add(0, std::move(arg))
139+
.Lambda(1)
140+
.Param("item")
141+
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
142+
auto i = 0U;
143+
for (const auto& item : type.GetItems()) {
144+
parent.Callable(i++, "Member")
145+
.Arg(0, "item")
146+
.Atom(1, item->GetName())
147+
.Seal();
148+
}
149+
return parent;
150+
})
151+
.Seal()
152+
.Seal().Build();
153+
}
154+
133155
} // anonymous namespace end
134156

157+
TExprBase DqPeepholeRewriteMapJoinWithGraceCore(const TExprBase& node, TExprContext& ctx) {
158+
if (!node.Maybe<TDqPhyGraceJoin>()) {
159+
return node;
160+
}
161+
const auto graceJoin = node.Cast<TDqPhyGraceJoin>();
162+
const auto pos = graceJoin.Pos();
163+
164+
const TString leftTableLabel(GetTableLabel(graceJoin.LeftLabel()));
165+
const TString rightTableLabel(GetTableLabel(graceJoin.RightLabel()));
166+
167+
auto [leftKeyColumnNodes, rightKeyColumnNodes] = JoinKeysToAtoms(ctx, graceJoin, leftTableLabel, rightTableLabel);
168+
const auto keyWidth = leftKeyColumnNodes.size();
169+
170+
ui32 outputIndex = 0;
171+
const auto makeRenames = [&ctx, &outputIndex, pos](TStringBuf, const TStructExprType& type) {
172+
TExprNode::TListType renames;
173+
for (auto i = 0u; i < type.GetSize(); i++) {
174+
renames.emplace_back(ctx.NewAtom(pos, ctx.GetIndexAsString(i)));
175+
renames.emplace_back(ctx.NewAtom(pos, ctx.GetIndexAsString(outputIndex++)));
176+
}
177+
return renames;
178+
};
179+
180+
const auto itemTypeLeft = GetSequenceItemType(graceJoin.LeftInput(), false, ctx)->Cast<TStructExprType>();
181+
const auto itemTypeRight = GetSequenceItemType(graceJoin.RightInput(), false, ctx)->Cast<TStructExprType>();
182+
183+
std::vector<TString> fullColNames;
184+
185+
for (auto i = 0u; i < itemTypeLeft->GetSize(); i++) {
186+
TString name(itemTypeLeft->GetItems()[i]->GetName());
187+
if (leftTableLabel) {
188+
name = leftTableLabel + "." + name;
189+
}
190+
fullColNames.push_back(name);
191+
}
192+
for (auto i = 0u; i < itemTypeRight->GetSize(); i++) {
193+
TString name(itemTypeRight->GetItems()[i]->GetName());
194+
if (rightTableLabel) {
195+
name = rightTableLabel + "." + name;
196+
}
197+
fullColNames.push_back(name);
198+
}
199+
200+
TExprNode::TListType leftRenames = makeRenames(leftTableLabel, *itemTypeLeft);
201+
TExprNode::TListType rightRenames, rightPayloads;
202+
const bool withRightSide = graceJoin.JoinType().Value() != "LeftOnly" && graceJoin.JoinType().Value() != "LeftSemi";
203+
if (withRightSide) {
204+
rightRenames = makeRenames(rightTableLabel, *itemTypeRight);
205+
rightPayloads.reserve(rightRenames.size() >> 1U);
206+
for (auto it = rightRenames.cbegin(); rightRenames.cend() != it; ++++it)
207+
rightPayloads.emplace_back(*it);
208+
}
209+
210+
TTypeAnnotationNode::TListType keyTypesLeft(keyWidth);
211+
TTypeAnnotationNode::TListType keyTypesRight(keyWidth);
212+
TTypeAnnotationNode::TListType keyTypes(keyWidth);
213+
for (auto i = 0U; i < keyTypes.size(); ++i) {
214+
const auto keyTypeLeft = itemTypeLeft->FindItemType(leftKeyColumnNodes[i]->Content());
215+
const auto keyTypeRight = itemTypeRight->FindItemType(rightKeyColumnNodes[i]->Content());
216+
bool optKey = false;
217+
keyTypes[i] = JoinDryKeyType(keyTypeLeft, keyTypeRight, optKey, ctx);
218+
if (!keyTypes[i]) {
219+
keyTypes.clear();
220+
keyTypesLeft.clear();
221+
keyTypesRight.clear();
222+
break;
223+
}
224+
keyTypesLeft[i] = optKey ? ctx.MakeType<TOptionalExprType>(keyTypes[i]) : keyTypes[i];
225+
keyTypesRight[i] = optKey ? ctx.MakeType<TOptionalExprType>(keyTypes[i]) : keyTypes[i];
226+
}
227+
228+
auto leftInput = ExpandJoinInput(*itemTypeLeft, ctx.NewCallable(graceJoin.LeftInput().Pos(), "ToFlow", {graceJoin.LeftInput().Ptr()}), ctx);
229+
auto rightInput = ExpandJoinInput(*itemTypeRight, ctx.NewCallable(graceJoin.RightInput().Pos(), "ToFlow", {graceJoin.RightInput().Ptr()}), ctx);
230+
YQL_ENSURE(!keyTypes.empty());
231+
232+
for (auto i = 0U; i < leftKeyColumnNodes.size(); i++) {
233+
const auto origName = TString(leftKeyColumnNodes[i]->Content());
234+
auto index = itemTypeLeft->FindItem(origName);
235+
YQL_ENSURE(index);
236+
leftKeyColumnNodes[i] = ctx.NewAtom(leftKeyColumnNodes[i]->Pos(), ctx.GetIndexAsString(*index));
237+
}
238+
for (auto i = 0U; i < rightKeyColumnNodes.size(); i++) {
239+
const auto origName = TString(rightKeyColumnNodes[i]->Content());
240+
auto index = itemTypeRight->FindItem(origName);
241+
YQL_ENSURE(index);
242+
rightKeyColumnNodes[i] = ctx.NewAtom(rightKeyColumnNodes[i]->Pos(), ctx.GetIndexAsString(*index));
243+
}
244+
245+
auto [leftKeyColumnNodesCopy, rightKeyColumnNodesCopy] = JoinKeysToAtoms(ctx, graceJoin, leftTableLabel, rightTableLabel);
246+
247+
auto graceJoinCore = Build<TCoGraceJoinCore>(ctx, pos)
248+
.LeftInput(std::move(leftInput))
249+
.RightInput(std::move(rightInput))
250+
.JoinKind(graceJoin.JoinType())
251+
.LeftKeysColumns(ctx.NewList(pos, std::move(leftKeyColumnNodes)))
252+
.RightKeysColumns(ctx.NewList(pos, std::move(rightKeyColumnNodes)))
253+
.LeftRenames(ctx.NewList(pos, std::move(leftRenames)))
254+
.RightRenames(ctx.NewList(pos, std::move(rightRenames)))
255+
.LeftKeysColumnNames(ctx.NewList(pos, std::move(leftKeyColumnNodesCopy)))
256+
.RightKeysColumnNames(ctx.NewList(pos, std::move(rightKeyColumnNodesCopy)))
257+
.Flags()
258+
.Build()
259+
.Done();
260+
261+
auto graceNode = ctx.Builder(pos)
262+
.Callable("NarrowMap")
263+
.Add(0, graceJoinCore.Ptr())
264+
.Lambda(1)
265+
.Params("output", fullColNames.size())
266+
.Callable("AsStruct")
267+
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
268+
ui32 i = 0U;
269+
for (const auto& colName : fullColNames) {
270+
parent.List(i)
271+
.Atom(0, colName)
272+
.Arg(1, "output", i)
273+
.Seal();
274+
i++;
275+
}
276+
return parent;
277+
})
278+
.Seal()
279+
.Seal()
280+
.Seal()
281+
.Build();
282+
283+
return TExprBase(graceNode);
284+
}
285+
135286
/**
136287
* Rewrites a `KqpMapJoin` to the `MapJoinCore`.
137288
*
@@ -142,10 +293,11 @@ TExprNode::TListType OriginalJoinOutputMembers(const TDqPhyMapJoin& mapJoin, TEx
142293
* (rely on the fact that there will be only one element in the `FlatMap`-stream)
143294
* - Align key types using `StrictCast`, use internal columns to store converted left keys
144295
*/
145-
TExprBase DqPeepholeRewriteMapJoin(const TExprBase& node, TExprContext& ctx) {
296+
TExprBase DqPeepholeRewriteMapJoinWithMapCore(const TExprBase& node, TExprContext& ctx) {
146297
if (!node.Maybe<TDqPhyMapJoin>()) {
147298
return node;
148299
}
300+
149301
const auto mapJoin = node.Cast<TDqPhyMapJoin>();
150302
const auto pos = mapJoin.Pos();
151303

ydb/library/yql/dq/opt/dq_opt_peephole.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ namespace NYql::NDq {
1010

1111
NNodes::TExprBase DqPeepholeRewriteCrossJoin(const NNodes::TExprBase& node, TExprContext& ctx);
1212
NNodes::TExprBase DqPeepholeRewriteJoinDict(const NNodes::TExprBase& node, TExprContext& ctx);
13-
NNodes::TExprBase DqPeepholeRewriteMapJoin(const NNodes::TExprBase& node, TExprContext& ctx);
13+
NNodes::TExprBase DqPeepholeRewriteMapJoinWithGraceCore(const NNodes::TExprBase& node, TExprContext& ctx);
14+
NNodes::TExprBase DqPeepholeRewriteMapJoinWithMapCore(const NNodes::TExprBase& node, TExprContext& ctx);
1415
NNodes::TExprBase DqPeepholeRewriteReplicate(const NNodes::TExprBase& node, TExprContext& ctx);
1516
NNodes::TExprBase DqPeepholeRewritePureJoin(const NNodes::TExprBase& node, TExprContext& ctx);
1617
NNodes::TExprBase DqPeepholeDropUnusedInputs(const NNodes::TExprBase& node, TExprContext& ctx);

ydb/library/yql/dq/opt/dq_opt_phy.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2610,7 +2610,7 @@ TMaybeNode<TDqJoin> DqFlipJoin(const TDqJoin& join, TExprContext& ctx) {
26102610

26112611

26122612
TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationContext& optCtx,
2613-
const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin, bool shuffleMapJoin)
2613+
const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin, bool shuffleMapJoin, bool useGraceCoreForMap)
26142614
{
26152615
if (!node.Maybe<TDqJoin>()) {
26162616
return node;
@@ -2660,7 +2660,7 @@ TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationCon
26602660
// separate stage to receive data from both sides of join.
26612661
// TODO: We can push MapJoin to existing stage for data query, if it doesn't have table reads. This
26622662
// requires some additional knowledge, probably with use of constraints.
2663-
return DqBuildPhyJoin(join, pushLeftStage, ctx, optCtx);
2663+
return DqBuildPhyJoin(join, pushLeftStage, ctx, optCtx, useGraceCoreForMap);
26642664
}
26652665

26662666
TExprBase DqPrecomputeToInput(const TExprBase& node, TExprContext& ctx) {

0 commit comments

Comments
 (0)