Skip to content

Commit

Permalink
Adapt server to support close and run forever
Browse files Browse the repository at this point in the history
This commit adapts the capnp server class to support the close
method and run forever method. The close method is used to
stop a running server safely. Since Python does not gurantee that
the destructor is called it is recommended to manage the livetime
by one self. The run forever method is a simple helper function
that is often usefull if a server is running in the main thread.
  • Loading branch information
tobiasah committed Jul 23, 2024
1 parent 8b441b8 commit 9b506b3
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 4 deletions.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ ignore = [
"PLR0911",
# Missing return type annotation for special method
"ANN204",
# Too many arguments in function definition
"PLR0913",
]

[tool.ruff.lint.flake8-annotations]
Expand Down
2 changes: 1 addition & 1 deletion src/labone/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async def create_from_session(
)

@staticmethod
async def create( # noqa: PLR0913
async def create(
serial: str,
*,
host: str,
Expand Down
2 changes: 1 addition & 1 deletion src/labone/nodetree/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ class ResultNode(MetaNode):
The time the results where created.
"""

def __init__( # noqa: PLR0913
def __init__(
self,
tree_manager: NodeTreeManager,
path_segments: tuple[NormalizedPathSegment, ...],
Expand Down
47 changes: 47 additions & 0 deletions src/labone/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
from __future__ import annotations

import typing as t
from asyncio import CancelledError, Future, get_running_loop
from signal import SIGINT, SIGTERM

import zhinst.comms
from typing_extensions import TypeAlias

from labone.core.errors import LabOneCoreError
from labone.core.helper import get_default_context

CapnpResult: TypeAlias = dict[str, t.Any]
Expand Down Expand Up @@ -43,6 +46,8 @@ class CapnpServer:
def __init__(self, schema: zhinst.comms.SchemaLoader):
self._schema = schema
self._registered_callbacks: dict[tuple[int, int], t.Callable] = {}
self._run_forever_future: Future[None] | None = None
self._capnp_server: zhinst.comms.DynamicServer | None = None
self._load_callbacks()

def _load_callbacks(self) -> None:
Expand Down Expand Up @@ -96,13 +101,55 @@ async def start(
port: port to listen on.
open_overwrite: Flag if the server should be reachable from outside.
"""
if self._capnp_server:
msg = f"server {self!r} is already running"
raise LabOneCoreError(msg)
self._capnp_server = await context.listen(
port=port,
openOverride=open_overwrite,
callback=self._capnp_callback,
schema=self._schema,
)

def close(self) -> None:
"""Close the server."""
if self._capnp_server is None:
msg = f"server {self!r} is not running"
raise LabOneCoreError(msg)
self._capnp_server.close()

async def run_forever(self) -> None:
"""Run the server forever.
Useful for running the server in the main thread.
This method is a coroutine that will block until the server until a
CancelledError is raised. After a CancelledError the server is shutdown
properly and the functions returns.
"""
if self._run_forever_future is not None:
msg = f"server {self!r} is already being awaited on run_forever()"
raise LabOneCoreError(msg)
if self._capnp_server is None:
msg = f"server {self!r} is not running"
raise LabOneCoreError(msg)

self._run_forever_future = get_running_loop().create_future()

loop = get_running_loop()
for signal_enum in [SIGINT, SIGTERM]:
loop.add_signal_handler(signal_enum, self.close)

try:
await self._run_forever_future
except CancelledError:
try:
self.close()
finally:
raise
finally:
self._run_forever_future = None

async def start_pipe(
self,
context: zhinst.comms.CapnpContext | None = None,
Expand Down
2 changes: 1 addition & 1 deletion tests/core/test_shf_vector_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def test_shf_scope_vector(
],
)
@pytest.mark.parametrize(("x", "y"), [(0, 0), (1, 1), (32, 743)])
def test_shf_demodulator_vector( # noqa: PLR0913
def test_shf_demodulator_vector(
vector_length,
scaling,
timestamp_delta,
Expand Down
2 changes: 1 addition & 1 deletion tests/mock/ab_hpk_automatic_functionality_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def new_test_function(*args, **kwargs):
exception_mock = e
assert (exception is None) == (exception_mock is None)
if exception is not None:
assert type(exception) == type(exception_mock)
assert type(exception) is type(exception_mock)
assert string_output.getvalue() == string_output_mock.getvalue()

return new_test_function
Expand Down

0 comments on commit 9b506b3

Please sign in to comment.