Skip to content

Commit 95ef5ac

Browse files
authored
Merge 8132f3a into a2f8363
2 parents a2f8363 + 8132f3a commit 95ef5ac

File tree

1 file changed

+48
-18
lines changed

1 file changed

+48
-18
lines changed

ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,47 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
6666
return TStatus::Error;
6767
}
6868

69-
auto source = input->Child(TS3WriteObject::idx_Input);
69+
const auto targetNode = input->Child(TS3WriteObject::idx_Target);
70+
if (!TS3Target::Match(targetNode)) {
71+
ctx.AddError(TIssue(ctx.GetPosition(targetNode->Pos()), "Expected S3 target."));
72+
return TStatus::Error;
73+
}
74+
75+
const TTypeAnnotationNode* targetType = nullptr;
76+
if (const TS3Target target(targetNode); const auto settings = target.Settings()) {
77+
if (const auto userschema = GetSetting(settings.Cast().Ref(), "userschema")) {
78+
targetType = userschema->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
79+
}
80+
}
81+
82+
const auto source = input->ChildPtr(TS3WriteObject::idx_Input);
83+
if (const auto maybeTuple = TMaybeNode<TExprList>(source)) {
84+
const auto tuple = maybeTuple.Cast();
85+
86+
TVector<TExprBase> convertedValues;
87+
convertedValues.reserve(tuple.Size());
88+
for (const auto& value : tuple) {
89+
if (!EnsureStructType(input->Pos(), *value.Ref().GetTypeAnn(), ctx)) {
90+
return TStatus::Error;
91+
}
92+
93+
TExprNode::TPtr node = value.Ptr();
94+
if (targetType && TryConvertTo(node, *targetType, ctx) == TStatus::Error) {
95+
ctx.AddError(TIssue(ctx.GetPosition(source->Pos()), "Failed to convert input columns types to scheme types"));
96+
return TStatus::Error;
97+
}
98+
99+
convertedValues.emplace_back(std::move(node));
100+
}
101+
102+
const auto list = Build<TCoAsList>(ctx, input->Pos())
103+
.Add(std::move(convertedValues))
104+
.Done();
105+
106+
input->ChildRef(TS3WriteObject::idx_Input) = list.Ptr();
107+
return TStatus::Repeat;
108+
}
109+
70110
if (!EnsureListType(*source, ctx)) {
71111
return TStatus::Error;
72112
}
@@ -80,23 +120,13 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
80120
return TStatus::Error;
81121
}
82122

83-
auto target = input->Child(TS3WriteObject::idx_Target);
84-
if (!TS3Target::Match(target)) {
85-
ctx.AddError(TIssue(ctx.GetPosition(target->Pos()), "Expected S3 target."));
86-
return TStatus::Error;
87-
}
88-
89-
TS3Target tgt(target);
90-
if (auto settings = tgt.Settings()) {
91-
if (auto userschema = GetSetting(settings.Cast().Ref(), "userschema")) {
92-
const TTypeAnnotationNode* targetType = userschema->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
93-
if (!IsSameAnnotation(*targetType, *sourceType)) {
94-
ctx.AddError(TIssue(ctx.GetPosition(source->Pos()),
95-
TStringBuilder() << "Type mismatch between schema type: " << *targetType
96-
<< " and actual data type: " << *sourceType << ", diff is: "
97-
<< GetTypeDiff(*targetType, *sourceType)));
98-
return TStatus::Error;
99-
}
123+
if (targetType) {
124+
const auto status = TryConvertTo(input->ChildRef(TS3WriteObject::idx_Input), *ctx.MakeType<TListExprType>(targetType), ctx);
125+
if (status == TStatus::Error) {
126+
ctx.AddError(TIssue(ctx.GetPosition(source->Pos()), "Row type mismatch for S3 external table"));
127+
return TStatus::Error;
128+
} else if (status != TStatus::Ok) {
129+
return status;
100130
}
101131
}
102132

0 commit comments

Comments
 (0)