@@ -100,7 +100,14 @@ class TSoPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
100100 YQL_CLOG (INFO, ProviderSolomon) << " Optimize SoWriteToShard" ;
101101
102102 const auto solomonCluster = TString (write.DataSink ().Cluster ().Value ());
103- auto shard = BuildSolomonShard (write.Shard ().Cast <TCoAtom>(), ctx, solomonCluster);
103+ auto * typeAnn = write.Input ().Ref ().GetTypeAnn ();
104+ const TTypeAnnotationNode* inputItemType = nullptr ;
105+ if (!EnsureNewSeqType<false , true , false >(write.Input ().Pos (), *typeAnn, ctx, &inputItemType)) {
106+ return {};
107+ }
108+
109+ auto rowTypeNode = ExpandType (write.Pos (), *inputItemType, ctx);
110+ auto shard = BuildSolomonShard (write.Shard ().Cast <TCoAtom>(), TExprBase (rowTypeNode), ctx, solomonCluster);
104111
105112 auto dqSink = Build<TDqSink>(ctx, write.Pos ())
106113 .DataSink (write.DataSink ())
@@ -131,7 +138,7 @@ class TSoPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
131138 }
132139
133140private:
134- TCallable BuildSolomonShard (TCoAtom shardNode, TExprContext& ctx, TString solomonCluster) const {
141+ TCallable BuildSolomonShard (TCoAtom shardNode, TExprBase rowType, TExprContext& ctx, TString solomonCluster) const {
135142 const auto * clusterDesc = State_->Configuration ->ClusterConfigs .FindPtr (solomonCluster);
136143 YQL_ENSURE (clusterDesc, " Unknown cluster " << solomonCluster);
137144
@@ -148,31 +155,14 @@ class TSoPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
148155 }
149156 YQL_ENSURE (!cluster.empty (), " Cluster is not defined. You can define it inside connection, or inside query." );
150157
151- auto solomonClusterAtom = Build<TCoAtom>(ctx, shardNode.Pos ())
152- .Value (solomonCluster)
153- .Done ();
154-
155- auto projectAtom = Build<TCoAtom>(ctx, shardNode.Pos ())
156- .Value (project)
157- .Done ();
158-
159- auto clusterAtom = Build<TCoAtom>(ctx, shardNode.Pos ())
160- .Value (cluster)
158+ return Build<TSoShard>(ctx, shardNode.Pos ())
159+ .SolomonCluster <TCoAtom>().Value (solomonCluster).Build ()
160+ .Project <TCoAtom>().Value (project).Build ()
161+ .Cluster <TCoAtom>().Value (cluster).Build ()
162+ .Service <TCoAtom>().Value (service).Build ()
163+ .RowType (rowType)
164+ .Token <TCoSecureParam>().Name ().Build (" cluster:default_" + solomonCluster).Build ()
161165 .Done ();
162-
163- auto serviceAtom = Build<TCoAtom>(ctx, shardNode.Pos ())
164- .Value (service)
165- .Done ();
166-
167- auto dqSoShardBuilder = Build<TSoShard>(ctx, shardNode.Pos ());
168- dqSoShardBuilder.SolomonCluster (solomonClusterAtom);
169- dqSoShardBuilder.Project (projectAtom);
170- dqSoShardBuilder.Cluster (clusterAtom);
171- dqSoShardBuilder.Service (serviceAtom);
172-
173- dqSoShardBuilder.Token <TCoSecureParam>().Name ().Build (" cluster:default_" + solomonCluster).Build ();
174-
175- return dqSoShardBuilder.Done ();
176166 }
177167
178168private:
0 commit comments