Skip to content

Commit

Permalink
Merge pull request #930 from dhermes/pubsub-subscription-factory
Browse files Browse the repository at this point in the history
Implementing `topic.subscription` factory.
  • Loading branch information
dhermes committed Jul 2, 2015
2 parents 6f4cf96 + c03e7e0 commit 384c525
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 27 deletions.
25 changes: 12 additions & 13 deletions docs/pubsub-usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ Create a new pull subscription for a topic:
>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = pubsub.Subscription('subscription_name', topic)
>>> subscription = topic.subscription('subscription_name')
>>> subscription.create() # API request

Create a new pull subscription for a topic with a non-default ACK deadline:
Expand All @@ -131,8 +131,7 @@ Create a new pull subscription for a topic with a non-default ACK deadline:
>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = pubsub.Subscription('subscription_name', topic,
... ack_deadline=90)
>>> subscription = topic.subscription('subscription_name', ack_deadline=90)
>>> subscription.create() # API request

Create a new push subscription for a topic:
Expand All @@ -143,8 +142,8 @@ Create a new push subscription for a topic:
>>> ENDPOINT = 'https://example.com/hook'
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = pubsub.Subscription('subscription_name', topic,
... push_endpoint=ENDPOINT)
>>> subscription = topic.subscription('subscription_name',
... push_endpoint=ENDPOINT)
>>> subscription.create() # API request

Check for the existence of a subscription:
Expand All @@ -154,7 +153,7 @@ Check for the existence of a subscription:
>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = pubsub.Subscription('subscription_name', topic)
>>> subscription = topic.subscription('subscription_name')
>>> subscription.exists() # API request
True

Expand All @@ -166,7 +165,7 @@ Convert a pull subscription to push:
>>> ENDPOINT = 'https://example.com/hook'
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = pubsub.Subscription('subscription_name', topic)
>>> subscription = topic.subscription('subscription_name')
>>> subscription.modify_push_configuration(push_endpoint=ENDPOINT) # API request

Convert a push subscription to pull:
Expand All @@ -177,8 +176,8 @@ Convert a push subscription to pull:
>>> ENDPOINT = 'https://example.com/hook'
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = pubusb.Subscription('subscription_name', topic,
... push_endpoint=ENDPOINT)
>>> subscription = topic.subscription('subscription_name',
... push_endpoint=ENDPOINT)
>>> subscription.modify_push_configuration(push_endpoint=None) # API request

List subscriptions for a topic:
Expand Down Expand Up @@ -209,7 +208,7 @@ Delete a subscription:
>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = pubsub.Subscription('subscription_name', topic)
>>> subscription = topic.subscription('subscription_name')
>>> subscription.delete() # API request


Expand All @@ -223,7 +222,7 @@ Fetch pending messages for a pull subscription:
>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = pubsub.Subscription('subscription_name', topic)
>>> subscription = topic.subscription('subscription_name')
>>> with topic.batch() as batch:
... batch.publish('this is the first message_payload')
... batch.publish('this is the second message_payload',
Expand Down Expand Up @@ -252,7 +251,7 @@ Fetch a limited number of pending messages for a pull subscription:
>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = pubsub.Subscription('subscription_name', topic)
>>> subscription = topic.subscription('subscription_name')
>>> with topic.batch() as batch:
... batch.publish('this is the first message_payload')
... batch.publish('this is the second message_payload',
Expand All @@ -268,7 +267,7 @@ Fetch messages for a pull subscription without blocking (none pending):
>>> from gcloud import pubsub
>>> client = pubsub.Client()
>>> topic = client.topic('topic_name')
>>> subscription = pubsub.Subscription('subscription_name', topic)
>>> subscription = topic.subscription('subscription_name')
>>> received = subscription.pull(max_messages=1) # API request
>>> messages = [recv[1] for recv in received]
>>> [message.id for message in messages]
Expand Down
45 changes: 45 additions & 0 deletions gcloud/pubsub/_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helper functions for shared behavior."""


def topic_name_from_path(path, project):
"""Validate a topic URI path and get the topic name.
:type path: string
:param path: URI path for a topic API request.
:type project: string
:param project: The project associated with the request. It is
included for validation purposes.
:rtype: string
:returns: Topic name parsed from ``path``.
:raises: :class:`ValueError` if the ``path`` is ill-formed or if
the project from the ``path`` does not agree with the
``project`` passed in.
"""
# PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
path_parts = path.split('/')
if (len(path_parts) != 4 or path_parts[0] != 'projects' or
path_parts[2] != 'topics'):
raise ValueError('Expected path to be of the form '
'projects/{project}/topics/{topic_name}')
if (len(path_parts) != 4 or path_parts[0] != 'projects' or
path_parts[2] != 'topics' or path_parts[1] != project):
raise ValueError('Project from client should agree with '
'project from resource.')

return path_parts[3]
12 changes: 7 additions & 5 deletions gcloud/pubsub/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
"""Define API Subscriptions."""

from gcloud.exceptions import NotFound
from gcloud.pubsub._helpers import topic_name_from_path
from gcloud.pubsub.message import Message
from gcloud.pubsub.topic import Topic


class Subscription(object):
Expand Down Expand Up @@ -65,11 +65,13 @@ def from_api_repr(cls, resource, client, topics=None):
"""
if topics is None:
topics = {}
t_name = resource['topic']
topic = topics.get(t_name)
topic_path = resource['topic']
topic = topics.get(topic_path)
if topic is None:
topic = topics[t_name] = Topic.from_api_repr({'name': t_name},
client)
# NOTE: This duplicates behavior from Topic.from_api_repr to avoid
# an import cycle.
topic_name = topic_name_from_path(topic_path, client.project)
topic = topics[topic_path] = client.topic(topic_name)
_, _, _, name = resource['name'].split('/')
ack_deadline = resource.get('ackDeadlineSeconds')
push_config = resource.get('pushConfig', {})
Expand Down
47 changes: 47 additions & 0 deletions gcloud/pubsub/test__helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest2


class Test_topic_name_from_path(unittest2.TestCase):

def _callFUT(self, path, project):
from gcloud.pubsub._helpers import topic_name_from_path
return topic_name_from_path(path, project)

def test_invalid_path_length(self):
PATH = 'projects/foo'
PROJECT = None
self.assertRaises(ValueError, self._callFUT, PATH, PROJECT)

def test_invalid_path_format(self):
TOPIC_NAME = 'TOPIC_NAME'
PROJECT = 'PROJECT'
PATH = 'foo/%s/bar/%s' % (PROJECT, TOPIC_NAME)
self.assertRaises(ValueError, self._callFUT, PATH, PROJECT)

def test_invalid_project(self):
TOPIC_NAME = 'TOPIC_NAME'
PROJECT1 = 'PROJECT1'
PROJECT2 = 'PROJECT2'
PATH = 'projects/%s/topics/%s' % (PROJECT1, TOPIC_NAME)
self.assertRaises(ValueError, self._callFUT, PATH, PROJECT2)

def test_valid_data(self):
TOPIC_NAME = 'TOPIC_NAME'
PROJECT = 'PROJECT'
PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
topic_name = self._callFUT(PATH, PROJECT)
self.assertEqual(topic_name, TOPIC_NAME)
4 changes: 4 additions & 0 deletions gcloud/pubsub/test_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,3 +518,7 @@ class _Client(object):
def __init__(self, project, connection=None):
self.project = project
self.connection = connection

def topic(self, name, timestamp_messages=False):
from gcloud.pubsub.topic import Topic
return Topic(name, client=self, timestamp_messages=timestamp_messages)
14 changes: 14 additions & 0 deletions gcloud/pubsub/test_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,20 @@ def test_delete_w_alternate_client(self):
self.assertEqual(req['method'], 'DELETE')
self.assertEqual(req['path'], '/%s' % PATH)

def test_subscription(self):
from gcloud.pubsub.subscription import Subscription
TOPIC_NAME = 'topic_name'
PROJECT = 'PROJECT'
CLIENT = _Client(project=PROJECT)
topic = self._makeOne(TOPIC_NAME,
client=CLIENT)

SUBSCRIPTION_NAME = 'subscription_name'
subscription = topic.subscription(SUBSCRIPTION_NAME)
self.assertTrue(isinstance(subscription, Subscription))
self.assertEqual(subscription.name, SUBSCRIPTION_NAME)
self.assertTrue(subscription.topic is topic)


class TestBatch(unittest2.TestCase):

Expand Down
27 changes: 22 additions & 5 deletions gcloud/pubsub/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

from gcloud._helpers import _RFC3339_MICROS
from gcloud.exceptions import NotFound
from gcloud.pubsub._helpers import topic_name_from_path
from gcloud.pubsub.subscription import Subscription

_NOW = datetime.datetime.utcnow

Expand Down Expand Up @@ -48,6 +50,24 @@ def __init__(self, name, client, timestamp_messages=False):
self._client = client
self.timestamp_messages = timestamp_messages

def subscription(self, name, ack_deadline=None, push_endpoint=None):
"""Creates a subscription bound to the current topic.
:type name: string
:param name: the name of the subscription
:type ack_deadline: int
:param ack_deadline: the deadline (in seconds) by which messages pulled
from the back-end must be acknowledged.
:type push_endpoint: string
:param push_endpoint: URL to which messages will be pushed by the
back-end. If not set, the application must pull
messages.
"""
return Subscription(name, self, ack_deadline=ack_deadline,
push_endpoint=push_endpoint)

@classmethod
def from_api_repr(cls, resource, client):
"""Factory: construct a topic given its API representation
Expand All @@ -65,11 +85,8 @@ def from_api_repr(cls, resource, client):
project from the resource does not agree with the project
from the client.
"""
_, project, _, name = resource['name'].split('/')
if client.project != project:
raise ValueError('Project from clientshould agree with '
'project from resource.')
return cls(name, client=client)
topic_name = topic_name_from_path(resource['name'], client.project)
return cls(topic_name, client=client)

@property
def project(self):
Expand Down
7 changes: 3 additions & 4 deletions system_tests/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

from gcloud import _helpers
from gcloud import pubsub
from gcloud.pubsub.subscription import Subscription


_helpers._PROJECT_ENV_VAR_NAME = 'GCLOUD_TESTS_PROJECT_ID'
Expand Down Expand Up @@ -68,7 +67,7 @@ def test_create_subscription(self):
topic.create()
self.to_delete.append(topic)
SUBSCRIPTION_NAME = 'subscribing-now'
subscription = Subscription(SUBSCRIPTION_NAME, topic)
subscription = topic.subscription(SUBSCRIPTION_NAME)
self.assertFalse(subscription.exists())
subscription.create()
self.to_delete.append(subscription)
Expand All @@ -88,7 +87,7 @@ def test_list_subscriptions(self):
'newest%d' % (1000 * time.time(),),
]
for subscription_name in subscriptions_to_create:
subscription = Subscription(subscription_name, topic)
subscription = topic.subscription(subscription_name)
subscription.create()
self.to_delete.append(subscription)

Expand All @@ -106,7 +105,7 @@ def test_message_pull_mode_e2e(self):
topic.create()
self.to_delete.append(topic)
SUBSCRIPTION_NAME = 'subscribing-now'
subscription = Subscription(SUBSCRIPTION_NAME, topic)
subscription = topic.subscription(SUBSCRIPTION_NAME)
self.assertFalse(subscription.exists())
subscription.create()
self.to_delete.append(subscription)
Expand Down

0 comments on commit 384c525

Please sign in to comment.