Skip to content

Commit

Permalink
Merge branch 'master' of github.com:pricingassistant/mrq
Browse files Browse the repository at this point in the history
* 'master' of github.com:pricingassistant/mrq:
  requirement update
  Max time (pricingassistant#152)
  more info on workers (pricingassistant#151)
  Save abort traceback (pricingassistant#149)
  • Loading branch information
sylvinus committed Mar 19, 2017
2 parents 218c26a + 9d4086e commit f7aa5e4
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 9 deletions.
1 change: 1 addition & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ USE_LARGE_JOB_IDS = False #Do not use compacted job IDs in Redis. For compatibil
"""
QUEUES = ("default",) # The queues to listen on.Defaults to default , which will listen on all queues.
MAX_JOBS = 0 #Gevent:max number of jobs to do before quitting. Workaround for memory leaks in your tasks. Defaults to 0
MAX_TIME = 0 # max number of seconds a worker runs before quitting
MAX_MEMORY = 1 #Max memory (in Mb) after which the process will be shut down. Use with PROCESS = [1-N] to have supervisord automatically respawn the worker when this happens.Defaults to 1
GRENLETS = 1 #Max number of greenlets to use.Defaults to 1.
PROCESSES = 0 #Number of processes to launch with supervisord.Defaults to 0.
Expand Down
3 changes: 3 additions & 0 deletions docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ It is started with a list of queues to listen to, in a specific order.

It can be started with concurrency options (multiple processes and / or multiple greenlets). We call this whole group a single 'worker' even though it is able to dequeue multiple jobs in parallel.

If a worker is started with concurrency options, it will poll for waiting jobs and dispatch them to its related processes/greenlets.
For example, if we decide to use the greenlets option, under the hood, the worker will be one python process that has a pool of greenlets which will be in charge of actually running tasks.


## Statuses

Expand Down
7 changes: 7 additions & 0 deletions mrq/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,13 @@ def add_parser_args(parser, config_type):
help='Gevent: max number of jobs to do before quitting.' +
' Temp workaround for memory leaks')

parser.add_argument(
'--max_time',
default=0.0,
type=float,
action='store',
help='Max time a worker should run before quitting.')

parser.add_argument(
'--max_memory',
default=0,
Expand Down
2 changes: 1 addition & 1 deletion mrq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ def save_abort(self):
"dateexpires": dateexpires
}

self._save_status("abort", updates)
self._save_status("abort", updates, exception=True)

def _save_status(self, status, updates=None, exception=False, w=None, j=None):

Expand Down
17 changes: 13 additions & 4 deletions mrq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import sys
import json as json_stdlib
import ujson as json
import http.server
from bson import ObjectId
from collections import defaultdict

Expand Down Expand Up @@ -53,6 +52,7 @@ def __init__(self):

self.done_jobs = 0
self.max_jobs = self.config["max_jobs"]
self.max_time = datetime.timedelta(seconds=self.config["max_time"]) or None

self.connected = False # MongoDB + Redis

Expand Down Expand Up @@ -419,7 +419,7 @@ def work(self):
"""
self.work_init()

self.work_loop(max_jobs=self.max_jobs)
self.work_loop(max_jobs=self.max_jobs, max_time=self.max_time)

return self.work_stop()

Expand Down Expand Up @@ -449,7 +449,7 @@ def work_init(self):

self.install_signal_handlers()

def work_loop(self, max_jobs=None):
def work_loop(self, max_jobs=None, max_time=None):

self.done_jobs = 0
self.idle_wait_count = 0
Expand All @@ -459,6 +459,7 @@ def work_loop(self, max_jobs=None):
try:

queue_offset = 0
max_time_reached = False

while True:

Expand All @@ -472,6 +473,12 @@ def work_loop(self, max_jobs=None):

while True:

# we put this here to make sure we have a strict limit on max_time
if max_time and datetime.datetime.utcnow() - self.datestarted >= max_time:
self.log.info("Reached max_time=%s" % max_time.seconds)
max_time_reached = True
break

free_pool_slots = self.gevent_pool.free_count()

if max_jobs:
Expand All @@ -487,6 +494,9 @@ def work_loop(self, max_jobs=None):
self.status = "full"
gevent.sleep(0.01)

if max_time_reached:
break

jobs = []

available_queues = [
Expand Down Expand Up @@ -561,7 +571,6 @@ def work_loop(self, max_jobs=None):

self.log.debug("Joining the greenlet pool...")
self.status = "join"

self.gevent_pool.join(timeout=None, raise_error=False)
self.log.debug("Joined.")

Expand Down
2 changes: 1 addition & 1 deletion requirements-base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ termcolor==1.1.0
subprocess32==3.2.7; python_version < '3.2'
supervisor==3.0; python_version < '3.0'
git+git://github.com/Supervisor/supervisor.git@c18aecf1641d8953767e7010be8bae1924a133bf#egg=Supervisor; python_version >= '3.0'
future==0.15.2
future>=0.15.2
importlib==1.0.3; python_version < '2.7'
18 changes: 18 additions & 0 deletions tests/tasks/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,31 @@ def run(self, params):
raise Exception(params.get("message", ""))


class InAbortException(BaseException):
pass


class Abort(Task):

def run(self, params):

abort_current_job()


class AbortOnFailed(Task):
def run(self, params):

log.info("Will abort this task")

connections.mongodb_jobs.tests_inserts.insert(params)
try:
raise InAbortException
except InAbortException:
abort_current_job()

raise Exception("Should not be reached")


class ReturnParams(Task):

def run(self, params):
Expand Down
23 changes: 21 additions & 2 deletions tests/test_abort.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from mrq.job import Job
from mrq.queue import Queue
from datetime import datetime
from datetime import timedelta

from mrq.queue import Queue


def test_abort(worker):

Expand All @@ -19,3 +19,22 @@ def test_abort(worker):
assert job["status"] == "abort"
assert job.get("dateexpires") is not None
assert job["dateexpires"] < datetime.utcnow() + timedelta(hours=24)


def test_abort_traceback_history(worker):

worker.start(flags="--config tests/fixtures/config-tracebackhistory.py")

worker.send_task("tests.tasks.general.Abort", {"a": 41}, block=True, accept_statuses=["abort"])

job = worker.mongodb_jobs.mrq_jobs.find()[0]

assert len(job["traceback_history"]) == 1
assert not job["traceback_history"][0].get("original_traceback")

worker.send_task("tests.tasks.general.AbortOnFailed", {"a": 41}, block=True, accept_statuses=["abort"])

job = worker.mongodb_jobs.mrq_jobs.find({"path": "tests.tasks.general.AbortOnFailed"})[0]

assert len(job["traceback_history"]) == 1
assert "InAbortException" in job["traceback_history"][0].get("original_traceback")
21 changes: 20 additions & 1 deletion tests/test_interrupts.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import time
import datetime
from builtins import str
from mrq.job import Job
from mrq.job import Job, get_job_result
from mrq.queue import Queue
from bson import ObjectId
import pytest
Expand Down Expand Up @@ -322,6 +322,25 @@ def test_interrupt_maxjobs(worker):
assert Queue("default").size() == 7


def test_worker_interrupt_after_max_time(worker):
worker.start(flags="--greenlets=2 --max_time=2", queues="test1 default")

task_ids = worker.send_tasks("tests.tasks.general.Add", [{"a": i, "b": 1, "sleep": 3} for i in range(5)],
block=False)

time.sleep(5)

results = [get_job_result(task_id) for task_id in task_ids]

queued_tasks = [result for result in results if result['status'] == "queued"]
successful_tasks = [(i, result) for i, result in enumerate(results) if result['status'] == "success"]

assert len(queued_tasks) == 3
assert len(successful_tasks) == 2
for i, result in successful_tasks:
assert result['result'] == i + 1


def test_interrupt_maxconcurrency(worker):

# The worker will raise a maxconcurrency on the second job
Expand Down

0 comments on commit f7aa5e4

Please sign in to comment.