Skip to content

Commit 7fb005f

Browse files
merge to ydb stable YQ-4070 supported insert values for s3 (#14915)
Co-authored-by: Ivan Sukhov <evanevannnn@ydb.tech>
1 parent 03a4bae commit 7fb005f

File tree

10 files changed

+232
-16
lines changed

10 files changed

+232
-16
lines changed

ydb/core/external_sources/object_storage.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
150150
}
151151
const bool hasPartitioning = objectStorage.projection_size() || objectStorage.partitioned_by_size();
152152
issues.AddIssues(ValidateFormatSetting(objectStorage.format(), objectStorage.format_setting(), location, hasPartitioning));
153+
issues.AddIssues(ValidateSchema(schema));
153154
issues.AddIssues(ValidateJsonListFormat(objectStorage.format(), schema, objectStorage.partitioned_by()));
154155
issues.AddIssues(ValidateRawFormat(objectStorage.format(), schema, objectStorage.partitioned_by()));
155156
if (hasPartitioning) {
@@ -268,6 +269,22 @@ struct TObjectStorageExternalSource : public IExternalSource {
268269
return issues;
269270
}
270271

272+
template<typename TScheme>
273+
static NYql::TIssues ValidateSchema(const TScheme& schema) {
274+
NYql::TIssues issues;
275+
for (const auto& column: schema.column()) {
276+
const auto type = column.type();
277+
if (type.has_optional_type() && type.optional_type().item().has_optional_type()) {
278+
issues.AddIssue(MakeErrorIssue(
279+
Ydb::StatusIds::BAD_REQUEST,
280+
TStringBuilder{} << "Double optional types are not supported (you have '"
281+
<< column.name() << " " << NYdb::TType(column.type()).ToString() << "' field)"));
282+
}
283+
}
284+
285+
return issues;
286+
}
287+
271288
template<typename TScheme>
272289
static NYql::TIssues ValidateJsonListFormat(const TString& format, const TScheme& schema, const google::protobuf::RepeatedPtrField<TString>& partitionedBy) {
273290
NYql::TIssues issues;

ydb/core/external_sources/object_storage_ut.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,15 @@ Y_UNIT_TEST_SUITE(ObjectStorageTest) {
5555
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Date, Timestamp and Interval types are not allowed in json_list format");
5656
}
5757

58+
Y_UNIT_TEST(FailedOptionalTypeValidation) {
59+
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
60+
NKikimrExternalSources::TSchema schema;
61+
NKikimrExternalSources::TGeneral general;
62+
auto newColumn = schema.add_column();
63+
newColumn->mutable_type()->mutable_optional_type()->mutable_item()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::INT32);
64+
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Double optional types are not supported");
65+
}
66+
5867
Y_UNIT_TEST(WildcardsValidation) {
5968
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
6069
NKikimrExternalSources::TSchema schema;

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,82 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
805805
UNIT_ASSERT_EQUAL(GetObjectKeys(writeBucket).size(), 3);
806806
}
807807

808+
Y_UNIT_TEST(InsertIntoBucketValuesCast) {
809+
const TString writeDataSourceName = "/Root/write_data_source";
810+
const TString writeTableName = "/Root/write_binding";
811+
const TString writeBucket = "test_bucket_values_cast";
812+
const TString writeObject = "test_object_write/";
813+
{
814+
Aws::S3::S3Client s3Client = MakeS3Client();
815+
CreateBucket(writeBucket, s3Client);
816+
}
817+
818+
auto kikimr = NTestUtils::MakeKikimrRunner();
819+
820+
auto tc = kikimr->GetTableClient();
821+
auto session = tc.CreateSession().GetValueSync().GetSession();
822+
{
823+
const TString query = fmt::format(R"(
824+
CREATE EXTERNAL DATA SOURCE `{write_source}` WITH (
825+
SOURCE_TYPE="ObjectStorage",
826+
LOCATION="{write_location}",
827+
AUTH_METHOD="NONE"
828+
);
829+
CREATE EXTERNAL TABLE `{write_table}` (
830+
key Uint64 NOT NULL,
831+
value String NOT NULL
832+
) WITH (
833+
DATA_SOURCE="{write_source}",
834+
LOCATION="{write_object}",
835+
FORMAT="tsv_with_names"
836+
);
837+
)",
838+
"write_source"_a = writeDataSourceName,
839+
"write_table"_a = writeTableName,
840+
"write_location"_a = GetBucketLocation(writeBucket),
841+
"write_object"_a = writeObject);
842+
843+
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
844+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToOneLineString());
845+
}
846+
847+
auto db = kikimr->GetQueryClient();
848+
{
849+
const TString query = fmt::format(R"(
850+
INSERT INTO `{write_table}`
851+
(key, value)
852+
VALUES
853+
(1, "#######"),
854+
(4294967295u, "#######");
855+
856+
INSERT INTO `{write_source}`.`{write_object}` WITH (FORMAT = "tsv_with_names")
857+
(key, value)
858+
VALUES
859+
(1, "#######"),
860+
(4294967295u, "#######");
861+
862+
INSERT INTO `{write_table}` SELECT * FROM AS_TABLE([
863+
<|key: 1, value: "#####"|>,
864+
<|key: 4294967295u, value: "#####"|>
865+
]);
866+
867+
INSERT INTO `{write_source}`.`{write_object}` WITH (FORMAT = "tsv_with_names")
868+
SELECT * FROM AS_TABLE([
869+
<|key: 1, value: "#####"|>,
870+
<|key: 4294967295u, value: "#####"|>
871+
]);
872+
)",
873+
"write_source"_a = writeDataSourceName,
874+
"write_table"_a = writeTableName,
875+
"write_object"_a = writeObject);
876+
877+
const auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
878+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToOneLineString());
879+
}
880+
881+
UNIT_ASSERT_EQUAL(GetObjectKeys(writeBucket).size(), 4);
882+
}
883+
808884
Y_UNIT_TEST(UpdateExternalTable) {
809885
const TString readDataSourceName = "/Root/read_data_source";
810886
const TString readTableName = "/Root/read_binding";

ydb/library/yql/providers/s3/common/util.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,20 @@ TString TUrlBuilder::Build() const {
7878
return std::move(result);
7979
}
8080

81+
bool ValidateS3ReadWriteSchema(const TStructExprType* schemaStructRowType, TExprContext& ctx) {
82+
for (const TItemExprType* item : schemaStructRowType->GetItems()) {
83+
const TTypeAnnotationNode* rowType = item->GetItemType();
84+
if (rowType->GetKind() == ETypeAnnotationKind::Optional) {
85+
rowType = rowType->Cast<TOptionalExprType>()->GetItemType();
86+
}
87+
88+
if (rowType->GetKind() == ETypeAnnotationKind::Optional) {
89+
ctx.AddError(TIssue(TStringBuilder() << "Double optional types are not supported (you have '"
90+
<< item->GetName() << " " << FormatType(item->GetItemType()) << "' field)"));
91+
return false;
92+
}
93+
}
94+
return true;
95+
}
96+
8197
}

ydb/library/yql/providers/s3/common/util.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <util/string/builder.h>
44
#include <ydb/library/yql/public/issue/yql_issue.h>
5+
#include <ydb/library/yql/ast/yql_expr.h>
56

67
namespace NYql::NS3Util {
78

@@ -30,4 +31,6 @@ class TUrlBuilder {
3031
TString MainUri;
3132
};
3233

34+
bool ValidateS3ReadWriteSchema(const TStructExprType* schemaStructRowType, TExprContext& ctx);
35+
3336
}

ydb/library/yql/providers/s3/common/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ PEERDIR(
1919
ydb/library/yql/providers/s3/events
2020
ydb/library/yql/public/issue
2121
ydb/library/yql/public/issue/protos
22+
ydb/library/yql/ast
2223
)
2324

2425
IF (CLANG AND NOT WITH_VALGRIND)

ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
44
#include <ydb/library/yql/core/yql_opt_utils.h>
5+
#include <ydb/library/yql/providers/s3/common/util.h>
56
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
67

78
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
@@ -26,6 +27,7 @@ TExprNode::TListType GetPartitionKeys(const TExprNode::TPtr& partBy) {
2627

2728
return {};
2829
}
30+
2931
}
3032

3133
namespace {
@@ -64,7 +66,47 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
6466
return TStatus::Error;
6567
}
6668

67-
auto source = input->Child(TS3WriteObject::idx_Input);
69+
const auto targetNode = input->Child(TS3WriteObject::idx_Target);
70+
if (!TS3Target::Match(targetNode)) {
71+
ctx.AddError(TIssue(ctx.GetPosition(targetNode->Pos()), "Expected S3 target."));
72+
return TStatus::Error;
73+
}
74+
75+
const TTypeAnnotationNode* targetType = nullptr;
76+
if (const TS3Target target(targetNode); const auto settings = target.Settings()) {
77+
if (const auto userschema = GetSetting(settings.Cast().Ref(), "userschema")) {
78+
targetType = userschema->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
79+
}
80+
}
81+
82+
const auto source = input->ChildPtr(TS3WriteObject::idx_Input);
83+
if (const auto maybeTuple = TMaybeNode<TExprList>(source)) {
84+
const auto tuple = maybeTuple.Cast();
85+
86+
TVector<TExprBase> convertedValues;
87+
convertedValues.reserve(tuple.Size());
88+
for (const auto& value : tuple) {
89+
if (!EnsureStructType(input->Pos(), *value.Ref().GetTypeAnn(), ctx)) {
90+
return TStatus::Error;
91+
}
92+
93+
TExprNode::TPtr node = value.Ptr();
94+
if (targetType && TryConvertTo(node, *targetType, ctx) == TStatus::Error) {
95+
ctx.AddError(TIssue(ctx.GetPosition(source->Pos()), "Failed to convert input columns types to scheme types"));
96+
return TStatus::Error;
97+
}
98+
99+
convertedValues.emplace_back(std::move(node));
100+
}
101+
102+
const auto list = Build<TCoAsList>(ctx, input->Pos())
103+
.Add(std::move(convertedValues))
104+
.Done();
105+
106+
input->ChildRef(TS3WriteObject::idx_Input) = list.Ptr();
107+
return TStatus::Repeat;
108+
}
109+
68110
if (!EnsureListType(*source, ctx)) {
69111
return TStatus::Error;
70112
}
@@ -74,23 +116,17 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
74116
return TStatus::Error;
75117
}
76118

77-
auto target = input->Child(TS3WriteObject::idx_Target);
78-
if (!TS3Target::Match(target)) {
79-
ctx.AddError(TIssue(ctx.GetPosition(target->Pos()), "Expected S3 target."));
119+
if (!NS3Util::ValidateS3ReadWriteSchema(sourceType->Cast<TStructExprType>(), ctx)) {
80120
return TStatus::Error;
81121
}
82122

83-
TS3Target tgt(target);
84-
if (auto settings = tgt.Settings()) {
85-
if (auto userschema = GetSetting(settings.Cast().Ref(), "userschema")) {
86-
const TTypeAnnotationNode* targetType = userschema->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
87-
if (!IsSameAnnotation(*targetType, *sourceType)) {
88-
ctx.AddError(TIssue(ctx.GetPosition(source->Pos()),
89-
TStringBuilder() << "Type mismatch between schema type: " << *targetType
90-
<< " and actual data type: " << *sourceType << ", diff is: "
91-
<< GetTypeDiff(*targetType, *sourceType)));
92-
return TStatus::Error;
93-
}
123+
if (targetType) {
124+
const auto status = TryConvertTo(input->ChildRef(TS3WriteObject::idx_Input), *ctx.MakeType<TListExprType>(targetType), ctx);
125+
if (status == TStatus::Error) {
126+
ctx.AddError(TIssue(ctx.GetPosition(source->Pos()), "Row type mismatch for S3 external table"));
127+
return TStatus::Error;
128+
} else if (status != TStatus::Ok) {
129+
return status;
94130
}
95131
}
96132

ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "yql_s3_provider_impl.h"
22

33
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
4+
#include <ydb/library/yql/providers/s3/common/util.h>
45
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
56
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
67
#include <ydb/library/yql/providers/s3/range_helpers/path_list_reader.h>
@@ -491,6 +492,10 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
491492
auto format = s3Object.Format().Ref().Content();
492493
const TStructExprType* structRowType = rowType->Cast<TStructExprType>();
493494

495+
if (!NS3Util::ValidateS3ReadWriteSchema(structRowType, ctx)) {
496+
return TStatus::Error;
497+
}
498+
494499
THashSet<TStringBuf> columns;
495500
for (const TItemExprType* item : structRowType->GetItems()) {
496501
columns.emplace(item->GetName());

ydb/tests/fq/s3/test_bindings_1.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ def test_s3_format_mismatch(self, kikimr, s3, client, unique_prefix):
113113

114114
describe_result = client.describe_query(query_id).result
115115
describe_string = "{}".format(describe_result)
116-
assert "Type mismatch between schema type" in describe_string, describe_string
116+
assert "Row type mismatch for S3 external table" in describe_string, describe_string
117117

118118
@yq_all
119119
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)

ydb/tests/fq/s3/test_s3_0.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1114,3 +1114,56 @@ def wait_checkpoints(require_query_is_on=False):
11141114

11151115
client.abort_query(query_id)
11161116
client.wait_query(query_id)
1117+
1118+
@yq_v2
1119+
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
1120+
def test_double_optional_types_validation(self, kikimr, s3, client, unique_prefix):
1121+
resource = boto3.resource(
1122+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
1123+
)
1124+
1125+
bucket = resource.Bucket("fbucket")
1126+
bucket.create(ACL='public-read')
1127+
bucket.objects.all().delete()
1128+
1129+
s3_client = boto3.client(
1130+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
1131+
)
1132+
1133+
fruits = '''Fruit,Price,Weight
1134+
Banana,3,100
1135+
Apple,2,22
1136+
Pear,15,33'''
1137+
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='fruits.csv', ContentType='text/plain')
1138+
1139+
kikimr.control_plane.wait_bootstrap(1)
1140+
storage_connection_name = unique_prefix + "fruitbucket"
1141+
client.create_storage_connection(storage_connection_name, "fbucket")
1142+
1143+
sql = f'''
1144+
SELECT *
1145+
FROM `{storage_connection_name}`.`fruits.csv`
1146+
WITH (format='csv_with_names', SCHEMA (
1147+
Name Int32??,
1148+
));
1149+
'''
1150+
1151+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
1152+
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
1153+
issues = str(client.describe_query(query_id).result.query.issue)
1154+
1155+
assert "Double optional types are not supported" in issues, "Incorrect issues: " + issues
1156+
1157+
sql = f'''
1158+
INSERT INTO `{storage_connection_name}`.`insert/`
1159+
WITH
1160+
(
1161+
FORMAT="csv_with_names"
1162+
)
1163+
SELECT CAST(42 AS Int32??) as Weight;'''
1164+
1165+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
1166+
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
1167+
issues = str(client.describe_query(query_id).result.query.issue)
1168+
1169+
assert "Double optional types are not supported" in issues, "Incorrect issues: " + issues

0 commit comments

Comments
 (0)