Skip to content

Commit

Permalink
Move worker pool monitoring to be time based instead of add_job based. (
Browse files Browse the repository at this point in the history
home-assistant#3439)

* Move worker pool monitoring to be time based instead of add_job based.

* Stub out worker pool monitor during tests

* Add test for monitor worker pool.

* Improve naming

* Test stop_monitor coroutine

* Add async_create_timer test

* Finish rename create_timer
  • Loading branch information
balloob authored Sep 20, 2016
1 parent d31f6bc commit be68fe0
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 79 deletions.
59 changes: 46 additions & 13 deletions homeassistant/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
# Pattern for validating entity IDs (format: <domain>.<entity>)
ENTITY_ID_PATTERN = re.compile(r"^(\w+)\.(\w+)$")

# Interval at which we check if the pool is getting busy
MONITOR_POOL_INTERVAL = 30

_LOGGER = logging.getLogger(__name__)


Expand Down Expand Up @@ -190,7 +193,8 @@ def async_start(self):
This method is a coroutine.
"""
create_timer(self)
async_create_timer(self)
async_monitor_worker_pool(self)
self.bus.async_fire(EVENT_HOMEASSISTANT_START)
yield from self.loop.run_in_executor(None, self.pool.block_till_done)
self.state = CoreState.running
Expand Down Expand Up @@ -1075,13 +1079,8 @@ def as_dict(self):
}


def create_timer(hass, interval=TIMER_INTERVAL):
def async_create_timer(hass, interval=TIMER_INTERVAL):
"""Create a timer that will start on HOMEASSISTANT_START."""
# We want to be able to fire every time a minute starts (seconds=0).
# We want this so other modules can use that to make sure they fire
# every minute.
assert 60 % interval == 0, "60 % TIMER_INTERVAL should be 0!"

stop_event = asyncio.Event(loop=hass.loop)

# Setting the Event inside the loop by marking it as a coroutine
Expand Down Expand Up @@ -1160,14 +1159,48 @@ def job_handler(job):
# We do not want to crash our ThreadPool
_LOGGER.exception("BusHandler:Exception doing job")

def busy_callback(worker_count, current_jobs, pending_jobs_count):
"""Callback to be called when the pool queue gets too big."""
return util.ThreadPool(job_handler, worker_count)


def async_monitor_worker_pool(hass):
"""Create a monitor for the thread pool to check if pool is misbehaving."""
busy_threshold = hass.pool.worker_count * 3

handle = None

def schedule():
"""Schedule the monitor."""
nonlocal handle
handle = hass.loop.call_later(MONITOR_POOL_INTERVAL,
check_pool_threshold)

def check_pool_threshold():
"""Check pool size."""
nonlocal busy_threshold

pending_jobs = hass.pool.queue_size

if pending_jobs < busy_threshold:
schedule()
return

_LOGGER.warning(
"WorkerPool:All %d threads are busy and %d jobs pending",
worker_count, pending_jobs_count)
hass.pool.worker_count, pending_jobs)

for start, job in current_jobs:
_LOGGER.warning("WorkerPool:Current job from %s: %s",
for start, job in hass.pool.current_jobs:
_LOGGER.warning("WorkerPool:Current job started at %s: %s",
dt_util.as_local(start).isoformat(), job)

return util.ThreadPool(job_handler, worker_count, busy_callback)
busy_threshold *= 2

schedule()

schedule()

@asyncio.coroutine
def stop_monitor(event):
"""Stop the monitor."""
handle.cancel()

hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, stop_monitor)
2 changes: 1 addition & 1 deletion homeassistant/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def start(self):
'Unable to setup local API to receive events')

self.state = ha.CoreState.starting
ha.create_timer(self)
ha.async_create_timer(self)

self.bus.fire(ha.EVENT_HOMEASSISTANT_START,
origin=ha.EventOrigin.remote)
Expand Down
91 changes: 31 additions & 60 deletions homeassistant/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ class ThreadPool(object):
"""A priority queue-based thread pool."""

# pylint: disable=too-many-instance-attributes
def __init__(self, job_handler, worker_count=0, busy_callback=None):
def __init__(self, job_handler, worker_count=0):
"""Initialize the pool.
job_handler: method to be called from worker thread to handle job
Expand All @@ -318,84 +318,56 @@ def __init__(self, job_handler, worker_count=0, busy_callback=None):
pending_jobs_count
"""
self._job_handler = job_handler
self._busy_callback = busy_callback

self.worker_count = 0
self.busy_warning_limit = 0
self._work_queue = queue.PriorityQueue()
self.current_jobs = []
self._lock = threading.RLock()
self._quit_task = object()

self.running = True

for _ in range(worker_count):
self.add_worker()

@property
def queue_size(self):
"""Return estimated number of jobs that are waiting to be processed."""
return self._work_queue.qsize()

def add_worker(self):
"""Add worker to the thread pool and reset warning limit."""
with self._lock:
if not self.running:
raise RuntimeError("ThreadPool not running")
if not self.running:
raise RuntimeError("ThreadPool not running")

worker = threading.Thread(
target=self._worker,
name='ThreadPool Worker {}'.format(self.worker_count))
worker.daemon = True
worker.start()
threading.Thread(
target=self._worker, daemon=True,
name='ThreadPool Worker {}'.format(self.worker_count)).start()

self.worker_count += 1
self.busy_warning_limit = self.worker_count * 3
self.worker_count += 1

def remove_worker(self):
"""Remove worker from the thread pool and reset warning limit."""
with self._lock:
if not self.running:
raise RuntimeError("ThreadPool not running")
if not self.running:
raise RuntimeError("ThreadPool not running")

self._work_queue.put(PriorityQueueItem(0, self._quit_task))
self._work_queue.put(PriorityQueueItem(0, self._quit_task))

self.worker_count -= 1
self.busy_warning_limit = self.worker_count * 3
self.worker_count -= 1

def add_job(self, priority, job):
"""Add a job to the queue."""
with self._lock:
if not self.running:
raise RuntimeError("ThreadPool not running")

self._work_queue.put(PriorityQueueItem(priority, job))
if not self.running:
raise RuntimeError("ThreadPool not running")

# Check if our queue is getting too big.
if self._work_queue.qsize() > self.busy_warning_limit \
and self._busy_callback is not None:

# Increase limit we will issue next warning.
self.busy_warning_limit *= 2

self._busy_callback(
self.worker_count, self.current_jobs,
self._work_queue.qsize())
self._work_queue.put(PriorityQueueItem(priority, job))

def add_many_jobs(self, jobs):
"""Add a list of jobs to the queue."""
with self._lock:
if not self.running:
raise RuntimeError("ThreadPool not running")

for priority, job in jobs:
self._work_queue.put(PriorityQueueItem(priority, job))
if not self.running:
raise RuntimeError("ThreadPool not running")

# Check if our queue is getting too big.
if self._work_queue.qsize() > self.busy_warning_limit \
and self._busy_callback is not None:

# Increase limit we will issue next warning.
self.busy_warning_limit *= 2

self._busy_callback(
self.worker_count, self.current_jobs,
self._work_queue.qsize())
for priority, job in jobs:
self._work_queue.put(PriorityQueueItem(priority, job))

def block_till_done(self):
"""Block till current work is done."""
Expand All @@ -405,18 +377,17 @@ def stop(self):
"""Finish all the jobs and stops all the threads."""
self.block_till_done()

with self._lock:
if not self.running:
return
if not self.running:
return

# Tell the workers to quit
for _ in range(self.worker_count):
self.remove_worker()
# Tell the workers to quit
for _ in range(self.worker_count):
self.remove_worker()

self.running = False
self.running = False

# Wait till all workers have quit
self.block_till_done()
# Wait till all workers have quit
self.block_till_done()

def _worker(self):
"""Handle jobs for the thread pool."""
Expand Down
1 change: 1 addition & 0 deletions requirements_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ flake8>=3.0.4
pylint>=1.5.6
coveralls>=1.1
pytest>=2.9.2
pytest-asyncio>=0.5.0
pytest-cov>=2.3.1
pytest-timeout>=1.0.0
pytest-catchlog>=1.2.2
Expand Down
8 changes: 5 additions & 3 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ def start_hass():
"""Helper to start hass."""
with patch.object(hass.loop, 'run_forever', return_value=None):
with patch.object(hass, 'async_stop', return_value=fake_stop()):
with patch.object(ha, 'create_timer', return_value=None):
orig_start()
hass.block_till_done()
with patch.object(ha, 'async_create_timer', return_value=None):
with patch.object(ha, 'async_monitor_worker_pool',
return_value=None):
orig_start()
hass.block_till_done()

def stop_hass():
orig_stop()
Expand Down
83 changes: 82 additions & 1 deletion tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import signal
import unittest
from unittest.mock import patch
from unittest.mock import patch, MagicMock
from datetime import datetime, timedelta

import pytz
Expand Down Expand Up @@ -459,3 +459,84 @@ def register_call(_):
pool.add_job(ha.JobPriority.EVENT_DEFAULT, (register_call, None))
pool.block_till_done()
self.assertEqual(1, len(calls))


class TestWorkerPoolMonitor(object):
"""Test monitor_worker_pool."""

@patch('homeassistant.core._LOGGER.warning')
def test_worker_pool_monitor(self, mock_warning, event_loop):
"""Test we log an error and increase threshold."""
hass = MagicMock()
hass.pool.worker_count = 3
schedule_handle = MagicMock()
hass.loop.call_later.return_value = schedule_handle

ha.async_monitor_worker_pool(hass)
assert hass.loop.call_later.called
assert hass.bus.async_listen_once.called
assert not schedule_handle.called

check_threshold = hass.loop.call_later.mock_calls[0][1][1]

hass.pool.queue_size = 8
check_threshold()
assert not mock_warning.called

hass.pool.queue_size = 9
check_threshold()
assert mock_warning.called

mock_warning.reset_mock()
assert not mock_warning.called

check_threshold()
assert not mock_warning.called

hass.pool.queue_size = 17
check_threshold()
assert not mock_warning.called

hass.pool.queue_size = 18
check_threshold()
assert mock_warning.called

event_loop.run_until_complete(
hass.bus.async_listen_once.mock_calls[0][1][1](None))
assert schedule_handle.cancel.called


class TestAsyncCreateTimer(object):
"""Test create timer."""

@patch('homeassistant.core.asyncio.Event')
@patch('homeassistant.core.dt_util.utcnow')
def test_create_timer(self, mock_utcnow, mock_event, event_loop):
"""Test create timer fires correctly."""
hass = MagicMock()
now = mock_utcnow()
event = mock_event()
now.second = 1
mock_utcnow.reset_mock()

ha.async_create_timer(hass)
assert len(hass.bus.async_listen_once.mock_calls) == 2
start_timer = hass.bus.async_listen_once.mock_calls[1][1][1]

event_loop.run_until_complete(start_timer(None))
assert hass.loop.create_task.called

timer = hass.loop.create_task.mock_calls[0][1][0]
event.is_set.side_effect = False, False, True
event_loop.run_until_complete(timer)
assert len(mock_utcnow.mock_calls) == 1

assert hass.loop.call_soon.called
event_type, event_data = hass.loop.call_soon.mock_calls[0][1][1:]

assert ha.EVENT_TIME_CHANGED == event_type
assert {ha.ATTR_NOW: now} == event_data

stop_timer = hass.bus.async_listen_once.mock_calls[0][1][1]
event_loop.run_until_complete(stop_timer(None))
assert event.set.called
2 changes: 1 addition & 1 deletion tests/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def setUpModule(): # pylint: disable=invalid-name
{http.DOMAIN: {http.CONF_API_PASSWORD: API_PASSWORD,
http.CONF_SERVER_PORT: SLAVE_PORT}})

with patch.object(ha, 'create_timer', return_value=None):
with patch.object(ha, 'async_create_timer', return_value=None):
slave.start()


Expand Down

0 comments on commit be68fe0

Please sign in to comment.