Skip to content

Commit a3ac0d2

Browse files
authored
YQ-4070 supported insert values for s3 (#14708)
1 parent 39d7abd commit a3ac0d2

File tree

3 files changed

+125
-19
lines changed

3 files changed

+125
-19
lines changed

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -943,6 +943,82 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
943943
UNIT_ASSERT_EQUAL(GetObjectKeys(writeBucket).size(), 3);
944944
}
945945

946+
Y_UNIT_TEST(InsertIntoBucketValuesCast) {
947+
const TString writeDataSourceName = "/Root/write_data_source";
948+
const TString writeTableName = "/Root/write_binding";
949+
const TString writeBucket = "test_bucket_values_cast";
950+
const TString writeObject = "test_object_write/";
951+
{
952+
Aws::S3::S3Client s3Client = MakeS3Client();
953+
CreateBucket(writeBucket, s3Client);
954+
}
955+
956+
auto kikimr = NTestUtils::MakeKikimrRunner();
957+
958+
auto tc = kikimr->GetTableClient();
959+
auto session = tc.CreateSession().GetValueSync().GetSession();
960+
{
961+
const TString query = fmt::format(R"(
962+
CREATE EXTERNAL DATA SOURCE `{write_source}` WITH (
963+
SOURCE_TYPE="ObjectStorage",
964+
LOCATION="{write_location}",
965+
AUTH_METHOD="NONE"
966+
);
967+
CREATE EXTERNAL TABLE `{write_table}` (
968+
key Uint64 NOT NULL,
969+
value String NOT NULL
970+
) WITH (
971+
DATA_SOURCE="{write_source}",
972+
LOCATION="{write_object}",
973+
FORMAT="tsv_with_names"
974+
);
975+
)",
976+
"write_source"_a = writeDataSourceName,
977+
"write_table"_a = writeTableName,
978+
"write_location"_a = GetBucketLocation(writeBucket),
979+
"write_object"_a = writeObject);
980+
981+
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
982+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToOneLineString());
983+
}
984+
985+
auto db = kikimr->GetQueryClient();
986+
{
987+
const TString query = fmt::format(R"(
988+
INSERT INTO `{write_table}`
989+
(key, value)
990+
VALUES
991+
(1, "#######"),
992+
(4294967295u, "#######");
993+
994+
INSERT INTO `{write_source}`.`{write_object}` WITH (FORMAT = "tsv_with_names")
995+
(key, value)
996+
VALUES
997+
(1, "#######"),
998+
(4294967295u, "#######");
999+
1000+
INSERT INTO `{write_table}` SELECT * FROM AS_TABLE([
1001+
<|key: 1, value: "#####"|>,
1002+
<|key: 4294967295u, value: "#####"|>
1003+
]);
1004+
1005+
INSERT INTO `{write_source}`.`{write_object}` WITH (FORMAT = "tsv_with_names")
1006+
SELECT * FROM AS_TABLE([
1007+
<|key: 1, value: "#####"|>,
1008+
<|key: 4294967295u, value: "#####"|>
1009+
]);
1010+
)",
1011+
"write_source"_a = writeDataSourceName,
1012+
"write_table"_a = writeTableName,
1013+
"write_object"_a = writeObject);
1014+
1015+
const auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
1016+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToOneLineString());
1017+
}
1018+
1019+
UNIT_ASSERT_EQUAL(GetObjectKeys(writeBucket).size(), 4);
1020+
}
1021+
9461022
Y_UNIT_TEST(UpdateExternalTable) {
9471023
const TString readDataSourceName = "/Root/read_data_source";
9481024
const TString readTableName = "/Root/read_binding";

ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,47 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
6666
return TStatus::Error;
6767
}
6868

69-
auto source = input->Child(TS3WriteObject::idx_Input);
69+
const auto targetNode = input->Child(TS3WriteObject::idx_Target);
70+
if (!TS3Target::Match(targetNode)) {
71+
ctx.AddError(TIssue(ctx.GetPosition(targetNode->Pos()), "Expected S3 target."));
72+
return TStatus::Error;
73+
}
74+
75+
const TTypeAnnotationNode* targetType = nullptr;
76+
if (const TS3Target target(targetNode); const auto settings = target.Settings()) {
77+
if (const auto userschema = GetSetting(settings.Cast().Ref(), "userschema")) {
78+
targetType = userschema->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
79+
}
80+
}
81+
82+
const auto source = input->ChildPtr(TS3WriteObject::idx_Input);
83+
if (const auto maybeTuple = TMaybeNode<TExprList>(source)) {
84+
const auto tuple = maybeTuple.Cast();
85+
86+
TVector<TExprBase> convertedValues;
87+
convertedValues.reserve(tuple.Size());
88+
for (const auto& value : tuple) {
89+
if (!EnsureStructType(input->Pos(), *value.Ref().GetTypeAnn(), ctx)) {
90+
return TStatus::Error;
91+
}
92+
93+
TExprNode::TPtr node = value.Ptr();
94+
if (targetType && TryConvertTo(node, *targetType, ctx) == TStatus::Error) {
95+
ctx.AddError(TIssue(ctx.GetPosition(source->Pos()), "Failed to convert input columns types to scheme types"));
96+
return TStatus::Error;
97+
}
98+
99+
convertedValues.emplace_back(std::move(node));
100+
}
101+
102+
const auto list = Build<TCoAsList>(ctx, input->Pos())
103+
.Add(std::move(convertedValues))
104+
.Done();
105+
106+
input->ChildRef(TS3WriteObject::idx_Input) = list.Ptr();
107+
return TStatus::Repeat;
108+
}
109+
70110
if (!EnsureListType(*source, ctx)) {
71111
return TStatus::Error;
72112
}
@@ -80,23 +120,13 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
80120
return TStatus::Error;
81121
}
82122

83-
auto target = input->Child(TS3WriteObject::idx_Target);
84-
if (!TS3Target::Match(target)) {
85-
ctx.AddError(TIssue(ctx.GetPosition(target->Pos()), "Expected S3 target."));
86-
return TStatus::Error;
87-
}
88-
89-
TS3Target tgt(target);
90-
if (auto settings = tgt.Settings()) {
91-
if (auto userschema = GetSetting(settings.Cast().Ref(), "userschema")) {
92-
const TTypeAnnotationNode* targetType = userschema->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
93-
if (!IsSameAnnotation(*targetType, *sourceType)) {
94-
ctx.AddError(TIssue(ctx.GetPosition(source->Pos()),
95-
TStringBuilder() << "Type mismatch between schema type: " << *targetType
96-
<< " and actual data type: " << *sourceType << ", diff is: "
97-
<< GetTypeDiff(*targetType, *sourceType)));
98-
return TStatus::Error;
99-
}
123+
if (targetType) {
124+
const auto status = TryConvertTo(input->ChildRef(TS3WriteObject::idx_Input), *ctx.MakeType<TListExprType>(targetType), ctx);
125+
if (status == TStatus::Error) {
126+
ctx.AddError(TIssue(ctx.GetPosition(source->Pos()), "Row type mismatch for S3 external table"));
127+
return TStatus::Error;
128+
} else if (status != TStatus::Ok) {
129+
return status;
100130
}
101131
}
102132

ydb/tests/fq/s3/test_bindings_1.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ def test_s3_format_mismatch(self, kikimr, s3, client, unique_prefix):
113113

114114
describe_result = client.describe_query(query_id).result
115115
describe_string = "{}".format(describe_result)
116-
assert "Type mismatch between schema type" in describe_string, describe_string
116+
assert "Row type mismatch for S3 external table" in describe_string, describe_string
117117

118118
@yq_all
119119
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)

0 commit comments

Comments
 (0)