@@ -124,13 +124,15 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
124124 auto keys = GetPartitionKeys (partBy);
125125
126126 auto sinkSettingsBuilder = Build<TExprList>(ctx, target.Pos ());
127- if (partBy)
127+ if (partBy) {
128128 sinkSettingsBuilder.Add (std::move (partBy));
129+ }
129130
130131 auto compression = GetCompression (settings);
131132 const auto & extension = GetExtension (target.Format ().Value (), compression ? compression->Tail ().Content () : " " sv);
132- if (compression)
133+ if (compression) {
133134 sinkSettingsBuilder.Add (std::move (compression));
135+ }
134136
135137 auto sinkOutputSettingsBuilder = Build<TExprList>(ctx, target.Pos ());
136138 if (auto csvDelimiter = GetCsvDelimiter (settings)) {
@@ -199,31 +201,17 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
199201 }
200202 }
201203
202- if (! FindNode (input. Ptr (), [] ( const TExprNode::TPtr& node) { return node-> IsCallable ( TCoDataSource::CallableName ()); } )) {
204+ if (IsDqPureExpr (input)) {
203205 YQL_CLOG (INFO, ProviderS3) << " Rewrite pure S3WriteObject `" << cluster << " `.`" << target.Path ().StringValue () << " ` as stage with sink." ;
204- auto shouldBePassedAsInput = FindNode (input.Ptr (), [] (const TExprNode::TPtr& node) { return node->IsCallable (TDqStage::CallableName ()); });
205-
206- auto stageInputs = Build<TExprList>(ctx, writePos);
207- auto toFlow = Build<TCoToFlow>(ctx, writePos);
208- TVector<TCoArgument> args;
209-
210- if (shouldBePassedAsInput) {
211- auto arg = Build<TCoArgument>(ctx, writePos).Name (" in" ).Done ();
212- stageInputs.Add (input);
213- args.push_back (arg);
214- toFlow.Input (arg);
215- }
216- else {
217- toFlow.Input (input);
218- }
219-
220206 return keys.empty () ?
221207 Build<TDqStage>(ctx, writePos)
222- .Inputs (stageInputs. Done () )
208+ .Inputs (). Build ( )
223209 .Program <TCoLambda>()
224- .Args (args )
210+ .Args ({} )
225211 .Body <TS3SinkOutput>()
226- .Input (toFlow.Done ())
212+ .Input <TCoToFlow>()
213+ .Input (input)
214+ .Build ()
227215 .Format (target.Format ())
228216 .KeyColumns ().Build ()
229217 .Settings (sinkOutputSettingsBuilder.Done ())
@@ -251,10 +239,12 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
251239 .Add <TDqCnHashShuffle>()
252240 .Output <TDqOutput>()
253241 .Stage <TDqStage>()
254- .Inputs (stageInputs. Done () )
242+ .Inputs (). Build ( )
255243 .Program <TCoLambda>()
256- .Args (args)
257- .Body (toFlow.Done ())
244+ .Args ({})
245+ .Body <TCoToFlow>()
246+ .Input (input)
247+ .Build ()
258248 .Build ()
259249 .Settings ().Build ()
260250 .Build ()
@@ -317,23 +307,26 @@ class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
317307 .Build ()
318308 .Done ();
319309
320- auto outputsBuilder = Build<TDqStageOutputsList>(ctx, target.Pos ());
321- if (inputStage.Outputs () && keys.empty ()) {
322- outputsBuilder.InitFrom (inputStage.Outputs ().Cast ());
323- }
324- outputsBuilder.Add (sink);
310+ auto outputsBuilder = Build<TDqStageOutputsList>(ctx, target.Pos ())
311+ .Add (sink);
325312
326313 if (keys.empty ()) {
327- const auto outputBuilder = Build<TS3SinkOutput>(ctx, target.Pos ())
328- .Input (inputStage.Program ().Body ().Ptr ())
329- .Format (target.Format ())
330- .KeyColumns ().Add (std::move (keys)).Build ()
331- .Settings (sinkOutputSettingsBuilder.Done ())
332- .Done ();
333-
334314 return Build<TDqStage>(ctx, writePos)
335- .InitFrom (inputStage)
336- .Program (ctx.DeepCopyLambda (inputStage.Program ().Ref (), outputBuilder.Ptr ()))
315+ .Inputs ()
316+ .Add <TDqCnMap>()
317+ .Output (dqUnion.Output ())
318+ .Build ()
319+ .Build ()
320+ .Program <TCoLambda>()
321+ .Args ({" in" })
322+ .Body <TS3SinkOutput>()
323+ .Input (" in" )
324+ .Format (target.Format ())
325+ .KeyColumns ().Add (std::move (keys)).Build ()
326+ .Settings (sinkOutputSettingsBuilder.Done ())
327+ .Build ()
328+ .Build ()
329+ .Settings ().Build ()
337330 .Outputs (outputsBuilder.Done ())
338331 .Done ();
339332 } else {
0 commit comments