Skip to content

Commit 6fa89d8

Browse files
authored
Merge pull request #1855 from tseaver/pubsub-add_gax_paging
Add support for controlled paging via Gax.
2 parents c252631 + eec245d commit 6fa89d8

File tree

5 files changed

+126
-17
lines changed

5 files changed

+126
-17
lines changed

.coveragerc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[report]
22
omit =
33
*/_generated/*.py
4+
show_missing = True
45
exclude_lines =
56
# Re-enable the standard pragma
67
pragma: NO COVER

gcloud/pubsub/_gax.py

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
# pylint: disable=import-error
1818
from google.gax import CallOptions
19+
from google.gax import INITIAL_PAGE
1920
from google.gax.errors import GaxError
2021
from google.gax.grpc import exc_to_code
2122
from google.pubsub.v1.pubsub_pb2 import PubsubMessage
@@ -28,6 +29,14 @@
2829
from gcloud._helpers import _to_bytes
2930

3031

32+
def _build_paging_options(page_token=None):
33+
"""Helper for :meth:'_PublisherAPI.list_topics' et aliae."""
34+
if page_token is None:
35+
page_token = INITIAL_PAGE
36+
options = {'page_token': page_token}
37+
return CallOptions(**options)
38+
39+
3140
class _PublisherAPI(object):
3241
"""Helper mapping publisher-related APIs.
3342
@@ -37,7 +46,7 @@ class _PublisherAPI(object):
3746
def __init__(self, gax_api):
3847
self._gax_api = gax_api
3948

40-
def list_topics(self, project):
49+
def list_topics(self, project, page_token=None):
4150
"""List topics for the project associated with this API.
4251
4352
See:
@@ -46,13 +55,18 @@ def list_topics(self, project):
4655
:type project: string
4756
:param project: project ID
4857
58+
:type page_token: string
59+
:param page_token: opaque marker for the next "page" of topics. If not
60+
passed, the API will return the first page of
61+
topics.
62+
4963
:rtype: tuple, (list, str)
5064
:returns: list of ``Topic`` resource dicts, plus a
5165
"next page token" string: if not None, indicates that
5266
more topics can be retrieved with another call (pass that
5367
value as ``page_token``).
5468
"""
55-
options = CallOptions(is_page_streaming=False)
69+
options = _build_paging_options(page_token)
5670
path = 'projects/%s' % (project,)
5771
response = self._gax_api.list_topics(path, options)
5872
topics = [{'name': topic_pb.name} for topic_pb in response.topics]
@@ -152,7 +166,7 @@ def topic_publish(self, topic_path, messages):
152166
raise
153167
return response.message_ids
154168

155-
def topic_list_subscriptions(self, topic_path):
169+
def topic_list_subscriptions(self, topic_path, page_token=None):
156170
"""API call: list subscriptions bound to a topic
157171
158172
See:
@@ -162,13 +176,18 @@ def topic_list_subscriptions(self, topic_path):
162176
:param topic_path: fully-qualified path of the topic, in format
163177
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
164178
179+
:type page_token: string
180+
:param page_token: opaque marker for the next "page" of subscriptions.
181+
If not passed, the API will return the first page
182+
of subscriptions.
183+
165184
:rtype: list of strings
166185
:returns: fully-qualified names of subscriptions for the supplied
167186
topic.
168187
:raises: :exc:`gcloud.exceptions.NotFound` if the topic does not
169188
exist
170189
"""
171-
options = CallOptions(is_page_streaming=False)
190+
options = _build_paging_options(page_token)
172191
try:
173192
response = self._gax_api.list_topic_subscriptions(
174193
topic_path, options)
@@ -190,7 +209,7 @@ class _SubscriberAPI(object):
190209
def __init__(self, gax_api):
191210
self._gax_api = gax_api
192211

193-
def list_subscriptions(self, project):
212+
def list_subscriptions(self, project, page_token=None):
194213
"""List subscriptions for the project associated with this API.
195214
196215
See:
@@ -199,13 +218,18 @@ def list_subscriptions(self, project):
199218
:type project: string
200219
:param project: project ID
201220
221+
:type page_token: string
222+
:param page_token: opaque marker for the next "page" of subscriptions.
223+
If not passed, the API will return the first page
224+
of subscriptions.
225+
202226
:rtype: tuple, (list, str)
203227
:returns: list of ``Subscription`` resource dicts, plus a
204228
"next page token" string: if not None, indicates that
205229
more topics can be retrieved with another call (pass that
206230
value as ``page_token``).
207231
"""
208-
options = CallOptions(is_page_streaming=False)
232+
options = _build_paging_options(page_token)
209233
path = 'projects/%s' % (project,)
210234
response = self._gax_api.list_subscriptions(path, options)
211235
subscriptions = [_subscription_pb_to_mapping(sub_pb)

gcloud/pubsub/test__gax.py

Lines changed: 80 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ def test_ctor(self):
5252
self.assertTrue(api._gax_api is gax_api)
5353

5454
def test_list_topics_no_paging(self):
55-
response = _ListTopicsResponsePB([_TopicPB(self.TOPIC_PATH)])
55+
from google.gax import INITIAL_PAGE
56+
TOKEN = 'TOKEN'
57+
response = _ListTopicsResponsePB([_TopicPB(self.TOPIC_PATH)], TOKEN)
5658
gax_api = _GAXPublisherAPI(_list_topics_response=response)
5759
api = self._makeOne(gax_api)
5860

@@ -62,11 +64,31 @@ def test_list_topics_no_paging(self):
6264
topic = topics[0]
6365
self.assertIsInstance(topic, dict)
6466
self.assertEqual(topic['name'], self.TOPIC_PATH)
65-
self.assertEqual(next_token, None)
67+
self.assertEqual(next_token, TOKEN)
6668

6769
name, options = gax_api._list_topics_called_with
6870
self.assertEqual(name, self.PROJECT_PATH)
69-
self.assertFalse(options.is_page_streaming)
71+
self.assertTrue(options.page_token is INITIAL_PAGE)
72+
73+
def test_list_topics_with_paging(self):
74+
TOKEN = 'TOKEN'
75+
NEW_TOKEN = 'NEW_TOKEN'
76+
response = _ListTopicsResponsePB(
77+
[_TopicPB(self.TOPIC_PATH)], NEW_TOKEN)
78+
gax_api = _GAXPublisherAPI(_list_topics_response=response)
79+
api = self._makeOne(gax_api)
80+
81+
topics, next_token = api.list_topics(self.PROJECT, page_token=TOKEN)
82+
83+
self.assertEqual(len(topics), 1)
84+
topic = topics[0]
85+
self.assertIsInstance(topic, dict)
86+
self.assertEqual(topic['name'], self.TOPIC_PATH)
87+
self.assertEqual(next_token, NEW_TOKEN)
88+
89+
name, options = gax_api._list_topics_called_with
90+
self.assertEqual(name, self.PROJECT_PATH)
91+
self.assertEqual(options.page_token, TOKEN)
7092

7193
def test_topic_create(self):
7294
topic_pb = _TopicPB(self.TOPIC_PATH)
@@ -233,6 +255,7 @@ def test_topic_publish_error(self):
233255
self.assertEqual(options, None)
234256

235257
def test_topic_list_subscriptions_no_paging(self):
258+
from google.gax import INITIAL_PAGE
236259
response = _ListTopicSubscriptionsResponsePB([self.SUB_PATH])
237260
gax_api = _GAXPublisherAPI(_list_topic_subscriptions_response=response)
238261
api = self._makeOne(gax_api)
@@ -249,9 +272,32 @@ def test_topic_list_subscriptions_no_paging(self):
249272

250273
topic_path, options = gax_api._list_topic_subscriptions_called_with
251274
self.assertEqual(topic_path, self.TOPIC_PATH)
252-
self.assertFalse(options.is_page_streaming)
275+
self.assertTrue(options.page_token is INITIAL_PAGE)
276+
277+
def test_topic_list_subscriptions_with_paging(self):
278+
TOKEN = 'TOKEN'
279+
NEW_TOKEN = 'NEW_TOKEN'
280+
response = _ListTopicSubscriptionsResponsePB(
281+
[self.SUB_PATH], NEW_TOKEN)
282+
gax_api = _GAXPublisherAPI(_list_topic_subscriptions_response=response)
283+
api = self._makeOne(gax_api)
284+
285+
subscriptions, next_token = api.topic_list_subscriptions(
286+
self.TOPIC_PATH, page_token=TOKEN)
287+
288+
self.assertEqual(len(subscriptions), 1)
289+
subscription = subscriptions[0]
290+
self.assertIsInstance(subscription, dict)
291+
self.assertEqual(subscription['name'], self.SUB_PATH)
292+
self.assertEqual(subscription['topic'], self.TOPIC_PATH)
293+
self.assertEqual(next_token, NEW_TOKEN)
294+
295+
name, options = gax_api._list_topic_subscriptions_called_with
296+
self.assertEqual(name, self.TOPIC_PATH)
297+
self.assertEqual(options.page_token, TOKEN)
253298

254299
def test_topic_list_subscriptions_miss(self):
300+
from google.gax import INITIAL_PAGE
255301
from gcloud.exceptions import NotFound
256302
gax_api = _GAXPublisherAPI()
257303
api = self._makeOne(gax_api)
@@ -261,9 +307,10 @@ def test_topic_list_subscriptions_miss(self):
261307

262308
topic_path, options = gax_api._list_topic_subscriptions_called_with
263309
self.assertEqual(topic_path, self.TOPIC_PATH)
264-
self.assertFalse(options.is_page_streaming)
310+
self.assertTrue(options.page_token is INITIAL_PAGE)
265311

266312
def test_topic_list_subscriptions_error(self):
313+
from google.gax import INITIAL_PAGE
267314
from google.gax.errors import GaxError
268315
gax_api = _GAXPublisherAPI(_random_gax_error=True)
269316
api = self._makeOne(gax_api)
@@ -273,7 +320,7 @@ def test_topic_list_subscriptions_error(self):
273320

274321
topic_path, options = gax_api._list_topic_subscriptions_called_with
275322
self.assertEqual(topic_path, self.TOPIC_PATH)
276-
self.assertFalse(options.is_page_streaming)
323+
self.assertTrue(options.page_token is INITIAL_PAGE)
277324

278325

279326
@unittest2.skipUnless(_HAVE_GAX, 'No gax-python')
@@ -291,6 +338,7 @@ def test_ctor(self):
291338
self.assertTrue(api._gax_api is gax_api)
292339

293340
def test_list_subscriptions_no_paging(self):
341+
from google.gax import INITIAL_PAGE
294342
response = _ListSubscriptionsResponsePB([_SubscriptionPB(
295343
self.SUB_PATH, self.TOPIC_PATH, self.PUSH_ENDPOINT, 0)])
296344
gax_api = _GAXSubscriberAPI(_list_subscriptions_response=response)
@@ -310,7 +358,32 @@ def test_list_subscriptions_no_paging(self):
310358

311359
name, options = gax_api._list_subscriptions_called_with
312360
self.assertEqual(name, self.PROJECT_PATH)
313-
self.assertFalse(options.is_page_streaming)
361+
self.assertTrue(options.page_token is INITIAL_PAGE)
362+
363+
def test_list_subscriptions_with_paging(self):
364+
TOKEN = 'TOKEN'
365+
NEW_TOKEN = 'NEW_TOKEN'
366+
response = _ListSubscriptionsResponsePB([_SubscriptionPB(
367+
self.SUB_PATH, self.TOPIC_PATH, self.PUSH_ENDPOINT, 0)], NEW_TOKEN)
368+
gax_api = _GAXSubscriberAPI(_list_subscriptions_response=response)
369+
api = self._makeOne(gax_api)
370+
371+
subscriptions, next_token = api.list_subscriptions(
372+
self.PROJECT, page_token=TOKEN)
373+
374+
self.assertEqual(len(subscriptions), 1)
375+
subscription = subscriptions[0]
376+
self.assertIsInstance(subscription, dict)
377+
self.assertEqual(subscription['name'], self.SUB_PATH)
378+
self.assertEqual(subscription['topic'], self.TOPIC_PATH)
379+
self.assertEqual(subscription['pushConfig'],
380+
{'pushEndpoint': self.PUSH_ENDPOINT})
381+
self.assertEqual(subscription['ackDeadlineSeconds'], 0)
382+
self.assertEqual(next_token, NEW_TOKEN)
383+
384+
name, options = gax_api._list_subscriptions_called_with
385+
self.assertEqual(name, self.PROJECT_PATH)
386+
self.assertEqual(options.page_token, TOKEN)
314387

315388
def test_subscription_create(self):
316389
sub_pb = _SubscriptionPB(self.SUB_PATH, self.TOPIC_PATH, '', 0)

setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
]
2020

2121
GRPC_EXTRAS = [
22-
'grpcio == 0.13.1',
22+
'grpcio >= 0.14.0',
23+
'google-gax >= 0.11.0',
2324
'gax-google-pubsub-v1',
2425
]
2526

tox.ini

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,20 @@ covercmd =
2121
--cover-branches \
2222
--nocapture
2323

24+
[grpc]
25+
deps =
26+
grpcio >= 0.14.0
27+
google-gax >= 0.11.0
28+
gax-google-pubsub-v1
29+
2430
[testenv:py27]
2531
basepython =
2632
python2.7
2733
commands =
28-
pip --quiet install gcloud[grpc]
2934
nosetests
35+
deps =
36+
{[testenv]deps}
37+
{[grpc]deps}
3038
setenv =
3139
PYTHONPATH =
3240

@@ -45,10 +53,10 @@ deps =
4553
basepython =
4654
python2.7
4755
commands =
48-
pip --quiet install gcloud[grpc]
4956
{[testenv]covercmd} --cover-min-percentage=100
5057
deps =
5158
{[testenv]deps}
59+
{[grpc]deps}
5260
coverage
5361
setenv =
5462
PYTHONPATH =
@@ -112,10 +120,12 @@ passenv = {[testenv:system-tests]passenv}
112120
basepython =
113121
python2.7
114122
commands =
115-
pip --quiet install gcloud[grpc]
116123
python {toxinidir}/system_tests/attempt_system_tests.py
117124
setenv =
118125
PYTHONPATH =
126+
deps =
127+
{[testenv]deps}
128+
{[grpc]deps}
119129
passenv = GOOGLE_* GCLOUD_* TRAVIS* encrypted_*
120130

121131
[testenv:system-tests3]

0 commit comments

Comments
 (0)