diff --git a/gcloud/pubsub/message.py b/gcloud/pubsub/message.py index 38b907e62b69..1a04c3ee6fa7 100644 --- a/gcloud/pubsub/message.py +++ b/gcloud/pubsub/message.py @@ -15,6 +15,11 @@ """Define API Topics.""" import base64 +import datetime + +import pytz + +_RFC3339_MICROS = '%Y-%m-%dT%H:%M:%S.%fZ' class Message(object): @@ -44,6 +49,24 @@ def attributes(self): self._attributes = {} return self._attributes + @property + def timestamp(self): + """Return sortable timestamp from attributes, if passed. + + Allows sorting messages in publication order (assuming consistent + clocks across all publishers). + + :rtype: datetime + :returns: timestamp (in UTC timezone) parsed from RFC 3339 timestamp + :raises: ValueError if timestamp not in ``attributes``, or if it does + not match the RFC 3339 format. + """ + stamp = self.attributes.get('timestamp') + if stamp is None: + raise ValueError('No timestamp') + return datetime.datetime.strptime(stamp, _RFC3339_MICROS).replace( + tzinfo=pytz.UTC) + @classmethod def from_api_repr(cls, api_repr): """Factory: construct message from API representation. diff --git a/gcloud/pubsub/test_message.py b/gcloud/pubsub/test_message.py index 32b2854a4eaa..6cf97f677ec2 100644 --- a/gcloud/pubsub/test_message.py +++ b/gcloud/pubsub/test_message.py @@ -66,3 +66,39 @@ def test_from_api_repr_w_attributes(self): self.assertEqual(message.data, DATA) self.assertEqual(message.message_id, MESSAGE_ID) self.assertEqual(message.attributes, ATTRS) + + def test_timestamp_no_attributes(self): + DATA = b'DEADBEEF' + MESSAGE_ID = b'12345' + message = self._makeOne(data=DATA, message_id=MESSAGE_ID) + + def _to_fail(): + return message.timestamp + + self.assertRaises(ValueError, _to_fail) + + def test_timestamp_wo_timestamp_in_attributes(self): + DATA = b'DEADBEEF' + MESSAGE_ID = b'12345' + ATTRS = {'a': 'b'} + message = self._makeOne(data=DATA, message_id=MESSAGE_ID, + attributes=ATTRS) + + def _to_fail(): + return message.timestamp + + self.assertRaises(ValueError, _to_fail) + + def test_timestamp_w_timestamp_in_attributes(self): + from datetime import datetime + from pytz import utc + from gcloud.pubsub.message import _RFC3339_MICROS + DATA = b'DEADBEEF' + MESSAGE_ID = b'12345' + TIMESTAMP = '2015-04-10T18:42:27.131956Z' + naive = datetime.strptime(TIMESTAMP, _RFC3339_MICROS) + timestamp = naive.replace(tzinfo=utc) + ATTRS = {'timestamp': TIMESTAMP} + message = self._makeOne(data=DATA, message_id=MESSAGE_ID, + attributes=ATTRS) + self.assertEqual(message.timestamp, timestamp) diff --git a/regression/pubsub.py b/regression/pubsub.py index 208f10f6d0fa..7c0db3cffc6e 100644 --- a/regression/pubsub.py +++ b/regression/pubsub.py @@ -103,7 +103,7 @@ def test_list_subscriptions(self): def test_message_pull_mode_e2e(self): TOPIC_NAME = 'subscribe-me' - topic = Topic(TOPIC_NAME) + topic = Topic(TOPIC_NAME, timestamp_messages=True) self.assertFalse(topic.exists()) topic.create() self.to_delete.append(topic) @@ -113,14 +113,23 @@ def test_message_pull_mode_e2e(self): subscription.create() self.to_delete.append(subscription) - MESSAGE = b'MESSAGE' - EXTRA = 'EXTRA' - topic.publish(MESSAGE, extra=EXTRA) + MESSAGE_1 = b'MESSAGE ONE' + MESSAGE_2 = b'MESSAGE ONE' + EXTRA_1 = 'EXTRA 1' + EXTRA_2 = 'EXTRA 2' + topic.publish(MESSAGE_1, extra=EXTRA_1) + topic.publish(MESSAGE_2, extra=EXTRA_2) - received = subscription.pull() + received = subscription.pull(max_messages=2) ack_ids = [recv[0] for recv in received] subscription.acknowledge(ack_ids) messages = [recv[1] for recv in received] - message, = messages - self.assertEqual(message.data, MESSAGE) - self.assertEqual(message.attributes, {'extra': EXTRA}) + + def _by_timestamp(message): + return message.timestamp + + message1, message2 = sorted(messages, key=_by_timestamp) + self.assertEqual(message1.data, MESSAGE_1) + self.assertEqual(message1.attributes['extra'], EXTRA_1) + self.assertEqual(message2.data, MESSAGE_2) + self.assertEqual(message2.attributes['extra'], EXTRA_2)