Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shutdown database engine before waiting for executor shutdown #117339

Merged
merged 12 commits into from
Aug 22, 2024
12 changes: 4 additions & 8 deletions homeassistant/components/recorder/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,13 +367,6 @@ def async_add_executor_job[_T](
"""Add an executor job from within the event loop."""
return self.hass.loop.run_in_executor(self._db_executor, target, *args)

def _stop_executor(self) -> None:
"""Stop the executor."""
if self._db_executor is None:
return
self._db_executor.shutdown()
self._db_executor = None

@callback
def _async_check_queue(self, *_: Any) -> None:
"""Periodic check of the queue size to ensure we do not exhaust memory.
Expand Down Expand Up @@ -1501,5 +1494,8 @@ def _shutdown(self) -> None:
try:
self._end_session()
finally:
self._stop_executor()
if self._db_executor:
self._db_executor.shutdown(join_threads_or_timeout=False)
self._close_connection()
if self._db_executor:
self._db_executor.join_threads_or_timeout()
7 changes: 5 additions & 2 deletions homeassistant/util/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,13 @@ def join_or_interrupt_threads(
class InterruptibleThreadPoolExecutor(ThreadPoolExecutor):
"""A ThreadPoolExecutor instance that will not deadlock on shutdown."""

def shutdown(self, *args: Any, **kwargs: Any) -> None:
def shutdown(
self, *args: Any, join_threads_or_timeout: bool = True, **kwargs: Any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the join_threads_or_timeout argument?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we do. Not sure if we ever did.

dropped in 8a9fe6a

Copy link
Member Author

@bdraco bdraco Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope it breaks because we need to wait to join https://github.com/home-assistant/core/pull/117339/files#diff-909bb3e2ee175002975532b69a04af5a78fd77c6ca3259f3848b7c1e0678cba1R1498 for the recorder case as we want the connection closed before we join but not for the default executor shutdown case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this a bit more? Who's calling this function in the default shutdown case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown gets called from https://github.com/python/cpython/blob/8edfa0b0b4ae4235bb3262d952c23e7581516d4f/Lib/asyncio/base_events.py#L606 via

loop.run_until_complete(loop.shutdown_default_executor())

We want database executor to gracefully close the connection before it can reach the code that would forcefully interrupt the threads that fail to join gracefully. For the default executor we don't need to do that (current behavior) since there isn't a way to gracefully shut them down if they don't join on their own.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see. Please amend the docstring to explain this, and also explain when calling shutdown with join_threads_or_timeout=False why we do that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusted the docstring and added some more comments

) -> None:
"""Shutdown with interrupt support added."""
super().shutdown(wait=False, cancel_futures=True)
self.join_threads_or_timeout()
if join_threads_or_timeout:
self.join_threads_or_timeout()

def join_threads_or_timeout(self) -> None:
"""Join threads or timeout."""
Expand Down
19 changes: 9 additions & 10 deletions tests/components/recorder/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,10 @@ async def test_shutdown_before_startup_finishes(
await hass.async_block_till_done()
await hass.async_stop()

def _run_information_with_session():
instance.recorder_and_worker_thread_ids.add(threading.get_ident())
return run_information_with_session(session)

run_info = await instance.async_add_executor_job(_run_information_with_session)
# The database executor is shutdown so we must run the
# query in the main thread for testing
instance.recorder_and_worker_thread_ids.add(threading.get_ident())
run_info = run_information_with_session(session)

assert run_info.run_id == 1
assert run_info.start is not None
Expand Down Expand Up @@ -216,19 +215,19 @@ async def test_shutdown_closes_connections(
instance = recorder.get_instance(hass)
await instance.async_db_ready
await hass.async_block_till_done()
pool = instance.engine.pool
pool.shutdown = Mock()
pool = instance.engine

def _ensure_connected():
with session_scope(hass=hass, read_only=True) as session:
list(session.query(States))

await instance.async_add_executor_job(_ensure_connected)

hass.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE)
await hass.async_block_till_done()
with patch.object(pool, "dispose", wraps=pool.dispose) as dispose:
hass.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE)
await hass.async_block_till_done()

assert len(pool.shutdown.mock_calls) == 1
assert len(dispose.mock_calls) == 1
with pytest.raises(RuntimeError):
assert instance.get_session()

Expand Down
Loading