Skip to content

Commit 079be7a

Browse files
authored
Merge e4100e9 into 1dab30b
2 parents 1dab30b + e4100e9 commit 079be7a

File tree

7 files changed

+131
-20
lines changed

7 files changed

+131
-20
lines changed

ydb/core/fq/libs/control_plane_storage/request_validators.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,4 +172,27 @@ NYql::TIssues ValidateEntityName(const TString& name) {
172172
return issues;
173173
}
174174

175+
std::vector<NYdb::TType> GetStringTypes() {
176+
NYdb::TType stringType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::String).Build();
177+
NYdb::TType utf8Type = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Utf8).Build();
178+
NYdb::TType ysonType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Yson).Build();
179+
NYdb::TType jsonType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Json).Build();
180+
const std::vector<NYdb::TType> result {
181+
stringType,
182+
utf8Type,
183+
ysonType,
184+
jsonType,
185+
NYdb::TTypeBuilder{}.Optional(stringType).Build(),
186+
NYdb::TTypeBuilder{}.Optional(utf8Type).Build(),
187+
NYdb::TTypeBuilder{}.Optional(ysonType).Build(),
188+
NYdb::TTypeBuilder{}.Optional(jsonType).Build()
189+
};
190+
return result;
191+
}
192+
193+
bool ValidateStringType(const NYdb::TType& columnType) {
194+
static const std::vector<NYdb::TType> availableTypes = GetStringTypes();
195+
return FindIf(availableTypes, [&columnType](const auto& availableType) { return NYdb::TypesEqual(availableType, columnType); }) != availableTypes.end();
196+
}
197+
175198
} // namespace NFq

ydb/core/fq/libs/control_plane_storage/request_validators.h

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,37 @@
1818

1919
namespace NFq {
2020

21+
bool ValidateStringType(const NYdb::TType& columnType);
22+
23+
template<typename TScheme, typename TPartitionedBy>
24+
NYql::TIssues ValidateFormatForInput(const TString& format, const TScheme& schema, const TPartitionedBy& partitionedBy) {
25+
NYql::TIssues issues;
26+
if (format == "raw"sv) {
27+
ui64 realSchemaRowCount = 0;
28+
Ydb::Column lastColumn;
29+
TSet<TString> partitionedBySet{partitionedBy.begin(), partitionedBy.end()};
30+
31+
for (const auto& column: schema.column()) {
32+
if (partitionedBySet.contains(column.name())) {
33+
continue;
34+
}
35+
if (!ValidateStringType(column.type())) {
36+
issues.AddIssue(MakeErrorIssue(
37+
TIssuesIds::BAD_REQUEST,
38+
TStringBuilder{} << TStringBuilder() << "Only string type field in schema supported in raw format (you have "
39+
<< NYdb::TType(column.type()).ToString() <<" type)"));
40+
}
41+
++realSchemaRowCount;
42+
}
43+
44+
if (realSchemaRowCount > 1) {
45+
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, TStringBuilder{} << TStringBuilder() << "Only one field in schema supported in raw format (you have "
46+
<< realSchemaRowCount << " fields)"));
47+
}
48+
}
49+
return issues;
50+
}
51+
2152
template<class P>
2253
NYql::TIssues ValidateEvent(const P& ev, size_t maxSize)
2354
{
@@ -110,6 +141,7 @@ NYql::TIssues ValidateBinding(const T& ev, size_t maxSize, const TSet<FederatedQ
110141
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "data streams with empty schema is forbidden"));
111142
}
112143
issues.AddIssues(NKikimr::NExternalSource::ValidateDateFormatSetting(dataStreams.format_setting(), true));
144+
issues.AddIssues(ValidateFormatForInput(dataStreams.format(), dataStreams.schema(), TVector<TString>{}));
113145
break;
114146
}
115147
case FederatedQuery::BindingSetting::BINDING_NOT_SET: {
@@ -121,6 +153,7 @@ NYql::TIssues ValidateBinding(const T& ev, size_t maxSize, const TSet<FederatedQ
121153
const FederatedQuery::ObjectStorageBinding objectStorage = setting.object_storage();
122154
for (const auto& subset: objectStorage.subset()) {
123155
issues.AddIssues(NKikimr::NExternalSource::Validate(subset.schema(), subset, pathsLimit));
156+
issues.AddIssues(ValidateFormatForInput(subset.format(), subset.schema(), subset.partitioned_by()));
124157
}
125158
break;
126159
}

ydb/library/yql/providers/common/mkql/parser.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ TRuntimeNode BuildParseCall(
215215
if (parseItemStructType->GetMembersCount() == 0) {
216216
return ctx.ProgramBuilder.NewStruct(parseItemType, {});
217217
}
218+
MKQL_ENSURE(parseItemStructType->GetMembersCount() == 1, "Only one field (in schema) supported in raw format");
218219

219220
bool isOptional;
220221
const auto schemeType = UnpackOptionalData(

ydb/library/yql/providers/common/provider/yql_provider.cpp

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1382,13 +1382,53 @@ bool ValidateCompressionForOutput(std::string_view format, std::string_view comp
13821382
return false;
13831383
}
13841384

1385-
bool ValidateFormatForInput(std::string_view format, TExprContext& ctx) {
1386-
if (format.empty() || IsIn(FormatsForInput, format)) {
1385+
bool ValidateFormatForInput(
1386+
std::string_view format,
1387+
const TStructExprType* schemaStructRowType,
1388+
const std::function<bool(TStringBuf)>& excludeFields,
1389+
TExprContext& ctx) {
1390+
if (format.empty()) {
13871391
return true;
13881392
}
1389-
ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << format
1390-
<< ". Use one of: " << JoinSeq(", ", FormatsForInput)));
1391-
return false;
1393+
1394+
if (!IsIn(FormatsForInput, format)) {
1395+
ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << format
1396+
<< ". Use one of: " << JoinSeq(", ", FormatsForInput)));
1397+
return false;
1398+
}
1399+
1400+
if (schemaStructRowType && format == TStringBuf("raw")) {
1401+
ui64 realSchemaRowCount = 0;
1402+
const TTypeAnnotationNode* rowType= nullptr;
1403+
1404+
for (const TItemExprType* item : schemaStructRowType->GetItems()) {
1405+
if (excludeFields && excludeFields(item->GetName())) {
1406+
continue;
1407+
}
1408+
rowType = item->GetItemType();
1409+
++realSchemaRowCount;
1410+
}
1411+
if (!realSchemaRowCount) {
1412+
return true;
1413+
}
1414+
1415+
if (realSchemaRowCount > 1) {
1416+
ctx.AddError(TIssue(TStringBuilder() << "Only one field in schema supported in raw format (you have "
1417+
<< realSchemaRowCount << " fields)"));
1418+
return false;
1419+
}
1420+
1421+
if (rowType->GetKind() == ETypeAnnotationKind::Optional) {
1422+
rowType = rowType->Cast<TOptionalExprType>()->GetItemType();
1423+
}
1424+
1425+
if (rowType->GetKind() != ETypeAnnotationKind::Data
1426+
|| !IsDataTypeString(rowType->Cast<TDataExprType>()->GetSlot())) {
1427+
ctx.AddError(TIssue(TStringBuilder() << "Only string type field in schema supported in raw format"));
1428+
return false;
1429+
}
1430+
}
1431+
return true;
13921432
}
13931433

13941434
bool ValidateFormatForOutput(std::string_view format, TExprContext& ctx) {

ydb/library/yql/providers/common/provider/yql_provider.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ void WriteStatistics(NYson::TYsonWriter& writer, const TOperationStatistics& sta
187187
bool ValidateCompressionForInput(std::string_view format, std::string_view compression, TExprContext& ctx);
188188
bool ValidateCompressionForOutput(std::string_view format, std::string_view compression, TExprContext& ctx);
189189

190-
bool ValidateFormatForInput(std::string_view format, TExprContext& ctx);
190+
bool ValidateFormatForInput(std::string_view format, const TStructExprType* schemaStructRowType, const std::function<bool(TStringBuf)>& excludeFields, TExprContext& ctx);
191191
bool ValidateFormatForOutput(std::string_view format, TExprContext& ctx);
192192

193193
bool ValidateIntervalUnit(std::string_view unit, TExprContext& ctx);

ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -99,23 +99,27 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
9999
return TStatus::Error;
100100
}
101101

102-
auto format = read.Format().Ref().Content();
103-
if (!NCommon::ValidateFormatForInput(format, ctx)) {
102+
TPqTopic topic = read.Topic();
103+
if (!EnsureCallable(topic.Ref(), ctx)) {
104104
return TStatus::Error;
105105
}
106106

107-
if (!NCommon::ValidateCompressionForInput(format, read.Compression().Ref().Content(), ctx)) {
107+
TVector<TString> columnOrder;
108+
auto schema = GetReadTopicSchema(topic, read.Columns().Maybe<TCoAtomList>(), ctx, columnOrder);
109+
if (!schema) {
108110
return TStatus::Error;
109111
}
110112

111-
TPqTopic topic = read.Topic();
112-
if (!EnsureCallable(topic.Ref(), ctx)) {
113+
auto format = read.Format().Ref().Content();
114+
if (!NCommon::ValidateFormatForInput(
115+
format,
116+
schema->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(),
117+
[](TStringBuf fieldName) {return FindPqMetaFieldDescriptorBySysColumn(TString(fieldName)); },
118+
ctx)) {
113119
return TStatus::Error;
114120
}
115121

116-
TVector<TString> columnOrder;
117-
auto schema = GetReadTopicSchema(topic, read.Columns().Maybe<TCoAtomList>(), ctx, columnOrder);
118-
if (!schema) {
122+
if (!NCommon::ValidateCompressionForInput(format, read.Compression().Ref().Content(), ctx)) {
119123
return TStatus::Error;
120124
}
121125

ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
357357
}
358358

359359
if (!EnsureAtom(*input->Child(TS3ParseSettings::idx_Format), ctx) ||
360-
!NCommon::ValidateFormatForInput(input->Child(TS3ParseSettings::idx_Format)->Content(), ctx))
360+
!NCommon::ValidateFormatForInput(input->Child(TS3ParseSettings::idx_Format)->Content(), nullptr, nullptr, ctx))
361361
{
362362
return TStatus::Error;
363363
}
@@ -438,13 +438,15 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
438438
std::vector<TString> partitionedBy;
439439
TString projection;
440440
{
441-
THashSet<TStringBuf> columns;
441+
TS3Object s3Object(input->Child(TS3ReadObject::idx_Object));
442+
auto format = s3Object.Format().Ref().Content();
442443
const TStructExprType* structRowType = rowType->Cast<TStructExprType>();
444+
445+
THashSet<TStringBuf> columns;
443446
for (const TItemExprType* item : structRowType->GetItems()) {
444447
columns.emplace(item->GetName());
445448
}
446-
447-
TS3Object s3Object(input->Child(TS3ReadObject::idx_Object));
449+
448450
if (TMaybeNode<TExprBase> settings = s3Object.Settings()) {
449451
for (auto& settingNode : settings.Raw()->ChildrenList()) {
450452
const TStringBuf name = settingNode->Head().Content();
@@ -461,13 +463,21 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
461463
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "Table contains no columns except partitioning columns"));
462464
return TStatus::Error;
463465
}
464-
465466
}
466467
if (name == "projection"sv) {
467468
projection = settingNode->Tail().Content();
468469
}
469470
}
470471
}
472+
473+
TSet<TString> partitionedBySet{partitionedBy.begin(), partitionedBy.end()};
474+
if (!NCommon::ValidateFormatForInput(
475+
format,
476+
structRowType,
477+
[partitionedBySet](TStringBuf fieldName) {return partitionedBySet.contains(fieldName); },
478+
ctx)) {
479+
return TStatus::Error;
480+
}
471481
}
472482

473483
if (!ValidateProjectionTypes(
@@ -550,7 +560,7 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
550560
}
551561

552562
const auto format = input->Child(TS3Object::idx_Format)->Content();
553-
if (!EnsureAtom(*input->Child(TS3Object::idx_Format), ctx) || !NCommon::ValidateFormatForInput(format, ctx)) {
563+
if (!EnsureAtom(*input->Child(TS3Object::idx_Format), ctx) || !NCommon::ValidateFormatForInput(format, nullptr, nullptr, ctx)) {
554564
return TStatus::Error;
555565
}
556566

0 commit comments

Comments
 (0)