Skip to content
This repository has been archived by the owner on Jun 25, 2019. It is now read-only.

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
necrophonic committed Feb 16, 2017
2 parents 1f47f93 + 890d12a commit 267c606
Show file tree
Hide file tree
Showing 24 changed files with 338 additions and 377 deletions.
7 changes: 3 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
language: python
python:
- "3.5"
- "3.4"
install:
- "pip install -r requirements.txt"
- "pip install -r test_requirements.txt"
- make
script:
- flake8
- python -m unittest tests/**/*.py
- make test
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Unreleased

### 1.0.0 2017-02-2016
- Initial release
5 changes: 1 addition & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
FROM onsdigital/flask-crypto
FROM onsdigital/flask-crypto-queue

ADD app /app
ADD requirements.txt /requirements.txt
ADD startup.sh /startup.sh

RUN mkdir -p /app/logs

RUN pip3 install --no-cache-dir -U -I -r /requirements.txt

ENTRYPOINT ./startup.sh
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
build:
pip3 install -r requirements.txt

test:
pip3 install -r test_requirements.txt
flake8 --exclude lib
python3 -m unittest tests/*.py
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# sdx-downstream-ctp

[![Build Status](https://travis-ci.org/ONSdigital/sdx-downstream-ctp.svg?branch=develop)](https://travis-ci.org/ONSdigital/sdx-downstream-ctp)

The sdx-downstream-ctp app is used within the Office for National Statistics (ONS) for consuming decrypted Survey Data Exchange (SDX) Surveys from sdx-store and delivering them to CTP.

## Installation
Expand All @@ -22,6 +24,4 @@ The following envioronment variables can be set:

`SDX_STORE_URL` - The URL of the sdx-store service, defaults to http://sdx-store:5000

`SDX_TRANSFORM_CTP_URL` - The URL of the sdx-transform-cs service, defaults to http://sdx-transform-ctp:5000

`SDX_SEQUENCE_URL` - The URL of the sdx-transform-cs service, defaults to http://sdx-sequence:5000
70 changes: 41 additions & 29 deletions app/async_consumer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from app import settings
import pika
import logging
from structlog import wrap_logger
import time

LOGGER = settings.logger
logger = wrap_logger(logging.getLogger(__name__))


class AsyncConsumer(object):
Expand Down Expand Up @@ -49,29 +51,29 @@ def connect(self):
self._url = settings.RABBIT_URLS[server_choice]

try:
LOGGER.info('Connecting to queue', attempt=count)
logger.info('Connecting to queue', attempt=count)
return pika.SelectConnection(pika.URLParameters(self._url),
self.on_connection_open,
stop_ioloop_on_close=False)
except pika.exceptions.AMQPConnectionError as e:
LOGGER.error("Connection error", exception=e)
logger.error("Connection error", exception=e)
count += 1
LOGGER.error("Connection sleep", no_of_seconds=count)
logger.error("Connection sleep", no_of_seconds=count)
time.sleep(count)

continue

def close_connection(self):
"""This method closes the connection to RabbitMQ."""
LOGGER.info('Closing connection')
logger.info('Closing connection')
self._connection.close()

def add_on_connection_close_callback(self):
"""This method adds an on close callback that will be invoked by pika
when RabbitMQ closes the connection to the publisher unexpectedly.
"""
LOGGER.info('Adding connection close callback')
logger.info('Adding connection close callback')
self._connection.add_on_close_callback(self.on_connection_closed)

def on_connection_closed(self, connection, reply_code, reply_text):
Expand All @@ -88,7 +90,7 @@ def on_connection_closed(self, connection, reply_code, reply_text):
if self._closing:
self._connection.ioloop.stop()
else:
LOGGER.warning('Connection closed, reopening in 5 seconds',
logger.warning('Connection closed, reopening in 5 seconds',
reply_code=reply_code, reply_text=reply_text)
self._connection.add_timeout(5, self.reconnect)

Expand All @@ -100,7 +102,7 @@ def on_connection_open(self, unused_connection):
:type unused_connection: pika.SelectConnection
"""
LOGGER.info('Connection opened')
logger.info('Connection opened')
self.add_on_connection_close_callback()
self.open_channel()

Expand All @@ -119,7 +121,7 @@ def add_on_channel_close_callback(self):
RabbitMQ unexpectedly closes the channel.
"""
LOGGER.info('Adding channel close callback')
logger.info('Adding channel close callback')
self._channel.add_on_close_callback(self.on_channel_closed)

def on_channel_closed(self, channel, reply_code, reply_text):
Expand All @@ -134,7 +136,7 @@ def on_channel_closed(self, channel, reply_code, reply_text):
:param str reply_text: The text reason the channel was closed
"""
LOGGER.warning('Channel was closed', channel=channel,
logger.warning('Channel was closed', channel=channel,
reply_code=reply_code, reply_text=reply_text)
self._connection.close()

Expand All @@ -147,7 +149,7 @@ def on_channel_open(self, channel):
:param pika.channel.Channel channel: The channel object
"""
LOGGER.info('Channel opened')
logger.info('Channel opened')
self._channel = channel
self.add_on_channel_close_callback()
self.setup_exchange(self.EXCHANGE)
Expand All @@ -160,7 +162,7 @@ def setup_exchange(self, exchange_name):
:param str|unicode exchange_name: The name of the exchange to declare
"""
LOGGER.info('Declaring exchange', name=exchange_name)
logger.info('Declaring exchange', name=exchange_name)
self._channel.exchange_declare(self.on_exchange_declareok,
exchange_name,
self.EXCHANGE_TYPE)
Expand All @@ -172,7 +174,7 @@ def on_exchange_declareok(self, unused_frame):
:param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
"""
LOGGER.info('Exchange declared')
logger.info('Exchange declared')
self.setup_queue(self.QUEUE)

def setup_queue(self, queue_name):
Expand All @@ -183,8 +185,8 @@ def setup_queue(self, queue_name):
:param str|unicode queue_name: The name of the queue to declare.
"""
LOGGER.info('Declaring queue', name=queue_name)
self._channel.queue_declare(self.on_queue_declareok, queue_name)
logger.info('Declaring queue', name=queue_name)
self._channel.queue_declare(self.on_queue_declareok, queue_name, durable=False)

def on_queue_declareok(self, method_frame):
"""Method invoked by pika when the Queue.Declare RPC call made in
Expand All @@ -196,7 +198,7 @@ def on_queue_declareok(self, method_frame):
:param pika.frame.Method method_frame: The Queue.DeclareOk frame
"""
LOGGER.info('Binding to rabbit', exchange=self.EXCHANGE, queue=self.QUEUE)
logger.info('Binding to rabbit', exchange=self.EXCHANGE, queue=self.QUEUE)
self._channel.queue_bind(self.on_bindok, self.QUEUE, self.EXCHANGE)

def add_on_cancel_callback(self):
Expand All @@ -205,7 +207,7 @@ def add_on_cancel_callback(self):
on_consumer_cancelled will be invoked by pika.
"""
LOGGER.info('Adding consumer cancellation callback')
logger.info('Adding consumer cancellation callback')
self._channel.add_on_cancel_callback(self.on_consumer_cancelled)

def on_consumer_cancelled(self, method_frame):
Expand All @@ -215,7 +217,7 @@ def on_consumer_cancelled(self, method_frame):
:param pika.frame.Method method_frame: The Basic.Cancel frame
"""
LOGGER.info('Consumer was cancelled remotely, shutting down: %r',
logger.info('Consumer was cancelled remotely, shutting down: %r',
method_frame)
if self._channel:
self._channel.close()
Expand All @@ -227,17 +229,26 @@ def acknowledge_message(self, delivery_tag, **kwargs):
:param int delivery_tag: The delivery tag from the Basic.Deliver frame
"""
LOGGER.info('Acknowledging message', delivery_tag=delivery_tag, **kwargs)
logger.info('Acknowledging message', delivery_tag=delivery_tag, **kwargs)
self._channel.basic_ack(delivery_tag)

def nack_message(self, delivery_tag, **kwargs):
"""Negative acknowledge a message
:param int delivery_tag: The deliver tag from the Basic.Deliver frame
"""
logger.info('Nacking message', delivery_tag=delivery_tag, **kwargs)
self._channel.basic_nack(delivery_tag)

def reject_message(self, delivery_tag, **kwargs):
"""Reject the message delivery from RabbitMQ by sending a
Basic.Reject RPC method for the delivery tag.
:param int delivery_tag: The delivery tag from the Basic.Deliver frame
"""
LOGGER.info('Rejecting message', delivery_tag=delivery_tag, **kwargs)
logger.info('Rejecting message', delivery_tag=delivery_tag, **kwargs)
self._channel.basic_reject(delivery_tag, requeue=False)

def on_message(self, unused_channel, basic_deliver, properties, body):
Expand All @@ -254,7 +265,7 @@ def on_message(self, unused_channel, basic_deliver, properties, body):
:param str|unicode body: The message body
"""
LOGGER.info('Received message',
logger.info('Received message',
delivery_tag=basic_deliver.delivery_tag, app_id=properties.app_id, msg=body)
self.acknowledge_message(basic_deliver.delivery_tag)

Expand All @@ -267,7 +278,7 @@ def on_cancelok(self, unused_frame):
:param pika.frame.Method unused_frame: The Basic.CancelOk frame
"""
LOGGER.info('RabbitMQ acknowledged the cancellation of the consumer')
logger.info('RabbitMQ acknowledged the cancellation of the consumer')
self.close_channel()

def stop_consuming(self):
Expand All @@ -276,7 +287,7 @@ def stop_consuming(self):
"""
if self._channel:
LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ')
logger.info('Sending a Basic.Cancel RPC command to RabbitMQ')
self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)

def start_consuming(self):
Expand All @@ -289,8 +300,9 @@ def start_consuming(self):
will invoke when a message is fully received.
"""
LOGGER.info('Issuing consumer related RPC commands')
logger.info('Issuing consumer related RPC commands')
self.add_on_cancel_callback()
self._channel.basic_qos(prefetch_count=1)
self._consumer_tag = self._channel.basic_consume(self.on_message,
self.QUEUE)

Expand All @@ -302,15 +314,15 @@ def on_bindok(self, unused_frame):
:param pika.frame.Method unused_frame: The Queue.BindOk response frame
"""
LOGGER.info('Queue bound')
logger.info('Queue bound')
self.start_consuming()

def close_channel(self):
"""Call to close the channel with RabbitMQ cleanly by issuing the
Channel.Close RPC command.
"""
LOGGER.info('Closing the channel')
logger.info('Closing the channel')
self._channel.close()

def open_channel(self):
Expand All @@ -319,7 +331,7 @@ def open_channel(self):
on_channel_open callback will be invoked by pika.
"""
LOGGER.info('Creating a new channel')
logger.info('Creating a new channel')
self._connection.channel(on_open_callback=self.on_channel_open)

def run(self):
Expand All @@ -341,8 +353,8 @@ def stop(self):
the IOLoop will be buffered but not processed.
"""
LOGGER.info('Stopping')
logger.info('Stopping')
self._closing = True
self.stop_consuming()
self._stopped = True
LOGGER.info('Stopped')
logger.info('Stopped')
67 changes: 36 additions & 31 deletions app/consumer.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,58 @@
from app.settings import logger
from app.async_consumer import AsyncConsumer
from app.helpers.request_helper import get_doc_from_store
from app.processors.common_software_processor import CTPProcessor
from .processor import CTPProcessor
from app import settings
from app.helpers.sdxftp import SDXFTP
from app.helpers.exceptions import BadMessageError, RetryableError


def get_delivery_count_from_properties(properties):
"""
Returns the delivery count for a message from the rabbit queue. The
value is auto-set by rabbitmq.
"""
delivery_count = 0
if properties.headers and 'x-delivery-count' in properties.headers:
delivery_count = properties.headers['x-delivery-count']

return delivery_count
return delivery_count + 1


class Consumer(AsyncConsumer):

def __init__(self):
self._ftp = SDXFTP(logger, settings.FTP_HOST, settings.FTP_USER, settings.FTP_PASS)
super(Consumer, self).__init__()

def on_message(self, unused_channel, basic_deliver, properties, body):
logger.info('Received message', delivery_tag=basic_deliver.delivery_tag, app_id=properties.app_id, body=body.decode("utf-8"))

delivery_count = get_delivery_count_from_properties(properties)
delivery_count += 1

logger.info(
'Received message',
queue=self.QUEUE,
delivery_tag=basic_deliver.delivery_tag,
delivery_count=delivery_count,
app_id=properties.app_id
)

mongo_id = body.decode("utf-8")
document = get_doc_from_store(mongo_id)
processor = CTPProcessor(logger, document, self._ftp)

try:
mongo_id = body.decode("utf-8")
document = get_doc_from_store(mongo_id)

processor = self.get_processor(document)

if processor:
processed_ok = processor.process()

if processed_ok:
self.acknowledge_message(basic_deliver.delivery_tag, tx_id=processor.tx_id)
else:
if delivery_count == settings.QUEUE_MAX_MESSAGE_DELIVERIES:
logger.error("Reached maximum number of retries", tx_id=processor.tx_id, delivery_count=delivery_count, message=mongo_id)
self.reject_message(basic_deliver.delivery_tag, tx_id=processor.tx_id)
else:
pass

except Exception as e:
logger.error("ResponseProcessor failed", exception=e, tx_id=processor.tx_id)

def get_processor(self, survey):
if survey.get('survey_id') == 'census':
return CTPProcessor(logger, survey)
else:
logger.error("Missing or not supported survey id")
return None
processor.process()
self.acknowledge_message(basic_deliver.delivery_tag, tx_id=processor.tx_id)
logger.info("Processed successfully", tx_id=processor.tx_id)

except BadMessageError as e:
# If it's a bad message then we have to reject it
self.reject_message(basic_deliver.delivery_tag, tx_id=processor.tx_id)
logger.error("Bad message", action="rejected", exception=e, delivery_count=delivery_count, tx_id=processor.tx_id)

except (RetryableError, Exception) as e:
self.nack_message(basic_deliver.delivery_tag, tx_id=processor.tx_id)
logger.error("Failed to process", action="nack for retry", exception=e, delivery_count=delivery_count, tx_id=processor.tx_id)


def main():
Expand Down
Loading

0 comments on commit 267c606

Please sign in to comment.