Skip to content

Commit 53c189a

Browse files
authored
Merge pull request #2633 from dhermes/pubsub-iterators-3
Converting Pub/Sub client->list_subscriptions to iterator.
2 parents 1cbc793 + e1fbb6b commit 53c189a

File tree

7 files changed

+400
-199
lines changed

7 files changed

+400
-199
lines changed

docs/pubsub_snippets.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,8 @@ def do_something_with(sub): # pylint: disable=unused-argument
6060
pass
6161

6262
# [START client_list_subscriptions]
63-
subscriptions, token = client.list_subscriptions() # API request
64-
while True:
65-
for subscription in subscriptions:
66-
do_something_with(subscription)
67-
if token is None:
68-
break
69-
subscriptions, token = client.list_subscriptions(
70-
page_token=token) # API request
63+
for subscription in client.list_subscriptions(): # API request(s)
64+
do_something_with(subscription)
7165
# [END client_list_subscriptions]
7266

7367

pubsub/google/cloud/pubsub/_gax.py

Lines changed: 55 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,15 @@
1414

1515
"""GAX wrapper for Pubsub API requests."""
1616

17+
import functools
18+
1719
from google.cloud.gapic.pubsub.v1.publisher_api import PublisherApi
1820
from google.cloud.gapic.pubsub.v1.subscriber_api import SubscriberApi
1921
from google.gax import CallOptions
2022
from google.gax import INITIAL_PAGE
2123
from google.gax.errors import GaxError
2224
from google.gax.grpc import exc_to_code
25+
from google.protobuf.json_format import MessageToDict
2326
from google.pubsub.v1.pubsub_pb2 import PubsubMessage
2427
from google.pubsub.v1.pubsub_pb2 import PushConfig
2528
from grpc import insecure_channel
@@ -77,12 +80,7 @@ def list_topics(self, project, page_size=0, page_token=None):
7780
path = 'projects/%s' % (project,)
7881
page_iter = self._gax_api.list_topics(
7982
path, page_size=page_size, options=options)
80-
81-
iter_kwargs = {}
82-
if page_size: # page_size can be 0 or explicit None.
83-
iter_kwargs['max_results'] = page_size
84-
return GAXIterator(self._client, page_iter, _item_to_topic,
85-
**iter_kwargs)
83+
return GAXIterator(self._client, page_iter, _item_to_topic)
8684

8785
def topic_create(self, topic_path):
8886
"""API call: create a topic
@@ -214,11 +212,8 @@ def topic_list_subscriptions(self, topic, page_size=0, page_token=None):
214212
raise NotFound(topic_path)
215213
raise
216214

217-
iter_kwargs = {}
218-
if page_size: # page_size can be 0 or explicit None.
219-
iter_kwargs['max_results'] = page_size
220215
iterator = GAXIterator(self._client, page_iter,
221-
_item_to_subscription, **iter_kwargs)
216+
_item_to_subscription_for_topic)
222217
iterator.topic = topic
223218
return iterator
224219

@@ -228,9 +223,13 @@ class _SubscriberAPI(object):
228223
229224
:type gax_api: :class:`google.pubsub.v1.publisher_api.SubscriberApi`
230225
:param gax_api: API object used to make GAX requests.
226+
227+
:type client: :class:`~google.cloud.pubsub.client.Client`
228+
:param client: The client that owns this API object.
231229
"""
232-
def __init__(self, gax_api):
230+
def __init__(self, gax_api, client):
233231
self._gax_api = gax_api
232+
self._client = client
234233

235234
def list_subscriptions(self, project, page_size=0, page_token=None):
236235
"""List subscriptions for the project associated with this API.
@@ -250,22 +249,25 @@ def list_subscriptions(self, project, page_size=0, page_token=None):
250249
If not passed, the API will return the first page
251250
of subscriptions.
252251
253-
:rtype: tuple, (list, str)
254-
:returns: list of ``Subscription`` resource dicts, plus a
255-
"next page token" string: if not None, indicates that
256-
more topics can be retrieved with another call (pass that
257-
value as ``page_token``).
252+
:rtype: :class:`~google.cloud.iterator.Iterator`
253+
:returns: Iterator of
254+
:class:`~google.cloud.pubsub.subscription.Subscription`
255+
accessible to the current API.
258256
"""
259257
if page_token is None:
260258
page_token = INITIAL_PAGE
261259
options = CallOptions(page_token=page_token)
262260
path = 'projects/%s' % (project,)
263261
page_iter = self._gax_api.list_subscriptions(
264262
path, page_size=page_size, options=options)
265-
subscriptions = [_subscription_pb_to_mapping(sub_pb)
266-
for sub_pb in page_iter.next()]
267-
token = page_iter.page_token or None
268-
return subscriptions, token
263+
264+
# We attach a mutable topics dictionary so that as topic
265+
# objects are created by Subscription.from_api_repr, they
266+
# can be re-used by other subscriptions from the same topic.
267+
topics = {}
268+
item_to_value = functools.partial(
269+
_item_to_sub_for_client, topics=topics)
270+
return GAXIterator(self._client, page_iter, item_to_value)
269271

270272
def subscription_create(self, subscription_path, topic_path,
271273
ack_deadline=None, push_endpoint=None):
@@ -313,7 +315,7 @@ def subscription_create(self, subscription_path, topic_path,
313315
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
314316
raise Conflict(topic_path)
315317
raise
316-
return _subscription_pb_to_mapping(sub_pb)
318+
return MessageToDict(sub_pb)
317319

318320
def subscription_get(self, subscription_path):
319321
"""API call: retrieve a subscription
@@ -335,7 +337,7 @@ def subscription_get(self, subscription_path):
335337
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
336338
raise NotFound(subscription_path)
337339
raise
338-
return _subscription_pb_to_mapping(sub_pb)
340+
return MessageToDict(sub_pb)
339341

340342
def subscription_delete(self, subscription_path):
341343
"""API call: delete a subscription
@@ -474,24 +476,6 @@ def _message_pb_from_mapping(message):
474476
attributes=message['attributes'])
475477

476478

477-
def _subscription_pb_to_mapping(sub_pb):
478-
"""Helper for :meth:`list_subscriptions`, et aliae
479-
480-
Performs "impedance matching" between the protobuf attrs and the keys
481-
expected in the JSON API.
482-
"""
483-
mapping = {
484-
'name': sub_pb.name,
485-
'topic': sub_pb.topic,
486-
'ackDeadlineSeconds': sub_pb.ack_deadline_seconds,
487-
}
488-
if sub_pb.push_config.push_endpoint != '':
489-
mapping['pushConfig'] = {
490-
'pushEndpoint': sub_pb.push_config.push_endpoint,
491-
}
492-
return mapping
493-
494-
495479
def _message_pb_to_mapping(message_pb):
496480
"""Helper for :meth:`pull`, et aliae
497481
@@ -576,7 +560,7 @@ def _item_to_topic(iterator, resource):
576560
{'name': resource.name}, iterator.client)
577561

578562

579-
def _item_to_subscription(iterator, subscription_path):
563+
def _item_to_subscription_for_topic(iterator, subscription_path):
580564
"""Convert a subscription name to the native object.
581565
582566
:type iterator: :class:`~google.cloud.iterator.Iterator`
@@ -591,3 +575,33 @@ def _item_to_subscription(iterator, subscription_path):
591575
subscription_name = subscription_name_from_path(
592576
subscription_path, iterator.client.project)
593577
return Subscription(subscription_name, iterator.topic)
578+
579+
580+
def _item_to_sub_for_client(iterator, sub_pb, topics):
581+
"""Convert a subscription protobuf to the native object.
582+
583+
.. note::
584+
585+
This method does not have the correct signature to be used as
586+
the ``item_to_value`` argument to
587+
:class:`~google.cloud.iterator.Iterator`. It is intended to be
588+
patched with a mutable topics argument that can be updated
589+
on subsequent calls. For an example, see how the method is
590+
used above in :meth:`_SubscriberAPI.list_subscriptions`.
591+
592+
:type iterator: :class:`~google.cloud.iterator.Iterator`
593+
:param iterator: The iterator that is currently in use.
594+
595+
:type sub_pb: :class:`~google.pubsub.v1.pubsub_pb2.Subscription`
596+
:param sub_pb: A subscription returned from the API.
597+
598+
:type topics: dict
599+
:param topics: A dictionary of topics to be used (and modified)
600+
as new subscriptions are created bound to topics.
601+
602+
:rtype: :class:`~google.cloud.pubsub.subscription.Subscription`
603+
:returns: The next subscription in the page.
604+
"""
605+
resource = MessageToDict(sub_pb)
606+
return Subscription.from_api_repr(
607+
resource, iterator.client, topics=topics)

pubsub/google/cloud/pubsub/client.py

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
from google.cloud.pubsub.connection import _PublisherAPI as JSONPublisherAPI
2323
from google.cloud.pubsub.connection import _SubscriberAPI as JSONSubscriberAPI
2424
from google.cloud.pubsub.connection import _IAMPolicyAPI
25-
from google.cloud.pubsub.subscription import Subscription
2625
from google.cloud.pubsub.topic import Topic
2726

2827
try:
@@ -98,9 +97,9 @@ def subscriber_api(self):
9897
if self._subscriber_api is None:
9998
if self._use_gax:
10099
generated = make_gax_subscriber_api(self.connection)
101-
self._subscriber_api = GAXSubscriberAPI(generated)
100+
self._subscriber_api = GAXSubscriberAPI(generated, self)
102101
else:
103-
self._subscriber_api = JSONSubscriberAPI(self.connection)
102+
self._subscriber_api = JSONSubscriberAPI(self)
104103
return self._subscriber_api
105104

106105
@property
@@ -160,20 +159,14 @@ def list_subscriptions(self, page_size=None, page_token=None):
160159
passed, the API will return the first page of
161160
topics.
162161
163-
:rtype: tuple, (list, str)
164-
:returns: list of :class:`~.pubsub.subscription.Subscription`,
165-
plus a "next page token" string: if not None, indicates that
166-
more topics can be retrieved with another call (pass that
167-
value as ``page_token``).
162+
:rtype: :class:`~google.cloud.iterator.Iterator`
163+
:returns: Iterator of
164+
:class:`~google.cloud.pubsub.subscription.Subscription`
165+
accessible to the current client.
168166
"""
169167
api = self.subscriber_api
170-
resources, next_token = api.list_subscriptions(
168+
return api.list_subscriptions(
171169
self.project, page_size, page_token)
172-
topics = {}
173-
subscriptions = [Subscription.from_api_repr(resource, self,
174-
topics=topics)
175-
for resource in resources]
176-
return subscriptions, next_token
177170

178171
def topic(self, name, timestamp_messages=False):
179172
"""Creates a topic bound to the current client.

pubsub/google/cloud/pubsub/connection.py

Lines changed: 55 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"""Create / interact with Google Cloud Pub/Sub connections."""
1616

1717
import base64
18+
import functools
1819
import os
1920

2021
from google.cloud import connection as base_connection
@@ -238,7 +239,8 @@ def topic_list_subscriptions(self, topic, page_size=None, page_token=None):
238239

239240
iterator = HTTPIterator(
240241
client=self._client, path=path,
241-
item_to_value=_item_to_subscription, items_key='subscriptions',
242+
item_to_value=_item_to_subscription_for_topic,
243+
items_key='subscriptions',
242244
page_token=page_token, extra_params=extra_params)
243245
iterator.topic = topic
244246
return iterator
@@ -247,12 +249,13 @@ def topic_list_subscriptions(self, topic, page_size=None, page_token=None):
247249
class _SubscriberAPI(object):
248250
"""Helper mapping subscriber-related APIs.
249251
250-
:type connection: :class:`Connection`
251-
:param connection: the connection used to make API requests.
252+
:type client: :class:`~google.cloud.pubsub.client.Client`
253+
:param client: the client used to make API requests.
252254
"""
253255

254-
def __init__(self, connection):
255-
self._connection = connection
256+
def __init__(self, client):
257+
self._client = client
258+
self._connection = client.connection
256259

257260
def list_subscriptions(self, project, page_size=None, page_token=None):
258261
"""API call: list subscriptions for a given project
@@ -272,24 +275,26 @@ def list_subscriptions(self, project, page_size=None, page_token=None):
272275
If not passed, the API will return the first page
273276
of subscriptions.
274277
275-
:rtype: tuple, (list, str)
276-
:returns: list of ``Subscription`` resource dicts, plus a
277-
"next page token" string: if not None, indicates that
278-
more subscriptions can be retrieved with another call (pass
279-
that value as ``page_token``).
278+
:rtype: :class:`~google.cloud.iterator.Iterator`
279+
:returns: Iterator of
280+
:class:`~google.cloud.pubsub.subscription.Subscription`
281+
accessible to the current API.
280282
"""
281-
conn = self._connection
282-
params = {}
283-
283+
extra_params = {}
284284
if page_size is not None:
285-
params['pageSize'] = page_size
286-
287-
if page_token is not None:
288-
params['pageToken'] = page_token
289-
285+
extra_params['pageSize'] = page_size
290286
path = '/projects/%s/subscriptions' % (project,)
291-
resp = conn.api_request(method='GET', path=path, query_params=params)
292-
return resp.get('subscriptions', ()), resp.get('nextPageToken')
287+
288+
# We attach a mutable topics dictionary so that as topic
289+
# objects are created by Subscription.from_api_repr, they
290+
# can be re-used by other subscriptions from the same topic.
291+
topics = {}
292+
item_to_value = functools.partial(
293+
_item_to_sub_for_client, topics=topics)
294+
return HTTPIterator(
295+
client=self._client, path=path, item_to_value=item_to_value,
296+
items_key='subscriptions', page_token=page_token,
297+
extra_params=extra_params)
293298

294299
def subscription_create(self, subscription_path, topic_path,
295300
ack_deadline=None, push_endpoint=None):
@@ -590,7 +595,7 @@ def _item_to_topic(iterator, resource):
590595
return Topic.from_api_repr(resource, iterator.client)
591596

592597

593-
def _item_to_subscription(iterator, subscription_path):
598+
def _item_to_subscription_for_topic(iterator, subscription_path):
594599
"""Convert a subscription name to the native object.
595600
596601
:type iterator: :class:`~google.cloud.iterator.Iterator`
@@ -605,3 +610,32 @@ def _item_to_subscription(iterator, subscription_path):
605610
subscription_name = subscription_name_from_path(
606611
subscription_path, iterator.client.project)
607612
return Subscription(subscription_name, iterator.topic)
613+
614+
615+
def _item_to_sub_for_client(iterator, resource, topics):
616+
"""Convert a subscription to the native object.
617+
618+
.. note::
619+
620+
This method does not have the correct signature to be used as
621+
the ``item_to_value`` argument to
622+
:class:`~google.cloud.iterator.Iterator`. It is intended to be
623+
patched with a mutable topics argument that can be updated
624+
on subsequent calls. For an example, see how the method is
625+
used above in :meth:`_SubscriberAPI.list_subscriptions`.
626+
627+
:type iterator: :class:`~google.cloud.iterator.Iterator`
628+
:param iterator: The iterator that is currently in use.
629+
630+
:type resource: dict
631+
:param resource: A subscription returned from the API.
632+
633+
:type topics: dict
634+
:param topics: A dictionary of topics to be used (and modified)
635+
as new subscriptions are created bound to topics.
636+
637+
:rtype: :class:`~google.cloud.pubsub.subscription.Subscription`
638+
:returns: The next subscription in the page.
639+
"""
640+
return Subscription.from_api_repr(
641+
resource, iterator.client, topics=topics)

0 commit comments

Comments
 (0)