Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions ydb/core/kqp/opt/kqp_opt_build_txs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
.Add(CreateKqpBuildPhyStagesTransformer(/* allowDependantConsumers */ false, typesCtx, config->BlockChannelsMode), "BuildPhysicalStages")
.Add(CreateKqpBuildWideBlockChannelsTransformer(typesCtx, config->BlockChannelsMode), "BuildWideBlockChannels")
.Add(*BuildTxTransformer, "BuildPhysicalTx")
.Add(CreateKqpTxPeepholeTransformer(TypeAnnTransformer.Get(), typesCtx, config, /* withFinalStageRules */ false), "Peephole")
.Add(CreateKqpTxPeepholeTransformer(TypeAnnTransformer.Get(), typesCtx, config, /* withFinalStageRules */ false, {"KqpPeephole-RewriteCrossJoin"}), "Peephole")
.Build(false);

ScanTxTransformer = TTransformationPipeline(&typesCtx)
Expand All @@ -586,7 +586,7 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
.AddPostTypeAnnotation(/* forSubgraph */ true)
.Add(CreateKqpBuildPhyStagesTransformer(config->SpillingEnabled(), typesCtx, config->BlockChannelsMode), "BuildPhysicalStages")
.Add(*BuildTxTransformer, "BuildPhysicalTx")
.Add(CreateKqpTxPeepholeTransformer(TypeAnnTransformer.Get(), typesCtx, config, /* withFinalStageRules */ false), "Peephole")
.Add(CreateKqpTxPeepholeTransformer(TypeAnnTransformer.Get(), typesCtx, config, /* withFinalStageRules */ false, {"KqpPeephole-RewriteCrossJoin"}), "Peephole")
.Build(false);
}

Expand Down
112 changes: 70 additions & 42 deletions ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ class TxPlanSerializer {

writer.WriteKey("Inputs");
writer.BeginList();

for (const auto& input : op.Inputs) {

if (std::holds_alternative<ui32>(input)) {
Expand Down Expand Up @@ -901,16 +901,11 @@ class TxPlanSerializer {
Y_ENSURE(QueryPlanNodes.contains(stageId));
auto& commonNode = QueryPlanNodes[stageId];

auto& parentNode = GetParent(stageId);
parentNode.Plans.erase(stageId);

auto& cteNode = AddPlanNode(QueryPlanNodes.begin()->second);
cteNode.Plans.insert(stageId);
cteNode.TypeName = TStringBuilder() << commonNode.TypeName;
cteNode.CteName = TStringBuilder() << cteNode.TypeName << "_" << stageId;
if (!commonNode.CteName) {
commonNode.CteName = TStringBuilder() << commonNode.TypeName << "_" << stageId;
}

parentNode.CteRefName = *cteNode.CteName;
planNode.CteRefName = *cteNode.CteName;
planNode.CteRefName = *commonNode.CteName;

return;
}
Expand Down Expand Up @@ -1058,6 +1053,8 @@ class TxPlanSerializer {
operatorId = Visit(maybeDelete.Cast(), planNode);
} else if (auto maybeArg = TMaybeNode<TCoArgument>(node)) {
return {CurrentArgContext.AddArg(node.Get())};
} else if (auto maybeCrossJoin = TMaybeNode<TDqPhyCrossJoin>(node)) {
operatorId = Visit(maybeCrossJoin.Cast(), planNode);
}

TVector<std::variant<ui32, TArgContext>> inputIds;
Expand All @@ -1068,17 +1065,18 @@ class TxPlanSerializer {
}
} else {
if (TMaybeNode<TCoFlatMapBase>(node)) {

auto flatMap = TExprBase(node).Cast<TCoFlatMapBase>();
auto flatMapInputs = Visit(flatMap, planNode);

inputIds.insert(inputIds.end(), flatMapInputs.begin(), flatMapInputs.end());

auto flatMapLambdaInputs = Visit(flatMap.Lambda().Body().Ptr(), planNode);
inputIds.insert(inputIds.end(), flatMapLambdaInputs.begin(), flatMapLambdaInputs.end());
} else if (TMaybeNode<TCoMap>(node)) {
auto map = TExprBase(node).Cast<TCoMap>();
auto mapInputs = Visit(map, planNode);

}
else {
auto mapLambdaInputs = Visit(map.Lambda().Body().Ptr(), planNode);
inputIds.insert(inputIds.end(), mapLambdaInputs.begin(), mapLambdaInputs.end());
} else {
for (const auto& child : node->Children()) {
if(!child->IsLambda()) {
auto ids = Visit(child, planNode);
Expand All @@ -1102,10 +1100,30 @@ class TxPlanSerializer {
return inputIds;
}

TVector<std::variant<ui32, TArgContext>> Visit(const TCoMap& map, TQueryPlanNode& planNode) {
auto mapInputs = Visit(map.Input().Ptr(), planNode);

if (!mapInputs.empty() && map.Lambda().Args().Size() != 0) {
auto input = mapInputs[0];
auto newContext = CurrentArgContext.AddArg(map.Lambda().Args().Arg(0).Ptr().Get());

if (std::holds_alternative<ui32>(input)) {
LambdaInputs[newContext] = std::get<ui32>(input);
} else {
auto context = std::get<TArgContext>(input);
if (LambdaInputs.contains(context)){
LambdaInputs[newContext] = LambdaInputs.at(context);
}
}
}

return mapInputs;
}

TVector<std::variant<ui32, TArgContext>> Visit(const TCoFlatMapBase& flatMap, TQueryPlanNode& planNode) {
auto flatMapInputs = Visit(flatMap.Input().Ptr(), planNode);

if (flatMapInputs.size() >= 1) {
if (!flatMapInputs.empty() && flatMap.Lambda().Args().Size() != 0) {
auto input = flatMapInputs[0];
auto newContext = CurrentArgContext.AddArg(flatMap.Lambda().Args().Arg(0).Ptr().Get());

Expand Down Expand Up @@ -1374,6 +1392,15 @@ class TxPlanSerializer {
return operatorId;
}

std::variant<ui32, TArgContext> Visit(const TDqPhyCrossJoin&, TQueryPlanNode& planNode) {
const auto name = "CrossJoin";

TOperator op;
op.Properties["Name"] = name;

return AddOperator(planNode, name, std::move(op));
}

std::variant<ui32, TArgContext> Visit(const TCoMapJoinCore& join, TQueryPlanNode& planNode) {
const auto name = TStringBuilder() << join.JoinKind().Value() << "Join (MapJoin)";

Expand Down Expand Up @@ -1958,10 +1985,6 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
NJson::TJsonValue result;
result["PlanNodeId"] = currentNodeId;

//if (plan.GetMapSafe().contains("PlanNodeId")) {
// YQL_CLOG(TRACE, CoreDq) << "Recursed into " << plan.GetMapSafe().at("PlanNodeId").GetIntegerSafe() << ", constructed: " << currentNodeId;
//}

if (plan.GetMapSafe().contains("PlanNodeType")) {
result["PlanNodeType"] = plan.GetMapSafe().at("PlanNodeType").GetStringSafe();
}
Expand All @@ -1975,7 +1998,15 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,

result["Node Type"] = plan.GetMapSafe().at("Node Type").GetStringSafe();

if (plan.GetMapSafe().contains("CTE Name")) {
auto precompute = plan.GetMapSafe().at("CTE Name").GetStringSafe();
if (precomputes.contains(precompute)) {
planInputs.AppendValue(ReconstructQueryPlanRec(precomputes.at(precompute), 0, planIndex, precomputes, nodeCounter));
}
}

if (!plan.GetMapSafe().contains("Plans")) {
result["Plans"] = planInputs;
return result;
}

Expand Down Expand Up @@ -2008,11 +2039,8 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
if (!p.GetMapSafe().contains("Operators") && p.GetMapSafe().contains("CTE Name")) {
auto precompute = p.GetMapSafe().at("CTE Name").GetStringSafe();
if (precomputes.contains(precompute)) {
//YQL_CLOG(TRACE, CoreDq) << "Following precompute: " << precompute ;
planInputs.AppendValue(ReconstructQueryPlanRec(precomputes.at(precompute), 0, planIndex, precomputes, nodeCounter));
} //else {
// YQL_CLOG(TRACE, CoreDq) << "Didn't find precompute: " << precompute ;
//}
}
} else if (p.GetMapSafe().at("Node Type").GetStringSafe().find("Precompute") == TString::npos) {
planInputs.AppendValue(ReconstructQueryPlanRec(p, 0, planIndex, precomputes, nodeCounter));
}
Expand All @@ -2024,12 +2052,9 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
if (plan.GetMapSafe().contains("CTE Name") && plan.GetMapSafe().at("Node Type").GetStringSafe() == "ConstantExpr") {
auto precompute = plan.GetMapSafe().at("CTE Name").GetStringSafe();
if (!precomputes.contains(precompute)) {
//YQL_CLOG(TRACE, CoreDq) << "Didn't find precompute: " << precompute ;

result["Node Type"] = plan.GetMapSafe().at("Node Type");
return result;
}
//YQL_CLOG(TRACE, CoreDq) << "Following precompute: " << precompute ;

return ReconstructQueryPlanRec(precomputes.at(precompute), 0, planIndex, precomputes, nodeCounter);
}
Expand All @@ -2041,24 +2066,29 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,

auto opName = op.GetMapSafe().at("Name").GetStringSafe();

THashSet<ui32> processedExternalOperators;
THashSet<ui32> processedInternalOperators;
for (auto opInput : op.GetMapSafe().at("Inputs").GetArraySafe()) {
// Sometimes we have inputs for these operators, don't process them
if (opName == "TablePointLookup") {
break;
}


if (opInput.GetMapSafe().contains("ExternalPlanNodeId")) {
auto inputPlanKey = opInput.GetMapSafe().at("ExternalPlanNodeId").GetIntegerSafe();

if (processedExternalOperators.contains(inputPlanKey)) {
continue;
}
processedExternalOperators.insert(inputPlanKey);

auto inputPlan = planIndex.at(inputPlanKey);
planInputs.push_back( ReconstructQueryPlanRec(inputPlan, 0, planIndex, precomputes, nodeCounter));
} else if (opInput.GetMapSafe().contains("InternalOperatorId")) {
auto inputPlanId = opInput.GetMapSafe().at("InternalOperatorId").GetIntegerSafe();
planInputs.push_back( ReconstructQueryPlanRec(plan, inputPlanId, planIndex, precomputes, nodeCounter));
}

// Sometimes we have multiple inputs for these operators, break after the first one
if (opName == "Filter" || opName == "TopSort" || opName == "Aggregate") {
break;
if (processedInternalOperators.contains(inputPlanId)) {
continue;
}
processedInternalOperators.insert(inputPlanId);

planInputs.push_back( ReconstructQueryPlanRec(plan, inputPlanId, planIndex, precomputes, nodeCounter));
}
}

Expand All @@ -2085,12 +2115,9 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
maybePrecompute = op.GetMapSafe().at("Iterator").GetStringSafe();
}

if (precomputes.contains(maybePrecompute)) {
//YQL_CLOG(TRACE, CoreDq) << "Following precompute: " << maybePrecompute ;
if (precomputes.contains(maybePrecompute) && planInputs.empty()) {
planInputs.push_back(ReconstructQueryPlanRec(precomputes.at(maybePrecompute), 0, planIndex, precomputes, nodeCounter));
} //else {
// YQL_CLOG(TRACE, CoreDq) << "Didn't find precompute: " << maybePrecompute ;
//}
}
}

result["Node Type"] = opName;
Expand Down Expand Up @@ -2205,6 +2232,7 @@ NJson::TJsonValue SimplifyQueryPlan(NJson::TJsonValue& plan) {

int nodeCounter = 0;
plan = ReconstructQueryPlanRec(plan, 0, planIndex, precomputes, nodeCounter);

RemoveRedundantNodes(plan, redundantNodes);
ComputeCpuTimes(plan);
ComputeTotalRows(plan);
Expand Down
52 changes: 36 additions & 16 deletions ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ TStatus ReplaceNonDetFunctionsWithParams(TExprNode::TPtr& input, TExprContext& c

class TKqpPeepholeTransformer : public TOptimizeTransformerBase {
public:
TKqpPeepholeTransformer(TTypeAnnotationContext& typesCtx)
: TOptimizeTransformerBase(&typesCtx, NYql::NLog::EComponent::ProviderKqp, {})
TKqpPeepholeTransformer(TTypeAnnotationContext& typesCtx, TSet<TString> disabledOpts)
: TOptimizeTransformerBase(&typesCtx, NYql::NLog::EComponent::ProviderKqp, disabledOpts)
{
#define HNDL(name) "KqpPeephole-"#name, Hndl(&TKqpPeepholeTransformer::name)
AddHandler(0, &TDqReplicate::Match, HNDL(RewriteReplicate));
Expand Down Expand Up @@ -152,8 +152,12 @@ class TKqpPeepholeTransformer : public TOptimizeTransformerBase {
};

struct TKqpPeepholePipelineConfigurator : IPipelineConfigurator {
TKqpPeepholePipelineConfigurator(TKikimrConfiguration::TPtr config)
TKqpPeepholePipelineConfigurator(
TKikimrConfiguration::TPtr config,
TSet<TString> disabledOpts
)
: Config(config)
, DisabledOpts(disabledOpts)
{}

void AfterCreate(TTransformationPipeline*) const override {
Expand All @@ -163,11 +167,12 @@ struct TKqpPeepholePipelineConfigurator : IPipelineConfigurator {
}

void AfterOptimize(TTransformationPipeline* pipeline) const override {
pipeline->Add(new TKqpPeepholeTransformer(*pipeline->GetTypeAnnotationContext()), "KqpPeephole");
pipeline->Add(new TKqpPeepholeTransformer(*pipeline->GetTypeAnnotationContext(), DisabledOpts), "KqpPeephole");
}

private:
TKikimrConfiguration::TPtr Config;
TSet<TString> DisabledOpts;
};

class TKqpPeepholeFinalTransformer : public TOptimizeTransformerBase {
Expand Down Expand Up @@ -210,9 +215,9 @@ struct TKqpPeepholePipelineFinalConfigurator : IPipelineConfigurator {

TStatus PeepHoleOptimize(const TExprBase& program, TExprNode::TPtr& newProgram, TExprContext& ctx,
IGraphTransformer& typeAnnTransformer, TTypeAnnotationContext& typesCtx, TKikimrConfiguration::TPtr config,
bool allowNonDeterministicFunctions, bool withFinalStageRules)
bool allowNonDeterministicFunctions, bool withFinalStageRules, TSet<TString> disabledOpts)
{
TKqpPeepholePipelineConfigurator kqpPeephole(config);
TKqpPeepholePipelineConfigurator kqpPeephole(config, disabledOpts);
TKqpPeepholePipelineFinalConfigurator kqpPeepholeFinal(config);
TPeepholeSettings peepholeSettings;
peepholeSettings.CommonConfig = &kqpPeephole;
Expand All @@ -237,7 +242,7 @@ TStatus PeepHoleOptimize(const TExprBase& program, TExprNode::TPtr& newProgram,

TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprContext& ctx,
IGraphTransformer& typeAnnTransformer, TTypeAnnotationContext& typesCtx, THashSet<ui64>& optimizedStages,
TKikimrConfiguration::TPtr config, bool withFinalStageRules)
TKikimrConfiguration::TPtr config, bool withFinalStageRules, TSet<TString> disabledOpts)
{
TVector<TDqPhyStage> stages;
stages.reserve(tx.Stages().Size());
Expand Down Expand Up @@ -266,7 +271,7 @@ TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprConte

TExprNode::TPtr newProgram;
auto status = PeepHoleOptimize(program, newProgram, ctx, typeAnnTransformer, typesCtx, config,
allowNonDeterministicFunctions, withFinalStageRules);
allowNonDeterministicFunctions, withFinalStageRules, disabledOpts);
if (status != TStatus::Ok) {
ctx.AddError(TIssue(ctx.GetPosition(stage.Pos()), "Peephole optimization failed for KQP transaction"));
return {};
Expand Down Expand Up @@ -311,12 +316,18 @@ TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprConte

class TKqpTxPeepholeTransformer : public TSyncTransformerBase {
public:
TKqpTxPeepholeTransformer(IGraphTransformer* typeAnnTransformer,
TTypeAnnotationContext& typesCtx, TKikimrConfiguration::TPtr config, bool withFinalStageRules)
TKqpTxPeepholeTransformer(
IGraphTransformer* typeAnnTransformer,
TTypeAnnotationContext& typesCtx,
TKikimrConfiguration::TPtr config,
bool withFinalStageRules,
TSet<TString> disabledOpts
)
: TypeAnnTransformer(typeAnnTransformer)
, TypesCtx(typesCtx)
, Config(config)
, WithFinalStageRules(withFinalStageRules)
, DisabledOpts(disabledOpts)
{}

TStatus DoTransform(TExprNode::TPtr inputExpr, TExprNode::TPtr& outputExpr, TExprContext& ctx) final {
Expand All @@ -334,7 +345,7 @@ class TKqpTxPeepholeTransformer : public TSyncTransformerBase {
auto tx = input.Cast<TKqpPhysicalTx>();

THashSet<ui64> optimizedStages;
auto optimizedTx = PeepholeOptimize(tx, ctx, *TypeAnnTransformer, TypesCtx, optimizedStages, Config, WithFinalStageRules);
auto optimizedTx = PeepholeOptimize(tx, ctx, *TypeAnnTransformer, TypesCtx, optimizedStages, Config, WithFinalStageRules, DisabledOpts);

if (!optimizedTx) {
return TStatus::Error;
Expand All @@ -356,6 +367,7 @@ class TKqpTxPeepholeTransformer : public TSyncTransformerBase {
TKikimrConfiguration::TPtr Config;
bool Optimized = false;
bool WithFinalStageRules = true;
TSet<TString> DisabledOpts;
};

class TKqpTxsPeepholeTransformer : public TSyncTransformerBase {
Expand Down Expand Up @@ -430,14 +442,22 @@ class TKqpTxsPeepholeTransformer : public TSyncTransformerBase {

} // anonymous namespace

TAutoPtr<IGraphTransformer> CreateKqpTxPeepholeTransformer(NYql::IGraphTransformer* typeAnnTransformer,
TTypeAnnotationContext& typesCtx, const TKikimrConfiguration::TPtr& config, bool withFinalStageRules)
TAutoPtr<IGraphTransformer> CreateKqpTxPeepholeTransformer(
NYql::IGraphTransformer* typeAnnTransformer,
TTypeAnnotationContext& typesCtx,
const TKikimrConfiguration::TPtr& config,
bool withFinalStageRules,
TSet<TString> disabledOpts
)
{
return new TKqpTxPeepholeTransformer(typeAnnTransformer, typesCtx, config, withFinalStageRules);
return new TKqpTxPeepholeTransformer(typeAnnTransformer, typesCtx, config, withFinalStageRules, disabledOpts);
}

TAutoPtr<IGraphTransformer> CreateKqpTxsPeepholeTransformer(TAutoPtr<NYql::IGraphTransformer> typeAnnTransformer,
TTypeAnnotationContext& typesCtx, const TKikimrConfiguration::TPtr& config)
TAutoPtr<IGraphTransformer> CreateKqpTxsPeepholeTransformer(
TAutoPtr<NYql::IGraphTransformer> typeAnnTransformer,
TTypeAnnotationContext& typesCtx,
const TKikimrConfiguration::TPtr& config
)
{
return new TKqpTxsPeepholeTransformer(std::move(typeAnnTransformer), typesCtx, config);
}
Expand Down
Loading