Skip to content

Commit 85dc64b

Browse files
committed
Merge pull request #743 from tseaver/691-flesh_out_pubsub_subscription
Flesh out pubsub subscriptions
2 parents 01d697e + 7821d88 commit 85dc64b

File tree

4 files changed

+620
-0
lines changed

4 files changed

+620
-0
lines changed

gcloud/pubsub/api.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ def list_topics(page_size=None, page_token=None,
1919
project=None, connection=None):
2020
"""List topics for a given project.
2121
22+
See:
23+
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/list
24+
2225
:type page_size: int
2326
:param page_size: maximum number of topics to return, If not passed,
2427
defaults to a value set by the API.
@@ -52,3 +55,55 @@ def list_topics(page_size=None, page_token=None,
5255

5356
path = '/projects/%s/topics' % project
5457
return connection.api_request(method='GET', path=path, query_params=params)
58+
59+
60+
def list_subscriptions(page_size=None, page_token=None, topic_name=None,
61+
project=None, connection=None):
62+
"""List subscriptions for a given project.
63+
64+
See:
65+
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/list
66+
67+
and (where ``topic_name`` is passed):
68+
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/subscriptions/list
69+
70+
:type page_size: int
71+
:param page_size: maximum number of topics to return, If not passed,
72+
defaults to a value set by the API.
73+
74+
:type page_token: string
75+
:param page_token: opaque marker for the next "page" of topics. If not
76+
passed, the API will return the first page of topics.
77+
78+
:type topic_name: string
79+
:param topic_name: limit results to subscriptions bound to the given topic.
80+
81+
:type project: string
82+
:param project: project ID to query. If not passed, defaults to the
83+
project ID inferred from the environment.
84+
85+
:type connection: :class:`gcloud.pubsub.connection.Connection`
86+
:param connection: connection to use for the query. If not passed,
87+
defaults to the connection inferred from the
88+
environment.
89+
90+
:rtype: dict
91+
:returns: keys include ``subscriptions`` (a list of subscription mappings)
92+
and ``nextPageToken`` (a string: if non-empty, indicates that
93+
more topics can be retrieved with another call (pass that
94+
value as ``page_token``).
95+
"""
96+
params = {}
97+
98+
if page_size is not None:
99+
params['pageSize'] = page_size
100+
101+
if page_token is not None:
102+
params['pageToken'] = page_token
103+
104+
if topic_name is None:
105+
path = '/projects/%s/subscriptions' % project
106+
else:
107+
path = '/projects/%s/topics/%s/subscriptions' % (project, topic_name)
108+
109+
return connection.api_request(method='GET', path=path, query_params=params)

gcloud/pubsub/subscription.py

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
# Copyright 2015 Google Inc. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
""" Define API Subscriptions."""
16+
17+
from gcloud.exceptions import NotFound
18+
19+
20+
class Subscription(object):
21+
"""Subscriptions receive messages published to their topics.
22+
23+
See:
24+
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions
25+
26+
:type name: string
27+
:param name: the name of the subscription
28+
29+
:type topic: :class:`gcloud.pubsub.topic.Topic`
30+
:param topic: the topic to which the subscription belongs..
31+
32+
:type ack_deadline: int
33+
:param ack_deadline: the deadline (in seconds) by which messages pulled
34+
from the back-end must be acknowledged.
35+
36+
:type push_endpoint: string
37+
:param push_endpoint: URL to which messages will be pushed by the back-end.
38+
If not set, the application must pull messages.
39+
"""
40+
def __init__(self, name, topic, ack_deadline=None, push_endpoint=None):
41+
self.name = name
42+
self.topic = topic
43+
self.ack_deadline = ack_deadline
44+
self.push_endpoint = push_endpoint
45+
46+
@property
47+
def path(self):
48+
"""URL path for the subscription's APIs"""
49+
project = self.topic.project
50+
return '/projects/%s/subscriptions/%s' % (project, self.name)
51+
52+
def create(self):
53+
"""API call: create the subscription via a PUT request
54+
55+
See:
56+
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/create
57+
"""
58+
data = {'topic': self.topic.path}
59+
60+
if self.ack_deadline is not None:
61+
data['ackDeadline'] = self.ack_deadline
62+
63+
if self.push_endpoint is not None:
64+
data['pushConfig'] = {'pushEndpoint': self.push_endpoint}
65+
66+
conn = self.topic.connection
67+
conn.api_request(method='PUT', path=self.path, data=data)
68+
69+
def exists(self):
70+
"""API call: test existence of the subscription via a GET request
71+
72+
See
73+
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get
74+
"""
75+
conn = self.topic.connection
76+
try:
77+
conn.api_request(method='GET',
78+
path=self.path,
79+
query_params={'fields': 'name'})
80+
except NotFound:
81+
return False
82+
else:
83+
return True
84+
85+
def reload(self):
86+
"""API call: sync local subscription configuration via a GET request
87+
88+
See
89+
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get
90+
"""
91+
conn = self.topic.connection
92+
data = conn.api_request(method='GET', path=self.path)
93+
self.ack_deadline = data.get('ackDeadline')
94+
push_config = data.get('pushConfig', {})
95+
self.push_endpoint = push_config.get('pushEndpoint')
96+
97+
def modify_push_configuration(self, push_endpoint):
98+
"""API call: update the push endpoint for the subscription.
99+
100+
See:
101+
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/modifyPushConfig
102+
103+
:type push_endpoint: string
104+
:param push_endpoint: URL to which messages will be pushed by the
105+
back-end. If None, the application must pull
106+
messages.
107+
"""
108+
data = {}
109+
config = data['pushConfig'] = {}
110+
if push_endpoint is not None:
111+
config['pushEndpoint'] = push_endpoint
112+
conn = self.topic.connection
113+
conn.api_request(method='POST',
114+
path='%s:modifyPushConfig' % self.path,
115+
data=data)
116+
self.push_endpoint = push_endpoint
117+
118+
def pull(self, return_immediately=False, max_messages=1):
119+
"""API call: retrieve messages for the subscription.
120+
121+
See:
122+
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/pull
123+
124+
:type return_immediately: boolean
125+
:param return_immediately: if True, the back-end returns even if no
126+
messages are available; if False, the API
127+
call blocks until one or more messages are
128+
available.
129+
130+
:type max_messages: int
131+
:param max_messages: the maximum number of messages to return.
132+
133+
:rtype: list of dict
134+
:returns: sequence of mappings, each containing keys ``ackId`` (the
135+
ID to be used in a subsequent call to :meth:`acknowledge`)
136+
and ``message``.
137+
"""
138+
data = {'returnImmediately': return_immediately,
139+
'maxMessages': max_messages}
140+
conn = self.topic.connection
141+
response = conn.api_request(method='POST',
142+
path='%s:pull' % self.path,
143+
data=data)
144+
return response['receivedMessages']
145+
146+
def acknowledge(self, ack_ids):
147+
"""API call: acknowledge retrieved messages for the subscription.
148+
149+
See:
150+
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/acknowledge
151+
152+
:type ack_ids: list of string
153+
:param ack_ids: ack IDs of messages being acknowledged
154+
"""
155+
data = {'ackIds': ack_ids}
156+
conn = self.topic.connection
157+
conn.api_request(method='POST',
158+
path='%s:acknowledge' % self.path,
159+
data=data)
160+
161+
def modify_ack_deadline(self, ack_id, ack_deadline):
162+
"""API call: update acknowledgement deadline for a retrieved message.
163+
164+
See:
165+
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/acknowledge
166+
167+
:type ack_id: string
168+
:param ack_id: ack ID of message being updated
169+
170+
:type ack_deadline: int
171+
:param ack_deadline: new deadline for the message, in seconds
172+
"""
173+
data = {'ackId': ack_id, 'ackDeadlineSeconds': ack_deadline}
174+
conn = self.topic.connection
175+
conn.api_request(method='POST',
176+
path='%s:modifyAckDeadline' % self.path,
177+
data=data)
178+
179+
def delete(self):
180+
"""API call: delete the subscription via a DELETE request.
181+
182+
See:
183+
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/delete
184+
"""
185+
conn = self.topic.connection
186+
conn.api_request(method='DELETE', path=self.path)

gcloud/pubsub/test_api.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,86 @@ def test_w_explicit_connection_w_paging(self):
6161
{'pageSize': SIZE, 'pageToken': TOKEN1})
6262

6363

64+
class Test_list_subscriptions(unittest2.TestCase):
65+
66+
def _callFUT(self, *args, **kw):
67+
from gcloud.pubsub.api import list_subscriptions
68+
return list_subscriptions(*args, **kw)
69+
70+
def test_w_explicit_connection_no_paging(self):
71+
PROJECT = 'PROJECT'
72+
SUB_NAME = 'topic_name'
73+
SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
74+
TOPIC_NAME = 'topic_name'
75+
TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
76+
TOKEN = 'TOKEN'
77+
returned = {'subscriptions': [{'name': SUB_PATH, 'topic': TOPIC_PATH}],
78+
'nextPageToken': TOKEN}
79+
conn = _Connection(returned)
80+
response = self._callFUT(project=PROJECT, connection=conn)
81+
subscriptions = response['subscriptions']
82+
self.assertEqual(len(subscriptions), 1)
83+
self.assertEqual(subscriptions[0],
84+
{'name': SUB_PATH, 'topic': TOPIC_PATH})
85+
self.assertEqual(response['nextPageToken'], TOKEN)
86+
self.assertEqual(len(conn._requested), 1)
87+
req = conn._requested[0]
88+
self.assertEqual(req['method'], 'GET')
89+
self.assertEqual(req['path'], '/projects/%s/subscriptions' % PROJECT)
90+
self.assertEqual(req['query_params'], {})
91+
92+
def test_w_explicit_connection_w_paging(self):
93+
PROJECT = 'PROJECT'
94+
SUB_NAME = 'topic_name'
95+
SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
96+
TOPIC_NAME = 'topic_name'
97+
TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
98+
TOKEN1 = 'TOKEN1'
99+
TOKEN2 = 'TOKEN2'
100+
SIZE = 1
101+
returned = {'subscriptions': [{'name': SUB_PATH, 'topic': TOPIC_PATH}],
102+
'nextPageToken': TOKEN2}
103+
conn = _Connection(returned)
104+
response = self._callFUT(SIZE, TOKEN1,
105+
project=PROJECT, connection=conn)
106+
subscriptions = response['subscriptions']
107+
self.assertEqual(len(subscriptions), 1)
108+
self.assertEqual(subscriptions[0],
109+
{'name': SUB_PATH, 'topic': TOPIC_PATH})
110+
self.assertEqual(response['nextPageToken'], TOKEN2)
111+
self.assertEqual(len(conn._requested), 1)
112+
req = conn._requested[0]
113+
self.assertEqual(req['method'], 'GET')
114+
self.assertEqual(req['path'], '/projects/%s/subscriptions' % PROJECT)
115+
self.assertEqual(req['query_params'],
116+
{'pageSize': SIZE, 'pageToken': TOKEN1})
117+
118+
def test_w_topic_name(self):
119+
PROJECT = 'PROJECT'
120+
SUB_NAME = 'topic_name'
121+
SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
122+
TOPIC_NAME = 'topic_name'
123+
TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
124+
TOKEN = 'TOKEN'
125+
returned = {'subscriptions': [{'name': SUB_PATH, 'topic': TOPIC_PATH}],
126+
'nextPageToken': TOKEN}
127+
conn = _Connection(returned)
128+
response = self._callFUT(topic_name=TOPIC_NAME,
129+
project=PROJECT, connection=conn)
130+
subscriptions = response['subscriptions']
131+
self.assertEqual(len(subscriptions), 1)
132+
self.assertEqual(subscriptions[0],
133+
{'name': SUB_PATH, 'topic': TOPIC_PATH})
134+
self.assertEqual(response['nextPageToken'], TOKEN)
135+
self.assertEqual(len(conn._requested), 1)
136+
req = conn._requested[0]
137+
self.assertEqual(req['method'], 'GET')
138+
self.assertEqual(req['path'],
139+
'/projects/%s/topics/%s/subscriptions'
140+
% (PROJECT, TOPIC_NAME))
141+
self.assertEqual(req['query_params'], {})
142+
143+
64144
class _Connection(object):
65145

66146
def __init__(self, *responses):

0 commit comments

Comments
 (0)