From 0e73f12fa177a4e8d8b9b1880a2a10b851865bf4 Mon Sep 17 00:00:00 2001 From: Christian Date: Thu, 24 Oct 2024 18:05:09 +0200 Subject: [PATCH] Implement asynchronous method handlers on server side --- README.md | 2 +- example_apps/offer_method_tcp.py | 2 +- example_apps/offer_method_udp.py | 2 +- setup.cfg | 2 +- src/someipy/client_service_instance.py | 4 +-- src/someipy/server_service_instance.py | 45 +++++++++++++++++++++++--- 6 files changed, 46 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 343b2b6..b98759d 100644 --- a/README.md +++ b/README.md @@ -63,7 +63,7 @@ The library is still under development. The current major limitations and deviat - Configuration and load balancing options in SOME/IP SD messages are not supported. - TTL of Service Discovery entries is not checked yet. -- The Initial Wait Phase and Repetition Phase of the Service Discovery specification are skipped. For simplification, the Main Phase is directly entered, i.e. SD Offer Entries are immediately sent cyclically. +- The Initial Wait Phase and Repetition Phase of the Service Discovery specification are skipped. The Main Phase is directly entered, i.e. SD Offer Entries are immediately sent cyclically. ### De-/Serialization diff --git a/example_apps/offer_method_tcp.py b/example_apps/offer_method_tcp.py index b07acad..1ac8571 100644 --- a/example_apps/offer_method_tcp.py +++ b/example_apps/offer_method_tcp.py @@ -21,7 +21,7 @@ SAMPLE_METHOD_ID = 0x0123 -def add_method_handler(input_data: bytes, addr: Tuple[str, int]) -> MethodResult: +async def add_method_handler(input_data: bytes, addr: Tuple[str, int]) -> MethodResult: print( f"Received data: {' '.join(f'0x{b:02x}' for b in input_data)} from IP: {addr[0]} Port: {addr[1]}" ) diff --git a/example_apps/offer_method_udp.py b/example_apps/offer_method_udp.py index e8a0731..73025ca 100644 --- a/example_apps/offer_method_udp.py +++ b/example_apps/offer_method_udp.py @@ -21,7 +21,7 @@ SAMPLE_METHOD_ID = 0x0123 -def add_method_handler(input_data: bytes, addr: Tuple[str, int]) -> MethodResult: +async def add_method_handler(input_data: bytes, addr: Tuple[str, int]) -> MethodResult: print( f"Received data: {' '.join(f'0x{b:02x}' for b in input_data)} from IP: {addr[0]} Port: {addr[1]}" ) diff --git a/setup.cfg b/setup.cfg index 95b8a18..4a40668 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = someipy -version = 0.0.8 +version = 0.0.9 author = Christian H. author_email = someipy.package@gmail.com description = A Python package implementing the SOME/IP protocol diff --git a/src/someipy/client_service_instance.py b/src/someipy/client_service_instance.py index b2dce58..8d1d3ad 100644 --- a/src/someipy/client_service_instance.py +++ b/src/someipy/client_service_instance.py @@ -234,9 +234,9 @@ async def call_method(self, method_id: int, payload: bytes) -> MethodResult: endpoint_to_str_int_tuple(self._found_services[0].service.endpoint), ) - # After sending the method call wait for maximum one second + # After sending the method call wait for maximum three seconds try: - await asyncio.wait_for(self._method_call_future, 1.0) + await asyncio.wait_for(self._method_call_future, 3.0) except asyncio.TimeoutError: get_logger(_logger_name).error( f"Waiting on response for method call 0x{method_id:04X} timed out." diff --git a/src/someipy/server_service_instance.py b/src/someipy/server_service_instance.py index 79b3295..c06f309 100644 --- a/src/someipy/server_service_instance.py +++ b/src/someipy/server_service_instance.py @@ -69,6 +69,9 @@ class ServerServiceInstance(ServiceDiscoveryObserver): _subscribers: Subscribers _offer_timer: SimplePeriodicTimer + _handler_tasks: set[asyncio.Task] + _is_running: bool + def __init__( self, service: Service, @@ -92,6 +95,9 @@ def __init__( self._subscribers = Subscribers() self._offer_timer = None + self._handler_tasks = set() + self._is_running = True + def send_event(self, event_group_id: int, event_id: int, payload: bytes) -> None: """ Sends an event to subscribers with the given event group ID, event ID, and payload. @@ -135,6 +141,18 @@ def send_event(self, event_group_id: int, event_id: int, payload: bytes) -> None endpoint_to_str_int_tuple(sub.endpoint), ) + async def _handle_method_call(self, method_handler, dst_addr, header_to_return): + result = await method_handler + header_to_return.message_type = result.message_type.value + header_to_return.return_code = result.return_code.value + payload_to_return = result.payload + + # Update length in header to the correct length + header_to_return.length = 8 + len(payload_to_return) + self._someip_endpoint.sendto( + header_to_return.to_buffer() + payload_to_return, dst_addr + ) + def someip_message_received( self, message: SomeIpMessage, addr: Tuple[str, int] ) -> None: @@ -155,6 +173,10 @@ def someip_message_received( - The protocol and interface version are not checked yet. - If the message type in the received header is not a request, a warning is logged. """ + + if not self._is_running: + return + header = message.header payload_to_return = bytes() header_to_return = header @@ -193,12 +215,15 @@ def send_response(): and header.return_code == 0x00 ): method_handler = self._service.methods[header.method_id].method_handler - result = method_handler(message.payload, addr) + coro = method_handler(message.payload, addr) - header_to_return.message_type = result.message_type.value - header_to_return.return_code = result.return_code.value - payload_to_return = result.payload - send_response() + # If a method is called, do it in a separate task to allow for asynchronous processing inside + # method handlers + new_task = asyncio.create_task( + self._handle_method_call(coro, addr, header_to_return) + ) + self._handler_tasks.add(new_task) + new_task.add_done_callback(self._handler_tasks.discard) else: get_logger(_logger_name).warning( @@ -400,6 +425,16 @@ async def stop_offer(self): ) self._sd_sender.send_multicast(sd_header.to_buffer()) + # Stop processing incoming calls + self._is_running = False + + # Cancel all running handler tasks + for task in self._handler_tasks: + task.cancel() + + # Wait for all tasks to be canceled + await asyncio.gather(*self._handler_tasks) + async def construct_server_service_instance( service: Service,