|
4 | 4 | import boto3 |
5 | 5 | import json |
6 | 6 | import logging |
| 7 | +import os |
7 | 8 | import pytest |
8 | 9 | import typing |
9 | 10 | import ydb |
@@ -44,9 +45,13 @@ def read_scan_rows(it) -> typing.List[ydb_pb.Value]: |
44 | 45 | return scanned_rows |
45 | 46 |
|
46 | 47 |
|
| 48 | +def make_columns(columns: typing.List[typing.Tuple[str, str]]) -> typing.List[ydb_pb.Column]: |
| 49 | + return [ydb_pb.Column(name=name, type=ydb_pb.Type(type_id=ydb_pb.Type.PrimitiveTypeId.Value(type))) for name, type in columns] |
| 50 | + |
| 51 | + |
47 | 52 | class TestYdbOverFq(TestYdsBase): |
48 | 53 | def make_binding(self, client: FederatedQueryClient, name: str, path: str, connection_id: str, columns: typing.List[typing.Tuple[str, str]]): |
49 | | - columns = [ydb_pb.Column(name=name, type=ydb_pb.Type(type_id=ydb_pb.Type.PrimitiveTypeId.Value(type))) for name, type in columns] |
| 54 | + columns = make_columns(columns) |
50 | 55 | client.create_object_storage_binding(name, path, "csv_with_names", connection_id, columns=columns) |
51 | 56 |
|
52 | 57 | def make_yq_driver(self, endpoint: str, folder_id: str, token: str) -> ydb.Driver: |
@@ -130,6 +135,32 @@ def test_list_directory_v2(self, kikimr, s3, client): |
130 | 135 | def test_list_directory_v1(self, kikimr, s3, client): |
131 | 136 | self.list_directory_test_body(kikimr, s3, client) |
132 | 137 |
|
| 138 | + @yq_all |
| 139 | + @pytest.mark.parametrize("client", [{"folder_id": "list_without_streams"}], indirect=True) |
| 140 | + def test_list_without_streams(self, kikimr, s3, client, yq_version): |
| 141 | + self.init_topics(f"topic_to_not_list_{yq_version}") |
| 142 | + |
| 143 | + connection_response = client.create_yds_connection("yds_conn", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")) |
| 144 | + |
| 145 | + logging.debug("connection_response: " + str(connection_response.result)) |
| 146 | + assert not connection_response.issues, str(connection_response.issues) |
| 147 | + |
| 148 | + binding_response = client.create_yds_binding(name="yds_bind", |
| 149 | + stream=self.input_topic, |
| 150 | + format="json_each_row", |
| 151 | + connection_id=connection_response.result.connection_id, |
| 152 | + columns=make_columns([("Data", "STRING")])) |
| 153 | + |
| 154 | + logging.debug("binding_response: " + str(binding_response.result)) |
| 155 | + assert not binding_response.issues, str(binding_response.issues) |
| 156 | + |
| 157 | + driver = self.make_yq_driver(kikimr.endpoint(), client.folder_id, "root@builtin") |
| 158 | + ls_res = driver.scheme_client.list_directory("/") |
| 159 | + assert ls_res.is_directory() |
| 160 | + # as long as ANALYTICS requests can't process streams, don't list them in ydb_over_fq |
| 161 | + # can't check len(children), because other tests' interference |
| 162 | + assert list(map(lambda ch: ch.name, ls_res.children)).count("yds_bind") == 0 |
| 163 | + |
133 | 164 | @yq_all |
134 | 165 | @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) |
135 | 166 | def test_execute_data_query(self, kikimr, s3, client, unique_prefix, yq_version): |
|
0 commit comments