Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 41 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,25 +1,56 @@
rq-gevent-worker
================

Implement a new worker based on gevent
Implement a new worker based on gevent for Python 3 and rq 0.7.1 and gevent 1.2.1

[![Downloads](https://pypip.in/download/rq-gevent-worker/badge.svg)](https://pypi.python.org/pypi/rq-gevent-worker/)
https://pypi.python.org/pypi/rq-gevent-worker/

##Install

$ pip install rq-gevent-worker

##Usage

$ rqgeventworker -h
put rq_gevent_worker.py in site-packages/rq/cli

then
vim your_venv/bin/rqworkr

to this

```python

#!/home/mehdi/venvs/test_rq/bin/python

# -*- coding: utf-8 -*-
import re
import sys
from gevent import monkey, get_hub
from gevent.hub import LoopExit

from rq.cli import worker
from rq.cli.rq_gevent_worker import GeventWorker
from rq import Connection, Queue, Worker


monkey.patch_all()
if __name__ == '__main__':
with Connection():
q = Queue()
GeventWorker(q).work()
# sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0])
# sys.exit(GeventWorker())



```

$ export PYTHONPATH=<your project import path>:$PYTHONPATH; rqgeventworker

##Test

$ pip install -r requirements.txt
Tested with
1. rq==0.7.1
2. Gevent 1.2.7
3. python 3.5.2

$ py.test tests

##Under The Hood
TODO
Expand All @@ -28,19 +59,8 @@ TODO

* Add a command line option to specify gevent pool size

##Note

###Crash
Official `Worker` use `os.fork()` to spawn a child process to execute a job,
so if the job cause the process crash, the worker process is still alive.

When using gevent, we use the same process to execute job, the job may
cause the whole worker process crash.

###Why not `rqworker -w <geventworker>`
Because we need gevent monkey patch at the start of the process, rqworker import
many modules before importing geventworker, so it will cause geventworker not work normally.

##Declaration

Most of the code is from [lechup](https://gist.github.com/lechup/d886e89490b2f6c737d7) and [jhorman](https://gist.github.com/jhorman/e16ed695845fca683057),
Most of the code is from [zhangliyong](https://github.com/zhangliyong/rq-gevent-worker)
19 changes: 10 additions & 9 deletions rq_gevent_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from gevent import monkey, get_hub
from gevent.hub import LoopExit
monkey.patch_all()
# monkey.patch_all()

import signal
import gevent
Expand Down Expand Up @@ -57,15 +57,15 @@ def request_force_stop():
raise SystemExit()

def request_stop():
if not self._stopped:
if not self._stop_requested:
gevent.signal(signal.SIGINT, request_force_stop)
gevent.signal(signal.SIGTERM, request_force_stop)

self.log.warning('Warm shut down requested.')
self.log.warning('Stopping after all greenlets are finished. '
'Press Ctrl+C again for a cold shutdown.')

self._stopped = True
self._stop_requested = True
self.gevent_pool.join()

raise StopRequested()
Expand All @@ -85,7 +85,7 @@ def work(self, burst=False):

The return value indicates whether any jobs were processed.
"""
setup_loghandlers()
setup_loghandlers("INFO")
self._install_signal_handlers()

self.did_perform_work = False
Expand All @@ -94,7 +94,7 @@ def work(self, burst=False):
self.set_state('starting')
try:
while True:
if self.stopped:
if self._stop_requested:
self.log.info('Stopping on request.')
break

Expand Down Expand Up @@ -130,23 +130,23 @@ def job_done(child):
if job.get_status() == JobStatus.FINISHED:
queue.enqueue_dependents(job)

child_greenlet = self.gevent_pool.spawn(self.perform_job, job)
child_greenlet = self.gevent_pool.spawn(self.perform_job, job,queue)
child_greenlet.link(job_done)

def dequeue_job_and_maintain_ttl(self, timeout):
if self._stopped:
if self._stop_requested:
raise StopRequested()

result = None
while True:
if self._stopped:
if self._stop_requested:
raise StopRequested()

self.heartbeat()

while self.gevent_pool.full():
gevent.sleep(0.1)
if self._stopped:
if self._stop_requested:
raise StopRequested()

try:
Expand Down Expand Up @@ -174,3 +174,4 @@ def main():

sys.argv.extend(['-w', 'rq_gevent_worker.GeventWorker'])
rq_main()