Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for controlled paging via Gax. #1855

Merged
merged 4 commits into from
Jun 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[report]
omit =
*/_generated/*.py
show_missing = True
exclude_lines =
# Re-enable the standard pragma
pragma: NO COVER
Expand Down
36 changes: 30 additions & 6 deletions gcloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

# pylint: disable=import-error
from google.gax import CallOptions
from google.gax import INITIAL_PAGE
from google.gax.errors import GaxError
from google.gax.grpc import exc_to_code
from google.pubsub.v1.pubsub_pb2 import PubsubMessage
Expand All @@ -28,6 +29,14 @@
from gcloud._helpers import _to_bytes


def _build_paging_options(page_token=None):
"""Helper for :meth:'_PublisherAPI.list_topics' et aliae."""
if page_token is None:
page_token = INITIAL_PAGE
options = {'page_token': page_token}
return CallOptions(**options)


class _PublisherAPI(object):
"""Helper mapping publisher-related APIs.

Expand All @@ -37,7 +46,7 @@ class _PublisherAPI(object):
def __init__(self, gax_api):
self._gax_api = gax_api

def list_topics(self, project):
def list_topics(self, project, page_token=None):
"""List topics for the project associated with this API.

See:
Expand All @@ -46,13 +55,18 @@ def list_topics(self, project):
:type project: string
:param project: project ID

:type page_token: string
:param page_token: opaque marker for the next "page" of topics. If not
passed, the API will return the first page of
topics.

:rtype: tuple, (list, str)
:returns: list of ``Topic`` resource dicts, plus a
"next page token" string: if not None, indicates that
more topics can be retrieved with another call (pass that
value as ``page_token``).
"""
options = CallOptions(is_page_streaming=False)
options = _build_paging_options(page_token)
path = 'projects/%s' % (project,)
response = self._gax_api.list_topics(path, options)
topics = [{'name': topic_pb.name} for topic_pb in response.topics]
Expand Down Expand Up @@ -152,7 +166,7 @@ def topic_publish(self, topic_path, messages):
raise
return response.message_ids

def topic_list_subscriptions(self, topic_path):
def topic_list_subscriptions(self, topic_path, page_token=None):
"""API call: list subscriptions bound to a topic

See:
Expand All @@ -162,13 +176,18 @@ def topic_list_subscriptions(self, topic_path):
:param topic_path: fully-qualified path of the topic, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.

:type page_token: string
:param page_token: opaque marker for the next "page" of subscriptions.
If not passed, the API will return the first page
of subscriptions.

:rtype: list of strings
:returns: fully-qualified names of subscriptions for the supplied
topic.
:raises: :exc:`gcloud.exceptions.NotFound` if the topic does not
exist
"""
options = CallOptions(is_page_streaming=False)
options = _build_paging_options(page_token)
try:
response = self._gax_api.list_topic_subscriptions(
topic_path, options)
Expand All @@ -190,7 +209,7 @@ class _SubscriberAPI(object):
def __init__(self, gax_api):
self._gax_api = gax_api

def list_subscriptions(self, project):
def list_subscriptions(self, project, page_token=None):
"""List subscriptions for the project associated with this API.

See:
Expand All @@ -199,13 +218,18 @@ def list_subscriptions(self, project):
:type project: string
:param project: project ID

:type page_token: string
:param page_token: opaque marker for the next "page" of subscriptions.
If not passed, the API will return the first page
of subscriptions.

:rtype: tuple, (list, str)
:returns: list of ``Subscription`` resource dicts, plus a
"next page token" string: if not None, indicates that
more topics can be retrieved with another call (pass that
value as ``page_token``).
"""
options = CallOptions(is_page_streaming=False)
options = _build_paging_options(page_token)
path = 'projects/%s' % (project,)
response = self._gax_api.list_subscriptions(path, options)
subscriptions = [_subscription_pb_to_mapping(sub_pb)
Expand Down
87 changes: 80 additions & 7 deletions gcloud/pubsub/test__gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ def test_ctor(self):
self.assertTrue(api._gax_api is gax_api)

def test_list_topics_no_paging(self):
response = _ListTopicsResponsePB([_TopicPB(self.TOPIC_PATH)])
from google.gax import INITIAL_PAGE
TOKEN = 'TOKEN'
response = _ListTopicsResponsePB([_TopicPB(self.TOPIC_PATH)], TOKEN)
gax_api = _GAXPublisherAPI(_list_topics_response=response)
api = self._makeOne(gax_api)

Expand All @@ -62,11 +64,31 @@ def test_list_topics_no_paging(self):
topic = topics[0]
self.assertIsInstance(topic, dict)
self.assertEqual(topic['name'], self.TOPIC_PATH)
self.assertEqual(next_token, None)
self.assertEqual(next_token, TOKEN)

name, options = gax_api._list_topics_called_with
self.assertEqual(name, self.PROJECT_PATH)
self.assertFalse(options.is_page_streaming)
self.assertTrue(options.page_token is INITIAL_PAGE)

def test_list_topics_with_paging(self):
TOKEN = 'TOKEN'
NEW_TOKEN = 'NEW_TOKEN'
response = _ListTopicsResponsePB(
[_TopicPB(self.TOPIC_PATH)], NEW_TOKEN)
gax_api = _GAXPublisherAPI(_list_topics_response=response)
api = self._makeOne(gax_api)

topics, next_token = api.list_topics(self.PROJECT, page_token=TOKEN)

self.assertEqual(len(topics), 1)
topic = topics[0]
self.assertIsInstance(topic, dict)
self.assertEqual(topic['name'], self.TOPIC_PATH)
self.assertEqual(next_token, NEW_TOKEN)

name, options = gax_api._list_topics_called_with
self.assertEqual(name, self.PROJECT_PATH)
self.assertEqual(options.page_token, TOKEN)

def test_topic_create(self):
topic_pb = _TopicPB(self.TOPIC_PATH)
Expand Down Expand Up @@ -233,6 +255,7 @@ def test_topic_publish_error(self):
self.assertEqual(options, None)

def test_topic_list_subscriptions_no_paging(self):
from google.gax import INITIAL_PAGE
response = _ListTopicSubscriptionsResponsePB([self.SUB_PATH])
gax_api = _GAXPublisherAPI(_list_topic_subscriptions_response=response)
api = self._makeOne(gax_api)
Expand All @@ -249,9 +272,32 @@ def test_topic_list_subscriptions_no_paging(self):

topic_path, options = gax_api._list_topic_subscriptions_called_with
self.assertEqual(topic_path, self.TOPIC_PATH)
self.assertFalse(options.is_page_streaming)
self.assertTrue(options.page_token is INITIAL_PAGE)

def test_topic_list_subscriptions_with_paging(self):
TOKEN = 'TOKEN'
NEW_TOKEN = 'NEW_TOKEN'
response = _ListTopicSubscriptionsResponsePB(
[self.SUB_PATH], NEW_TOKEN)
gax_api = _GAXPublisherAPI(_list_topic_subscriptions_response=response)
api = self._makeOne(gax_api)

subscriptions, next_token = api.topic_list_subscriptions(
self.TOPIC_PATH, page_token=TOKEN)

self.assertEqual(len(subscriptions), 1)
subscription = subscriptions[0]
self.assertIsInstance(subscription, dict)
self.assertEqual(subscription['name'], self.SUB_PATH)
self.assertEqual(subscription['topic'], self.TOPIC_PATH)
self.assertEqual(next_token, NEW_TOKEN)

name, options = gax_api._list_topic_subscriptions_called_with
self.assertEqual(name, self.TOPIC_PATH)
self.assertEqual(options.page_token, TOKEN)

def test_topic_list_subscriptions_miss(self):
from google.gax import INITIAL_PAGE
from gcloud.exceptions import NotFound
gax_api = _GAXPublisherAPI()
api = self._makeOne(gax_api)
Expand All @@ -261,9 +307,10 @@ def test_topic_list_subscriptions_miss(self):

topic_path, options = gax_api._list_topic_subscriptions_called_with
self.assertEqual(topic_path, self.TOPIC_PATH)
self.assertFalse(options.is_page_streaming)
self.assertTrue(options.page_token is INITIAL_PAGE)

def test_topic_list_subscriptions_error(self):
from google.gax import INITIAL_PAGE
from google.gax.errors import GaxError
gax_api = _GAXPublisherAPI(_random_gax_error=True)
api = self._makeOne(gax_api)
Expand All @@ -273,7 +320,7 @@ def test_topic_list_subscriptions_error(self):

topic_path, options = gax_api._list_topic_subscriptions_called_with
self.assertEqual(topic_path, self.TOPIC_PATH)
self.assertFalse(options.is_page_streaming)
self.assertTrue(options.page_token is INITIAL_PAGE)


@unittest2.skipUnless(_HAVE_GAX, 'No gax-python')
Expand All @@ -291,6 +338,7 @@ def test_ctor(self):
self.assertTrue(api._gax_api is gax_api)

def test_list_subscriptions_no_paging(self):
from google.gax import INITIAL_PAGE
response = _ListSubscriptionsResponsePB([_SubscriptionPB(
self.SUB_PATH, self.TOPIC_PATH, self.PUSH_ENDPOINT, 0)])
gax_api = _GAXSubscriberAPI(_list_subscriptions_response=response)
Expand All @@ -310,7 +358,32 @@ def test_list_subscriptions_no_paging(self):

name, options = gax_api._list_subscriptions_called_with
self.assertEqual(name, self.PROJECT_PATH)
self.assertFalse(options.is_page_streaming)
self.assertTrue(options.page_token is INITIAL_PAGE)

def test_list_subscriptions_with_paging(self):
TOKEN = 'TOKEN'
NEW_TOKEN = 'NEW_TOKEN'
response = _ListSubscriptionsResponsePB([_SubscriptionPB(
self.SUB_PATH, self.TOPIC_PATH, self.PUSH_ENDPOINT, 0)], NEW_TOKEN)
gax_api = _GAXSubscriberAPI(_list_subscriptions_response=response)
api = self._makeOne(gax_api)

subscriptions, next_token = api.list_subscriptions(
self.PROJECT, page_token=TOKEN)

self.assertEqual(len(subscriptions), 1)
subscription = subscriptions[0]
self.assertIsInstance(subscription, dict)
self.assertEqual(subscription['name'], self.SUB_PATH)
self.assertEqual(subscription['topic'], self.TOPIC_PATH)
self.assertEqual(subscription['pushConfig'],
{'pushEndpoint': self.PUSH_ENDPOINT})
self.assertEqual(subscription['ackDeadlineSeconds'], 0)
self.assertEqual(next_token, NEW_TOKEN)

name, options = gax_api._list_subscriptions_called_with
self.assertEqual(name, self.PROJECT_PATH)
self.assertEqual(options.page_token, TOKEN)

def test_subscription_create(self):
sub_pb = _SubscriptionPB(self.SUB_PATH, self.TOPIC_PATH, '', 0)
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
]

GRPC_EXTRAS = [
'grpcio == 0.13.1',
'grpcio >= 0.14.0',
'google-gax >= 0.11.0',
'gax-google-pubsub-v1',
]

Expand Down
16 changes: 13 additions & 3 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@ covercmd =
--cover-branches \
--nocapture

[grpc]
deps =
grpcio >= 0.14.0
google-gax >= 0.11.0
gax-google-pubsub-v1

[testenv:py27]
basepython =
python2.7
commands =
pip --quiet install gcloud[grpc]
nosetests
deps =
{[testenv]deps}
{[grpc]deps}
setenv =
PYTHONPATH =

Expand All @@ -45,10 +53,10 @@ deps =
basepython =
python2.7
commands =
pip --quiet install gcloud[grpc]
{[testenv]covercmd} --cover-min-percentage=100
deps =
{[testenv]deps}
{[grpc]deps}
coverage
setenv =
PYTHONPATH =
Expand Down Expand Up @@ -112,10 +120,12 @@ passenv = {[testenv:system-tests]passenv}
basepython =
python2.7
commands =
pip --quiet install gcloud[grpc]
python {toxinidir}/system_tests/attempt_system_tests.py
setenv =
PYTHONPATH =
deps =
{[testenv]deps}
{[grpc]deps}
passenv = GOOGLE_* GCLOUD_* TRAVIS* encrypted_*

[testenv:system-tests3]
Expand Down