From a876d208abb5488be0bee6637f6ad4898322519e Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Mon, 20 Jun 2016 11:56:27 -0400 Subject: [PATCH 1/3] Fix docstring copy pasta. --- gcloud/pubsub/connection.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gcloud/pubsub/connection.py b/gcloud/pubsub/connection.py index e3394b3bd888..c41608abf5a1 100644 --- a/gcloud/pubsub/connection.py +++ b/gcloud/pubsub/connection.py @@ -217,8 +217,8 @@ def topic_list_subscriptions(self, topic_path, page_size=None, ``projects//topics/``. :type page_size: int - :param page_size: maximum number of topics to return, If not passed, - defaults to a value set by the API. + :param page_size: maximum number of subscriptions to return, If not + passed, defaults to a value set by the API. :type page_token: string :param page_token: opaque marker for the next "page" of topics. If not From cebd7d2cd338c5d9dddf194000b2d843c615f64a Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Mon, 20 Jun 2016 11:57:48 -0400 Subject: [PATCH 2/3] Fix GAX topic / subscription enumeration / paging. Proper fix for borked attempt in #1855. --- gcloud/pubsub/_gax.py | 45 ++++++++++++----- gcloud/pubsub/test__gax.py | 101 ++++++++++++++++++++----------------- 2 files changed, 86 insertions(+), 60 deletions(-) diff --git a/gcloud/pubsub/_gax.py b/gcloud/pubsub/_gax.py index 90468f80fff9..3a8c48ae6c61 100644 --- a/gcloud/pubsub/_gax.py +++ b/gcloud/pubsub/_gax.py @@ -46,7 +46,7 @@ class _PublisherAPI(object): def __init__(self, gax_api): self._gax_api = gax_api - def list_topics(self, project, page_token=None): + def list_topics(self, project, page_size=0, page_token=None): """List topics for the project associated with this API. See: @@ -55,6 +55,10 @@ def list_topics(self, project, page_token=None): :type project: string :param project: project ID + :type page_size: int + :param page_size: maximum number of topics to return, If not passed, + defaults to a value set by the API. + :type page_token: string :param page_token: opaque marker for the next "page" of topics. If not passed, the API will return the first page of @@ -68,9 +72,11 @@ def list_topics(self, project, page_token=None): """ options = _build_paging_options(page_token) path = 'projects/%s' % (project,) - response = self._gax_api.list_topics(path, options) - topics = [{'name': topic_pb.name} for topic_pb in response.topics] - return topics, response.next_page_token + page_iter = self._gax_api.list_topics( + path, page_size=page_size, options=options) + topics = [{'name': topic_pb.name} for topic_pb in page_iter.next()] + token = page_iter.page_token or None + return topics, token def topic_create(self, topic_path): """API call: create a topic @@ -166,7 +172,8 @@ def topic_publish(self, topic_path, messages): raise return response.message_ids - def topic_list_subscriptions(self, topic_path, page_token=None): + def topic_list_subscriptions(self, topic_path, page_size=0, + page_token=None): """API call: list subscriptions bound to a topic See: @@ -176,6 +183,10 @@ def topic_list_subscriptions(self, topic_path, page_token=None): :param topic_path: fully-qualified path of the topic, in format ``projects//topics/``. + :type page_size: int + :param page_size: maximum number of subscriptions to return, If not + passed, defaults to a value set by the API. + :type page_token: string :param page_token: opaque marker for the next "page" of subscriptions. If not passed, the API will return the first page @@ -189,15 +200,15 @@ def topic_list_subscriptions(self, topic_path, page_token=None): """ options = _build_paging_options(page_token) try: - response = self._gax_api.list_topic_subscriptions( - topic_path, options) + page_iter = self._gax_api.list_topic_subscriptions( + topic_path, page_size=page_size, options=options) except GaxError as exc: if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: raise NotFound(topic_path) raise - subs = [{'topic': topic_path, 'name': subscription} - for subscription in response.subscriptions] - return subs, response.next_page_token + subs = page_iter.next() + token = page_iter.page_token or None + return subs, token class _SubscriberAPI(object): @@ -209,7 +220,7 @@ class _SubscriberAPI(object): def __init__(self, gax_api): self._gax_api = gax_api - def list_subscriptions(self, project, page_token=None): + def list_subscriptions(self, project, page_size=0, page_token=None): """List subscriptions for the project associated with this API. See: @@ -218,6 +229,10 @@ def list_subscriptions(self, project, page_token=None): :type project: string :param project: project ID + :type page_size: int + :param page_size: maximum number of subscriptions to return, If not + passed, defaults to a value set by the API. + :type page_token: string :param page_token: opaque marker for the next "page" of subscriptions. If not passed, the API will return the first page @@ -231,10 +246,12 @@ def list_subscriptions(self, project, page_token=None): """ options = _build_paging_options(page_token) path = 'projects/%s' % (project,) - response = self._gax_api.list_subscriptions(path, options) + page_iter = self._gax_api.list_subscriptions( + path, page_size=page_size, options=options) subscriptions = [_subscription_pb_to_mapping(sub_pb) - for sub_pb in response.subscriptions] - return subscriptions, response.next_page_token + for sub_pb in page_iter.next()] + token = page_iter.page_token or None + return subscriptions, token def subscription_create(self, subscription_path, topic_path, ack_deadline=None, push_endpoint=None): diff --git a/gcloud/pubsub/test__gax.py b/gcloud/pubsub/test__gax.py index e6188daac38b..d8f97470e1ee 100644 --- a/gcloud/pubsub/test__gax.py +++ b/gcloud/pubsub/test__gax.py @@ -54,7 +54,7 @@ def test_ctor(self): def test_list_topics_no_paging(self): from google.gax import INITIAL_PAGE TOKEN = 'TOKEN' - response = _ListTopicsResponsePB([_TopicPB(self.TOPIC_PATH)], TOKEN) + response = _PageIterator([_TopicPB(self.TOPIC_PATH)], TOKEN) gax_api = _GAXPublisherAPI(_list_topics_response=response) api = self._makeOne(gax_api) @@ -66,19 +66,22 @@ def test_list_topics_no_paging(self): self.assertEqual(topic['name'], self.TOPIC_PATH) self.assertEqual(next_token, TOKEN) - name, options = gax_api._list_topics_called_with + name, page_size, options = gax_api._list_topics_called_with self.assertEqual(name, self.PROJECT_PATH) + self.assertEqual(page_size, 0) self.assertTrue(options.page_token is INITIAL_PAGE) def test_list_topics_with_paging(self): + SIZE = 23 TOKEN = 'TOKEN' NEW_TOKEN = 'NEW_TOKEN' - response = _ListTopicsResponsePB( + response = _PageIterator( [_TopicPB(self.TOPIC_PATH)], NEW_TOKEN) gax_api = _GAXPublisherAPI(_list_topics_response=response) api = self._makeOne(gax_api) - topics, next_token = api.list_topics(self.PROJECT, page_token=TOKEN) + topics, next_token = api.list_topics( + self.PROJECT, page_size=SIZE, page_token=TOKEN) self.assertEqual(len(topics), 1) topic = topics[0] @@ -86,8 +89,9 @@ def test_list_topics_with_paging(self): self.assertEqual(topic['name'], self.TOPIC_PATH) self.assertEqual(next_token, NEW_TOKEN) - name, options = gax_api._list_topics_called_with + name, page_size, options = gax_api._list_topics_called_with self.assertEqual(name, self.PROJECT_PATH) + self.assertEqual(page_size, SIZE) self.assertEqual(options.page_token, TOKEN) def test_topic_create(self): @@ -256,7 +260,8 @@ def test_topic_publish_error(self): def test_topic_list_subscriptions_no_paging(self): from google.gax import INITIAL_PAGE - response = _ListTopicSubscriptionsResponsePB([self.SUB_PATH]) + response = _PageIterator([ + {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH}], None) gax_api = _GAXPublisherAPI(_list_topic_subscriptions_response=response) api = self._makeOne(gax_api) @@ -270,20 +275,23 @@ def test_topic_list_subscriptions_no_paging(self): self.assertEqual(subscription['topic'], self.TOPIC_PATH) self.assertEqual(next_token, None) - topic_path, options = gax_api._list_topic_subscriptions_called_with + topic_path, page_size, options = ( + gax_api._list_topic_subscriptions_called_with) self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertEqual(page_size, 0) self.assertTrue(options.page_token is INITIAL_PAGE) def test_topic_list_subscriptions_with_paging(self): + SIZE = 23 TOKEN = 'TOKEN' NEW_TOKEN = 'NEW_TOKEN' - response = _ListTopicSubscriptionsResponsePB( - [self.SUB_PATH], NEW_TOKEN) + response = _PageIterator([ + {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH}], NEW_TOKEN) gax_api = _GAXPublisherAPI(_list_topic_subscriptions_response=response) api = self._makeOne(gax_api) subscriptions, next_token = api.topic_list_subscriptions( - self.TOPIC_PATH, page_token=TOKEN) + self.TOPIC_PATH, page_size=SIZE, page_token=TOKEN) self.assertEqual(len(subscriptions), 1) subscription = subscriptions[0] @@ -292,8 +300,10 @@ def test_topic_list_subscriptions_with_paging(self): self.assertEqual(subscription['topic'], self.TOPIC_PATH) self.assertEqual(next_token, NEW_TOKEN) - name, options = gax_api._list_topic_subscriptions_called_with + name, page_size, options = ( + gax_api._list_topic_subscriptions_called_with) self.assertEqual(name, self.TOPIC_PATH) + self.assertEqual(page_size, SIZE) self.assertEqual(options.page_token, TOKEN) def test_topic_list_subscriptions_miss(self): @@ -305,8 +315,10 @@ def test_topic_list_subscriptions_miss(self): with self.assertRaises(NotFound): api.topic_list_subscriptions(self.TOPIC_PATH) - topic_path, options = gax_api._list_topic_subscriptions_called_with + topic_path, page_size, options = ( + gax_api._list_topic_subscriptions_called_with) self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertEqual(page_size, 0) self.assertTrue(options.page_token is INITIAL_PAGE) def test_topic_list_subscriptions_error(self): @@ -318,8 +330,10 @@ def test_topic_list_subscriptions_error(self): with self.assertRaises(GaxError): api.topic_list_subscriptions(self.TOPIC_PATH) - topic_path, options = gax_api._list_topic_subscriptions_called_with + topic_path, page_size, options = ( + gax_api._list_topic_subscriptions_called_with) self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertEqual(page_size, 0) self.assertTrue(options.page_token is INITIAL_PAGE) @@ -339,8 +353,8 @@ def test_ctor(self): def test_list_subscriptions_no_paging(self): from google.gax import INITIAL_PAGE - response = _ListSubscriptionsResponsePB([_SubscriptionPB( - self.SUB_PATH, self.TOPIC_PATH, self.PUSH_ENDPOINT, 0)]) + response = _PageIterator([_SubscriptionPB( + self.SUB_PATH, self.TOPIC_PATH, self.PUSH_ENDPOINT, 0)], None) gax_api = _GAXSubscriberAPI(_list_subscriptions_response=response) api = self._makeOne(gax_api) @@ -356,20 +370,22 @@ def test_list_subscriptions_no_paging(self): self.assertEqual(subscription['ackDeadlineSeconds'], 0) self.assertEqual(next_token, None) - name, options = gax_api._list_subscriptions_called_with + name, page_size, options = gax_api._list_subscriptions_called_with self.assertEqual(name, self.PROJECT_PATH) + self.assertEqual(page_size, 0) self.assertTrue(options.page_token is INITIAL_PAGE) def test_list_subscriptions_with_paging(self): + SIZE = 23 TOKEN = 'TOKEN' NEW_TOKEN = 'NEW_TOKEN' - response = _ListSubscriptionsResponsePB([_SubscriptionPB( + response = _PageIterator([_SubscriptionPB( self.SUB_PATH, self.TOPIC_PATH, self.PUSH_ENDPOINT, 0)], NEW_TOKEN) gax_api = _GAXSubscriberAPI(_list_subscriptions_response=response) api = self._makeOne(gax_api) subscriptions, next_token = api.list_subscriptions( - self.PROJECT, page_token=TOKEN) + self.PROJECT, page_size=SIZE, page_token=TOKEN) self.assertEqual(len(subscriptions), 1) subscription = subscriptions[0] @@ -381,8 +397,9 @@ def test_list_subscriptions_with_paging(self): self.assertEqual(subscription['ackDeadlineSeconds'], 0) self.assertEqual(next_token, NEW_TOKEN) - name, options = gax_api._list_subscriptions_called_with + name, page_size, options = gax_api._list_subscriptions_called_with self.assertEqual(name, self.PROJECT_PATH) + self.assertEqual(page_size, 23) self.assertEqual(options.page_token, TOKEN) def test_subscription_create(self): @@ -743,8 +760,8 @@ class _GAXPublisherAPI(_GaxAPIBase): _create_topic_conflict = False - def list_topics(self, name, options): - self._list_topics_called_with = name, options + def list_topics(self, name, page_size, options): + self._list_topics_called_with = name, page_size, options return self._list_topics_response def create_topic(self, name, options=None): @@ -784,9 +801,9 @@ def publish(self, topic, messages, options=None): except AttributeError: raise GaxError('miss', self._make_grpc_not_found()) - def list_topic_subscriptions(self, topic, options=None): + def list_topic_subscriptions(self, topic, page_size, options=None): from google.gax.errors import GaxError - self._list_topic_subscriptions_called_with = topic, options + self._list_topic_subscriptions_called_with = topic, page_size, options if self._random_gax_error: raise GaxError('error') try: @@ -802,8 +819,8 @@ class _GAXSubscriberAPI(_GaxAPIBase): _acknowledge_ok = False _modify_ack_deadline_ok = False - def list_subscriptions(self, project, options=None): - self._list_subscriptions_called_with = (project, options) + def list_subscriptions(self, project, page_size, options=None): + self._list_subscriptions_called_with = (project, page_size, options) return self._list_subscriptions_response def create_subscription(self, name, topic, @@ -873,6 +890,19 @@ def modify_ack_deadline(self, name, ack_ids, deadline, options=None): raise GaxError('miss', self._make_grpc_not_found()) +class _PageIterator(object): + + def __init__(self, items, page_token): + self._items = items + self.page_token = page_token + + def next(self): + if self._items is None: + raise StopIteration() + items, self._items = self._items, None + return items + + class _TopicPB(object): def __init__(self, name): @@ -885,20 +915,6 @@ def __init__(self, message_ids): self.message_ids = message_ids -class _ListTopicsResponsePB(object): - - def __init__(self, topic_pbs, next_page_token=None): - self.topics = topic_pbs - self.next_page_token = next_page_token - - -class _ListTopicSubscriptionsResponsePB(object): - - def __init__(self, subscriptions, next_page_token=None): - self.subscriptions = subscriptions - self.next_page_token = next_page_token - - class _PushConfigPB(object): def __init__(self, push_endpoint): @@ -933,10 +949,3 @@ def __init__(self, name, topic, push_endpoint, ack_deadline_seconds): self.topic = topic self.push_config = _PushConfigPB(push_endpoint) self.ack_deadline_seconds = ack_deadline_seconds - - -class _ListSubscriptionsResponsePB(object): - - def __init__(self, subscription_pbs, next_page_token=None): - self.subscriptions = subscription_pbs - self.next_page_token = next_page_token From 701d0ce557b5477faa0a900d2f4dfcec9ed2af21 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Mon, 20 Jun 2016 13:36:06 -0400 Subject: [PATCH 3/3] Attempt to get 'Topic.publish()' working with GAX. See #1869 for remaining issue. --- gcloud/pubsub/_gax.py | 6 ++++-- gcloud/pubsub/test__gax.py | 41 +++++++++++++++++++++++++++++++++++--- 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/gcloud/pubsub/_gax.py b/gcloud/pubsub/_gax.py index 3a8c48ae6c61..0639833feb73 100644 --- a/gcloud/pubsub/_gax.py +++ b/gcloud/pubsub/_gax.py @@ -165,12 +165,14 @@ def topic_publish(self, topic_path, messages): message_pbs = [_message_pb_from_dict(message) for message in messages] try: - response = self._gax_api.publish(topic_path, message_pbs) + event = self._gax_api.publish(topic_path, message_pbs) + if not event.is_set(): + event.wait() except GaxError as exc: if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: raise NotFound(topic_path) raise - return response.message_ids + return event.result.message_ids def topic_list_subscriptions(self, topic_path, page_size=0, page_token=None): diff --git a/gcloud/pubsub/test__gax.py b/gcloud/pubsub/test__gax.py index d8f97470e1ee..bcf636586ab0 100644 --- a/gcloud/pubsub/test__gax.py +++ b/gcloud/pubsub/test__gax.py @@ -207,7 +207,30 @@ def test_topic_publish_hit(self): MSGID = 'DEADBEEF' MESSAGE = {'data': B64, 'attributes': {}} response = _PublishResponsePB([MSGID]) - gax_api = _GAXPublisherAPI(_publish_response=response) + event = _Event(response) + event.wait() # already received result + gax_api = _GAXPublisherAPI(_publish_response=event) + api = self._makeOne(gax_api) + + resource = api.topic_publish(self.TOPIC_PATH, [MESSAGE]) + + self.assertEqual(resource, [MSGID]) + topic_path, message_pbs, options = gax_api._publish_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + message_pb, = message_pbs + self.assertEqual(message_pb.data, B64) + self.assertEqual(message_pb.attributes, {}) + self.assertEqual(options, None) + + def test_topic_publish_hit_with_wait(self): + import base64 + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + MSGID = 'DEADBEEF' + MESSAGE = {'data': B64, 'attributes': {}} + response = _PublishResponsePB([MSGID]) + event = _Event(response) + gax_api = _GAXPublisherAPI(_publish_response=event) api = self._makeOne(gax_api) resource = api.topic_publish(self.TOPIC_PATH, [MESSAGE]) @@ -897,12 +920,24 @@ def __init__(self, items, page_token): self.page_token = page_token def next(self): - if self._items is None: - raise StopIteration() items, self._items = self._items, None return items +class _Event(object): + + result = None + + def __init__(self, result): + self._result = result + + def is_set(self): + return self.result is not None + + def wait(self, *_): + self.result = self._result + + class _TopicPB(object): def __init__(self, name):