Skip to content
This repository has been archived by the owner on Mar 22, 2018. It is now read-only.

Commit

Permalink
Message bodies can now be delivered as dict or json strings.
Browse files Browse the repository at this point in the history
While the body should be a dict, json strings are now accepted and
loaded when they are passed to _wrap_on_message.
  • Loading branch information
ashcrow committed Sep 15, 2016
1 parent 1467771 commit 007c222
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 48 deletions.
101 changes: 58 additions & 43 deletions src/commissaire_service/service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"""
Service base class.
"""
import json
import logging
import multiprocessing
import uuid
Expand Down Expand Up @@ -180,75 +181,89 @@ def on_message(self, body, message):
Called when a non-jsonrpc message arrives.
:param body: Body of the message.
:type body: str
:type body: dict
:param message: The message instance.
:type message: kombu.message.Message
"""
self.logger.error(
'Dropping unknown message: payload="{}", properties="{}"'.format(
body, message.properties))
message.ack()

def _wrap_on_message(self, body, message):
"""
Wraps on_message for jsonrpc routing and logging.
:param body: Body of the message.
:type body: str
:type body: dict or json string
:param message: The message instance.
:type message: kombu.message.Message
"""
self.logger.debug('Received message "{}" {}'.format(
message.delivery_tag, body))
expected_method = message.delivery_info['routing_key'].rsplit(
'.', 1)[1]
# If we have a method and it matches the routing key treat it
# as a jsonrpc call
if (
isinstance(body, dict) and
'method' in body.keys() and
body.get('method') == expected_method):
try:

# If we don't get a valid message we default to -1 for the id
uid = -1
result = None
try:
# If we don't have a dict then it should be a json string
if isinstance(body, str):
body = json.loads(body)

# If we have a method and it matches the routing key treat it
# as a jsonrpc call
if (
isinstance(body, dict) and
'method' in body.keys() and
body.get('method') == expected_method):
uid = body.get('id', '-1')
method = getattr(self, 'on_{}'.format(body['method']))
if type(body['params']) is dict:
result = method(message=message, **body['params'])
else:
result = method(message, *body['params'])
except Exception as error:
jsonrpc_error_code = -32600
# If there is an attribute error then use the Method Not Found
# code in the error response
if type(error) is AttributeError:
jsonrpc_error_code = -32601
result = {
'jsonrpc': '2.0',
'id': body['id'],
'error': {
'code': jsonrpc_error_code,
'message': str(error),
'data': {
'exception': str(type(error))
}

self.logger.debug('Result for "{}": "{}"'.format(
uid, result))
# Otherwise send it to on_message
else:
self.on_message(body, message)
except Exception as error:
jsonrpc_error_code = -32600
# If there is an attribute error then use the Method Not Found
# code in the error response
if type(error) is AttributeError:
jsonrpc_error_code = -32601
elif type(error) is json.decoder.JSONDecodeError:
jsonrpc_error_code = -32700 # Parser error
result = {
'jsonrpc': '2.0',
'id': uid,
'error': {
'code': jsonrpc_error_code,
'message': str(error),
'data': {
'exception': str(type(error))
}
}
self.logger.warn(
'Exception raised during method call: {}: {}'.format(
type(error), error))
self.logger.debug('Result for "{}": "{}"'.format(
body['id'], result))
message.ack()
if message.properties.get('reply_to'):
self.logger.debug('Responding to {0}'.format(
message.properties['reply_to']))
response_queue = self.connection.SimpleQueue(
message.properties['reply_to'])
response_queue.put({
'result': result,
})
response_queue.close()
# Otherwise send it to on_message
else:
self.on_message(body, message)
}
self.logger.warn(
'Exception raised during method call: {}: {}'.format(
type(error), error))

# Reply back if needed
if message.properties.get('reply_to'):
self.logger.debug('Responding to {0}'.format(
message.properties['reply_to']))
response_queue = self.connection.SimpleQueue(
message.properties['reply_to'])
response_queue.put({
'result': json.dumps(result),
})
response_queue.close()

message.ack()
self.logger.debug('Message "{0}" {1} ackd'.format(
message.delivery_tag,
('was' if message.acknowledged else 'was not')))
Expand Down
9 changes: 4 additions & 5 deletions test/test_service_commissaireservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ def test_on_message(self):
"""
message = mock.MagicMock(properties={'properties': 'here'})
self.service_instance.on_message('test', message)
message.ack.assert_called_once_with()

def test_responds(self):
"""
Expand Down Expand Up @@ -183,7 +182,7 @@ def test__wrap_on_message_with_exposed_method(self):
payload=body,
properties={'reply_to': 'test_queue'},
delivery_info={'routing_key': 'test.method'})
self.service_instance.on_method = mock.MagicMock()
self.service_instance.on_method = mock.MagicMock(return_value='{}')
self.service_instance._wrap_on_message(body, message)
# The on_method should have been called
self.service_instance.on_method.assert_called_once_with(
Expand Down Expand Up @@ -211,10 +210,10 @@ def test__wrap_on_message_with_bad_message(self):
"""
Verify ServiceManager._wrap_on_message forwards to on_message on non jsonrpc messages.
"""
body = 'message'
self.service_instance.on_message = mock.MagicMock()
body = '[]'
message = mock.MagicMock(
payload=body,
properties={'reply_to': 'test_queue'})
self.service_instance._wrap_on_message(body, message)
# The message should be ackd
message.ack.assert_called_once_with()
self.assertEquals(1, self.service_instance.on_message.call_count)

0 comments on commit 007c222

Please sign in to comment.