Skip to content

Commit

Permalink
Drain events before publish data. (celery#214)
Browse files Browse the repository at this point in the history
* Drain events before publish data.

Data are drained to checked if server sent connection blocked/unblocked notification.

* s/assert_called_with/assert_called_once_with/

* Add unittest of connection blocked when broker does not support it

* Improve naming of tests

* Added unittest for publishing when connection is closed
  • Loading branch information
matusvalo authored and Omer Katz committed Nov 7, 2018
1 parent 88794b4 commit b0bc72f
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 4 deletions.
9 changes: 9 additions & 0 deletions amqp/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -1732,6 +1732,15 @@ def _basic_publish(self, msg, exchange='', routing_key='',
if not self.connection:
raise RecoverableConnectionError(
'basic_publish: connection closed')

client_properties = self.connection.client_properties
if client_properties['capabilities']['connection.blocked']:
try:
# Check if an event was sent, such as the out of memory message
self.connection.drain_events(timeout=0)
except socket.timeout:
pass

try:
with self.connection.transport.having_timeout(timeout):
return self.send_method(
Expand Down
53 changes: 49 additions & 4 deletions t/unit/test_channel.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
from __future__ import absolute_import, unicode_literals

import pytest
from case import ContextMock, Mock, patch, ANY
import socket
from case import ContextMock, Mock, patch, ANY, MagicMock

from amqp import spec
from amqp.platform import pack
from amqp.serialization import dumps
from amqp.channel import Channel
from amqp.exceptions import ConsumerCancelled, NotFound, MessageNacked
from amqp.exceptions import ConsumerCancelled, NotFound, MessageNacked, \
RecoverableConnectionError


class test_Channel:

@pytest.fixture(autouse=True)
def setup_conn(self):
self.conn = Mock(name='connection')
self.conn = MagicMock(name='connection')
self.conn.channels = {}
self.conn._get_free_channel_id.return_value = 2
self.c = Channel(self.conn, 1)
Expand Down Expand Up @@ -366,7 +368,43 @@ def wait(method, *args, **kwargs):
spec.Basic.Nack, frame, None
)

def test_basic_publsh_confirm_callback(self):
def test_basic_publish_connection_blocked(self):
# Basic test checking that drain_events() is called
# before publishing message and send_method() is called
self.c._basic_publish('msg', 'ex', 'rkey')
self.conn.drain_events.assert_called_once_with(timeout=0)
self.c.send_method.assert_called_once_with(
spec.Basic.Publish, 'Bssbb',
(0, 'ex', 'rkey', False, False), 'msg',
)

self.c.send_method.reset_mock()

# Basic test checking that socket.timeout exception
# is ignored and send_method() is called.
self.conn.drain_events.side_effect = socket.timeout
self.c._basic_publish('msg', 'ex', 'rkey')
self.c.send_method.assert_called_once_with(
spec.Basic.Publish, 'Bssbb',
(0, 'ex', 'rkey', False, False), 'msg',
)

def test_basic_publish_connection_blocked_not_supported(self):
# Test veryfying that when server does not have
# connection.blocked capability, drain_events() are not called
self.conn.client_properties = {
'capabilities': {
'connection.blocked': False
}
}
self.c._basic_publish('msg', 'ex', 'rkey')
self.conn.drain_events.assert_not_called()
self.c.send_method.assert_called_once_with(
spec.Basic.Publish, 'Bssbb',
(0, 'ex', 'rkey', False, False), 'msg',
)

def test_basic_publish_confirm_callback(self):

def wait_nack(method, *args, **kwargs):
kwargs['callback'](spec.Basic.Nack)
Expand All @@ -388,6 +426,13 @@ def wait_ack(method, *args, **kwargs):
# it must nost raise exception
self.c.basic_publish_confirm(1, 2, arg=1)

def test_basic_publish_connection_closed(self):
self.c.collect()
with pytest.raises(RecoverableConnectionError) as excinfo:
self.c._basic_publish('msg', 'ex', 'rkey')
assert 'basic_publish: connection closed' in str(excinfo.value)
self.c.send_method.assert_not_called()

def test_basic_qos(self):
self.c.basic_qos(0, 123, False)
self.c.send_method.assert_called_with(
Expand Down

0 comments on commit b0bc72f

Please sign in to comment.