Skip to content

Commit a040614

Browse files
committed
Add manual client for BigQuery Storage (googleapis#474)
* Add manual client for BigQuery Storage. The manual client adds a default project, which is used by the wrapped create_read_session to make the project parameter optional. A future purpose of the manual client is to override the read_rows method to return an iterator that can reconnect with the correct offset, much like Spanner's StreamedResultSet class. This work is not yet complete. I'd like feedback on the approach for the manual client first. * Use same client name as wrapped gapic class. * Use subclass for BigQueryStorageClient Also, change create_read_session project argument back to 'parent' to better match the super class. * Add unit tests. * Remove default project from manual client.
1 parent c7ef504 commit a040614

File tree

7 files changed

+257
-25
lines changed

7 files changed

+257
-25
lines changed

bigquery_storage/google/cloud/bigquery_storage_v1beta1/__init__.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,27 @@
1616

1717
from __future__ import absolute_import
1818

19+
import pkg_resources
20+
__version__ = pkg_resources.get_distribution('google-cloud-bigquery-storage').version # noqa
21+
1922
from google.cloud.bigquery_storage_v1beta1 import types
20-
from google.cloud.bigquery_storage_v1beta1.gapic import big_query_storage_client # noqa
23+
from google.cloud.bigquery_storage_v1beta1 import client
2124
from google.cloud.bigquery_storage_v1beta1.gapic import enums
2225

2326

24-
class BigQueryStorageClient(big_query_storage_client.BigQueryStorageClient):
25-
__doc__ = big_query_storage_client.BigQueryStorageClient.__doc__
27+
class BigQueryStorageClient(client.BigQueryStorageClient):
28+
__doc__ = client.BigQueryStorageClient.__doc__
2629
enums = enums
2730

2831

2932
__all__ = (
30-
'enums',
33+
# google.cloud.bigquery_storage_v1beta1
34+
'__version__',
3135
'types',
36+
37+
# google.cloud.bigquery_storage_v1beta1.client
3238
'BigQueryStorageClient',
39+
40+
# google.cloud.bigquery_storage_v1beta1.gapic
41+
'enums',
3342
)
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright 2018 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# https://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
"""Parent client for calling the Cloud BigQuery Storage API.
18+
19+
This is the base from which all interactions with the API occur.
20+
"""
21+
22+
from __future__ import absolute_import
23+
24+
import google.api_core.gapic_v1.method
25+
26+
from google.cloud.bigquery_storage_v1beta1.gapic import big_query_storage_client # noqa
27+
28+
29+
_SCOPES = (
30+
'https://www.googleapis.com/auth/bigquery',
31+
'https://www.googleapis.com/auth/cloud-platform',
32+
)
33+
34+
35+
class BigQueryStorageClient(big_query_storage_client.BigQueryStorageClient):
36+
"""Client for interacting with BigQuery Storage API.
37+
38+
The BigQuery storage API can be used to read data stored in BigQuery.
39+
"""
40+
41+
def read_rows(self,
42+
read_position,
43+
retry=google.api_core.gapic_v1.method.DEFAULT,
44+
timeout=google.api_core.gapic_v1.method.DEFAULT,
45+
metadata=None):
46+
"""
47+
Reads rows from the table in the format prescribed by the read
48+
session. Each response contains one or more table rows, up to a
49+
maximum of 10 MiB per response; read requests which attempt to read
50+
individual rows larger than this will fail.
51+
52+
Each request also returns a set of stream statistics reflecting the
53+
estimated total number of rows in the read stream. This number is
54+
computed based on the total table size and the number of active
55+
streams in the read session, and may change as other streams continue
56+
to read data.
57+
58+
Example:
59+
>>> from google.cloud import bigquery_storage_v1beta1
60+
>>>
61+
>>> client = bigquery_storage_v1beta1.BigQueryStorageClient()
62+
>>>
63+
>>> # TODO: Initialize ``read_position``:
64+
>>> read_position = {}
65+
>>>
66+
>>> for element in client.read_rows(read_position):
67+
... # process element
68+
... pass
69+
70+
Args:
71+
read_position (Union[ \
72+
dict, \
73+
~google.cloud.bigquery_storage_v1beta1.types.StreamPosition \
74+
]):
75+
Required. Identifier of the position in the stream to start
76+
reading from. The offset requested must be less than the last
77+
row read from ReadRows. Requesting a larger offset is
78+
undefined. If a dict is provided, it must be of the same form
79+
as the protobuf message
80+
:class:`~google.cloud.bigquery_storage_v1beta1.types.StreamPosition`
81+
retry (Optional[google.api_core.retry.Retry]): A retry object used
82+
to retry requests. If ``None`` is specified, requests will not
83+
be retried.
84+
timeout (Optional[float]): The amount of time, in seconds, to wait
85+
for the request to complete. Note that if ``retry`` is
86+
specified, the timeout applies to each individual attempt.
87+
metadata (Optional[Sequence[Tuple[str, str]]]): Additional metadata
88+
that is provided to the method.
89+
90+
Returns:
91+
Iterable[~google.cloud.bigquery_storage_v1beta1.types.ReadRowsResponse].
92+
93+
Raises:
94+
google.api_core.exceptions.GoogleAPICallError: If the request
95+
failed for any reason.
96+
google.api_core.exceptions.RetryError: If the request failed due
97+
to a retryable error and retry attempts failed.
98+
ValueError: If the parameters are invalid.
99+
"""
100+
# TODO: Accept a stream and start at offset 0.
101+
# TODO: Return a custom iterator which reconnects to the stream
102+
# automatically. See: Spanner's StreamedResultSet.
103+
return super(BigQueryStorageClient, self).read_rows(
104+
read_position,
105+
retry=retry,
106+
timeout=timeout,
107+
metadata=metadata,
108+
)

bigquery_storage/noxfile.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def default(session):
4848
'--cov-append',
4949
'--cov-config=.coveragerc',
5050
'--cov-report=',
51-
os.path.join('tests', 'unit', 'gapic', 'v1beta1'),
51+
os.path.join('tests', 'unit'),
5252
*session.posargs
5353
)
5454

@@ -91,6 +91,7 @@ def cover(session):
9191
session.run('coverage', 'report', '--show-missing', '--fail-under=100')
9292
session.run('coverage', 'erase')
9393

94+
9495
@nox.session(python=['2.7', '3.6'])
9596
def system(session):
9697
"""Run the system test suite."""

bigquery_storage/synth.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,15 @@
3131

3232
s.move(
3333
library,
34-
excludes=['docs/conf.py', 'docs/index.rst', 'README.rst',
35-
'nox*.py', 'setup.py', 'setup.cfg']
34+
excludes=[
35+
'docs/conf.py',
36+
'docs/index.rst',
37+
'google/cloud/bigquery_storage_v1beta1/__init__.py',
38+
'README.rst',
39+
'nox*.py',
40+
'setup.py',
41+
'setup.cfg',
42+
],
3643
)
3744

3845
s.replace(
@@ -55,13 +62,19 @@
5562
'\g<0>import google.api_core.path_template\n'
5663
)
5764

58-
# START: Ignore lint and coverage
5965
s.replace(
60-
['google/cloud/bigquery_storage_v1beta1/__init__.py'],
61-
'import big_query_storage_client',
62-
'import big_query_storage_client # noqa',
66+
['tests/unit/gapic/v1beta1/test_big_query_storage_client_v1beta1.py'],
67+
'from google.cloud import bigquery_storage_v1beta1',
68+
'from google.cloud.bigquery_storage_v1beta1.gapic import big_query_storage_client # noqa',
6369
)
6470

71+
s.replace(
72+
['tests/unit/gapic/v1beta1/test_big_query_storage_client_v1beta1.py'],
73+
'bigquery_storage_v1beta1.BigQueryStorageClient',
74+
'big_query_storage_client.BigQueryStorageClient',
75+
)
76+
77+
# START: Ignore lint and coverage
6578
s.replace(
6679
['google/cloud/bigquery_storage_v1beta1/gapic/big_query_storage_client.py'],
6780
'if transport:',

bigquery_storage/tests/system/test_system.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,21 @@ def client():
3232
return bigquery_storage_v1beta1.BigQueryStorageClient()
3333

3434

35-
def test_read_rows(project_id, client):
35+
@pytest.fixture()
36+
def table_reference():
3637
table_ref = bigquery_storage_v1beta1.types.TableReference()
3738
table_ref.project_id = 'bigquery-public-data'
3839
table_ref.dataset_id = 'usa_names'
3940
table_ref.table_id = 'usa_1910_2013'
41+
return table_ref
42+
4043

44+
def test_read_rows(client, table_reference):
4145
session = client.create_read_session(
42-
table_ref,
46+
table_reference,
4347
requested_streams=1,
44-
parent='projects/{}'.format(project_id))
48+
parent='projects/{}'.format(project_id),
49+
)
4550

4651
stream_pos = bigquery_storage_v1beta1.types.StreamPosition(
4752
stream=session.streams[0])
@@ -50,3 +55,17 @@ def test_read_rows(project_id, client):
5055

5156
assert page.status.estimated_row_count > 0
5257
assert len(page.avro_rows.serialized_binary_rows) > 0
58+
59+
60+
def test_split_read_stream(client, table_reference):
61+
session = client.create_read_session(
62+
table_reference,
63+
requested_streams=1,
64+
parent='projects/{}'.format(project_id),
65+
)
66+
67+
split = client.split_read_stream(session.streams[0])
68+
69+
assert split.primary_stream is not None
70+
assert split.remainder_stream is not None
71+
assert split.primary_stream != split.remainder_stream

bigquery_storage/tests/unit/gapic/v1beta1/test_big_query_storage_client_v1beta1.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import pytest
1919

20-
from google.cloud import bigquery_storage_v1beta1
20+
from google.cloud.bigquery_storage_v1beta1.gapic import big_query_storage_client # noqa
2121
from google.cloud.bigquery_storage_v1beta1.proto import storage_pb2
2222
from google.cloud.bigquery_storage_v1beta1.proto import table_reference_pb2
2323
from google.protobuf import empty_pb2
@@ -77,7 +77,7 @@ def test_create_read_session(self):
7777

7878
# Mock the API response
7979
channel = ChannelStub(responses=[expected_response])
80-
client = bigquery_storage_v1beta1.BigQueryStorageClient(
80+
client = big_query_storage_client.BigQueryStorageClient(
8181
channel=channel)
8282

8383
# Setup Request
@@ -98,7 +98,7 @@ def test_create_read_session(self):
9898
def test_create_read_session_exception(self):
9999
# Mock the API response
100100
channel = ChannelStub(responses=[CustomException()])
101-
client = bigquery_storage_v1beta1.BigQueryStorageClient(
101+
client = big_query_storage_client.BigQueryStorageClient(
102102
channel=channel)
103103

104104
# Setup request
@@ -115,7 +115,7 @@ def test_read_rows(self):
115115

116116
# Mock the API response
117117
channel = ChannelStub(responses=[iter([expected_response])])
118-
client = bigquery_storage_v1beta1.BigQueryStorageClient(
118+
client = big_query_storage_client.BigQueryStorageClient(
119119
channel=channel)
120120

121121
# Setup Request
@@ -135,7 +135,7 @@ def test_read_rows(self):
135135
def test_read_rows_exception(self):
136136
# Mock the API response
137137
channel = ChannelStub(responses=[CustomException()])
138-
client = bigquery_storage_v1beta1.BigQueryStorageClient(
138+
client = big_query_storage_client.BigQueryStorageClient(
139139
channel=channel)
140140

141141
# Setup request
@@ -152,7 +152,7 @@ def test_batch_create_read_session_streams(self):
152152

153153
# Mock the API response
154154
channel = ChannelStub(responses=[expected_response])
155-
client = bigquery_storage_v1beta1.BigQueryStorageClient(
155+
client = big_query_storage_client.BigQueryStorageClient(
156156
channel=channel)
157157

158158
# Setup Request
@@ -172,7 +172,7 @@ def test_batch_create_read_session_streams(self):
172172
def test_batch_create_read_session_streams_exception(self):
173173
# Mock the API response
174174
channel = ChannelStub(responses=[CustomException()])
175-
client = bigquery_storage_v1beta1.BigQueryStorageClient(
175+
client = big_query_storage_client.BigQueryStorageClient(
176176
channel=channel)
177177

178178
# Setup request
@@ -185,7 +185,7 @@ def test_batch_create_read_session_streams_exception(self):
185185

186186
def test_finalize_stream(self):
187187
channel = ChannelStub()
188-
client = bigquery_storage_v1beta1.BigQueryStorageClient(
188+
client = big_query_storage_client.BigQueryStorageClient(
189189
channel=channel)
190190

191191
# Setup Request
@@ -201,7 +201,7 @@ def test_finalize_stream(self):
201201
def test_finalize_stream_exception(self):
202202
# Mock the API response
203203
channel = ChannelStub(responses=[CustomException()])
204-
client = bigquery_storage_v1beta1.BigQueryStorageClient(
204+
client = big_query_storage_client.BigQueryStorageClient(
205205
channel=channel)
206206

207207
# Setup request
@@ -218,7 +218,7 @@ def test_split_read_stream(self):
218218

219219
# Mock the API response
220220
channel = ChannelStub(responses=[expected_response])
221-
client = bigquery_storage_v1beta1.BigQueryStorageClient(
221+
client = big_query_storage_client.BigQueryStorageClient(
222222
channel=channel)
223223

224224
# Setup Request
@@ -236,7 +236,7 @@ def test_split_read_stream(self):
236236
def test_split_read_stream_exception(self):
237237
# Mock the API response
238238
channel = ChannelStub(responses=[CustomException()])
239-
client = bigquery_storage_v1beta1.BigQueryStorageClient(
239+
client = big_query_storage_client.BigQueryStorageClient(
240240
channel=channel)
241241

242242
# Setup request

0 commit comments

Comments
 (0)