Skip to content

Commit 9569967

Browse files
authored
YQ-3011 fix parquet type validation (#3502)
1 parent 5a3921a commit 9569967

File tree

2 files changed

+27
-10
lines changed

2 files changed

+27
-10
lines changed

ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,18 @@ bool ExtractSettingValue(const TExprNode& value, TStringBuf settingName, TString
282282

283283
}
284284

285+
bool EnsureParquetTypeSupported(TPositionHandle position, const TTypeAnnotationNode* type, TExprContext& ctx, const IArrowResolver::TPtr& arrowResolver) {
286+
auto resolveStatus = arrowResolver->AreTypesSupported(ctx.GetPosition(position), { type }, ctx);
287+
YQL_ENSURE(resolveStatus != IArrowResolver::ERROR);
288+
289+
if (resolveStatus != IArrowResolver::OK) {
290+
ctx.AddError(TIssue(ctx.GetPosition(position), TStringBuilder() << "Type " << *type << " is not supported for parquet"));
291+
return false;
292+
}
293+
294+
return true;
295+
}
296+
285297
class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
286298
public:
287299
TS3DataSourceTypeAnnotationTransformer(TS3State::TPtr state)
@@ -407,7 +419,8 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
407419
return TStatus::Error;
408420
}
409421

410-
if (!TS3Object::Match(input->Child(TS3ReadObject::idx_Object))) {
422+
const auto& objectNode = input->Child(TS3ReadObject::idx_Object);
423+
if (!TS3Object::Match(objectNode)) {
411424
ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3ReadObject::idx_Object)->Pos()), "Expected S3 object."));
412425
return TStatus::Error;
413426
}
@@ -467,6 +480,19 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
467480
return TStatus::Error;
468481
}
469482

483+
if (objectNode->Child(TS3Object::idx_Format)->Content() == "parquet") {
484+
YQL_ENSURE(State_->Types->ArrowResolver);
485+
bool allTypesSupported = true;
486+
for (const auto& item : rowType->Cast<TStructExprType>()->GetItems()) {
487+
if (!EnsureParquetTypeSupported(input->Pos(), item->GetItemType(), ctx, State_->Types->ArrowResolver)) {
488+
allTypesSupported = false;
489+
}
490+
}
491+
if (!allTypesSupported) {
492+
return TStatus::Error;
493+
}
494+
}
495+
470496
input->SetTypeAnn(ctx.MakeType<TTupleExprType>(TTypeAnnotationNode::TListType{
471497
input->Child(TS3ReadObject::idx_World)->GetTypeAnn(),
472498
ctx.MakeType<TListExprType>(rowType)

ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -259,15 +259,6 @@ class TS3DqIntegration: public TDqIntegrationBase {
259259

260260
auto format = s3ReadObject.Object().Format().Ref().Content();
261261
if (const auto useCoro = State_->Configuration->SourceCoroActor.Get(); (!useCoro || *useCoro) && format != "raw" && format != "json_list") {
262-
if (format == "parquet") {
263-
YQL_ENSURE(State_->Types->ArrowResolver);
264-
TVector<const TTypeAnnotationNode*> allTypes;
265-
for (const auto& x : rowType->Cast<TStructExprType>()->GetItems()) {
266-
allTypes.push_back(x->GetItemType());
267-
}
268-
auto resolveStatus = State_->Types->ArrowResolver->AreTypesSupported(ctx.GetPosition(read->Pos()), allTypes, ctx);
269-
YQL_ENSURE(resolveStatus == IArrowResolver::OK);
270-
}
271262
return Build<TDqSourceWrap>(ctx, read->Pos())
272263
.Input<TS3ParseSettings>()
273264
.Paths(s3ReadObject.Object().Paths())

0 commit comments

Comments
 (0)