Skip to content

Commit

Permalink
Updated process & tests for traceback_histories
Browse files Browse the repository at this point in the history
  • Loading branch information
Pauleuh committed Sep 21, 2016
1 parent 5001128 commit b8a704d
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 19 deletions.
28 changes: 14 additions & 14 deletions mrq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import encodings
import copy_reg
from . import context
from config import get_config
from mrq.context import get_current_config


class Job(object):
Expand Down Expand Up @@ -358,15 +358,15 @@ def _save_traceback_history(self, status, trace, job_exc):
new_history = {
"date": failure_date,
"status": status,
"traceback": trace,
"exceptiontype": job_exc
"exceptiontype": job_exc.__name__
}

if self.original_exception:
new_history["original_traceback"] = new_history
traces = trace.split("---- Original exception: -----")
if len(traces) > 1:
new_history["original_traceback"] = traces[1]
new_history["traceback"] = traces[0]
self.collection.update({
"_id": self.id
}, {"$add": {"traceback_history": new_history}})
}, {"$push": {"traceback_history": new_history}})

def save_success(self, result=None):

Expand Down Expand Up @@ -399,7 +399,7 @@ def save_abort(self):
self._save_status("abort", updates)

def _save_status(self, status, updates=None, exception=False, w=None, j=None):
print "saving status"

if self.id is None:
return

Expand Down Expand Up @@ -431,12 +431,12 @@ def _save_status(self, status, updates=None, exception=False, w=None, j=None):
trace = traceback.format_exc()
context.log.error(trace)
db_updates["traceback"] = trace
db_updates["exceptiontype"] = sys.exc_info()[0].__name__
print "hello"
print get_config()
if get_config().get("save_traceback_history"):
print "saving traceback history"
self._save_traceback_history(status, trace, db_updates["exceptiontype"])
exc = sys.exc_info()[0]
db_updates["exceptiontype"] = exc.__name__

if get_current_config().get("save_traceback_history"):

self._save_traceback_history(status, trace, exc)

# In the most common case, we allow an optimization on Mongo writes
if status == "success":
Expand Down
24 changes: 24 additions & 0 deletions tests/tasks/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,30 @@ def run(self, params):
raise Exception("Should not be reached")


class InRetryException(BaseException):
pass


class RetryOnFailed(Task):

def run(self, params):

log.info("Retrying in %s on %s" %
(params.get("delay"), params.get("queue")))

connections.mongodb_jobs.tests_inserts.insert(params)
try:
raise InRetryException
except InRetryException:
retry_current_job(
queue=params.get("queue"),
delay=params.get("delay"),
max_retries=params.get("max_retries")
)

raise Exception("Should not be reached")


class WaitForFlag(Task):

def run(self, params):
Expand Down
30 changes: 25 additions & 5 deletions tests/test_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,34 @@ def test_retry_max_retries_zero(worker):


def test_retry_traceback_history(worker):
print "hello, starting worker"

worker.start(flags="--config tests/fixtures/config-tracebackhistory.py")
# delay = 0 should requeue right away.
print "hello, started worker"

worker.send_task(
"tests.tasks.general.Retry", {"queue": "noexec", "delay": 60}, block=True, accept_statuses=["retry"])
print "task launched"
"tests.tasks.general.Retry", {"queue": "noexec", "delay": 60}, block=True, accept_statuses=["retry"]
)

job = worker.mongodb_jobs.mrq_jobs.find()[0]
time.sleep(1)

assert len(job["traceback_history"]) == 1
assert not job["traceback_history"][0].get("original_traceback")

worker.send_task(
"tests.tasks.general.RetryOnFailed", {"queue": "default", "delay": 1}, block=True, accept_statuses=["retry"]
)

job = worker.mongodb_jobs.mrq_jobs.find({
"path": "tests.tasks.general.RetryOnFailed"})[0]

assert len(job["traceback_history"]) == 1
assert "InRetryException" in job["traceback_history"][0].get("original_traceback")
time.sleep(2)
worker.send_task("mrq.basetasks.cleaning.RequeueRetryJobs", {}, block=True)
time.sleep(2)
job = worker.mongodb_jobs.mrq_jobs.find({
"path": "tests.tasks.general.RetryOnFailed"})[0]

print job["traceback_history"]
assert len(job["traceback_history"]) == 2

0 comments on commit b8a704d

Please sign in to comment.