Skip to content

Commit 00a33c4

Browse files
authored
Merge pull request googleapis#2617 from dhermes/pubsub-iterators-refactor
Refactor iterator to separate into HTTP/GAX iterators
2 parents fb05733 + 3c1be23 commit 00a33c4

File tree

21 files changed

+542
-274
lines changed

21 files changed

+542
-274
lines changed

bigquery/google/cloud/bigquery/client.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from google.cloud.bigquery.job import LoadTableFromStorageJob
2424
from google.cloud.bigquery.job import QueryJob
2525
from google.cloud.bigquery.query import QueryResults
26-
from google.cloud.iterator import Iterator
26+
from google.cloud.iterator import HTTPIterator
2727

2828

2929
class Project(object):
@@ -92,9 +92,10 @@ def list_projects(self, max_results=None, page_token=None):
9292
:returns: Iterator of :class:`~google.cloud.bigquery.client.Project`
9393
accessible to the current client.
9494
"""
95-
return Iterator(client=self, path='/projects',
96-
items_key='projects', item_to_value=_item_to_project,
97-
page_token=page_token, max_results=max_results)
95+
return HTTPIterator(
96+
client=self, path='/projects', item_to_value=_item_to_project,
97+
items_key='projects', page_token=page_token,
98+
max_results=max_results)
9899

99100
def list_datasets(self, include_all=False, max_results=None,
100101
page_token=None):
@@ -123,9 +124,9 @@ def list_datasets(self, include_all=False, max_results=None,
123124
if include_all:
124125
extra_params['all'] = True
125126
path = '/projects/%s/datasets' % (self.project,)
126-
return Iterator(
127-
client=self, path=path, items_key='datasets',
128-
item_to_value=_item_to_dataset, page_token=page_token,
127+
return HTTPIterator(
128+
client=self, path=path, item_to_value=_item_to_dataset,
129+
items_key='datasets', page_token=page_token,
129130
max_results=max_results, extra_params=extra_params)
130131

131132
def dataset(self, dataset_name):
@@ -204,9 +205,9 @@ def list_jobs(self, max_results=None, page_token=None, all_users=None,
204205
extra_params['stateFilter'] = state_filter
205206

206207
path = '/projects/%s/jobs' % (self.project,)
207-
return Iterator(
208-
client=self, path=path, items_key='jobs',
209-
item_to_value=_item_to_job, page_token=page_token,
208+
return HTTPIterator(
209+
client=self, path=path, item_to_value=_item_to_job,
210+
items_key='jobs', page_token=page_token,
210211
max_results=max_results, extra_params=extra_params)
211212

212213
def load_table_from_storage(self, job_name, destination, *source_uris):

core/google/cloud/_testing.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,15 +76,13 @@ def _make_grpc_failed_precondition(self):
7676

7777
class _GAXPageIterator(object):
7878

79-
def __init__(self, items, page_token):
80-
self._items = items
81-
self.page_token = page_token
79+
def __init__(self, *pages, **kwargs):
80+
self._pages = iter(pages)
81+
self.page_token = kwargs.get('page_token')
8282

8383
def next(self):
84-
if self._items is None:
85-
raise StopIteration
86-
items, self._items = self._items, None
87-
return items
84+
import six
85+
return six.next(self._pages)
8886

8987
__next__ = next
9088

core/google/cloud/iterator.py

Lines changed: 167 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -109,23 +109,19 @@ class Page(object):
109109
:type parent: :class:`Iterator`
110110
:param parent: The iterator that owns the current page.
111111
112-
:type response: dict
113-
:param response: The JSON API response for a page.
114-
115-
:type items_key: str
116-
:param items_key: The dictionary key used to retrieve items
117-
from the response.
112+
:type items: iterable
113+
:param items: An iterable (that also defines __len__) of items
114+
from a raw API response.
118115
119116
:type item_to_value: callable
120-
:param item_to_value: Callable to convert an item from JSON
121-
into the native object. Assumed signature
122-
takes an :class:`Iterator` and a dictionary
123-
holding a single item.
117+
:param item_to_value: Callable to convert an item from the type in the
118+
raw API response into the native object.
119+
Assumed signature takes an :class:`Iterator` and a
120+
raw API response with a single item.
124121
"""
125122

126-
def __init__(self, parent, response, items_key, item_to_value):
123+
def __init__(self, parent, items, item_to_value):
127124
self._parent = parent
128-
items = response.get(items_key, ())
129125
self._num_items = len(items)
130126
self._remaining = self._num_items
131127
self._item_iter = iter(items)
@@ -167,14 +163,107 @@ def next(self):
167163

168164

169165
class Iterator(object):
166+
"""A generic class for iterating through API list responses.
167+
168+
:type client: :class:`~google.cloud.client.Client`
169+
:param client: The client used to identify the application.
170+
171+
:type item_to_value: callable
172+
:param item_to_value: Callable to convert an item from the type in the
173+
raw API response into the native object.
174+
Assumed signature takes an :class:`Iterator` and a
175+
raw API response with a single item.
176+
177+
:type page_token: str
178+
:param page_token: (Optional) A token identifying a page in a result set.
179+
180+
:type max_results: int
181+
:param max_results: (Optional) The maximum number of results to fetch.
182+
"""
183+
184+
def __init__(self, client, item_to_value,
185+
page_token=None, max_results=None):
186+
self._started = False
187+
self.client = client
188+
self._item_to_value = item_to_value
189+
self.max_results = max_results
190+
# The attributes below will change over the life of the iterator.
191+
self.page_number = 0
192+
self.next_page_token = page_token
193+
self.num_results = 0
194+
195+
@property
196+
def pages(self):
197+
"""Iterator of pages in the response.
198+
199+
:rtype: :class:`~types.GeneratorType`
200+
:returns: A generator of :class:`Page` instances.
201+
:raises ValueError: If the iterator has already been started.
202+
"""
203+
if self._started:
204+
raise ValueError('Iterator has already started', self)
205+
self._started = True
206+
return self._page_iter(increment=True)
207+
208+
def _items_iter(self):
209+
"""Iterator for each item returned."""
210+
for page in self._page_iter(increment=False):
211+
for item in page:
212+
self.num_results += 1
213+
yield item
214+
215+
def __iter__(self):
216+
"""Iterator for each item returned.
217+
218+
:rtype: :class:`~types.GeneratorType`
219+
:returns: A generator of items from the API.
220+
:raises ValueError: If the iterator has already been started.
221+
"""
222+
if self._started:
223+
raise ValueError('Iterator has already started', self)
224+
self._started = True
225+
return self._items_iter()
226+
227+
def _page_iter(self, increment):
228+
"""Generator of pages of API responses.
229+
230+
:type increment: bool
231+
:param increment: Flag indicating if the total number of results
232+
should be incremented on each page. This is useful
233+
since a page iterator will want to increment by
234+
results per page while an items iterator will want
235+
to increment per item.
236+
237+
Yields :class:`Page` instances.
238+
"""
239+
page = self._next_page()
240+
while page is not None:
241+
self.page_number += 1
242+
if increment:
243+
self.num_results += page.num_items
244+
yield page
245+
page = self._next_page()
246+
247+
@staticmethod
248+
def _next_page():
249+
"""Get the next page in the iterator.
250+
251+
This does nothing and is intended to be over-ridden by subclasses
252+
to return the next :class:`Page`.
253+
254+
:raises NotImplementedError: Always.
255+
"""
256+
raise NotImplementedError
257+
258+
259+
class HTTPIterator(Iterator):
170260
"""A generic class for iterating through Cloud JSON APIs list responses.
171261
172262
:type client: :class:`~google.cloud.client.Client`
173-
:param client: The client, which owns a connection to make requests.
263+
:param client: The client used to identify the application.
174264
175265
:type path: str
176-
:param path: The path to query for the list of items. Defaults
177-
to :attr:`PATH` on the current iterator class.
266+
:param path: The path to query for the list of items.
178267
179268
:type item_to_value: callable
180269
:param item_to_value: Callable to convert an item from JSON
@@ -203,13 +292,7 @@ class Iterator(object):
203292
the :class:`Page` that was started and the dictionary
204293
containing the page response.
205294
206-
:type page_iter: callable
207-
:param page_iter: (Optional) Callable to produce a pages iterator from the
208-
current iterator. Assumed signature takes the
209-
:class:`Iterator` that started the page. By default uses
210-
the HTTP pages iterator. Meant to provide a custom
211-
way to create pages (potentially with a custom
212-
transport such as gRPC).
295+
.. autoattribute:: pages
213296
"""
214297

215298
_PAGE_TOKEN = 'pageToken'
@@ -219,28 +302,18 @@ class Iterator(object):
219302
def __init__(self, client, path, item_to_value,
220303
items_key=DEFAULT_ITEMS_KEY,
221304
page_token=None, max_results=None, extra_params=None,
222-
page_start=_do_nothing_page_start, page_iter=None):
223-
self._started = False
224-
self.client = client
305+
page_start=_do_nothing_page_start):
306+
super(HTTPIterator, self).__init__(
307+
client, item_to_value, page_token=page_token,
308+
max_results=max_results)
225309
self.path = path
226-
self._item_to_value = item_to_value
227310
self._items_key = items_key
228-
self.max_results = max_results
229311
self.extra_params = extra_params
230312
self._page_start = page_start
231-
self._page_iter = None
232313
# Verify inputs / provide defaults.
233314
if self.extra_params is None:
234315
self.extra_params = {}
235-
if page_iter is None:
236-
self._page_iter = self._default_page_iter()
237-
else:
238-
self._page_iter = page_iter(self)
239316
self._verify_params()
240-
# The attributes below will change over the life of the iterator.
241-
self.page_number = 0
242-
self.next_page_token = page_token
243-
self.num_results = 0
244317

245318
def _verify_params(self):
246319
"""Verifies the parameters don't use any reserved parameter.
@@ -253,53 +326,22 @@ def _verify_params(self):
253326
raise ValueError('Using a reserved parameter',
254327
reserved_in_use)
255328

256-
def _default_page_iter(self):
257-
"""Generator of pages of API responses.
329+
def _next_page(self):
330+
"""Get the next page in the iterator.
258331
259-
Yields :class:`Page` instances.
332+
:rtype: :class:`Page`
333+
:returns: The next page in the iterator (or :data:`None` if
334+
there are no pages left).
260335
"""
261-
while self._has_next_page():
336+
if self._has_next_page():
262337
response = self._get_next_page_response()
263-
page = Page(self, response, self._items_key,
264-
self._item_to_value)
338+
items = response.get(self._items_key, ())
339+
page = Page(self, items, self._item_to_value)
265340
self._page_start(self, page, response)
266-
self.num_results += page.num_items
267-
yield page
268-
269-
@property
270-
def pages(self):
271-
"""Iterator of pages in the response.
272-
273-
:rtype: :class:`~types.GeneratorType`
274-
:returns: A generator of :class:`Page` instances.
275-
:raises ValueError: If the iterator has already been started.
276-
"""
277-
if self._started:
278-
raise ValueError('Iterator has already started', self)
279-
self._started = True
280-
return self._page_iter
281-
282-
def _items_iter(self):
283-
"""Iterator for each item returned."""
284-
for page in self._page_iter:
285-
# Decrement the total results since the pages iterator adds
286-
# to it when each page is encountered.
287-
self.num_results -= page.num_items
288-
for item in page:
289-
self.num_results += 1
290-
yield item
291-
292-
def __iter__(self):
293-
"""Iterator for each item returned.
294-
295-
:rtype: :class:`~types.GeneratorType`
296-
:returns: A generator of items from the API.
297-
:raises ValueError: If the iterator has already been started.
298-
"""
299-
if self._started:
300-
raise ValueError('Iterator has already started', self)
301-
self._started = True
302-
return self._items_iter()
341+
self.next_page_token = response.get('nextPageToken')
342+
return page
343+
else:
344+
return None
303345

304346
def _has_next_page(self):
305347
"""Determines whether or not there are more pages with results.
@@ -336,11 +378,53 @@ def _get_next_page_response(self):
336378
:rtype: dict
337379
:returns: The parsed JSON response of the next page's contents.
338380
"""
339-
response = self.client.connection.api_request(
381+
return self.client.connection.api_request(
340382
method='GET', path=self.path,
341383
query_params=self._get_query_params())
342384

343-
self.page_number += 1
344-
self.next_page_token = response.get('nextPageToken')
345385

346-
return response
386+
class GAXIterator(Iterator):
387+
"""A generic class for iterating through Cloud gRPC APIs list responses.
388+
389+
:type client: :class:`~google.cloud.client.Client`
390+
:param client: The client used to identify the application.
391+
392+
:type page_iter: :class:`~google.gax.PageIterator`
393+
:param page_iter: A GAX page iterator to be wrapped and conform to the
394+
:class:`~google.cloud.iterator.Iterator` surface.
395+
396+
:type item_to_value: callable
397+
:param item_to_value: Callable to convert an item from a protobuf
398+
into the native object. Assumed signature
399+
takes an :class:`Iterator` and a single item
400+
from the API response as a protobuf.
401+
402+
:type max_results: int
403+
:param max_results: (Optional) The maximum number of results to fetch.
404+
405+
.. autoattribute:: pages
406+
"""
407+
408+
def __init__(self, client, page_iter, item_to_value, max_results=None):
409+
super(GAXIterator, self).__init__(
410+
client, item_to_value, page_token=page_iter.page_token,
411+
max_results=max_results)
412+
self._gax_page_iter = page_iter
413+
414+
def _next_page(self):
415+
"""Get the next page in the iterator.
416+
417+
Wraps the response from the :class:`~google.gax.PageIterator` in a
418+
:class:`Page` instance and captures some state at each page.
419+
420+
:rtype: :class:`Page`
421+
:returns: The next page in the iterator (or :data:`None` if
422+
there are no pages left).
423+
"""
424+
try:
425+
items = six.next(self._gax_page_iter)
426+
page = Page(self, items, self._item_to_value)
427+
self.next_page_token = self._gax_page_iter.page_token or None
428+
return page
429+
except StopIteration:
430+
return None

0 commit comments

Comments
 (0)