Skip to content
1 change: 1 addition & 0 deletions ydb/core/kqp/opt/kqp_opt_build_txs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,7 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
.Add(*TypeAnnTransformer, "TypeAnnotation")
.AddPostTypeAnnotation(/* forSubgraph */ true)
.Add(CreateKqpBuildPhyStagesTransformer(enableSpillingGenericQuery, typesCtx, config->BlockChannelsMode), "BuildPhysicalStages")
// TODO(ilezhankin): "BuildWideBlockChannels" transformer is required only for BLOCK_CHANNELS_FORCE mode.
.Add(CreateKqpBuildWideBlockChannelsTransformer(typesCtx, config->BlockChannelsMode), "BuildWideBlockChannels")
.Add(*BuildTxTransformer, "BuildPhysicalTx")
.Add(CreateKqpTxPeepholeTransformer(
Expand Down
233 changes: 207 additions & 26 deletions ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/kqp/host/kqp_transform.h>
#include <ydb/core/kqp/opt/kqp_opt_impl.h>
#include <ydb/core/protos/table_service_config.pb.h>
#include <ydb/library/naming_conventions/naming_conventions.h>

#include <ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/core/yql_join.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/dq/opt/dq_opt_peephole.h>
#include <ydb/library/yql/dq/type_ann/dq_type_ann.h>
#include <ydb/library/yql/core/services/yql_transform_pipeline.h>
#include <ydb/library/yql/providers/common/transform/yql_optimize.h>

Expand Down Expand Up @@ -153,7 +155,7 @@ class TKqpPeepholeTransformer : public TOptimizeTransformerBase {

struct TKqpPeepholePipelineConfigurator : IPipelineConfigurator {
TKqpPeepholePipelineConfigurator(
TKikimrConfiguration::TPtr config,
TKikimrConfiguration::TPtr config,
TSet<TString> disabledOpts
)
: Config(config)
Expand Down Expand Up @@ -213,6 +215,93 @@ struct TKqpPeepholePipelineFinalConfigurator : IPipelineConfigurator {
const TKikimrConfiguration::TPtr Config;
};

// Sort stages in topological order by their inputs, so that we optimize the ones without inputs first.
TVector<TDqPhyStage> TopSortStages(const TDqPhyStageList& stages) {
TVector<TDqPhyStage> topSortedStages;
topSortedStages.reserve(stages.Size());
std::function<void(const TDqPhyStage&)> topSort;
THashSet<ui64 /*uniqueId*/> visitedStages;

// Assume there is no cycles.
topSort = [&](const TDqPhyStage& stage) {
if (visitedStages.contains(stage.Ref().UniqueId())) {
return;
}

for (const auto& input : stage.Inputs()) {
if (auto connection = input.Maybe<TDqConnection>()) {
// NOTE: somehow `Output()` is actually an input.
if (auto phyStage = connection.Cast().Output().Stage().Maybe<TDqPhyStage>()) {
topSort(phyStage.Cast());
}
}
}

visitedStages.insert(stage.Ref().UniqueId());
topSortedStages.push_back(stage);
};

for (const auto& stage : stages) {
topSort(stage);
}

return topSortedStages;
}

// TODO: copy-paste from https://github.com/ydb-platform/ydb/blob/122f053354c5df4fc559bf06fe0302f92d813032/ydb/library/yql/dq/opt/dq_opt_build.cpp#L444
bool IsCompatibleWithBlocks(TPositionHandle pos, const TStructExprType& type, TExprContext& ctx, TTypeAnnotationContext& typesCtx) {
TVector<const TTypeAnnotationNode*> types;
for (auto& item : type.GetItems()) {
types.emplace_back(item->GetItemType());
}

auto resolveStatus = typesCtx.ArrowResolver->AreTypesSupported(ctx.GetPosition(pos), types, ctx);
YQL_ENSURE(resolveStatus != IArrowResolver::ERROR);
return resolveStatus == IArrowResolver::OK;
}

// TODO: composite copy-paste from https://github.com/ydb-platform/ydb/blob/122f053354c5df4fc559bf06fe0302f92d813032/ydb/library/yql/dq/opt/dq_opt_build.cpp#L388
bool CanPropagateWideBlockThroughChannel(
const TDqOutput& output,
const THashMap<ui64, TKqpProgram>& programs,
const TDqStageSettings& stageSettings,
TExprContext& ctx,
TTypeAnnotationContext& typesCtx)
{
const auto& program = programs.at(output.Stage().Ref().UniqueId());

ui32 index = FromString<ui32>(output.Index().Value());
if (index != 0) {
// stage has multiple outputs
return false;
}

auto outputItemType = program.Lambda().Ref().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType();
if (IsWideBlockType(*outputItemType)) {
// output is already wide block
return false;
}

if (!stageSettings.WideChannels) {
return false;
}

YQL_ENSURE(stageSettings.OutputNarrowType);

if (!IsCompatibleWithBlocks(program.Pos(), *stageSettings.OutputNarrowType, ctx, typesCtx)) {
return false;
}

// Ensure that stage has blocks on top level (i.e. FromFlow(WideFromBlocks(...)))
if (!program.Lambda().Body().Maybe<TCoFromFlow>() ||
!program.Lambda().Body().Cast<TCoFromFlow>().Input().Maybe<TCoWideFromBlocks>())
{
return false;
}

return true;
}

TStatus PeepHoleOptimize(const TExprBase& program, TExprNode::TPtr& newProgram, TExprContext& ctx,
IGraphTransformer& typeAnnTransformer, TTypeAnnotationContext& typesCtx, TKikimrConfiguration::TPtr config,
bool allowNonDeterministicFunctions, bool withFinalStageRules, TSet<TString> disabledOpts)
Expand Down Expand Up @@ -244,30 +333,114 @@ TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprConte
IGraphTransformer& typeAnnTransformer, TTypeAnnotationContext& typesCtx, THashSet<ui64>& optimizedStages,
TKikimrConfiguration::TPtr config, bool withFinalStageRules, TSet<TString> disabledOpts)
{
TVector<TDqPhyStage> stages;
stages.reserve(tx.Stages().Size());
TNodeOnNodeOwnedMap stagesMap;
TVector<TKqpParamBinding> bindings(tx.ParamBindings().begin(), tx.ParamBindings().end());
THashMap<ui64 /*stage uid*/, TKqpProgram> programs;
THashMap<TString, TKqpParamBinding> nonDetParamBindings;

for (const auto& stage : tx.Stages()) {
const auto topSortedStages = TopSortStages(tx.Stages());
for (const auto& stage : topSortedStages) {
YQL_ENSURE(!optimizedStages.contains(stage.Ref().UniqueId()));

TCoLambda lambda = stage.Program();
TVector<TCoArgument> newArgs;
newArgs.reserve(stage.Inputs().Size());

// Propagate "WideFromBlock" through connections.
// TODO(ilezhankin): this peephole optimization should be implemented instead as
// the original whole-graph transformer |CreateDqBuildWideBlockChannelsTransformer|.
if (config->BlockChannelsMode == NKikimrConfig::TTableServiceConfig_EBlockChannelsMode_BLOCK_CHANNELS_AUTO) {
TNodeOnNodeOwnedMap argsMap;

YQL_ENSURE(stage.Inputs().Size() == stage.Program().Args().Size());

for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
auto oldArg = stage.Program().Args().Arg(i);
auto newArg = TCoArgument(ctx.NewArgument(oldArg.Pos(), oldArg.Name()));
newArg.MutableRef().SetTypeAnn(oldArg.Ref().GetTypeAnn());
newArgs.emplace_back(newArg);

if (auto connection = stage.Inputs().Item(i).Maybe<TDqConnection>(); connection &&
CanPropagateWideBlockThroughChannel(connection.Cast().Output(), programs, TDqStageSettings::Parse(stage), ctx, typesCtx))
{
TExprNode::TPtr newArgNode = ctx.Builder(oldArg.Pos())
.Callable("FromFlow")
.Callable(0, "WideFromBlocks")
.Callable(0, "ToFlow")
.Add(0, newArg.Ptr())
.Seal()
.Seal()
.Seal()
.Build();
argsMap.emplace(oldArg.Raw(), newArgNode);

auto stageUid = connection.Cast().Output().Stage().Ref().UniqueId();

// Update input program with: FromFlow(WideFromBlocks($1)) → FromFlow($1)
if (const auto& inputProgram = programs.at(stageUid); inputProgram.Lambda().Body().Maybe<TCoFromFlow>() &&
inputProgram.Lambda().Body().Cast<TCoFromFlow>().Input().Maybe<TCoWideFromBlocks>())
{
auto newBody = Build<TCoFromFlow>(ctx, inputProgram.Lambda().Body().Cast<TCoFromFlow>().Pos())
.Input(inputProgram.Lambda().Body().Cast<TCoFromFlow>().Input().Cast<TCoWideFromBlocks>().Input())
.Done();

auto newInputProgram = Build<TKqpProgram>(ctx, inputProgram.Pos())
.Lambda()
.Args(inputProgram.Lambda().Args())
.Body(newBody)
.Build()
.ArgsType(inputProgram.ArgsType())
.Done();

// Run the peephole optimization on new program again to update type annotations.
// TODO(ilezhankin): refactor to run only the update of type annotations - not the whole optimization.
bool allowNonDeterministicFunctions = !newInputProgram.Lambda().Body().Maybe<TKqpEffects>();
TExprNode::TPtr newInputProgramNode;

auto status = PeepHoleOptimize(newInputProgram, newInputProgramNode, ctx, typeAnnTransformer, typesCtx, config,
allowNonDeterministicFunctions, withFinalStageRules, disabledOpts);
if (status != TStatus::Ok) {
ctx.AddError(TIssue(ctx.GetPosition(stage.Pos()), "Peephole optimization failed for KQP transaction"));
return {};
}

programs.at(stageUid) = TKqpProgram(newInputProgramNode);
}

// Update the type annotation for an argument with return value of the input program.
newArg.MutableRef().SetTypeAnn(programs.at(stageUid).Lambda().Body().Ref().GetTypeAnn());
} else {
argsMap.emplace(oldArg.Raw(), newArg.Ptr());
}
}

// Rebuild lambda with new arguments.
lambda = Build<TCoLambda>(ctx, lambda.Pos())
.Args(newArgs)
.Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), argsMap))
.Done();
} else {
for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
auto oldArg = stage.Program().Args().Arg(i);
auto newArg = TCoArgument(ctx.NewArgument(oldArg.Pos(), oldArg.Name()));
newArg.MutableRef().SetTypeAnn(oldArg.Ref().GetTypeAnn());
newArgs.emplace_back(newArg);
}
}

TVector<const TTypeAnnotationNode*> argTypes;
for (const auto& arg : stage.Program().Args()) {
for (const auto& arg : newArgs) {
YQL_ENSURE(arg.Ref().GetTypeAnn());
argTypes.push_back(arg.Ref().GetTypeAnn());
}

// TODO: get rid of TKqpProgram-callable (YQL-10078)
TNodeOnNodeOwnedMap tmp;
auto program = Build<TKqpProgram>(ctx, stage.Pos())
//.Lambda(ctx.DeepCopy(stage.Program().Ref(), ctx, tmp, true /* internStrings */, false /* copyTypes */))
.Lambda(stage.Program())
.Lambda(lambda)
.ArgsType(ExpandType(stage.Pos(), *ctx.MakeType<TTupleExprType>(argTypes), ctx))
.Done();

bool allowNonDeterministicFunctions = !stage.Program().Body().Maybe<TKqpEffects>();
YQL_ENSURE(programs.emplace(stage.Ref().UniqueId(), program).second);

const bool allowNonDeterministicFunctions = !program.Lambda().Body().Maybe<TKqpEffects>();

TExprNode::TPtr newProgram;
auto status = PeepHoleOptimize(program, newProgram, ctx, typeAnnTransformer, typesCtx, config,
Expand All @@ -287,26 +460,34 @@ TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprConte
}
}

auto newStage = Build<TDqPhyStage>(ctx, stage.Pos())
.Inputs(ctx.ReplaceNodes(stage.Inputs().Ptr(), stagesMap))
.Program(ctx.DeepCopyLambda(TKqpProgram(newProgram).Lambda().Ref()))
.Settings(stage.Settings())
.Outputs(stage.Outputs())
.Done();

stages.emplace_back(newStage);
stagesMap.emplace(stage.Raw(), newStage.Ptr());

optimizedStages.emplace(stage.Ref().UniqueId());
programs.at(stage.Ref().UniqueId()) = TKqpProgram(newProgram);
}

TVector<TKqpParamBinding> bindings(tx.ParamBindings().begin(), tx.ParamBindings().end());

for (const auto& [_, binding] : nonDetParamBindings) {
bindings.emplace_back(std::move(binding));
}

TVector<TDqPhyStage> newStages;
TNodeOnNodeOwnedMap stagesMap;

// Rebuild stages only after all new programs are ready.
for (const auto& stage : topSortedStages) {
auto newStage = Build<TDqPhyStage>(ctx, stage.Pos())
.InitFrom(stage)
.Inputs(ctx.ReplaceNodes(stage.Inputs().Ptr(), stagesMap))
.Program(ctx.DeepCopyLambda(programs.at(stage.Ref().UniqueId()).Lambda().Ref()))
.Done();

newStages.emplace_back(newStage);
stagesMap.emplace(stage.Raw(), newStage.Ptr());
}

return Build<TKqpPhysicalTx>(ctx, tx.Pos())
.Stages()
.Add(stages)
.Add(newStages)
.Build()
.Results(ctx.ReplaceNodes(tx.Results().Ptr(), stagesMap))
.ParamBindings().Add(bindings).Build()
Expand All @@ -318,7 +499,7 @@ class TKqpTxPeepholeTransformer : public TSyncTransformerBase {
public:
TKqpTxPeepholeTransformer(
IGraphTransformer* typeAnnTransformer,
TTypeAnnotationContext& typesCtx,
TTypeAnnotationContext& typesCtx,
TKikimrConfiguration::TPtr config,
bool withFinalStageRules,
TSet<TString> disabledOpts
Expand Down Expand Up @@ -444,8 +625,8 @@ class TKqpTxsPeepholeTransformer : public TSyncTransformerBase {

TAutoPtr<IGraphTransformer> CreateKqpTxPeepholeTransformer(
NYql::IGraphTransformer* typeAnnTransformer,
TTypeAnnotationContext& typesCtx,
const TKikimrConfiguration::TPtr& config,
TTypeAnnotationContext& typesCtx,
const TKikimrConfiguration::TPtr& config,
bool withFinalStageRules,
TSet<TString> disabledOpts
)
Expand All @@ -455,7 +636,7 @@ TAutoPtr<IGraphTransformer> CreateKqpTxPeepholeTransformer(

TAutoPtr<IGraphTransformer> CreateKqpTxsPeepholeTransformer(
TAutoPtr<NYql::IGraphTransformer> typeAnnTransformer,
TTypeAnnotationContext& typesCtx,
TTypeAnnotationContext& typesCtx,
const TKikimrConfiguration::TPtr& config
)
{
Expand Down
Loading