Skip to content

Commit

Permalink
pause subqueues and the root queue is paused
Browse files Browse the repository at this point in the history
  • Loading branch information
Florian Perucki committed Oct 21, 2016
1 parent 1638a89 commit 1ed40d1
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 6 deletions.
23 changes: 19 additions & 4 deletions mrq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,21 +158,36 @@ def unserialize_job_ids(self, job_ids):
else:
return [x.encode('hex') for x in job_ids]

def _get_pausable_id(self):
"""
Get the queue id (either id or root_id) that should be used to pause/unpause the current queue
TODO: handle subqueues with more than one level, e.g. "queue/subqueue/"
"""
queue = self.id
delimiter = context.get_current_config().get("subqueues_delimiter")
if delimiter is not None and self.id.endswith(delimiter):
queue = self.root_id
return queue

def pause(self):
""" Adds this queue to the set of paused queues """
context.connections.redis.sadd(Queue.redis_key_paused_queues(), self.id)
context.connections.redis.sadd(Queue.redis_key_paused_queues(), self._get_pausable_id())

def is_paused(self):
"""
Returns wether the queue is paused or not.
Warning: this does NOT ensure that the queue was effectively added to
the list of paused queues. See the 'paused_queues_refresh_interval' option.
the set of paused queues. See the 'paused_queues_refresh_interval' option.
"""
return context.connections.redis.sismember(Queue.redis_key_paused_queues(), self.id)
root_is_paused = False
if self.root_id != self.id:
root_is_paused = context.connections.redis.sismember(Queue.redis_key_paused_queues(), self.root_id)

return root_is_paused or context.connections.redis.sismember(Queue.redis_key_paused_queues(), self.id)

def resume(self):
""" Resumes a paused queue """
context.connections.redis.srem(Queue.redis_key_paused_queues(), self.id)
context.connections.redis.srem(Queue.redis_key_paused_queues(), self._get_pausable_id())

def size(self):
""" Returns the total number of jobs on the queue """
Expand Down
9 changes: 7 additions & 2 deletions mrq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def greenlet_paused_queues(self):

while True:

# Update the process-local list of known queues
# Update the process-local list of paused queues
Queue.paused_queues = Queue.redis_paused_queues()
time.sleep(self.config["paused_queues_refresh_interval"])

Expand Down Expand Up @@ -483,7 +483,12 @@ def work_loop(self, max_jobs=None):
gevent.sleep(0.01)

jobs = []
available_queues = [queue for queue in self.queues if queue.id not in Queue.paused_queues]

available_queues = [
queue for queue in self.queues
if queue.root_id not in Queue.paused_queues and
queue.id not in Queue.paused_queues
]

for queue_i in xrange(len(available_queues)):

Expand Down
55 changes: 55 additions & 0 deletions tests/test_pause.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from mrq.job import Job
import pytest
from mrq.queue import Queue, send_task
import time
from mrq.context import set_current_config, get_config


def test_pause_resume(worker):
Expand Down Expand Up @@ -73,3 +75,56 @@ def test_pause_refresh_interval(worker):
assert job1["result"] == {"a": 41}

worker.stop()


def test_pause_subqueue(worker):

# set config in current context in order to have a subqueue delimiter
set_current_config(get_config(config_type="worker"))

worker.start(queues="high high/", flags="--subqueues_refresh_interval=1 --paused_queues_refresh_interval=1")

Queue("high").pause()

assert Queue("high/").is_paused()

# wait for the paused_queues list to be refreshed
time.sleep(2)

job_id1 = send_task(
"tests.tasks.general.MongoInsert", {"a": 41},
queue="high")

job_id2 = send_task(
"tests.tasks.general.MongoInsert", {"a": 43},
queue="high/subqueue")

# wait a bit to make sure the jobs status will still be queued
time.sleep(5)

job1 = Job(job_id1).fetch().data
job2 = Job(job_id2).fetch().data

assert job1["status"] == "queued"
assert job2["status"] == "queued"

assert worker.mongodb_jobs.tests_inserts.count() == 0

Queue("high/").resume()

Job(job_id1).wait(poll_interval=0.01)

Job(job_id2).wait(poll_interval=0.01)

job1 = Job(job_id1).fetch().data
job2 = Job(job_id2).fetch().data

assert job1["status"] == "success"
assert job1["result"] == {"a": 41}

assert job2["status"] == "success"
assert job2["result"] == {"a": 43}

assert worker.mongodb_jobs.tests_inserts.count() == 2

worker.stop()

0 comments on commit 1ed40d1

Please sign in to comment.