Skip to content

Commit

Permalink
Fix #42 -- Aquire and dispose broker connection per request
Browse files Browse the repository at this point in the history
Akquire the connetion each time `quantity` is called instead of
only once during app loading in Django.
Dispose old connection after usage.
  • Loading branch information
codingjoe committed Oct 10, 2018
1 parent 0be7ea1 commit 57075ec
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 26 deletions.
8 changes: 6 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ cache:
- pip
services:
- rabbitmq
install: pip install -Ur requirements.txt
script: coverage run --source=hirefire -m pytest
- redis-server
env:
- TOXENV = redis
- TOXENV = rabbitmq
install: pip install -U tox codecov
script: tox -e "$TOXENV"
branches:
only:
- master
Expand Down
55 changes: 32 additions & 23 deletions hirefire/procs/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@

from celery.app import app_or_default

try:
from librabbitmq import ChannelError
except ImportError:
try:
from amqp.exceptions import ChannelError
except ImportError:
# No RabbitMQ API wrapper installed, different celery broker used
ChannelError = Exception

from ..utils import KeyDefaultDict
from . import Proc

Expand Down Expand Up @@ -233,35 +242,35 @@ def __init__(self, app=None, *args, **kwargs):
if app is not None:
self.app = app
self.app = app_or_default(self.app)
self.connection = self.app.connection()
self.channel = self.connection.channel()

@staticmethod
def _get_redis_task_count(channel, queue):
return channel.client.llen(queue)

@staticmethod
def _get_rabbitmq_task_count(channel, queue):
try:
return channel.queue_declare(queue=queue, passive=True).message_count
except ChannelError:
logger.warning("The requested queue %s has not been created yet", queue)
return 0

def quantity(self, cache=None, **kwargs):
"""
Returns the aggregated number of tasks of the proc queues.
"""
if hasattr(self.channel, '_size'):
# Redis
return sum(self.channel._size(queue) for queue in self.queues)
# AMQP
try:
from librabbitmq import ChannelError
except ImportError:
from amqp.exceptions import ChannelError
count = 0
for queue in self.queues:
try:
queue = self.channel.queue_declare(queue, passive=True)
except ChannelError:
# The requested queue has not been created yet
pass
else:
count += queue.message_count
with self.app.connection_or_acquire() as connection:
channel = connection.channel()

if cache is not None and self.inspect_statuses:
count += self.inspect_count(cache)

return count
# Redis
if hasattr(channel, '_size'):
return sum(self._get_redis_task_count(channel, queue) for queue in self.queues)

# RabbitMQ
count = sum(self._get_rabbitmq_task_count(channel, queue) for queue in self.queues)
if cache is not None and self.inspect_statuses:
count += self.inspect_count(cache)
return count

def inspect_count(self, cache):
"""Use Celery's inspect() methods to see tasks on workers."""
Expand Down
17 changes: 17 additions & 0 deletions tests/contrib/django/testapp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
2 changes: 1 addition & 1 deletion tests/contrib/django/testapp/procs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
class WorkerProc(CeleryProc):
name = 'worker'
queues = ['celery']
inspect_statuses = []
inspect_statuses = ['active', 'reserved', 'scheduled']
5 changes: 5 additions & 0 deletions tests/contrib/django/testapp/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,8 @@
# https://docs.djangoproject.com/en/2.1/howto/static-files/

STATIC_URL = '/static/'


# Celery

CELERY_BROKER_URL = os.getenv('CELERY_BROKER_URL', 'amqp://')
14 changes: 14 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[[tox]
envlist = redis,rabbitmq

[testenv]
deps = -rrequirements.txt
commands = coverage run --append --source=hirefire -m pytest

[testenv:redis]
setenv =
CELERY_BROKER_URL = amqp://

[testenv:rabbitmq]
setenv =
CELERY_BROKER_URL = redis://

0 comments on commit 57075ec

Please sign in to comment.