@@ -222,6 +222,115 @@ TStatus KqpDuplicateResults(const TExprNode::TPtr& input, TExprNode::TPtr& outpu
222222 return TStatus::Ok;
223223}
224224
225+ template <typename TExpr>
226+ TVector<TExpr> CollectNodes (const TExprNode::TPtr& input) {
227+ TVector<TExpr> result;
228+
229+ VisitExpr (input, [&result](const TExprNode::TPtr& node) {
230+ if (TExpr::Match (node.Get ())) {
231+ result.emplace_back (TExpr (node));
232+ }
233+ return true ;
234+ });
235+
236+ return result;
237+ }
238+
239+ bool FindPrecomputedOutputs (TDqStageBase stage, const TParentsMap& parentsMap) {
240+ auto outIt = parentsMap.find (stage.Raw ());
241+ if (outIt == parentsMap.end ()) {
242+ return false ;
243+ }
244+
245+ for (auto & output : outIt->second ) {
246+ if (TDqOutput::Match (output)) {
247+ auto connIt = parentsMap.find (output);
248+ if (connIt != parentsMap.end ()) {
249+ for (auto maybeConn : connIt->second ) {
250+ auto parentIt = parentsMap.find (maybeConn);
251+ if (parentIt != parentsMap.end ()) {
252+ for (auto & parent : parentIt->second ) {
253+ if (TDqPrecompute::Match (parent) || TDqPhyPrecompute::Match (parent)) {
254+ return true ;
255+ }
256+ }
257+ }
258+ }
259+ }
260+ }
261+ }
262+
263+ return false ;
264+ }
265+
266+
267+ TExprBase ReplicatePrecompute (TDqStageBase stage, TExprContext& ctx, const TParentsMap& parentsMap) {
268+ for (size_t i = 0 ; i < stage.Inputs ().Size (); ++i) {
269+ auto input = stage.Inputs ().Item (i);
270+ if (auto maybeConn = stage.Inputs ().Item (i).Maybe <TDqConnection>()) {
271+ auto conn = maybeConn.Cast ();
272+ if (conn.Maybe <TDqCnValue>() || conn.Maybe <TDqCnUnionAll>()) {
273+ {
274+ auto sourceStage = conn.Output ().Stage ();
275+ if (!sourceStage.Program ().Body ().Maybe <TDqReplicate>()) {
276+ continue ;
277+ }
278+
279+ if (!FindPrecomputedOutputs (sourceStage, parentsMap)) {
280+ continue ;
281+ }
282+ }
283+
284+ auto arg = stage.Program ().Args ().Arg (i);
285+ auto newArg = Build<TCoArgument>(ctx, stage.Program ().Args ().Arg (i).Pos ())
286+ .Name (" _replaced_arg" )
287+ .Done ();
288+
289+ TVector<TCoArgument> newArgs;
290+ TNodeOnNodeOwnedMap programReplaces;
291+ for (size_t j = 0 ; j < stage.Program ().Args ().Size (); ++j) {
292+ auto oldArg = stage.Program ().Args ().Arg (j);
293+ newArgs.push_back (Build<TCoArgument>(ctx, stage.Program ().Args ().Arg (i).Pos ())
294+ .Name (" _replaced_arg_" + ToString (j))
295+ .Done ());
296+ if (i == j) {
297+ programReplaces[oldArg.Raw ()] = Build<TCoToFlow>(ctx, oldArg.Pos ()).Input (newArgs.back ()).Done ().Ptr ();
298+ } else {
299+ programReplaces[oldArg.Raw ()] = newArgs.back ().Ptr ();
300+ }
301+ }
302+
303+ return
304+ Build<TDqStage>(ctx, stage.Pos ())
305+ .Inputs (ctx.ReplaceNode (stage.Inputs ().Ptr (), input.Ref (), Build<TDqPhyPrecompute>(ctx, input.Pos ()).Connection (conn).Done ().Ptr ()))
306+ .Outputs (stage.Outputs ())
307+ .Settings (stage.Settings ())
308+ .Program ()
309+ .Args (newArgs)
310+ .Body (TExprBase (ctx.ReplaceNodes (stage.Program ().Body ().Ptr (), programReplaces)))
311+ .Build ()
312+ .Done ();
313+ }
314+ }
315+ }
316+ return stage;
317+ }
318+
319+ NYql::IGraphTransformer::TStatus ReplicatePrecomputeRule (const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
320+ TParentsMap parents;
321+ GatherParents (*input, parents, true );
322+ auto stages = CollectNodes<TDqStageBase>(input);
323+ for (auto & stage : stages) {
324+ auto applied = ReplicatePrecompute (stage, ctx, parents);
325+ if (applied.Raw () != stage.Raw ()) {
326+ output = ctx.ReplaceNode (input.Get (), stage.Ref (), applied.Ptr ());
327+ return TStatus::Repeat;
328+ }
329+ }
330+ output = input;
331+ return TStatus::Ok;
332+ }
333+
225334template <typename TFunctor>
226335NYql::IGraphTransformer::TStatus PerformGlobalRule (const TString& ruleName, const NYql::TExprNode::TPtr& input,
227336 NYql::TExprNode::TPtr& output, NYql::TExprContext& ctx, TFunctor func)
@@ -251,6 +360,8 @@ TAutoPtr<IGraphTransformer> CreateKqpFinalizingOptTransformer(const TIntrusivePt
251360 [kqpCtx](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> TStatus {
252361 output = input;
253362
363+ PERFORM_GLOBAL_RULE (" ReplicatePrecompute" , input, output, ctx, ReplicatePrecomputeRule);
364+
254365 PERFORM_GLOBAL_RULE (" ReplicateMultiUsedConnection" , input, output, ctx,
255366 [](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
256367 YQL_ENSURE (TKqlQuery::Match (input.Get ()));
0 commit comments