Skip to content

Commit

Permalink
Move delivery_info to constructor of Message. (celery#235)
Browse files Browse the repository at this point in the history
  • Loading branch information
matusvalo authored and auvipy committed Jan 10, 2019
1 parent 6c8b4b4 commit 9dcc0df
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 7 deletions.
5 changes: 2 additions & 3 deletions amqp/basic_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,10 @@ class Message(GenericContent):
('cluster_id', 's')
]

#: set by basic_consume/basic_get
delivery_info = None

def __init__(self, body='', children=None, channel=None, **properties):
super(Message, self).__init__(**properties)
#: set by basic_consume/basic_get
self.delivery_info = None
self.body = body
self.channel = channel

Expand Down
66 changes: 65 additions & 1 deletion t/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import socket
import pytest
from case import patch, call, Mock
from case import patch, call, Mock, ANY
from amqp import spec, Connection, Channel, sasl, Message
from amqp.platform import pack
from amqp.exceptions import ConnectionError, \
Expand Down Expand Up @@ -596,6 +596,70 @@ def test_queue_purge(self):
None
)

def test_basic_deliver(self):
# Test checking delivering single message
callback_mock = Mock()
frame_writer_cls_mock = Mock()
conn = Connection(frame_writer=frame_writer_cls_mock)
consumer_tag = 'amq.ctag-PCmzXGkhCw_v0Zq7jXyvkg'
with patch.object(conn, 'Transport') as transport_mock:
handshake(conn, transport_mock)
ch = create_channel(1, conn, transport_mock)

# Inject ConsumeOk response from Broker
transport_mock().read_frame.side_effect = [
# Inject Consume-ok response
build_frame_type_1(
spec.Basic.ConsumeOk,
channel=1,
args=(consumer_tag,),
arg_format='s'
),
# Inject basic-deliver response
build_frame_type_1(
spec.Basic.Deliver,
channel=1,
arg_format='sLbss',
args=(
# consumer-tag, delivery-tag, redelivered,
consumer_tag, 1, False,
# exchange-name, routing-key
'foo_exchange', 'routing-key'
)
),
build_frame_type_2(
channel=1,
body_len=12,
properties=b'0\x00\x00\x00\x00\x00\x01'
),
build_frame_type_3(
channel=1,
body=b'Hello World!'
),
]
frame_writer_mock = frame_writer_cls_mock()
frame_writer_mock.reset_mock()
ch.basic_consume('my_queue', callback=callback_mock)
conn.drain_events()
callback_mock.assert_called_once_with(ANY)
msg = callback_mock.call_args[0][0]
assert isinstance(msg, Message)
assert msg.body_size == 12
assert msg.body == b'Hello World!'
assert msg.frame_method == spec.Basic.Deliver
assert msg.delivery_tag == 1
assert msg.ready is True
assert msg.delivery_info == {
'consumer_tag': 'amq.ctag-PCmzXGkhCw_v0Zq7jXyvkg',
'delivery_tag': 1,
'redelivered': False,
'exchange': 'foo_exchange',
'routing_key': 'routing-key'
}
assert msg.properties == {
'application_headers': {}, 'delivery_mode': 1
}

def test_queue_get(self):
# Test verifying getting message from queue
frame_writer_cls_mock = Mock()
Expand Down
3 changes: 2 additions & 1 deletion t/unit/test_basic_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ def test_message(self):
channel=Mock(name='channel'),
application_headers={'h': 'v'},
)
m.delivery_info = {'delivery_tag': '1234'},
m.delivery_info = {'delivery_tag': '1234'}
assert m.body == 'foo'
assert m.channel
assert m.headers == {'h': 'v'}
assert m.delivery_tag == '1234'
22 changes: 20 additions & 2 deletions t/unit/test_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from case import ContextMock, Mock, patch, ANY, MagicMock

from amqp import spec
from amqp.basic_message import Message
from amqp.platform import pack
from amqp.serialization import dumps
from amqp.channel import Channel
Expand Down Expand Up @@ -328,11 +329,20 @@ def test_basic_consume_no_wait_no_consumer_tag(self):
assert 123 not in self.c.callbacks

def test_on_basic_deliver(self):
msg = Mock()
msg = Message()
self.c._on_basic_deliver(123, '321', False, 'ex', 'rkey', msg)
callback = self.c.callbacks[123] = Mock(name='cb')

self.c._on_basic_deliver(123, '321', False, 'ex', 'rkey', msg)
callback.assert_called_with(msg)
assert msg.channel == self.c
assert msg.delivery_info == {
'consumer_tag': 123,
'delivery_tag': '321',
'redelivered': False,
'exchange': 'ex',
'routing_key': 'rkey',
}

def test_basic_get(self):
self.c._on_get_empty = Mock()
Expand All @@ -356,11 +366,19 @@ def test_on_get_empty(self):
self.c._on_get_empty(1)

def test_on_get_ok(self):
msg = Mock()
msg = Message()
m = self.c._on_get_ok(
'dtag', 'redelivered', 'ex', 'rkey', 'mcount', msg,
)
assert m is msg
assert m.channel == self.c
assert m.delivery_info == {
'delivery_tag': 'dtag',
'redelivered': 'redelivered',
'exchange': 'ex',
'routing_key': 'rkey',
'message_count': 'mcount',
}

def test_basic_publish(self):
self.c.connection.transport.having_timeout = ContextMock()
Expand Down

0 comments on commit 9dcc0df

Please sign in to comment.