Skip to content

Commit 7c13a99

Browse files
committed
Add 'pubsub.subscription.Subscription' class.
Include 'create', 'delete', 'exists', and 'reload' method to manage subscription state. Allow toggling between push / pull w/ 'modify_push_config'. Consume / acknowledge messages in pull-mode w/ 'pull', 'acknowledge', and 'modify_ack_deadline'.
1 parent 703d56b commit 7c13a99

File tree

2 files changed

+480
-0
lines changed

2 files changed

+480
-0
lines changed

gcloud/pubsub/subscription.py

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
# Copyright 2015 Google Inc. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
""" Define API Subscriptions."""
16+
17+
from gcloud.exceptions import NotFound
18+
19+
20+
class Subscription(object):
21+
"""Subscriptions receive messages published to their topics.
22+
23+
See:
24+
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions
25+
26+
:type name: string
27+
:param name: the name of the subscription
28+
29+
:type topic: :class:`gcloud.pubsub.topic.Topic`
30+
:param topic: the topic to which the subscription belongs..
31+
32+
:type ack_deadline: int
33+
:param ack_deadline: the deadline (in seconds) by which messages pulled
34+
from the back-end must be ACKed.
35+
36+
:type push_endpoint: string
37+
:param push_endpoint: URL to which messages will be pushed by the back-end.
38+
If not set, the application must pull messages.
39+
"""
40+
def __init__(self, name, topic, ack_deadline=None, push_endpoint=None):
41+
self.name = name
42+
self.topic = topic
43+
self.ack_deadline = ack_deadline
44+
self.push_endpoint = push_endpoint
45+
46+
@property
47+
def path(self):
48+
"""URL path for the subscription's APIs"""
49+
project = self.topic.project
50+
return '/projects/%s/subscriptions/%s' % (project, self.name)
51+
52+
def create(self):
53+
"""API call: create the subscription via a PUT request
54+
55+
See:
56+
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/create
57+
"""
58+
data = {'topic': self.topic.path}
59+
60+
if self.ack_deadline is not None:
61+
data['ackDeadline'] = self.ack_deadline
62+
63+
if self.push_endpoint is not None:
64+
data['pushConfig'] = {'pushEndpoint': self.push_endpoint}
65+
66+
conn = self.topic.connection
67+
conn.api_request(method='PUT', path=self.path, data=data)
68+
69+
def exists(self):
70+
"""API call: test existence of the subsription via a GET request
71+
72+
See
73+
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get
74+
"""
75+
conn = self.topic.connection
76+
try:
77+
conn.api_request(method='GET',
78+
path=self.path,
79+
query_params={'fields': 'name'})
80+
except NotFound:
81+
return False
82+
else:
83+
return True
84+
85+
def reload(self):
86+
"""API call: test existence of the subsription via a GET request
87+
88+
See
89+
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get
90+
"""
91+
conn = self.topic.connection
92+
data = conn.api_request(method='GET', path=self.path)
93+
self.ack_deadline = data.get('ackDeadline')
94+
push_config = data.get('pushConfig', {})
95+
self.push_endpoint = push_config.get('pushEndpoint')
96+
97+
def modify_push_configuration(self, push_endpoint):
98+
"""API call: update the push endpoint for the subscription.
99+
100+
See:
101+
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/modifyPushConfig
102+
"""
103+
data = {}
104+
config = data['pushConfig'] = {}
105+
if push_endpoint is not None:
106+
config['pushEndpoint'] = push_endpoint
107+
conn = self.topic.connection
108+
conn.api_request(method='POST',
109+
path='%s:modifyPushConfig' % self.path,
110+
data=data)
111+
self.push_endpoint = push_endpoint
112+
113+
def pull(self, return_immediately=False, max_messages=1):
114+
"""API call: retrieve messages for the subscription.
115+
116+
See:
117+
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/pull
118+
119+
:type return_immediately: boolean
120+
:param return_immediately: if True, the back-end returns even if no
121+
messages are available; if False, the API
122+
call blocks until one or more messages are
123+
available.
124+
125+
:type max_messages: int
126+
:param max_messages: the maximum number of messages to return.
127+
128+
:rtype: list of dict
129+
:returns: sequence of mappings, each containing keys ``ackId`` (the
130+
ID to be used in a subsequent call to :meth:`acknowledge`)
131+
and ``message``.
132+
"""
133+
data = {'returnImmediately': return_immediately,
134+
'maxMessages': max_messages}
135+
conn = self.topic.connection
136+
response = conn.api_request(method='POST',
137+
path='%s:pull' % self.path,
138+
data=data)
139+
return response['receivedMessages']
140+
141+
def acknowledge(self, ack_ids):
142+
"""API call: acknowledge retrieved messages for the subscription.
143+
144+
See:
145+
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/acknowledge
146+
147+
:type ack_ids: list of string
148+
:param ack_ids: ack IDs of messages being acknowledged
149+
"""
150+
data = {'ackIds': ack_ids}
151+
conn = self.topic.connection
152+
conn.api_request(method='POST',
153+
path='%s:acknowledge' % self.path,
154+
data=data)
155+
156+
def modify_ack_deadline(self, ack_id, ack_deadline):
157+
"""API call: acknowledge retrieved messages for the subscription.
158+
159+
See:
160+
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/acknowledge
161+
162+
:type ack_id: string
163+
:param ack_id: ack ID of message being updated
164+
165+
:type ack_deadline: int
166+
:param ack_deadline: new deadline for the message, in seconds
167+
"""
168+
data = {'ackId': ack_id, 'ackDeadlineSeconds': ack_deadline}
169+
conn = self.topic.connection
170+
conn.api_request(method='POST',
171+
path='%s:modifyAckDeadline' % self.path,
172+
data=data)
173+
174+
def delete(self):
175+
"""API call: delete the subscription via a DELETE request
176+
177+
See:
178+
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/delete
179+
"""
180+
conn = self.topic.connection
181+
conn.api_request(method='DELETE', path=self.path)

0 commit comments

Comments
 (0)