Skip to content

Commit 4b4023f

Browse files
authored
Merge pull request #2565 from dhermes/bigquery-iterators-2
Move BigQuery list_ methods to use iterators
2 parents 4c4d056 + 76fa05c commit 4b4023f

File tree

10 files changed

+216
-111
lines changed

10 files changed

+216
-111
lines changed

bigquery/google/cloud/bigquery/_helpers.py

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -86,20 +86,34 @@ def _string_from_json(value, _):
8686
}
8787

8888

89+
def _row_from_json(row, schema):
90+
"""Convert JSON row data to row with appropriate types.
91+
92+
:type row: dict
93+
:param row: A JSON response row to be converted.
94+
95+
:type schema: tuple
96+
:param schema: A tuple of
97+
:class:`~google.cloud.bigquery.schema.SchemaField`.
98+
99+
:rtype: tuple
100+
:returns: A tuple of data converted to native types.
101+
"""
102+
row_data = []
103+
for field, cell in zip(schema, row['f']):
104+
converter = _CELLDATA_FROM_JSON[field.field_type]
105+
if field.mode == 'REPEATED':
106+
row_data.append([converter(item, field)
107+
for item in cell['v']])
108+
else:
109+
row_data.append(converter(cell['v'], field))
110+
111+
return tuple(row_data)
112+
113+
89114
def _rows_from_json(rows, schema):
90-
"""Convert JSON row data to rows w/ appropriate types."""
91-
rows_data = []
92-
for row in rows:
93-
row_data = []
94-
for field, cell in zip(schema, row['f']):
95-
converter = _CELLDATA_FROM_JSON[field.field_type]
96-
if field.mode == 'REPEATED':
97-
row_data.append([converter(item, field)
98-
for item in cell['v']])
99-
else:
100-
row_data.append(converter(cell['v'], field))
101-
rows_data.append(tuple(row_data))
102-
return rows_data
115+
"""Convert JSON row data to rows with appropriate types."""
116+
return [_row_from_json(row, schema) for row in rows]
103117

104118

105119
class _ConfigurationProperty(object):

bigquery/google/cloud/bigquery/dataset.py

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from google.cloud._helpers import _datetime_from_microseconds
1919
from google.cloud.exceptions import NotFound
2020
from google.cloud.bigquery.table import Table
21+
from google.cloud.iterator import HTTPIterator
2122

2223

2324
class AccessGrant(object):
@@ -542,35 +543,24 @@ def list_tables(self, max_results=None, page_token=None):
542543
https://cloud.google.com/bigquery/docs/reference/v2/tables/list
543544
544545
:type max_results: int
545-
:param max_results: maximum number of tables to return, If not
546-
passed, defaults to a value set by the API.
546+
:param max_results: (Optional) Maximum number of tables to return.
547+
If not passed, defaults to a value set by the API.
547548
548549
:type page_token: str
549-
:param page_token: opaque marker for the next "page" of datasets. If
550-
not passed, the API will return the first page of
551-
datasets.
552-
553-
:rtype: tuple, (list, str)
554-
:returns: list of :class:`google.cloud.bigquery.table.Table`, plus a
555-
"next page token" string: if not ``None``, indicates that
556-
more tables can be retrieved with another call (pass that
557-
value as ``page_token``).
558-
"""
559-
params = {}
560-
561-
if max_results is not None:
562-
params['maxResults'] = max_results
563-
564-
if page_token is not None:
565-
params['pageToken'] = page_token
550+
:param page_token: (Optional) Opaque marker for the next "page" of
551+
datasets. If not passed, the API will return the
552+
first page of datasets.
566553
554+
:rtype: :class:`~google.cloud.iterator.Iterator`
555+
:returns: Iterator of :class:`~google.cloud.bigquery.table.Table`
556+
contained within the current dataset.
557+
"""
567558
path = '/projects/%s/datasets/%s/tables' % (self.project, self.name)
568-
connection = self._client.connection
569-
resp = connection.api_request(method='GET', path=path,
570-
query_params=params)
571-
tables = [Table.from_api_repr(resource, self)
572-
for resource in resp.get('tables', ())]
573-
return tables, resp.get('nextPageToken')
559+
result = HTTPIterator(client=self._client, path=path,
560+
item_to_value=_item_to_table, items_key='tables',
561+
page_token=page_token, max_results=max_results)
562+
result.dataset = self
563+
return result
574564

575565
def table(self, name, schema=()):
576566
"""Construct a table bound to this dataset.
@@ -585,3 +575,18 @@ def table(self, name, schema=()):
585575
:returns: a new ``Table`` instance
586576
"""
587577
return Table(name, dataset=self, schema=schema)
578+
579+
580+
def _item_to_table(iterator, resource):
581+
"""Convert a JSON table to the native object.
582+
583+
:type iterator: :class:`~google.cloud.iterator.Iterator`
584+
:param iterator: The iterator that is currently in use.
585+
586+
:type resource: dict
587+
:param resource: An item to be converted to a table.
588+
589+
:rtype: :class:`~google.cloud.bigquery.table.Table`
590+
:returns: The next table in the page.
591+
"""
592+
return Table.from_api_repr(resource, iterator.dataset)

bigquery/google/cloud/bigquery/table.py

Lines changed: 67 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
from google.cloud.streaming.transfer import RESUMABLE_UPLOAD
3333
from google.cloud.streaming.transfer import Upload
3434
from google.cloud.bigquery.schema import SchemaField
35-
from google.cloud.bigquery._helpers import _rows_from_json
35+
from google.cloud.bigquery._helpers import _row_from_json
36+
from google.cloud.iterator import HTTPIterator
3637

3738

3839
_TABLE_HAS_NO_SCHEMA = "Table has no schema: call 'table.reload()'"
@@ -653,47 +654,36 @@ def fetch_data(self, max_results=None, page_token=None, client=None):
653654
up-to-date with the schema as defined on the back-end: if the
654655
two schemas are not identical, the values returned may be
655656
incomplete. To ensure that the local copy of the schema is
656-
up-to-date, call the table's ``reload`` method.
657+
up-to-date, call :meth:`reload`.
657658
658659
:type max_results: int
659-
:param max_results: (Optional) maximum number of rows to return.
660+
:param max_results: (Optional) Maximum number of rows to return.
660661
661662
:type page_token: str
662-
:param page_token:
663-
(Optional) token representing a cursor into the table's rows.
664-
665-
:type client: :class:`~google.cloud.bigquery.client.Client` or
666-
``NoneType``
667-
:param client: the client to use. If not passed, falls back to the
668-
``client`` stored on the current dataset.
669-
670-
:rtype: tuple
671-
:returns: ``(row_data, total_rows, page_token)``, where ``row_data``
672-
is a list of tuples, one per result row, containing only
673-
the values; ``total_rows`` is a count of the total number
674-
of rows in the table; and ``page_token`` is an opaque
675-
string which can be used to fetch the next batch of rows
676-
(``None`` if no further batches can be fetched).
663+
:param page_token: (Optional) Token representing a cursor into the
664+
table's rows.
665+
666+
:type client: :class:`~google.cloud.bigquery.client.Client`
667+
:param client: (Optional) The client to use. If not passed, falls
668+
back to the ``client`` stored on the current dataset.
669+
670+
:rtype: :class:`~google.cloud.iterator.Iterator`
671+
:returns: Iterator of row data :class:`tuple`s. During each page, the
672+
iterator will have the ``total_rows`` attribute set,
673+
which counts the total number of rows **in the table**
674+
(this is distinct from the total number of rows in the
675+
current page: ``iterator.page.num_items``).
677676
"""
678677
client = self._require_client(client)
679-
params = {}
680-
681-
if max_results is not None:
682-
params['maxResults'] = max_results
683-
684-
if page_token is not None:
685-
params['pageToken'] = page_token
686-
687-
response = client.connection.api_request(method='GET',
688-
path='%s/data' % self.path,
689-
query_params=params)
690-
total_rows = response.get('totalRows')
691-
if total_rows is not None:
692-
total_rows = int(total_rows)
693-
page_token = response.get('pageToken')
694-
rows_data = _rows_from_json(response.get('rows', ()), self._schema)
695-
696-
return rows_data, total_rows, page_token
678+
path = '%s/data' % (self.path,)
679+
iterator = HTTPIterator(client=client, path=path,
680+
item_to_value=_item_to_row, items_key='rows',
681+
page_token=page_token, max_results=max_results,
682+
page_start=_rows_page_start)
683+
iterator.schema = self._schema
684+
# Over-ride the key used to retrieve the next page token.
685+
iterator._NEXT_TOKEN = 'pageToken'
686+
return iterator
697687

698688
def insert_data(self,
699689
rows,
@@ -1083,6 +1073,47 @@ def _build_schema_resource(fields):
10831073
return infos
10841074

10851075

1076+
def _item_to_row(iterator, resource):
1077+
"""Convert a JSON row to the native object.
1078+
1079+
.. note::
1080+
1081+
This assumes that the ``schema`` attribute has been
1082+
added to the iterator after being created, which
1083+
should be done by the caller.
1084+
1085+
:type iterator: :class:`~google.cloud.iterator.Iterator`
1086+
:param iterator: The iterator that is currently in use.
1087+
1088+
:type resource: dict
1089+
:param resource: An item to be converted to a row.
1090+
1091+
:rtype: tuple
1092+
:returns: The next row in the page.
1093+
"""
1094+
return _row_from_json(resource, iterator.schema)
1095+
1096+
1097+
# pylint: disable=unused-argument
1098+
def _rows_page_start(iterator, page, response):
1099+
"""Grab total rows after a :class:`~google.cloud.iterator.Page` started.
1100+
1101+
:type iterator: :class:`~google.cloud.iterator.Iterator`
1102+
:param iterator: The iterator that is currently in use.
1103+
1104+
:type page: :class:`~google.cloud.iterator.Page`
1105+
:param page: The page that was just created.
1106+
1107+
:type response: dict
1108+
:param response: The JSON API response for a page of rows in a table.
1109+
"""
1110+
total_rows = response.get('totalRows')
1111+
if total_rows is not None:
1112+
total_rows = int(total_rows)
1113+
iterator.total_rows = total_rows
1114+
# pylint: enable=unused-argument
1115+
1116+
10861117
class _UploadConfig(object):
10871118
"""Faux message FBO apitools' 'configure_request'."""
10881119
accept = ['*/*']

bigquery/unit_tests/test_dataset.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -636,10 +636,18 @@ def test_delete_w_alternate_client(self):
636636
self.assertEqual(req['path'], '/%s' % PATH)
637637

638638
def test_list_tables_empty(self):
639+
import six
640+
639641
conn = _Connection({})
640642
client = _Client(project=self.PROJECT, connection=conn)
641643
dataset = self._makeOne(self.DS_NAME, client=client)
642-
tables, token = dataset.list_tables()
644+
645+
iterator = dataset.list_tables()
646+
self.assertIs(iterator.dataset, dataset)
647+
page = six.next(iterator.pages)
648+
tables = list(page)
649+
token = iterator.next_page_token
650+
643651
self.assertEqual(tables, [])
644652
self.assertIsNone(token)
645653
self.assertEqual(len(conn._requested), 1)
@@ -649,6 +657,7 @@ def test_list_tables_empty(self):
649657
self.assertEqual(req['path'], '/%s' % PATH)
650658

651659
def test_list_tables_defaults(self):
660+
import six
652661
from google.cloud.bigquery.table import Table
653662

654663
TABLE_1 = 'table_one'
@@ -677,7 +686,11 @@ def test_list_tables_defaults(self):
677686
client = _Client(project=self.PROJECT, connection=conn)
678687
dataset = self._makeOne(self.DS_NAME, client=client)
679688

680-
tables, token = dataset.list_tables()
689+
iterator = dataset.list_tables()
690+
self.assertIs(iterator.dataset, dataset)
691+
page = six.next(iterator.pages)
692+
tables = list(page)
693+
token = iterator.next_page_token
681694

682695
self.assertEqual(len(tables), len(DATA['tables']))
683696
for found, expected in zip(tables, DATA['tables']):
@@ -692,6 +705,7 @@ def test_list_tables_defaults(self):
692705
self.assertEqual(req['path'], '/%s' % PATH)
693706

694707
def test_list_tables_explicit(self):
708+
import six
695709
from google.cloud.bigquery.table import Table
696710

697711
TABLE_1 = 'table_one'
@@ -719,7 +733,11 @@ def test_list_tables_explicit(self):
719733
client = _Client(project=self.PROJECT, connection=conn)
720734
dataset = self._makeOne(self.DS_NAME, client=client)
721735

722-
tables, token = dataset.list_tables(max_results=3, page_token=TOKEN)
736+
iterator = dataset.list_tables(max_results=3, page_token=TOKEN)
737+
self.assertIs(iterator.dataset, dataset)
738+
page = six.next(iterator.pages)
739+
tables = list(page)
740+
token = iterator.next_page_token
723741

724742
self.assertEqual(len(tables), len(DATA['tables']))
725743
for found, expected in zip(tables, DATA['tables']):

0 commit comments

Comments
 (0)