Skip to content

Commit 2d22584

Browse files
authored
Merge c3eabc0 into 0d9831e
2 parents 0d9831e + c3eabc0 commit 2d22584

File tree

7 files changed

+204
-40
lines changed

7 files changed

+204
-40
lines changed

ydb/core/kqp/opt/logical/kqp_opt_log.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,14 +200,14 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
200200

201201
TMaybeNode<TExprBase> RewriteEquiJoin(TExprBase node, TExprContext& ctx) {
202202
bool useCBO = Config->CostBasedOptimizationLevel.Get().GetOrElse(Config->DefaultCostBasedOptimizationLevel) >= 2;
203-
TExprBase output = DqRewriteEquiJoin(node, KqpCtx.Config->GetHashJoinMode(), useCBO, ctx, TypesCtx, KqpCtx.JoinsCount);
203+
TExprBase output = DqRewriteEquiJoin(node, KqpCtx.Config->GetHashJoinMode(), useCBO, ctx, TypesCtx, KqpCtx.JoinsCount, KqpCtx.GetOptimizerHints());
204204
DumpAppliedRule("RewriteEquiJoin", node.Ptr(), output.Ptr(), ctx);
205205
return output;
206206
}
207207

208208
TMaybeNode<TExprBase> JoinToIndexLookup(TExprBase node, TExprContext& ctx) {
209209
bool useCBO = Config->CostBasedOptimizationLevel.Get().GetOrElse(Config->DefaultCostBasedOptimizationLevel) >= 2;
210-
TExprBase output = KqpJoinToIndexLookup(node, ctx, KqpCtx, useCBO);
210+
TExprBase output = KqpJoinToIndexLookup(node, ctx, KqpCtx, useCBO, KqpCtx.GetOptimizerHints());
211211
DumpAppliedRule("JoinToIndexLookup", node.Ptr(), output.Ptr(), ctx);
212212
return output;
213213
}

ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp

Lines changed: 52 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ bool IsParameterToListOfStructsRepack(const TExprBase& expr) {
331331
return true;
332332
}
333333

334-
//#define DBG(...) YQL_CLOG(DEBUG, ProviderKqp) << __VA_ARGS__
334+
// #define DBG(...) YQL_CLOG(DEBUG, ProviderKqp) << __VA_ARGS__
335335
#define DBG(...)
336336

337337
TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
@@ -935,7 +935,38 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
935935

936936
} // anonymous namespace
937937

938-
TExprBase KqpJoinToIndexLookup(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, bool useCBO)
938+
TVector<TString> CollectLabels(const TExprBase& node) {
939+
TVector<TString> rels;
940+
941+
if (node.Maybe<TDqPrecompute>()) {
942+
auto precompute = node.Cast<TDqPrecompute>();
943+
return CollectLabels(precompute.Input());
944+
}
945+
946+
if (node.Maybe<TDqJoin>()) {
947+
auto join = node.Cast<TDqJoin>();
948+
949+
if (join.LeftLabel().Maybe<TCoAtom>()) {
950+
rels.push_back(join.LeftLabel().Cast<TCoAtom>().StringValue());
951+
} else {
952+
auto lhs = CollectLabels(join.LeftInput());
953+
rels.insert(rels.end(), std::make_move_iterator(lhs.begin()), std::make_move_iterator(lhs.end()));
954+
}
955+
956+
if (join.RightLabel().Maybe<TCoAtom>()) {
957+
rels.push_back(join.RightLabel().Cast<TCoAtom>().StringValue());
958+
} else {
959+
auto rhs = CollectLabels(join.RightInput());
960+
rels.insert(rels.end(), std::make_move_iterator(rhs.begin()), std::make_move_iterator(rhs.end()));
961+
}
962+
963+
return rels;
964+
}
965+
966+
return {};
967+
}
968+
969+
TExprBase KqpJoinToIndexLookup(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, bool useCBO, const TOptimizerHints& hints)
939970
{
940971
if (!node.Maybe<TDqJoin>()) {
941972
return node;
@@ -952,20 +983,33 @@ TExprBase KqpJoinToIndexLookup(const TExprBase& node, TExprContext& ctx, const T
952983
return node;
953984
}
954985

955-
if (useCBO){
956-
957-
if (algo != EJoinAlgoType::LookupJoin && algo != EJoinAlgoType::LookupJoinReverse) {
986+
if (useCBO && algo != EJoinAlgoType::LookupJoin && algo != EJoinAlgoType::LookupJoinReverse){
987+
return node;
988+
}
989+
990+
/*
991+
* this cycle looks for applied hints for these join labels. if we've found one then we will leave the function.
992+
* But if it is a LookupJoin we will rewrite it with KqpJoinToIndexLookupImpl because lookup join needs to be rewritten
993+
*/
994+
auto joinLabels = CollectLabels(node);
995+
for (const auto& hint: hints.JoinAlgoHints->Hints) {
996+
if (
997+
std::unordered_set<TString>(hint.JoinLabels.begin(), hint.JoinLabels.end()) ==
998+
std::unordered_set<TString>(joinLabels.begin(), joinLabels.end()) && hint.Applied
999+
) {
1000+
if (hint.Algo == EJoinAlgoType::LookupJoin || hint.Algo == EJoinAlgoType::LookupJoinReverse) {
1001+
break;
1002+
}
1003+
9581004
return node;
959-
}
1005+
}
9601006
}
9611007

9621008
DBG("-- Join: " << KqpExprToPrettyString(join, ctx));
9631009

9641010
// SqlIn support (preferred lookup direction)
9651011
if (join.JoinType().Value() == "LeftSemi") {
9661012
auto flipJoin = FlipLeftSemiJoin(join, ctx);
967-
DBG("-- Flip join");
968-
9691013
if (auto indexLookupJoin = KqpJoinToIndexLookupImpl(flipJoin, ctx, kqpCtx)) {
9701014
return indexLookupJoin.Cast();
9711015
}

ydb/core/kqp/opt/logical/kqp_opt_log_rules.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ NYql::NNodes::TExprBase KqpPushExtractedPredicateToReadTable(NYql::NNodes::TExpr
2525
const TKqpOptimizeContext& kqpCtx, NYql::TTypeAnnotationContext& typesCtx, const NYql::TParentsMap& parentsMap);
2626

2727
NYql::NNodes::TExprBase KqpJoinToIndexLookup(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx,
28-
const TKqpOptimizeContext& kqpCtx, bool useCBO);
28+
const TKqpOptimizeContext& kqpCtx, bool useCBO, const NYql::TOptimizerHints& hints);
2929

3030
NYql::NNodes::TExprBase KqpRewriteSqlInToEquiJoin(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx,
3131
const TKqpOptimizeContext& kqpCtx, const NYql::TKikimrConfiguration::TPtr& config);
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
PRAGMA TablePathPrefix='/Root';
2+
PRAGMA ydb.OptimizerHints =
3+
'
4+
JoinType(R S Shuffle)
5+
JoinType(R S T Broadcast)
6+
JoinType(R S T U Shuffle)
7+
JoinType(R S T U V Broadcast)
8+
';
9+
10+
SELECT * FROM
11+
R INNER JOIN S on R.id = S.id
12+
INNER JOIN T on R.id = T.id
13+
INNER JOIN U on T.id = U.id
14+
INNER JOIN V on U.id = V.id;

ydb/core/kqp/ut/join/kqp_join_order_ut.cpp

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ static void CreateSampleTable(TSession session, bool useColumnStore) {
8181
CreateTables(session, "schema/lookupbug.sql", useColumnStore);
8282
}
8383

84-
static TKikimrRunner GetKikimrWithJoinSettings(bool useStreamLookupJoin = false, TString stats = ""){
84+
static TKikimrRunner GetKikimrWithJoinSettings(bool useStreamLookupJoin = false, TString stats = "", bool useCBO = true){
8585
TVector<NKikimrKqp::TKqpSetting> settings;
8686

8787
NKikimrKqp::TKqpSetting setting;
@@ -100,6 +100,9 @@ static TKikimrRunner GetKikimrWithJoinSettings(bool useStreamLookupJoin = false,
100100
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(useStreamLookupJoin);
101101
appConfig.MutableTableServiceConfig()->SetEnableConstantFolding(true);
102102
appConfig.MutableTableServiceConfig()->SetCompileTimeoutMs(TDuration::Minutes(10).MilliSeconds());
103+
if (!useCBO) {
104+
appConfig.MutableTableServiceConfig()->SetDefaultCostBasedOptimizationLevel(0);
105+
}
103106

104107
auto serverSettings = TKikimrSettings().SetAppConfig(appConfig);
105108
serverSettings.SetKqpSettings(settings);
@@ -197,8 +200,8 @@ class TChainTester {
197200
size_t ChainSize;
198201
};
199202

200-
void ExplainJoinOrderTestDataQueryWithStats(const TString& queryPath, const TString& statsPath, bool useStreamLookupJoin, bool useColumnStore) {
201-
auto kikimr = GetKikimrWithJoinSettings(useStreamLookupJoin, GetStatic(statsPath));
203+
void ExplainJoinOrderTestDataQueryWithStats(const TString& queryPath, const TString& statsPath, bool useStreamLookupJoin, bool useColumnStore, bool useCBO = true) {
204+
auto kikimr = GetKikimrWithJoinSettings(useStreamLookupJoin, GetStatic(statsPath), useCBO);
202205
auto db = kikimr.GetTableClient();
203206
auto session = db.CreateSession().GetValueSync().GetSession();
204207

@@ -333,8 +336,8 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) {
333336
// TChainTester(65).Test();
334337
//}
335338

336-
TString ExecuteJoinOrderTestDataQueryWithStats(const TString& queryPath, const TString& statsPath, bool useStreamLookupJoin, bool useColumnStore) {
337-
auto kikimr = GetKikimrWithJoinSettings(useStreamLookupJoin, GetStatic(statsPath));
339+
TString ExecuteJoinOrderTestDataQueryWithStats(const TString& queryPath, const TString& statsPath, bool useStreamLookupJoin, bool useColumnStore, bool useCBO = true) {
340+
auto kikimr = GetKikimrWithJoinSettings(useStreamLookupJoin, GetStatic(statsPath), useCBO);
338341
auto db = kikimr.GetTableClient();
339342
auto session = db.CreateSession().GetValueSync().GetSession();
340343

@@ -518,6 +521,69 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) {
518521
CheckJoinCardinality("queries/test_join_hint2.sql", "stats/basic.json", "InnerJoin (MapJoin)", 1, StreamLookupJoin, ColumnStore);
519522
}
520523

524+
525+
class TFindJoinWithLabels {
526+
public:
527+
TFindJoinWithLabels(
528+
const NJson::TJsonValue& plan
529+
)
530+
: Plan(plan)
531+
{}
532+
533+
TString Find(const TVector<TString>& labels) {
534+
Labels = labels;
535+
std::sort(Labels.begin(), Labels.end());
536+
TVector<TString> dummy;
537+
auto res = FindImpl(Plan, dummy);
538+
return res;
539+
}
540+
541+
private:
542+
TString FindImpl(const NJson::TJsonValue& plan, TVector<TString>& subtreeLabels) {
543+
auto planMap = plan.GetMapSafe();
544+
if (!planMap.contains("table")) {
545+
TString opName = planMap.at("op_name").GetStringSafe();
546+
547+
auto inputs = planMap.at("args").GetArraySafe();
548+
for (size_t i = 0; i < inputs.size(); ++i) {
549+
TVector<TString> childLabels;
550+
if (auto maybeOpName = FindImpl(inputs[i], childLabels) ) {
551+
return maybeOpName;
552+
}
553+
subtreeLabels.insert(subtreeLabels.end(), childLabels.begin(), childLabels.end());
554+
}
555+
556+
if (AreRequestedLabels(subtreeLabels)) {
557+
return opName;
558+
}
559+
560+
return "";
561+
}
562+
563+
subtreeLabels = {planMap.at("table").GetStringSafe()};
564+
return "";
565+
}
566+
567+
bool AreRequestedLabels(TVector<TString> labels) {
568+
std::sort(labels.begin(), labels.end());
569+
return Labels == labels;
570+
}
571+
572+
NJson::TJsonValue Plan;
573+
TVector<TString> Labels;
574+
};
575+
576+
Y_UNIT_TEST(OltpJoinTypeHintCBOTurnOFF) {
577+
auto plan = ExecuteJoinOrderTestDataQueryWithStats("queries/oltp_join_type_hint_cbo_turnoff.sql", "stats/basic.json", false, false, false);
578+
auto detailedPlan = GetDetailedJoinOrder(plan);
579+
580+
auto joinFinder = TFindJoinWithLabels(detailedPlan);
581+
UNIT_ASSERT(joinFinder.Find({"R", "S"}) == "InnerJoin (Grace)");
582+
UNIT_ASSERT(joinFinder.Find({"R", "S", "T"}) == "InnerJoin (MapJoin)");
583+
UNIT_ASSERT(joinFinder.Find({"R", "S", "T", "U"}) == "InnerJoin (Grace)");
584+
UNIT_ASSERT(joinFinder.Find({"R", "S", "T", "U", "V"}) == "InnerJoin (MapJoin)");
585+
}
586+
521587
Y_UNIT_TEST_XOR_OR_BOTH_FALSE(TestJoinOrderHintsSimple, StreamLookupJoin, ColumnStore) {
522588
auto plan = ExecuteJoinOrderTestDataQueryWithStats("queries/join_order_hints_simple.sql", "stats/basic.json", StreamLookupJoin, ColumnStore);
523589
UNIT_ASSERT_VALUES_EQUAL(GetJoinOrder(plan).GetStringRobust(), R"(["T",["R","S"]])") ;

ydb/library/yql/dq/opt/dq_opt_join.cpp

Lines changed: 61 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -116,44 +116,67 @@ TExprBase BuildDqJoinInput(TExprContext& ctx, TPositionHandle pos, const TExprBa
116116
return partition;
117117
}
118118

119-
TMaybe<TJoinInputDesc> BuildDqJoin(const TCoEquiJoinTuple& joinTuple,
120-
const THashMap<TStringBuf, TJoinInputDesc>& inputs, EHashJoinMode mode, bool useCBO, TExprContext& ctx, const TTypeAnnotationContext& typeCtx)
119+
TMaybe<TJoinInputDesc> BuildDqJoin(
120+
const TCoEquiJoinTuple& joinTuple,
121+
const THashMap<TStringBuf, TJoinInputDesc>& inputs,
122+
EHashJoinMode mode,
123+
TExprContext& ctx,
124+
const TTypeAnnotationContext& typeCtx,
125+
TVector<TString>& subtreeLabels,
126+
const NYql::TOptimizerHints& hints
127+
)
121128
{
122-
auto options = joinTuple.Options();
123-
auto linkSettings = GetEquiJoinLinkSettings(options.Ref());
124-
YQL_ENSURE(linkSettings.JoinAlgo != EJoinAlgoType::StreamLookupJoin || typeCtx.StreamLookupJoin, "Unsupported join strategy: streamlookup");
125-
126-
if (linkSettings.JoinAlgo == EJoinAlgoType::MapJoin) {
127-
mode = EHashJoinMode::Map;
128-
} else if (linkSettings.JoinAlgo == EJoinAlgoType::GraceJoin) {
129-
mode = EHashJoinMode::GraceAndSelf;
130-
}
131-
132-
bool leftAny = linkSettings.LeftHints.contains("any");
133-
bool rightAny = linkSettings.RightHints.contains("any");
134-
135129
TMaybe<TJoinInputDesc> left;
130+
TVector<TString> lhsLabels;
136131
if (joinTuple.LeftScope().Maybe<TCoAtom>()) {
132+
lhsLabels.push_back(joinTuple.LeftScope().Cast<TCoAtom>().StringValue());
137133
left = inputs.at(joinTuple.LeftScope().Cast<TCoAtom>().Value());
138134
YQL_ENSURE(left, "unknown scope " << joinTuple.LeftScope().Cast<TCoAtom>().Value());
139135
} else {
140-
left = BuildDqJoin(joinTuple.LeftScope().Cast<TCoEquiJoinTuple>(), inputs, mode, useCBO, ctx, typeCtx);
136+
left = BuildDqJoin(joinTuple.LeftScope().Cast<TCoEquiJoinTuple>(), inputs, mode, ctx, typeCtx, lhsLabels, hints);
141137
if (!left) {
142138
return {};
143139
}
144140
}
145141

146142
TMaybe<TJoinInputDesc> right;
143+
TVector<TString> rhsLabels;
147144
if (joinTuple.RightScope().Maybe<TCoAtom>()) {
145+
rhsLabels.push_back(joinTuple.RightScope().Cast<TCoAtom>().StringValue());
148146
right = inputs.at(joinTuple.RightScope().Cast<TCoAtom>().Value());
149147
YQL_ENSURE(right, "unknown scope " << joinTuple.RightScope().Cast<TCoAtom>().Value());
150148
} else {
151-
right = BuildDqJoin(joinTuple.RightScope().Cast<TCoEquiJoinTuple>(), inputs, mode, useCBO, ctx, typeCtx);
149+
right = BuildDqJoin(joinTuple.RightScope().Cast<TCoEquiJoinTuple>(), inputs, mode, ctx, typeCtx, rhsLabels, hints);
152150
if (!right) {
153151
return {};
154152
}
155153
}
156154

155+
subtreeLabels.insert(subtreeLabels.end(), std::make_move_iterator(lhsLabels.begin()), std::make_move_iterator(lhsLabels.end()));
156+
subtreeLabels.insert(subtreeLabels.end(), std::make_move_iterator(rhsLabels.begin()), std::make_move_iterator(rhsLabels.end()));
157+
158+
auto options = joinTuple.Options();
159+
auto linkSettings = GetEquiJoinLinkSettings(options.Ref());
160+
for (auto& hint: hints.JoinAlgoHints->Hints) {
161+
if (
162+
std::unordered_set<std::string>(hint.JoinLabels.begin(), hint.JoinLabels.end()) ==
163+
std::unordered_set<std::string>(subtreeLabels.begin(), subtreeLabels.end())
164+
) {
165+
linkSettings.JoinAlgo = hint.Algo;
166+
hint.Applied = true;
167+
}
168+
}
169+
YQL_ENSURE(linkSettings.JoinAlgo != EJoinAlgoType::StreamLookupJoin || typeCtx.StreamLookupJoin, "Unsupported join strategy: streamlookup");
170+
171+
if (linkSettings.JoinAlgo == EJoinAlgoType::MapJoin) {
172+
mode = EHashJoinMode::Map;
173+
} else if (linkSettings.JoinAlgo == EJoinAlgoType::GraceJoin) {
174+
mode = EHashJoinMode::GraceAndSelf;
175+
}
176+
177+
bool leftAny = linkSettings.LeftHints.contains("any");
178+
bool rightAny = linkSettings.RightHints.contains("any");
179+
157180
TStringBuf joinType = joinTuple.Type().Value();
158181
TSet<std::pair<TStringBuf, TStringBuf>> resultKeys;
159182
if (joinType != TStringBuf("RightOnly") && joinType != TStringBuf("RightSemi")) {
@@ -379,17 +402,32 @@ bool CheckJoinColumns(const TExprBase& node) {
379402
}
380403
}
381404

382-
TExprBase DqRewriteEquiJoin(const TExprBase& node, EHashJoinMode mode, bool useCBO, TExprContext& ctx, const TTypeAnnotationContext& typeCtx) {
383-
int dummyJoinCounter;
384-
return DqRewriteEquiJoin(node, mode, useCBO, ctx, typeCtx, dummyJoinCounter);
405+
TExprBase DqRewriteEquiJoin(
406+
const TExprBase& node,
407+
EHashJoinMode mode,
408+
bool useCBO,
409+
TExprContext& ctx,
410+
const TTypeAnnotationContext& typeCtx,
411+
const TOptimizerHints& hints
412+
) {
413+
int dummyJoinCounter = 0;
414+
return DqRewriteEquiJoin(node, mode, useCBO, ctx, typeCtx, dummyJoinCounter, hints);
385415
}
386416

387417
/**
388418
* Rewrite `EquiJoin` to a number of `DqJoin` callables. This is done to simplify next step of building
389419
* physical stages with join operators.
390420
* Potentially this optimizer can also perform joins reorder given cardinality information.
391421
*/
392-
TExprBase DqRewriteEquiJoin(const TExprBase& node, EHashJoinMode mode, bool useCBO, TExprContext& ctx, const TTypeAnnotationContext& typeCtx, int& joinCounter) {
422+
TExprBase DqRewriteEquiJoin(
423+
const TExprBase& node,
424+
EHashJoinMode mode,
425+
bool /* useCBO */,
426+
TExprContext& ctx,
427+
const TTypeAnnotationContext& typeCtx,
428+
int& joinCounter,
429+
const TOptimizerHints& hints
430+
) {
393431
if (!node.Maybe<TCoEquiJoin>()) {
394432
return node;
395433
}
@@ -406,7 +444,8 @@ TExprBase DqRewriteEquiJoin(const TExprBase& node, EHashJoinMode mode, bool useC
406444
}
407445

408446
auto joinTuple = equiJoin.Arg(equiJoin.ArgCount() - 2).Cast<TCoEquiJoinTuple>();
409-
auto result = BuildDqJoin(joinTuple, inputs, mode, useCBO, ctx, typeCtx);
447+
TVector<TString> dummy;
448+
auto result = BuildDqJoin(joinTuple, inputs, mode, ctx, typeCtx, dummy, hints);
410449
if (!result) {
411450
return node;
412451
}

ydb/library/yql/dq/opt/dq_opt_join.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#include <ydb/library/yql/dq/common/dq_common.h>
66
#include <ydb/library/yql/core/yql_expr_optimize.h>
7+
#include <ydb/library/yql/core/cbo/cbo_optimizer_new.h>
78

89
namespace NYql {
910

@@ -12,9 +13,9 @@ struct TRelOptimizerNode;
1213

1314
namespace NDq {
1415

15-
NNodes::TExprBase DqRewriteEquiJoin(const NNodes::TExprBase& node, EHashJoinMode mode, bool useCBO, TExprContext& ctx, const TTypeAnnotationContext& typeCtx);
16+
NNodes::TExprBase DqRewriteEquiJoin(const NNodes::TExprBase& node, EHashJoinMode mode, bool useCBO, TExprContext& ctx, const TTypeAnnotationContext& typeCtx, const TOptimizerHints& hints = {});
1617

17-
NNodes::TExprBase DqRewriteEquiJoin(const NNodes::TExprBase& node, EHashJoinMode mode, bool useCBO, TExprContext& ctx, const TTypeAnnotationContext& typeCtx, int& joinCounter);
18+
NNodes::TExprBase DqRewriteEquiJoin(const NNodes::TExprBase& node, EHashJoinMode mode, bool useCBO, TExprContext& ctx, const TTypeAnnotationContext& typeCtx, int& joinCounter, const TOptimizerHints& hints = {});
1819

1920
NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx);
2021

0 commit comments

Comments
 (0)