Skip to content
This repository has been archived by the owner on Mar 22, 2018. It is now read-only.

Commit

Permalink
CommissaireService now takes advantage of commissaire.bus.BusMixin.
Browse files Browse the repository at this point in the history
  • Loading branch information
ashcrow authored and mbarnes committed Sep 20, 2016
1 parent a55fe11 commit ad50832
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 125 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
commissaire
kombu
python-etcd
requests
Expand Down
86 changes: 5 additions & 81 deletions src/commissaire_service/service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
import logging
import multiprocessing
import traceback
import uuid

from time import sleep

from commissaire.bus import BusMixin

from kombu import Connection, Exchange, Producer, Queue
from kombu.mixins import ConsumerMixin

Expand Down Expand Up @@ -110,7 +111,7 @@ def run(self):
sleep(1)


class CommissaireService(ConsumerMixin):
class CommissaireService(ConsumerMixin, BusMixin):
"""
An example prototype CommissaireService base class.
"""
Expand Down Expand Up @@ -147,16 +148,6 @@ def __init__(self, exchange_name, connection_url, qkwargs):
self.producer = Producer(self._channel, self._exchange)
self.logger.debug('Initializing finished')

@classmethod
def create_id(cls):
"""
Creates a new unique identifier.
:returns: A unique identification string.
:rtype: str
"""
return str(uuid.uuid4())

def get_consumers(self, Consumer, channel):
"""
Returns the a list of consumers to watch. Called by the parent Mixin.
Expand Down Expand Up @@ -295,81 +286,14 @@ def respond(self, queue_name, id, payload, **kwargs):
self.logger.debug('Sent response for message id "{}"'.format(id))
send_queue.close()

def request(self, routing_key, method, params={}, **kwargs):
"""
Sends a request to a simple queue. Requests create the initial response
queue and wait for a response.
:param routing_key: The routing key to publish on.
:type routing_key: str
:param method: The remote method to request.
:type method: str
:param params: Keyword parameters to pass to the remote method.
:type params: dict
:param kwargs: Keyword arguments to pass to SimpleQueue
:type kwargs: dict
:returns: Result
:rtype: tuple
"""
id = self.create_id()
response_queue_name = 'response-{}'.format(id)
self.logger.debug('Creating response queue "{}"'.format(
response_queue_name))
queue_opts = {
'auto_delete': True,
'durable': False,
}
if kwargs.get('queue_opts'):
queue_opts.update(kwargs.pop('queue_opts'))

self.logger.debug('Response queue arguments: {}'.format(kwargs))

response_queue = self.connection.SimpleQueue(
response_queue_name,
queue_opts=queue_opts,
**kwargs)

jsonrpc_msg = {
'jsonrpc': "2.0",
'id': id,
'method': method,
'params': params,
}
self.logger.debug('jsonrpc message for id "{}": "{}"'.format(
id, jsonrpc_msg))

self.producer.publish(
jsonrpc_msg,
routing_key,
declare=[self._exchange],
reply_to=response_queue_name)

self.logger.debug(
'Sent message id "{}" to "{}". Waiting on response...'.format(
id, response_queue_name))

result = response_queue.get(block=True, timeout=3)
result.ack()

if 'error' in result.payload.keys():
self.logger.warn(
'Error returned from the message id "{}"'.format(
id, result.payload))

self.logger.debug(
'Result retrieved from response queue "{}": payload="{}"'.format(
response_queue_name, result))
self.logger.debug('Closing queue {}'.format(response_queue_name))
response_queue.close()
return result.payload

def onconnection_revived(self): # pragma: no cover
"""
Called when a reconnection occurs.
"""
self.logger.info('Connection (re)established')

def on_consume_ready(self, connection, channel, consumers): # pragma: no cover # NOQA
def on_consume_ready(
self, connection, channel, consumers): # pragma: no cover
"""
Called when the service is ready to consume messages.
Expand Down
44 changes: 0 additions & 44 deletions test/test_service_commissaireservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,6 @@ def test_initialization(self):
self._producer.assert_called_once_with(
self.service_instance._channel, self.service_instance._exchange)

def test_create_id(self):
"""
Verify CommissaireService.create_id makes a unique identifier.
"""
uid = CommissaireService.create_id()
# It should be a string
self.assertIs(str, type(uid))
# And it should be 36 chars in length (uuid.uuid4())
self.assertEquals(len(ID), len(uid))

def test_get_consumers(self):
"""
Verify CommissaireService.get_consumers properly sets consumers.
Expand Down Expand Up @@ -134,40 +124,6 @@ def test_responds(self):
self.service_instance.connection.SimpleQueue.__call__(
).close.assert_called_once_with()

def test_request(self):
"""
Verify CommissaireService.request can request method calls.
"""
routing_key = 'routing_key'
method = 'ping'
params = {}
queue_opts={'durable': False, 'auto_delete': True}

self.service_instance.request(
routing_key, method, params=params)
# A new SimpleQueue should have been created
self.service_instance.connection.SimpleQueue.assert_called_once_with(
mock.ANY,
queue_opts=queue_opts
)
# A jsonrpc message should have been published to the bus
self.service_instance.producer.publish.assert_called_once_with(
{
'jsonrpc': "2.0",
'id': mock.ANY,
'method': method,
'params': params,
},
routing_key,
declare=[self.service_instance._exchange],
reply_to=mock.ANY)
# The simple queue should be used to get a response
self.service_instance.connection.SimpleQueue.__call__(
).get.assert_called_once_with(block=True, timeout=mock.ANY)
# And finally the queue should be closed
self.service_instance.connection.SimpleQueue.__call__(
).close.assert_called_once_with()

def test__wrap_on_message_with_exposed_method(self):
"""
Verify ServiceManager._wrap_on_message routes requests properly.
Expand Down

0 comments on commit ad50832

Please sign in to comment.