Skip to content

Commit bb73563

Browse files
pashandor789Pavel Ivanov
andauthored
[] Removed cross join transform before saving explain plan (#4384)
Co-authored-by: Pavel Ivanov <pudge1000-7@mr-nvme-testing-11.search.yandex.net>
1 parent 6569863 commit bb73563

File tree

2,258 files changed

+77380
-2169
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

2,258 files changed

+77380
-2169
lines changed

ydb/core/kqp/opt/kqp_opt_build_txs.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,7 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
576576
.Add(CreateKqpBuildPhyStagesTransformer(/* allowDependantConsumers */ false, typesCtx, config->BlockChannelsMode), "BuildPhysicalStages")
577577
.Add(CreateKqpBuildWideBlockChannelsTransformer(typesCtx, config->BlockChannelsMode), "BuildWideBlockChannels")
578578
.Add(*BuildTxTransformer, "BuildPhysicalTx")
579-
.Add(CreateKqpTxPeepholeTransformer(TypeAnnTransformer.Get(), typesCtx, config, /* withFinalStageRules */ false), "Peephole")
579+
.Add(CreateKqpTxPeepholeTransformer(TypeAnnTransformer.Get(), typesCtx, config, /* withFinalStageRules */ false, {"KqpPeephole-RewriteCrossJoin"}), "Peephole")
580580
.Build(false);
581581

582582
ScanTxTransformer = TTransformationPipeline(&typesCtx)
@@ -586,7 +586,7 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
586586
.AddPostTypeAnnotation(/* forSubgraph */ true)
587587
.Add(CreateKqpBuildPhyStagesTransformer(config->SpillingEnabled(), typesCtx, config->BlockChannelsMode), "BuildPhysicalStages")
588588
.Add(*BuildTxTransformer, "BuildPhysicalTx")
589-
.Add(CreateKqpTxPeepholeTransformer(TypeAnnTransformer.Get(), typesCtx, config, /* withFinalStageRules */ false), "Peephole")
589+
.Add(CreateKqpTxPeepholeTransformer(TypeAnnTransformer.Get(), typesCtx, config, /* withFinalStageRules */ false, {"KqpPeephole-RewriteCrossJoin"}), "Peephole")
590590
.Build(false);
591591
}
592592

ydb/core/kqp/opt/kqp_query_plan.cpp

Lines changed: 70 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ class TxPlanSerializer {
344344

345345
writer.WriteKey("Inputs");
346346
writer.BeginList();
347-
347+
348348
for (const auto& input : op.Inputs) {
349349

350350
if (std::holds_alternative<ui32>(input)) {
@@ -901,16 +901,11 @@ class TxPlanSerializer {
901901
Y_ENSURE(QueryPlanNodes.contains(stageId));
902902
auto& commonNode = QueryPlanNodes[stageId];
903903

904-
auto& parentNode = GetParent(stageId);
905-
parentNode.Plans.erase(stageId);
906-
907-
auto& cteNode = AddPlanNode(QueryPlanNodes.begin()->second);
908-
cteNode.Plans.insert(stageId);
909-
cteNode.TypeName = TStringBuilder() << commonNode.TypeName;
910-
cteNode.CteName = TStringBuilder() << cteNode.TypeName << "_" << stageId;
904+
if (!commonNode.CteName) {
905+
commonNode.CteName = TStringBuilder() << commonNode.TypeName << "_" << stageId;
906+
}
911907

912-
parentNode.CteRefName = *cteNode.CteName;
913-
planNode.CteRefName = *cteNode.CteName;
908+
planNode.CteRefName = *commonNode.CteName;
914909

915910
return;
916911
}
@@ -1058,6 +1053,8 @@ class TxPlanSerializer {
10581053
operatorId = Visit(maybeDelete.Cast(), planNode);
10591054
} else if (auto maybeArg = TMaybeNode<TCoArgument>(node)) {
10601055
return {CurrentArgContext.AddArg(node.Get())};
1056+
} else if (auto maybeCrossJoin = TMaybeNode<TDqPhyCrossJoin>(node)) {
1057+
operatorId = Visit(maybeCrossJoin.Cast(), planNode);
10611058
}
10621059

10631060
TVector<std::variant<ui32, TArgContext>> inputIds;
@@ -1068,17 +1065,18 @@ class TxPlanSerializer {
10681065
}
10691066
} else {
10701067
if (TMaybeNode<TCoFlatMapBase>(node)) {
1071-
10721068
auto flatMap = TExprBase(node).Cast<TCoFlatMapBase>();
10731069
auto flatMapInputs = Visit(flatMap, planNode);
10741070

1075-
inputIds.insert(inputIds.end(), flatMapInputs.begin(), flatMapInputs.end());
1076-
10771071
auto flatMapLambdaInputs = Visit(flatMap.Lambda().Body().Ptr(), planNode);
10781072
inputIds.insert(inputIds.end(), flatMapLambdaInputs.begin(), flatMapLambdaInputs.end());
1073+
} else if (TMaybeNode<TCoMap>(node)) {
1074+
auto map = TExprBase(node).Cast<TCoMap>();
1075+
auto mapInputs = Visit(map, planNode);
10791076

1080-
}
1081-
else {
1077+
auto mapLambdaInputs = Visit(map.Lambda().Body().Ptr(), planNode);
1078+
inputIds.insert(inputIds.end(), mapLambdaInputs.begin(), mapLambdaInputs.end());
1079+
} else {
10821080
for (const auto& child : node->Children()) {
10831081
if(!child->IsLambda()) {
10841082
auto ids = Visit(child, planNode);
@@ -1102,10 +1100,30 @@ class TxPlanSerializer {
11021100
return inputIds;
11031101
}
11041102

1103+
TVector<std::variant<ui32, TArgContext>> Visit(const TCoMap& map, TQueryPlanNode& planNode) {
1104+
auto mapInputs = Visit(map.Input().Ptr(), planNode);
1105+
1106+
if (!mapInputs.empty() && map.Lambda().Args().Size() != 0) {
1107+
auto input = mapInputs[0];
1108+
auto newContext = CurrentArgContext.AddArg(map.Lambda().Args().Arg(0).Ptr().Get());
1109+
1110+
if (std::holds_alternative<ui32>(input)) {
1111+
LambdaInputs[newContext] = std::get<ui32>(input);
1112+
} else {
1113+
auto context = std::get<TArgContext>(input);
1114+
if (LambdaInputs.contains(context)){
1115+
LambdaInputs[newContext] = LambdaInputs.at(context);
1116+
}
1117+
}
1118+
}
1119+
1120+
return mapInputs;
1121+
}
1122+
11051123
TVector<std::variant<ui32, TArgContext>> Visit(const TCoFlatMapBase& flatMap, TQueryPlanNode& planNode) {
11061124
auto flatMapInputs = Visit(flatMap.Input().Ptr(), planNode);
11071125

1108-
if (flatMapInputs.size() >= 1) {
1126+
if (!flatMapInputs.empty() && flatMap.Lambda().Args().Size() != 0) {
11091127
auto input = flatMapInputs[0];
11101128
auto newContext = CurrentArgContext.AddArg(flatMap.Lambda().Args().Arg(0).Ptr().Get());
11111129

@@ -1374,6 +1392,15 @@ class TxPlanSerializer {
13741392
return operatorId;
13751393
}
13761394

1395+
std::variant<ui32, TArgContext> Visit(const TDqPhyCrossJoin&, TQueryPlanNode& planNode) {
1396+
const auto name = "CrossJoin";
1397+
1398+
TOperator op;
1399+
op.Properties["Name"] = name;
1400+
1401+
return AddOperator(planNode, name, std::move(op));
1402+
}
1403+
13771404
std::variant<ui32, TArgContext> Visit(const TCoMapJoinCore& join, TQueryPlanNode& planNode) {
13781405
const auto name = TStringBuilder() << join.JoinKind().Value() << "Join (MapJoin)";
13791406

@@ -1958,10 +1985,6 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
19581985
NJson::TJsonValue result;
19591986
result["PlanNodeId"] = currentNodeId;
19601987

1961-
//if (plan.GetMapSafe().contains("PlanNodeId")) {
1962-
// YQL_CLOG(TRACE, CoreDq) << "Recursed into " << plan.GetMapSafe().at("PlanNodeId").GetIntegerSafe() << ", constructed: " << currentNodeId;
1963-
//}
1964-
19651988
if (plan.GetMapSafe().contains("PlanNodeType")) {
19661989
result["PlanNodeType"] = plan.GetMapSafe().at("PlanNodeType").GetStringSafe();
19671990
}
@@ -1975,7 +1998,15 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
19751998

19761999
result["Node Type"] = plan.GetMapSafe().at("Node Type").GetStringSafe();
19772000

2001+
if (plan.GetMapSafe().contains("CTE Name")) {
2002+
auto precompute = plan.GetMapSafe().at("CTE Name").GetStringSafe();
2003+
if (precomputes.contains(precompute)) {
2004+
planInputs.AppendValue(ReconstructQueryPlanRec(precomputes.at(precompute), 0, planIndex, precomputes, nodeCounter));
2005+
}
2006+
}
2007+
19782008
if (!plan.GetMapSafe().contains("Plans")) {
2009+
result["Plans"] = planInputs;
19792010
return result;
19802011
}
19812012

@@ -2008,11 +2039,8 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
20082039
if (!p.GetMapSafe().contains("Operators") && p.GetMapSafe().contains("CTE Name")) {
20092040
auto precompute = p.GetMapSafe().at("CTE Name").GetStringSafe();
20102041
if (precomputes.contains(precompute)) {
2011-
//YQL_CLOG(TRACE, CoreDq) << "Following precompute: " << precompute ;
20122042
planInputs.AppendValue(ReconstructQueryPlanRec(precomputes.at(precompute), 0, planIndex, precomputes, nodeCounter));
2013-
} //else {
2014-
// YQL_CLOG(TRACE, CoreDq) << "Didn't find precompute: " << precompute ;
2015-
//}
2043+
}
20162044
} else if (p.GetMapSafe().at("Node Type").GetStringSafe().find("Precompute") == TString::npos) {
20172045
planInputs.AppendValue(ReconstructQueryPlanRec(p, 0, planIndex, precomputes, nodeCounter));
20182046
}
@@ -2024,12 +2052,9 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
20242052
if (plan.GetMapSafe().contains("CTE Name") && plan.GetMapSafe().at("Node Type").GetStringSafe() == "ConstantExpr") {
20252053
auto precompute = plan.GetMapSafe().at("CTE Name").GetStringSafe();
20262054
if (!precomputes.contains(precompute)) {
2027-
//YQL_CLOG(TRACE, CoreDq) << "Didn't find precompute: " << precompute ;
2028-
20292055
result["Node Type"] = plan.GetMapSafe().at("Node Type");
20302056
return result;
20312057
}
2032-
//YQL_CLOG(TRACE, CoreDq) << "Following precompute: " << precompute ;
20332058

20342059
return ReconstructQueryPlanRec(precomputes.at(precompute), 0, planIndex, precomputes, nodeCounter);
20352060
}
@@ -2041,24 +2066,29 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
20412066

20422067
auto opName = op.GetMapSafe().at("Name").GetStringSafe();
20432068

2069+
THashSet<ui32> processedExternalOperators;
2070+
THashSet<ui32> processedInternalOperators;
20442071
for (auto opInput : op.GetMapSafe().at("Inputs").GetArraySafe()) {
2045-
// Sometimes we have inputs for these operators, don't process them
2046-
if (opName == "TablePointLookup") {
2047-
break;
2048-
}
2049-
2072+
20502073
if (opInput.GetMapSafe().contains("ExternalPlanNodeId")) {
20512074
auto inputPlanKey = opInput.GetMapSafe().at("ExternalPlanNodeId").GetIntegerSafe();
2075+
2076+
if (processedExternalOperators.contains(inputPlanKey)) {
2077+
continue;
2078+
}
2079+
processedExternalOperators.insert(inputPlanKey);
2080+
20522081
auto inputPlan = planIndex.at(inputPlanKey);
20532082
planInputs.push_back( ReconstructQueryPlanRec(inputPlan, 0, planIndex, precomputes, nodeCounter));
20542083
} else if (opInput.GetMapSafe().contains("InternalOperatorId")) {
20552084
auto inputPlanId = opInput.GetMapSafe().at("InternalOperatorId").GetIntegerSafe();
2056-
planInputs.push_back( ReconstructQueryPlanRec(plan, inputPlanId, planIndex, precomputes, nodeCounter));
2057-
}
20582085

2059-
// Sometimes we have multiple inputs for these operators, break after the first one
2060-
if (opName == "Filter" || opName == "TopSort" || opName == "Aggregate") {
2061-
break;
2086+
if (processedInternalOperators.contains(inputPlanId)) {
2087+
continue;
2088+
}
2089+
processedInternalOperators.insert(inputPlanId);
2090+
2091+
planInputs.push_back( ReconstructQueryPlanRec(plan, inputPlanId, planIndex, precomputes, nodeCounter));
20622092
}
20632093
}
20642094

@@ -2085,12 +2115,9 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
20852115
maybePrecompute = op.GetMapSafe().at("Iterator").GetStringSafe();
20862116
}
20872117

2088-
if (precomputes.contains(maybePrecompute)) {
2089-
//YQL_CLOG(TRACE, CoreDq) << "Following precompute: " << maybePrecompute ;
2118+
if (precomputes.contains(maybePrecompute) && planInputs.empty()) {
20902119
planInputs.push_back(ReconstructQueryPlanRec(precomputes.at(maybePrecompute), 0, planIndex, precomputes, nodeCounter));
2091-
} //else {
2092-
// YQL_CLOG(TRACE, CoreDq) << "Didn't find precompute: " << maybePrecompute ;
2093-
//}
2120+
}
20942121
}
20952122

20962123
result["Node Type"] = opName;
@@ -2205,6 +2232,7 @@ NJson::TJsonValue SimplifyQueryPlan(NJson::TJsonValue& plan) {
22052232

22062233
int nodeCounter = 0;
22072234
plan = ReconstructQueryPlanRec(plan, 0, planIndex, precomputes, nodeCounter);
2235+
22082236
RemoveRedundantNodes(plan, redundantNodes);
22092237
ComputeCpuTimes(plan);
22102238
ComputeTotalRows(plan);

ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ TStatus ReplaceNonDetFunctionsWithParams(TExprNode::TPtr& input, TExprContext& c
8686

8787
class TKqpPeepholeTransformer : public TOptimizeTransformerBase {
8888
public:
89-
TKqpPeepholeTransformer(TTypeAnnotationContext& typesCtx)
90-
: TOptimizeTransformerBase(&typesCtx, NYql::NLog::EComponent::ProviderKqp, {})
89+
TKqpPeepholeTransformer(TTypeAnnotationContext& typesCtx, TSet<TString> disabledOpts)
90+
: TOptimizeTransformerBase(&typesCtx, NYql::NLog::EComponent::ProviderKqp, disabledOpts)
9191
{
9292
#define HNDL(name) "KqpPeephole-"#name, Hndl(&TKqpPeepholeTransformer::name)
9393
AddHandler(0, &TDqReplicate::Match, HNDL(RewriteReplicate));
@@ -152,8 +152,12 @@ class TKqpPeepholeTransformer : public TOptimizeTransformerBase {
152152
};
153153

154154
struct TKqpPeepholePipelineConfigurator : IPipelineConfigurator {
155-
TKqpPeepholePipelineConfigurator(TKikimrConfiguration::TPtr config)
155+
TKqpPeepholePipelineConfigurator(
156+
TKikimrConfiguration::TPtr config,
157+
TSet<TString> disabledOpts
158+
)
156159
: Config(config)
160+
, DisabledOpts(disabledOpts)
157161
{}
158162

159163
void AfterCreate(TTransformationPipeline*) const override {
@@ -163,11 +167,12 @@ struct TKqpPeepholePipelineConfigurator : IPipelineConfigurator {
163167
}
164168

165169
void AfterOptimize(TTransformationPipeline* pipeline) const override {
166-
pipeline->Add(new TKqpPeepholeTransformer(*pipeline->GetTypeAnnotationContext()), "KqpPeephole");
170+
pipeline->Add(new TKqpPeepholeTransformer(*pipeline->GetTypeAnnotationContext(), DisabledOpts), "KqpPeephole");
167171
}
168172

169173
private:
170174
TKikimrConfiguration::TPtr Config;
175+
TSet<TString> DisabledOpts;
171176
};
172177

173178
class TKqpPeepholeFinalTransformer : public TOptimizeTransformerBase {
@@ -210,9 +215,9 @@ struct TKqpPeepholePipelineFinalConfigurator : IPipelineConfigurator {
210215

211216
TStatus PeepHoleOptimize(const TExprBase& program, TExprNode::TPtr& newProgram, TExprContext& ctx,
212217
IGraphTransformer& typeAnnTransformer, TTypeAnnotationContext& typesCtx, TKikimrConfiguration::TPtr config,
213-
bool allowNonDeterministicFunctions, bool withFinalStageRules)
218+
bool allowNonDeterministicFunctions, bool withFinalStageRules, TSet<TString> disabledOpts)
214219
{
215-
TKqpPeepholePipelineConfigurator kqpPeephole(config);
220+
TKqpPeepholePipelineConfigurator kqpPeephole(config, disabledOpts);
216221
TKqpPeepholePipelineFinalConfigurator kqpPeepholeFinal(config);
217222
TPeepholeSettings peepholeSettings;
218223
peepholeSettings.CommonConfig = &kqpPeephole;
@@ -237,7 +242,7 @@ TStatus PeepHoleOptimize(const TExprBase& program, TExprNode::TPtr& newProgram,
237242

238243
TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprContext& ctx,
239244
IGraphTransformer& typeAnnTransformer, TTypeAnnotationContext& typesCtx, THashSet<ui64>& optimizedStages,
240-
TKikimrConfiguration::TPtr config, bool withFinalStageRules)
245+
TKikimrConfiguration::TPtr config, bool withFinalStageRules, TSet<TString> disabledOpts)
241246
{
242247
TVector<TDqPhyStage> stages;
243248
stages.reserve(tx.Stages().Size());
@@ -266,7 +271,7 @@ TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprConte
266271

267272
TExprNode::TPtr newProgram;
268273
auto status = PeepHoleOptimize(program, newProgram, ctx, typeAnnTransformer, typesCtx, config,
269-
allowNonDeterministicFunctions, withFinalStageRules);
274+
allowNonDeterministicFunctions, withFinalStageRules, disabledOpts);
270275
if (status != TStatus::Ok) {
271276
ctx.AddError(TIssue(ctx.GetPosition(stage.Pos()), "Peephole optimization failed for KQP transaction"));
272277
return {};
@@ -311,12 +316,18 @@ TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprConte
311316

312317
class TKqpTxPeepholeTransformer : public TSyncTransformerBase {
313318
public:
314-
TKqpTxPeepholeTransformer(IGraphTransformer* typeAnnTransformer,
315-
TTypeAnnotationContext& typesCtx, TKikimrConfiguration::TPtr config, bool withFinalStageRules)
319+
TKqpTxPeepholeTransformer(
320+
IGraphTransformer* typeAnnTransformer,
321+
TTypeAnnotationContext& typesCtx,
322+
TKikimrConfiguration::TPtr config,
323+
bool withFinalStageRules,
324+
TSet<TString> disabledOpts
325+
)
316326
: TypeAnnTransformer(typeAnnTransformer)
317327
, TypesCtx(typesCtx)
318328
, Config(config)
319329
, WithFinalStageRules(withFinalStageRules)
330+
, DisabledOpts(disabledOpts)
320331
{}
321332

322333
TStatus DoTransform(TExprNode::TPtr inputExpr, TExprNode::TPtr& outputExpr, TExprContext& ctx) final {
@@ -334,7 +345,7 @@ class TKqpTxPeepholeTransformer : public TSyncTransformerBase {
334345
auto tx = input.Cast<TKqpPhysicalTx>();
335346

336347
THashSet<ui64> optimizedStages;
337-
auto optimizedTx = PeepholeOptimize(tx, ctx, *TypeAnnTransformer, TypesCtx, optimizedStages, Config, WithFinalStageRules);
348+
auto optimizedTx = PeepholeOptimize(tx, ctx, *TypeAnnTransformer, TypesCtx, optimizedStages, Config, WithFinalStageRules, DisabledOpts);
338349

339350
if (!optimizedTx) {
340351
return TStatus::Error;
@@ -356,6 +367,7 @@ class TKqpTxPeepholeTransformer : public TSyncTransformerBase {
356367
TKikimrConfiguration::TPtr Config;
357368
bool Optimized = false;
358369
bool WithFinalStageRules = true;
370+
TSet<TString> DisabledOpts;
359371
};
360372

361373
class TKqpTxsPeepholeTransformer : public TSyncTransformerBase {
@@ -430,14 +442,22 @@ class TKqpTxsPeepholeTransformer : public TSyncTransformerBase {
430442

431443
} // anonymous namespace
432444

433-
TAutoPtr<IGraphTransformer> CreateKqpTxPeepholeTransformer(NYql::IGraphTransformer* typeAnnTransformer,
434-
TTypeAnnotationContext& typesCtx, const TKikimrConfiguration::TPtr& config, bool withFinalStageRules)
445+
TAutoPtr<IGraphTransformer> CreateKqpTxPeepholeTransformer(
446+
NYql::IGraphTransformer* typeAnnTransformer,
447+
TTypeAnnotationContext& typesCtx,
448+
const TKikimrConfiguration::TPtr& config,
449+
bool withFinalStageRules,
450+
TSet<TString> disabledOpts
451+
)
435452
{
436-
return new TKqpTxPeepholeTransformer(typeAnnTransformer, typesCtx, config, withFinalStageRules);
453+
return new TKqpTxPeepholeTransformer(typeAnnTransformer, typesCtx, config, withFinalStageRules, disabledOpts);
437454
}
438455

439-
TAutoPtr<IGraphTransformer> CreateKqpTxsPeepholeTransformer(TAutoPtr<NYql::IGraphTransformer> typeAnnTransformer,
440-
TTypeAnnotationContext& typesCtx, const TKikimrConfiguration::TPtr& config)
456+
TAutoPtr<IGraphTransformer> CreateKqpTxsPeepholeTransformer(
457+
TAutoPtr<NYql::IGraphTransformer> typeAnnTransformer,
458+
TTypeAnnotationContext& typesCtx,
459+
const TKikimrConfiguration::TPtr& config
460+
)
441461
{
442462
return new TKqpTxsPeepholeTransformer(std::move(typeAnnTransformer), typesCtx, config);
443463
}

0 commit comments

Comments
 (0)