Skip to content

Commit

Permalink
Implement asynchronous method handlers on server side
Browse files Browse the repository at this point in the history
  • Loading branch information
chrizog committed Oct 24, 2024
1 parent ec96e56 commit 0e73f12
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 11 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ The library is still under development. The current major limitations and deviat

- Configuration and load balancing options in SOME/IP SD messages are not supported.
- TTL of Service Discovery entries is not checked yet.
- The Initial Wait Phase and Repetition Phase of the Service Discovery specification are skipped. For simplification, the Main Phase is directly entered, i.e. SD Offer Entries are immediately sent cyclically.
- The Initial Wait Phase and Repetition Phase of the Service Discovery specification are skipped. The Main Phase is directly entered, i.e. SD Offer Entries are immediately sent cyclically.

### De-/Serialization

Expand Down
2 changes: 1 addition & 1 deletion example_apps/offer_method_tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
SAMPLE_METHOD_ID = 0x0123


def add_method_handler(input_data: bytes, addr: Tuple[str, int]) -> MethodResult:
async def add_method_handler(input_data: bytes, addr: Tuple[str, int]) -> MethodResult:
print(
f"Received data: {' '.join(f'0x{b:02x}' for b in input_data)} from IP: {addr[0]} Port: {addr[1]}"
)
Expand Down
2 changes: 1 addition & 1 deletion example_apps/offer_method_udp.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
SAMPLE_METHOD_ID = 0x0123


def add_method_handler(input_data: bytes, addr: Tuple[str, int]) -> MethodResult:
async def add_method_handler(input_data: bytes, addr: Tuple[str, int]) -> MethodResult:
print(
f"Received data: {' '.join(f'0x{b:02x}' for b in input_data)} from IP: {addr[0]} Port: {addr[1]}"
)
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = someipy
version = 0.0.8
version = 0.0.9
author = Christian H.
author_email = someipy.package@gmail.com
description = A Python package implementing the SOME/IP protocol
Expand Down
4 changes: 2 additions & 2 deletions src/someipy/client_service_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,9 @@ async def call_method(self, method_id: int, payload: bytes) -> MethodResult:
endpoint_to_str_int_tuple(self._found_services[0].service.endpoint),
)

# After sending the method call wait for maximum one second
# After sending the method call wait for maximum three seconds
try:
await asyncio.wait_for(self._method_call_future, 1.0)
await asyncio.wait_for(self._method_call_future, 3.0)
except asyncio.TimeoutError:
get_logger(_logger_name).error(
f"Waiting on response for method call 0x{method_id:04X} timed out."
Expand Down
45 changes: 40 additions & 5 deletions src/someipy/server_service_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ class ServerServiceInstance(ServiceDiscoveryObserver):
_subscribers: Subscribers
_offer_timer: SimplePeriodicTimer

_handler_tasks: set[asyncio.Task]
_is_running: bool

def __init__(
self,
service: Service,
Expand All @@ -92,6 +95,9 @@ def __init__(
self._subscribers = Subscribers()
self._offer_timer = None

self._handler_tasks = set()
self._is_running = True

def send_event(self, event_group_id: int, event_id: int, payload: bytes) -> None:
"""
Sends an event to subscribers with the given event group ID, event ID, and payload.
Expand Down Expand Up @@ -135,6 +141,18 @@ def send_event(self, event_group_id: int, event_id: int, payload: bytes) -> None
endpoint_to_str_int_tuple(sub.endpoint),
)

async def _handle_method_call(self, method_handler, dst_addr, header_to_return):
result = await method_handler
header_to_return.message_type = result.message_type.value
header_to_return.return_code = result.return_code.value
payload_to_return = result.payload

# Update length in header to the correct length
header_to_return.length = 8 + len(payload_to_return)
self._someip_endpoint.sendto(
header_to_return.to_buffer() + payload_to_return, dst_addr
)

def someip_message_received(
self, message: SomeIpMessage, addr: Tuple[str, int]
) -> None:
Expand All @@ -155,6 +173,10 @@ def someip_message_received(
- The protocol and interface version are not checked yet.
- If the message type in the received header is not a request, a warning is logged.
"""

if not self._is_running:
return

header = message.header
payload_to_return = bytes()
header_to_return = header
Expand Down Expand Up @@ -193,12 +215,15 @@ def send_response():
and header.return_code == 0x00
):
method_handler = self._service.methods[header.method_id].method_handler
result = method_handler(message.payload, addr)
coro = method_handler(message.payload, addr)

header_to_return.message_type = result.message_type.value
header_to_return.return_code = result.return_code.value
payload_to_return = result.payload
send_response()
# If a method is called, do it in a separate task to allow for asynchronous processing inside
# method handlers
new_task = asyncio.create_task(
self._handle_method_call(coro, addr, header_to_return)
)
self._handler_tasks.add(new_task)
new_task.add_done_callback(self._handler_tasks.discard)

else:
get_logger(_logger_name).warning(
Expand Down Expand Up @@ -400,6 +425,16 @@ async def stop_offer(self):
)
self._sd_sender.send_multicast(sd_header.to_buffer())

# Stop processing incoming calls
self._is_running = False

# Cancel all running handler tasks
for task in self._handler_tasks:
task.cancel()

# Wait for all tasks to be canceled
await asyncio.gather(*self._handler_tasks)


async def construct_server_service_instance(
service: Service,
Expand Down

0 comments on commit 0e73f12

Please sign in to comment.