Skip to content

Commit 31e766d

Browse files
committed
YQ-4070 supported insert values for s3 (ydb-platform#14708)
1 parent 3bafa91 commit 31e766d

File tree

3 files changed

+125
-19
lines changed

3 files changed

+125
-19
lines changed

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,82 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
805805
UNIT_ASSERT_EQUAL(GetObjectKeys(writeBucket).size(), 3);
806806
}
807807

808+
Y_UNIT_TEST(InsertIntoBucketValuesCast) {
809+
const TString writeDataSourceName = "/Root/write_data_source";
810+
const TString writeTableName = "/Root/write_binding";
811+
const TString writeBucket = "test_bucket_values_cast";
812+
const TString writeObject = "test_object_write/";
813+
{
814+
Aws::S3::S3Client s3Client = MakeS3Client();
815+
CreateBucket(writeBucket, s3Client);
816+
}
817+
818+
auto kikimr = NTestUtils::MakeKikimrRunner();
819+
820+
auto tc = kikimr->GetTableClient();
821+
auto session = tc.CreateSession().GetValueSync().GetSession();
822+
{
823+
const TString query = fmt::format(R"(
824+
CREATE EXTERNAL DATA SOURCE `{write_source}` WITH (
825+
SOURCE_TYPE="ObjectStorage",
826+
LOCATION="{write_location}",
827+
AUTH_METHOD="NONE"
828+
);
829+
CREATE EXTERNAL TABLE `{write_table}` (
830+
key Uint64 NOT NULL,
831+
value String NOT NULL
832+
) WITH (
833+
DATA_SOURCE="{write_source}",
834+
LOCATION="{write_object}",
835+
FORMAT="tsv_with_names"
836+
);
837+
)",
838+
"write_source"_a = writeDataSourceName,
839+
"write_table"_a = writeTableName,
840+
"write_location"_a = GetBucketLocation(writeBucket),
841+
"write_object"_a = writeObject);
842+
843+
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
844+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToOneLineString());
845+
}
846+
847+
auto db = kikimr->GetQueryClient();
848+
{
849+
const TString query = fmt::format(R"(
850+
INSERT INTO `{write_table}`
851+
(key, value)
852+
VALUES
853+
(1, "#######"),
854+
(4294967295u, "#######");
855+
856+
INSERT INTO `{write_source}`.`{write_object}` WITH (FORMAT = "tsv_with_names")
857+
(key, value)
858+
VALUES
859+
(1, "#######"),
860+
(4294967295u, "#######");
861+
862+
INSERT INTO `{write_table}` SELECT * FROM AS_TABLE([
863+
<|key: 1, value: "#####"|>,
864+
<|key: 4294967295u, value: "#####"|>
865+
]);
866+
867+
INSERT INTO `{write_source}`.`{write_object}` WITH (FORMAT = "tsv_with_names")
868+
SELECT * FROM AS_TABLE([
869+
<|key: 1, value: "#####"|>,
870+
<|key: 4294967295u, value: "#####"|>
871+
]);
872+
)",
873+
"write_source"_a = writeDataSourceName,
874+
"write_table"_a = writeTableName,
875+
"write_object"_a = writeObject);
876+
877+
const auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
878+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToOneLineString());
879+
}
880+
881+
UNIT_ASSERT_EQUAL(GetObjectKeys(writeBucket).size(), 4);
882+
}
883+
808884
Y_UNIT_TEST(UpdateExternalTable) {
809885
const TString readDataSourceName = "/Root/read_data_source";
810886
const TString readTableName = "/Root/read_binding";

ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,47 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
6666
return TStatus::Error;
6767
}
6868

69-
auto source = input->Child(TS3WriteObject::idx_Input);
69+
const auto targetNode = input->Child(TS3WriteObject::idx_Target);
70+
if (!TS3Target::Match(targetNode)) {
71+
ctx.AddError(TIssue(ctx.GetPosition(targetNode->Pos()), "Expected S3 target."));
72+
return TStatus::Error;
73+
}
74+
75+
const TTypeAnnotationNode* targetType = nullptr;
76+
if (const TS3Target target(targetNode); const auto settings = target.Settings()) {
77+
if (const auto userschema = GetSetting(settings.Cast().Ref(), "userschema")) {
78+
targetType = userschema->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
79+
}
80+
}
81+
82+
const auto source = input->ChildPtr(TS3WriteObject::idx_Input);
83+
if (const auto maybeTuple = TMaybeNode<TExprList>(source)) {
84+
const auto tuple = maybeTuple.Cast();
85+
86+
TVector<TExprBase> convertedValues;
87+
convertedValues.reserve(tuple.Size());
88+
for (const auto& value : tuple) {
89+
if (!EnsureStructType(input->Pos(), *value.Ref().GetTypeAnn(), ctx)) {
90+
return TStatus::Error;
91+
}
92+
93+
TExprNode::TPtr node = value.Ptr();
94+
if (targetType && TryConvertTo(node, *targetType, ctx) == TStatus::Error) {
95+
ctx.AddError(TIssue(ctx.GetPosition(source->Pos()), "Failed to convert input columns types to scheme types"));
96+
return TStatus::Error;
97+
}
98+
99+
convertedValues.emplace_back(std::move(node));
100+
}
101+
102+
const auto list = Build<TCoAsList>(ctx, input->Pos())
103+
.Add(std::move(convertedValues))
104+
.Done();
105+
106+
input->ChildRef(TS3WriteObject::idx_Input) = list.Ptr();
107+
return TStatus::Repeat;
108+
}
109+
70110
if (!EnsureListType(*source, ctx)) {
71111
return TStatus::Error;
72112
}
@@ -80,23 +120,13 @@ class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
80120
return TStatus::Error;
81121
}
82122

83-
auto target = input->Child(TS3WriteObject::idx_Target);
84-
if (!TS3Target::Match(target)) {
85-
ctx.AddError(TIssue(ctx.GetPosition(target->Pos()), "Expected S3 target."));
86-
return TStatus::Error;
87-
}
88-
89-
TS3Target tgt(target);
90-
if (auto settings = tgt.Settings()) {
91-
if (auto userschema = GetSetting(settings.Cast().Ref(), "userschema")) {
92-
const TTypeAnnotationNode* targetType = userschema->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
93-
if (!IsSameAnnotation(*targetType, *sourceType)) {
94-
ctx.AddError(TIssue(ctx.GetPosition(source->Pos()),
95-
TStringBuilder() << "Type mismatch between schema type: " << *targetType
96-
<< " and actual data type: " << *sourceType << ", diff is: "
97-
<< GetTypeDiff(*targetType, *sourceType)));
98-
return TStatus::Error;
99-
}
123+
if (targetType) {
124+
const auto status = TryConvertTo(input->ChildRef(TS3WriteObject::idx_Input), *ctx.MakeType<TListExprType>(targetType), ctx);
125+
if (status == TStatus::Error) {
126+
ctx.AddError(TIssue(ctx.GetPosition(source->Pos()), "Row type mismatch for S3 external table"));
127+
return TStatus::Error;
128+
} else if (status != TStatus::Ok) {
129+
return status;
100130
}
101131
}
102132

ydb/tests/fq/s3/test_bindings_1.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ def test_s3_format_mismatch(self, kikimr, s3, client, unique_prefix):
113113

114114
describe_result = client.describe_query(query_id).result
115115
describe_string = "{}".format(describe_result)
116-
assert "Type mismatch between schema type" in describe_string, describe_string
116+
assert "Row type mismatch for S3 external table" in describe_string, describe_string
117117

118118
@yq_all
119119
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)

0 commit comments

Comments
 (0)