Skip to content

Commit

Permalink
pool usage average populated from report greenlet
Browse files Browse the repository at this point in the history
  • Loading branch information
ggueret committed Jul 21, 2016
1 parent ed9bafe commit 1c37410
Showing 1 changed file with 4 additions and 14 deletions.
18 changes: 4 additions & 14 deletions mrq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def __init__(self):
self.process = psutil.Process(os.getpid())
self.greenlet = gevent.getcurrent()
self.graceful_stop = None
self.pool_usage_average = 0

self.idle_event = gevent.event.Event()
self.idle_wait_count = 0
Expand All @@ -68,6 +67,7 @@ def __init__(self):
self.name = "%s.%s" % (socket.gethostname().split(".")[0], os.getpid())

self.pool_size = self.config["greenlets"]
self.pool_usage_average = MovingAverage((60 / self.config["report_interval"] or 1))

from .logger import LogHandler
self.log_handler = LogHandler(quiet=self.config["quiet"])
Expand Down Expand Up @@ -227,16 +227,6 @@ def greenlet_subqueues(self):

time.sleep(self.config["subqueues_refresh_interval"])

def greenlet_pool_usage_average(self, interval=10):

moving_average = MovingAverage(6)

while True:
free_pool_slots = self.gevent_pool.free_count()
total_started = (self.pool_size - free_pool_slots)
self.pool_usage_average = moving_average.next(total_started)
time.sleep(interval)

def get_memory(self):
mmaps = self.process.get_memory_maps()
mem = {
Expand Down Expand Up @@ -316,11 +306,13 @@ def get_worker_report(self, with_memory=False):
else:
io[k] = sorted(v.items(), reverse=True, key=lambda x: x[1])

used_pool_slots = self.gevent_pool.free_count() - self.pool_size

return {
"status": self.status,
"config": {k: v for k, v in self.config.iteritems() if k in whitelisted_config},
"done_jobs": self.done_jobs,
"pool_usage_average": self.pool_usage_average,
"pool_usage_average": self.pool_usage_average.next(used_pool_slots),
"datestarted": self.datestarted,
"datereported": datetime.datetime.utcnow(),
"name": self.name,
Expand Down Expand Up @@ -430,8 +422,6 @@ def work_init(self):

self.greenlets["logs"] = gevent.spawn(self.greenlet_logs)

self.greenlets["pool_usage_average"] = gevent.spawn(self.greenlet_pool_usage_average)

if self.config["scheduler"]:
self.greenlets["scheduler"] = gevent.spawn(self.greenlet_scheduler)

Expand Down

0 comments on commit 1c37410

Please sign in to comment.