Skip to content

Additional validation for queries with "json_list" format and datetime types #10208

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
}
const bool hasPartitioning = objectStorage.projection_size() || objectStorage.partitioned_by_size();
issues.AddIssues(ValidateFormatSetting(objectStorage.format(), objectStorage.format_setting(), location, hasPartitioning));
issues.AddIssues(ValidateJsonListFormat(objectStorage.format(), schema, objectStorage.partitioned_by()));
issues.AddIssues(ValidateRawFormat(objectStorage.format(), schema, objectStorage.partitioned_by()));
if (hasPartitioning) {
if (NYql::NS3::HasWildcards(location)) {
Expand Down Expand Up @@ -263,6 +264,30 @@ struct TObjectStorageExternalSource : public IExternalSource {
return issues;
}

template<typename TScheme>
static NYql::TIssues ValidateJsonListFormat(const TString& format, const TScheme& schema, const google::protobuf::RepeatedPtrField<TString>& partitionedBy) {
NYql::TIssues issues;
if (format != "json_list"sv) {
return issues;
}

TSet<TString> partitionedBySet{partitionedBy.begin(), partitionedBy.end()};

for (const auto& column: schema.column()) {
if (partitionedBySet.contains(column.name())) {
continue;
}
if (ValidateDateOrTimeType(column.type())) {
issues.AddIssue(MakeErrorIssue(
Ydb::StatusIds::BAD_REQUEST,
TStringBuilder{} << "Date, Timestamp and Interval types are not allowed in json_list format (you have '"
<< column.name() << " " << NYdb::TType(column.type()).ToString() << "' field)"));
}
}

return issues;
}

template<typename TScheme>
static NYql::TIssues ValidateRawFormat(const TString& format, const TScheme& schema, const google::protobuf::RepeatedPtrField<TString>& partitionedBy) {
NYql::TIssues issues;
Expand Down Expand Up @@ -800,6 +825,50 @@ struct TObjectStorageExternalSource : public IExternalSource {
return FindIf(availableTypes, [&columnType](const auto& availableType) { return NYdb::TypesEqual(availableType, columnType); }) != availableTypes.end();
}

static std::vector<NYdb::TType> GetDateOrTimeTypes() {
NYdb::TType dateType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Date).Build();
NYdb::TType datetimeType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Datetime).Build();
NYdb::TType timestampType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Timestamp).Build();
NYdb::TType intervalType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Interval).Build();
NYdb::TType date32Type = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Date32).Build();
NYdb::TType datetime64Type = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Datetime64).Build();
NYdb::TType timestamp64Type = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Timestamp64).Build();
NYdb::TType interval64Type = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::Interval64).Build();
NYdb::TType tzdateType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::TzDate).Build();
NYdb::TType tzdatetimeType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::TzDatetime).Build();
NYdb::TType tztimestampType = NYdb::TTypeBuilder{}.Primitive(NYdb::EPrimitiveType::TzTimestamp).Build();
const std::vector<NYdb::TType> result {
dateType,
datetimeType,
timestampType,
intervalType,
date32Type,
datetime64Type,
timestamp64Type,
interval64Type,
tzdateType,
tzdatetimeType,
tztimestampType,
NYdb::TTypeBuilder{}.Optional(dateType).Build(),
NYdb::TTypeBuilder{}.Optional(datetimeType).Build(),
NYdb::TTypeBuilder{}.Optional(timestampType).Build(),
NYdb::TTypeBuilder{}.Optional(intervalType).Build(),
NYdb::TTypeBuilder{}.Optional(date32Type).Build(),
NYdb::TTypeBuilder{}.Optional(datetime64Type).Build(),
NYdb::TTypeBuilder{}.Optional(timestamp64Type).Build(),
NYdb::TTypeBuilder{}.Optional(interval64Type).Build(),
NYdb::TTypeBuilder{}.Optional(tzdateType).Build(),
NYdb::TTypeBuilder{}.Optional(tzdatetimeType).Build(),
NYdb::TTypeBuilder{}.Optional(tztimestampType).Build()
};
return result;
}

static bool ValidateDateOrTimeType(const NYdb::TType& columnType) {
static const std::vector<NYdb::TType> availableTypes = GetDateOrTimeTypes();
return FindIf(availableTypes, [&columnType](const auto& availableType) { return NYdb::TypesEqual(availableType, columnType); }) != availableTypes.end();
}

private:
const std::vector<TRegExMatch> HostnamePatterns;
const size_t PathsLimit;
Expand Down
25 changes: 25 additions & 0 deletions ydb/core/external_sources/object_storage_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,31 @@ Y_UNIT_TEST_SUITE(ObjectStorageTest) {
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Partition by must always be specified");
}

Y_UNIT_TEST(FailedJsonListValidation) {
static auto invalidTypes = {
Ydb::Type::DATE,
Ydb::Type::DATETIME,
Ydb::Type::TIMESTAMP,
Ydb::Type::INTERVAL,
Ydb::Type::DATE32,
Ydb::Type::DATETIME64,
Ydb::Type::TIMESTAMP64,
Ydb::Type::INTERVAL64,
Ydb::Type::TZ_DATE,
Ydb::Type::TZ_DATETIME,
Ydb::Type::TZ_TIMESTAMP,
};
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
NKikimrExternalSources::TSchema schema;
for (const auto typeId : invalidTypes) {
auto newColumn = schema.add_column();
newColumn->mutable_type()->set_type_id(typeId);
}
NKikimrExternalSources::TGeneral general;
general.mutable_attributes()->insert({"format", "json_list"});
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Date, Timestamp and Interval types are not allowed in json_list format");
}

Y_UNIT_TEST(WildcardsValidation) {
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
NKikimrExternalSources::TSchema schema;
Expand Down
24 changes: 24 additions & 0 deletions ydb/library/yql/providers/common/provider/yql_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1652,6 +1652,30 @@ bool ValidateFormatForInput(
return false;
}
}
else if (schemaStructRowType && format == TStringBuf("json_list")) {
bool failedSchemaColumns = false;

for (const TItemExprType* item : schemaStructRowType->GetItems()) {
if (excludeFields && excludeFields(item->GetName())) {
continue;
}
const TTypeAnnotationNode* rowType = item->GetItemType();
if (rowType->GetKind() == ETypeAnnotationKind::Optional) {
rowType = rowType->Cast<TOptionalExprType>()->GetItemType();
}

if (rowType->GetKind() == ETypeAnnotationKind::Data
&& IsDataTypeDateOrTzDateOrInterval(rowType->Cast<TDataExprType>()->GetSlot())) {
ctx.AddError(TIssue(TStringBuilder() << "Date, Timestamp and Interval types are not allowed in json_list format (you have '"
<< item->GetName() << " " << FormatType(rowType) << "' field)"));
failedSchemaColumns = true;
}
}

if (failedSchemaColumns) {
return false;
}
}
return true;
}

Expand Down
63 changes: 63 additions & 0 deletions ydb/tests/fq/s3/test_s3_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,69 @@ def test_inference_unsupported_types(self, kikimr, s3, client, unique_prefix):
assert result_set.rows[2].items[0].int64_value == 30
assert sum(kikimr.control_plane.get_metering(1)) == 10

@yq_v2
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_json_list_formats(self, kikimr, s3, client, unique_prefix):
resource = boto3.resource(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

bucket = resource.Bucket("fbucket")
bucket.create(ACL='public-read')
bucket.objects.all().delete()

s3_client = boto3.client(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

fruits = '''[
{ "date" : "", "datetime" : "", "timestamp" : "", "interval" : "", "date32" : "", "datetime64" : "", "timestamp64" : "", "interval64" : "", "tzDate" : "", "tzDateTime" : "", "tzTimestamp" : "" },
{ "date" : "", "datetime" : "", "timestamp" : "", "interval" : "", "date32" : "", "datetime64" : "", "timestamp64" : "", "interval64" : "", "tzDate" : "", "tzDateTime" : "", "tzTimestamp" : "" },
{ "date" : "", "datetime" : "", "timestamp" : "", "interval" : "", "date32" : "", "datetime64" : "", "timestamp64" : "", "interval64" : "", "tzDate" : "", "tzDateTime" : "", "tzTimestamp" : "" }
]'''
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='timestamp.json', ContentType='text/plain')

kikimr.control_plane.wait_bootstrap(1)
storage_connection_name = unique_prefix + "fruitbucket"
client.create_storage_connection(storage_connection_name, "fbucket")

sql = f'''
SELECT *
FROM `{storage_connection_name}`.`/timestamp.json`
WITH (
format="json_list",
schema=(
`date` date,
`datetime` datetime,
`timestamp` timestamp,
`interval` interval,
`date32` date32,
`datetime64` datetime64,
`timestamp64` timestamp64,
`interval64` interval64,
`tzDate` tzDate,
`tzDateTime` tzDateTime,
`tzTimestamp` tzTimestamp
));
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.FAILED)

error_message = str(client.describe_query(query_id).result)
assert "Date, Timestamp and Interval types are not allowed in json_list format" in error_message
assert "Date" in error_message
assert "Datetime" in error_message
assert "Timestamp" in error_message
assert "Interval" in error_message
assert "Date32" in error_message
assert "Datetime64" in error_message
assert "Timestamp64" in error_message
assert "Interval64" in error_message
assert "TzDate" in error_message
assert "TzDatetime" in error_message
assert "TzTimestamp" in error_message

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_csv_with_hopping(self, kikimr, s3, client, unique_prefix):
Expand Down
Loading