Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@
{"Index": 1, "Name": "Project", "Type": "TCoAtom"},
{"Index": 2, "Name": "Cluster", "Type": "TCoAtom"},
{"Index": 3, "Name": "Service", "Type": "TCoAtom"},
{"Index": 4, "Name": "Token", "Type": "TCoSecureParam", "Optional": true}
{"Index": 4, "Name": "RowType", "Type": "TExprBase"},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А это можно тестами покрыть?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

так покрыто уже

{"Index": 5, "Name": "Token", "Type": "TCoSecureParam", "Optional": true}
]
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class TSolomonDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase
TStatus HandleSoShard(TExprBase input, TExprContext& ctx) {
YQL_ENSURE(!State_->IsRtmrMode(), "SoShard can't be used in rtmr mode");

if (!EnsureMinArgsCount(input.Ref(), 4, ctx) || !EnsureMaxArgsCount(input.Ref(), 5, ctx)) {
if (!EnsureMinMaxArgsCount(input.Ref(), 5, 6, ctx)) {
return TStatus::Error;
}

Expand All @@ -137,6 +137,10 @@ class TSolomonDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase
return TStatus::Error;
}

if (!EnsureType(shard.RowType().Ref(), ctx)) {
return TStatus::Error;
}

if (shard.Token() && !EnsureCallable(shard.Token().Ref(), ctx)) {
return TStatus::Error;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,6 @@ NSo::NProto::ESolomonClusterType MapClusterType(TSolomonClusterConfig::ESolomonC
}
}

const TTypeAnnotationNode* GetItemType(const TExprNode& node) {
const TTypeAnnotationNode* typeAnn = node.GetTypeAnn();
switch (typeAnn->GetKind()) {
case ETypeAnnotationKind::Flow:
return typeAnn->Cast<TFlowExprType>()->GetItemType();
case ETypeAnnotationKind::Stream:
return typeAnn->Cast<TStreamExprType>()->GetItemType();
default: break;
}
YQL_ENSURE(false, "Invalid solomon sink type " << typeAnn->GetKind());
return nullptr;
}

void FillScheme(const TTypeAnnotationNode& itemType, NSo::NProto::TDqSolomonShardScheme& scheme) {
int index = 0;
for (const TItemExprType* structItem : itemType.Cast<TStructExprType>()->GetItems()) {
Expand Down Expand Up @@ -316,7 +303,7 @@ class TSolomonDqIntegration: public TDqIntegrationBase {
shardDesc.SetClusterType(MapClusterType(clusterDesc->GetClusterType()));
shardDesc.SetUseSsl(clusterDesc->GetUseSsl());

const TTypeAnnotationNode* itemType = GetItemType(node);
const TTypeAnnotationNode* itemType = shard.RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
FillScheme(*itemType, *shardDesc.MutableScheme());

if (auto maybeToken = shard.Token()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,14 @@ class TSoPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
YQL_CLOG(INFO, ProviderSolomon) << "Optimize SoWriteToShard";

const auto solomonCluster = TString(write.DataSink().Cluster().Value());
auto shard = BuildSolomonShard(write.Shard().Cast<TCoAtom>(), ctx, solomonCluster);
auto* typeAnn = write.Input().Ref().GetTypeAnn();
const TTypeAnnotationNode* inputItemType = nullptr;
if (!EnsureNewSeqType<false, true, false>(write.Input().Pos(), *typeAnn, ctx, &inputItemType)) {
return {};
}

auto rowTypeNode = ExpandType(write.Pos(), *inputItemType, ctx);
auto shard = BuildSolomonShard(write.Shard().Cast<TCoAtom>(), TExprBase(rowTypeNode), ctx, solomonCluster);

auto dqSink = Build<TDqSink>(ctx, write.Pos())
.DataSink(write.DataSink())
Expand Down Expand Up @@ -131,7 +138,7 @@ class TSoPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
}

private:
TCallable BuildSolomonShard(TCoAtom shardNode, TExprContext& ctx, TString solomonCluster) const {
TCallable BuildSolomonShard(TCoAtom shardNode, TExprBase rowType, TExprContext& ctx, TString solomonCluster) const {
const auto* clusterDesc = State_->Configuration->ClusterConfigs.FindPtr(solomonCluster);
YQL_ENSURE(clusterDesc, "Unknown cluster " << solomonCluster);

Expand All @@ -148,31 +155,14 @@ class TSoPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
}
YQL_ENSURE(!cluster.empty(), "Cluster is not defined. You can define it inside connection, or inside query.");

auto solomonClusterAtom = Build<TCoAtom>(ctx, shardNode.Pos())
.Value(solomonCluster)
.Done();

auto projectAtom = Build<TCoAtom>(ctx, shardNode.Pos())
.Value(project)
.Done();

auto clusterAtom = Build<TCoAtom>(ctx, shardNode.Pos())
.Value(cluster)
return Build<TSoShard>(ctx, shardNode.Pos())
.SolomonCluster<TCoAtom>().Value(solomonCluster).Build()
.Project<TCoAtom>().Value(project).Build()
.Cluster<TCoAtom>().Value(cluster).Build()
.Service<TCoAtom>().Value(service).Build()
.RowType(rowType)
.Token<TCoSecureParam>().Name().Build("cluster:default_" + solomonCluster).Build()
.Done();

auto serviceAtom = Build<TCoAtom>(ctx, shardNode.Pos())
.Value(service)
.Done();

auto dqSoShardBuilder = Build<TSoShard>(ctx, shardNode.Pos());
dqSoShardBuilder.SolomonCluster(solomonClusterAtom);
dqSoShardBuilder.Project(projectAtom);
dqSoShardBuilder.Cluster(clusterAtom);
dqSoShardBuilder.Service(serviceAtom);

dqSoShardBuilder.Token<TCoSecureParam>().Name().Build("cluster:default_" + solomonCluster).Build();

return dqSoShardBuilder.Done();
}

private:
Expand Down