Skip to content

Commit 6a5f480

Browse files
committed
Add 'pubsub.message.Message' class.
Maps 'pubsub.v1beta2.PubsubMessage', handles base64-decode of payload. Return 'Message' instances from 'Subscription.pull()'. Update docs accordingly.
1 parent 1052aad commit 6a5f480

File tree

6 files changed

+171
-28
lines changed

6 files changed

+171
-28
lines changed

docs/pubsub-surface.rst

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -199,11 +199,7 @@ Delete a subscription:
199199
Pull messages from a subscription
200200
---------------------------------
201201

202-
Fetch pending messages for a pull subscription
203-
204-
.. note::
205-
206-
The messages will have been ACKed already.
202+
Fetch pending messages for a pull subscription:
207203

208204
.. doctest::
209205

@@ -215,14 +211,23 @@ Fetch pending messages for a pull subscription
215211
... topic.publish('this is the first message_payload')
216212
... topic.publish('this is the second message_payload',
217213
... attr1='value1', attr2='value2')
218-
>>> messages = subscription.pull() # API request
214+
>>> received = subscription.pull() # API request
215+
>>> messages = [recv[1] for recv in received]
219216
>>> [message.id for message in messages]
220217
[<message_id1>, <message_id2>]
221218
>>> [message.data for message in messages]
222219
['this is the first message_payload', 'this is the second message_payload']
223-
>>> [message.attrs for message in messages]
220+
>>> [message.attributes for message in messages]
224221
[{}, {'attr1': 'value1', 'attr2': 'value2'}]
225222

223+
Note that received messages must be acknowledged, or else the back-end
224+
will re-send them later:
225+
226+
.. doctest::
227+
228+
>>> ack_ids = [recv[0] for recv in received]
229+
>>> subscription.acknowledge(ack_ids)
230+
226231
Fetch a limited number of pending messages for a pull subscription:
227232

228233
.. doctest::
@@ -235,8 +240,9 @@ Fetch a limited number of pending messages for a pull subscription:
235240
... topic.publish('this is the first message_payload')
236241
... topic.publish('this is the second message_payload',
237242
... attr1='value1', attr2='value2')
238-
>>> [message.id for message in subscription.pull(max_messages=1)]
239-
[<message_id1>]
243+
>>> received = subscription.pull(max_messages=1) # API request
244+
>>> messages = [recv[1] for recv in received]
245+
>>> [message.id for message in messages]
240246

241247
Fetch messages for a pull subscription without blocking (none pending):
242248

@@ -246,6 +252,7 @@ Fetch messages for a pull subscription without blocking (none pending):
246252
>>> from gcloud.pubsub.subscription import Subscription
247253
>>> topic = Topic('topic_name')
248254
>>> subscription = Subscription('subscription_name', topic)
249-
>>> [message.id for message in subscription.pull(return_immediately=True)]
255+
>>> received = subscription.pull(max_messages=1) # API request
256+
>>> messages = [recv[1] for recv in received]
257+
>>> [message.id for message in messages]
250258
[]
251-

gcloud/pubsub/message.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Copyright 2015 Google Inc. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Define API Topics."""
16+
17+
import base64
18+
19+
20+
class Message(object):
21+
"""Messages can be published to a topic and received by subscribers.
22+
23+
See:
24+
https://cloud.google.com/pubsub/reference/rest/google/pubsub/v1beta2/PubsubMessage
25+
26+
:type name: bytes
27+
:param name: the payload of the message
28+
29+
:type message_id: string
30+
:param message_id: An ID assigned to the message by the API.
31+
32+
:type attrs: dict or None
33+
:param attrs: Extra metadata associated by the publisher with the message.
34+
"""
35+
def __init__(self, data, message_id, attributes=None):
36+
self.data = data
37+
self.message_id = message_id
38+
self._attrs = attributes
39+
40+
@property
41+
def attrs(self):
42+
"""Lazily-constructed attribute dictionary"""
43+
if self._attrs is None:
44+
self._attrs = {}
45+
return self._attrs
46+
47+
@classmethod
48+
def from_api_repr(cls, api_repr):
49+
"""Factory: construct message from API representation.
50+
51+
:type api_repr: dict or None
52+
:param api_repr: The API representation of the message
53+
"""
54+
data = base64.b64decode(api_repr['data'])
55+
return cls(data=data, message_id=api_repr['messageId'],
56+
attributes=api_repr.get('attributes'))

gcloud/pubsub/subscription.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"""Define API Subscriptions."""
1616

1717
from gcloud.exceptions import NotFound
18+
from gcloud.pubsub.message import Message
1819
from gcloud.pubsub.topic import Topic
1920

2021

@@ -162,18 +163,19 @@ def pull(self, return_immediately=False, max_messages=1):
162163
:type max_messages: int
163164
:param max_messages: the maximum number of messages to return.
164165
165-
:rtype: list of dict
166-
:returns: sequence of mappings, each containing keys ``ackId`` (the
167-
ID to be used in a subsequent call to :meth:`acknowledge`)
168-
and ``message``.
166+
:rtype: list of (ack_id, message) tuples
167+
:returns: sequence of tuples: ``ack_id`` is the ID to be used in a
168+
subsequent call to :meth:`acknowledge`, and ``message``
169+
is an instance of :class:`gcloud.pubsub.message.Message`.
169170
"""
170171
data = {'returnImmediately': return_immediately,
171172
'maxMessages': max_messages}
172173
conn = self.topic.connection
173174
response = conn.api_request(method='POST',
174175
path='%s:pull' % self.path,
175176
data=data)
176-
return response['receivedMessages']
177+
return [(info['ackId'], Message.from_api_repr(info['message']))
178+
for info in response['receivedMessages']]
177179

178180
def acknowledge(self, ack_ids):
179181
"""API call: acknowledge retrieved messages for the subscription.

gcloud/pubsub/test_message.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# Copyright 2015 Google Inc. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import unittest2
16+
17+
18+
class TestMessage(unittest2.TestCase):
19+
20+
def _getTargetClass(self):
21+
from gcloud.pubsub.message import Message
22+
return Message
23+
24+
def _makeOne(self, *args, **kw):
25+
return self._getTargetClass()(*args, **kw)
26+
27+
def test_ctor_no_attrs(self):
28+
DATA = b'DEADBEEF'
29+
MESSAGE_ID = b'12345'
30+
message = self._makeOne(data=DATA, message_id=MESSAGE_ID)
31+
self.assertEqual(message.data, DATA)
32+
self.assertEqual(message.message_id, MESSAGE_ID)
33+
self.assertEqual(message.attrs, {})
34+
35+
def test_ctor_w_attrs(self):
36+
DATA = b'DEADBEEF'
37+
MESSAGE_ID = b'12345'
38+
ATTRS = {'a': 'b'}
39+
message = self._makeOne(data=DATA, message_id=MESSAGE_ID,
40+
attributes=ATTRS)
41+
self.assertEqual(message.data, DATA)
42+
self.assertEqual(message.message_id, MESSAGE_ID)
43+
self.assertEqual(message.attrs, ATTRS)
44+
45+
def test_from_api_repr_no_attrs(self):
46+
from base64 import b64encode as b64
47+
DATA = b'DEADBEEF'
48+
B64_DATA = b64(DATA)
49+
MESSAGE_ID = '12345'
50+
api_repr = {'data': B64_DATA, 'messageId': MESSAGE_ID}
51+
message = self._getTargetClass().from_api_repr(api_repr)
52+
self.assertEqual(message.data, DATA)
53+
self.assertEqual(message.message_id, MESSAGE_ID)
54+
self.assertEqual(message.attrs, {})
55+
56+
def test_from_api_repr_w_attrs(self):
57+
from base64 import b64encode as b64
58+
DATA = b'DEADBEEF'
59+
B64_DATA = b64(DATA)
60+
MESSAGE_ID = '12345'
61+
ATTRS = {'a': 'b'}
62+
api_repr = {'data': B64_DATA,
63+
'messageId': MESSAGE_ID,
64+
'attributes': ATTRS}
65+
message = self._getTargetClass().from_api_repr(api_repr)
66+
self.assertEqual(message.data, DATA)
67+
self.assertEqual(message.message_id, MESSAGE_ID)
68+
self.assertEqual(message.attrs, ATTRS)

gcloud/pubsub/test_subscription.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ def test_modify_push_config_wo_endpoint(self):
248248

249249
def test_pull_wo_return_immediately_wo_max_messages(self):
250250
import base64
251+
from gcloud.pubsub.message import Message
251252
PROJECT = 'PROJECT'
252253
SUB_NAME = 'sub_name'
253254
SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
@@ -256,15 +257,19 @@ def test_pull_wo_return_immediately_wo_max_messages(self):
256257
MSG_ID = 'BEADCAFE'
257258
PAYLOAD = b'This is the message text'
258259
B64 = base64.b64encode(PAYLOAD)
259-
MESSAGE = {'messageId': MSG_ID, 'data': B64, 'attributes': {}}
260+
MESSAGE = {'messageId': MSG_ID, 'data': B64}
260261
REC_MESSAGE = {'ackId': ACK_ID, 'message': MESSAGE}
261262
conn = _Connection({'receivedMessages': [REC_MESSAGE]})
262263
topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn)
263264
subscription = self._makeOne(SUB_NAME, topic)
264265
pulled = subscription.pull()
265266
self.assertEqual(len(pulled), 1)
266-
self.assertEqual(pulled[0]['ackId'], ACK_ID)
267-
self.assertEqual(pulled[0]['message'], MESSAGE)
267+
ack_id, message = pulled[0]
268+
self.assertEqual(ack_id, ACK_ID)
269+
self.assertTrue(isinstance(message, Message))
270+
self.assertEqual(message.data, PAYLOAD)
271+
self.assertEqual(message.message_id, MSG_ID)
272+
self.assertEqual(message.attrs, {})
268273
self.assertEqual(len(conn._requested), 1)
269274
req = conn._requested[0]
270275
self.assertEqual(req['method'], 'POST')
@@ -274,6 +279,7 @@ def test_pull_wo_return_immediately_wo_max_messages(self):
274279

275280
def test_pull_w_return_immediately_w_max_messages(self):
276281
import base64
282+
from gcloud.pubsub.message import Message
277283
PROJECT = 'PROJECT'
278284
SUB_NAME = 'sub_name'
279285
SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME)
@@ -282,15 +288,19 @@ def test_pull_w_return_immediately_w_max_messages(self):
282288
MSG_ID = 'BEADCAFE'
283289
PAYLOAD = b'This is the message text'
284290
B64 = base64.b64encode(PAYLOAD)
285-
MESSAGE = {'messageId': MSG_ID, 'data': B64, 'attributes': {}}
291+
MESSAGE = {'messageId': MSG_ID, 'data': B64, 'attributes': {'a': 'b'}}
286292
REC_MESSAGE = {'ackId': ACK_ID, 'message': MESSAGE}
287293
conn = _Connection({'receivedMessages': [REC_MESSAGE]})
288294
topic = _Topic(TOPIC_NAME, project=PROJECT, connection=conn)
289295
subscription = self._makeOne(SUB_NAME, topic)
290296
pulled = subscription.pull(return_immediately=True, max_messages=3)
291297
self.assertEqual(len(pulled), 1)
292-
self.assertEqual(pulled[0]['ackId'], ACK_ID)
293-
self.assertEqual(pulled[0]['message'], MESSAGE)
298+
ack_id, message = pulled[0]
299+
self.assertEqual(ack_id, ACK_ID)
300+
self.assertTrue(isinstance(message, Message))
301+
self.assertEqual(message.data, PAYLOAD)
302+
self.assertEqual(message.message_id, MSG_ID)
303+
self.assertEqual(message.attrs, {'a': 'b'})
294304
self.assertEqual(len(conn._requested), 1)
295305
req = conn._requested[0]
296306
self.assertEqual(req['method'], 'POST')

regression/pubsub.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ def test_list_subscriptions(self):
102102
self.assertEqual(len(created), len(subscriptions_to_create))
103103

104104
def test_message_pull_mode_e2e(self):
105-
from base64 import b64encode as b64
106105
TOPIC_NAME = 'subscribe-me'
107106
topic = Topic(TOPIC_NAME)
108107
self.assertFalse(topic.exists())
@@ -115,12 +114,13 @@ def test_message_pull_mode_e2e(self):
115114
self.to_delete.append(subscription)
116115

117116
MESSAGE = b'MESSAGE'
118-
EXTRA = b'EXTRA TWO'
117+
EXTRA = b'EXTRA'
119118
topic.publish(MESSAGE, extra=EXTRA)
120119

121120
received = subscription.pull()
122-
ack_ids = [msg['ackId'] for msg in received]
121+
ack_ids = [recv[0] for recv in received]
123122
subscription.acknowledge(ack_ids)
124-
one, = received
125-
self.assertEqual(one['message']['data'], b64(MESSAGE))
126-
self.assertEqual(one['message']['attributes'], {'extra': EXTRA})
123+
messages = [recv[1] for recv in received]
124+
message, = messages
125+
self.assertEqual(message.data, MESSAGE)
126+
self.assertEqual(message.attributes, {'extra': EXTRA})

0 commit comments

Comments
 (0)