diff --git a/README.md b/README.md index 448c49c..1ca42d5 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,6 @@ The library is still under development. The current major limitations and deviat ### SOME/IP - Only events (and field notifiers) are supported. Methods (and field getters/setters) are not supported yet. -- Receiving events is not supported yet. The server-side only is supported for now. Client service instances for receiving SOME/IP events are supported soon. - Only UDP services are supported. - Only unicast services are supported. - SOME/IP-TP is not supported. diff --git a/example_apps/receive_events.py b/example_apps/receive_events.py new file mode 100644 index 0000000..d29678d --- /dev/null +++ b/example_apps/receive_events.py @@ -0,0 +1,79 @@ +import asyncio +import ipaddress +import logging + +from someipy.service_discovery import construct_service_discovery +from someipy.client_service_instance import construct_client_service_instance +from someipy.logging import set_someipy_log_level +from temperature_msg import TemparatureMsg + +SD_MULTICAST_GROUP = "224.224.224.245" +SD_PORT = 30490 +INTERFACE_IP = "127.0.0.1" + +SAMPLE_EVENTGROUP_ID = 20 + + +def temperature_callback(payload: bytes) -> None: + print(f"Received {len(payload)} bytes.") + temperature_msg = TemparatureMsg().deserialize(payload) + print(temperature_msg) + + +async def main(): + # It's possible to configure the logging level of the someipy library, e.g. logging.INFO, logging.DEBUG, logging.WARN, .. + set_someipy_log_level(logging.DEBUG) + + # Since the construction of the class ServiceDiscoveryProtocol is not trivial and would require an async __init__ function + # use the construct_service_discovery function + # The local interface IP address needs to be passed so that the src-address of all SD UDP packets is correctly set + service_discovery = await construct_service_discovery( + SD_MULTICAST_GROUP, SD_PORT, INTERFACE_IP + ) + + # 1. For receiving events use a ClientServiceInstance. Since the construction of the class ClientServiceInstance is not + # trivial and would require an async __init__ function use the construct_client_service_instance function + # 2. Pass the service and instance ID, version and endpoint and TTL. The endpoint is needed because it will be the dest IP + # and port to which the events are sent to and the client will listen to + # 3. The ServiceDiscoveryProtocol object has to be passed as well, so the ClientServiceInstance can offer his service to + # other ECUs + service_instance_temperature = await construct_client_service_instance( + service_id=1, + instance_id=1000, + major_version=1, + minor_version=0, + endpoint=(ipaddress.IPv4Address(INTERFACE_IP), 3002), + ttl=5, + sd_sender=service_discovery, + ) + + # It's possible to optionally register a callback function which will be called when an event from the + # subscribed event group is received. The callback function will get the bytes of the payload passed which + # can be deserialized in the callback function + service_instance_temperature.register_callback(temperature_callback) + + # In order to subscribe to an event group, just pass the event group ID to the subscribe_eventgroup method + service_instance_temperature.subscribe_eventgroup(SAMPLE_EVENTGROUP_ID) + + # The service instance has to be attached always to the ServiceDiscoveryProtocol object, so that the service instance + # is notified by the ServiceDiscoveryProtocol about e.g. offers from other ECUs and can also subscribe to offered + # services + service_discovery.attach(service_instance_temperature) + + try: + # Keep the task alive + await asyncio.Future() + except asyncio.CancelledError as e: + print("Shutdown..") + finally: + print("Service Discovery close..") + service_discovery.close() + + print("End main task..") + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt as e: + pass diff --git a/src/someipy/client_service_instance.py b/src/someipy/client_service_instance.py new file mode 100644 index 0000000..d9a252f --- /dev/null +++ b/src/someipy/client_service_instance.py @@ -0,0 +1,159 @@ +import asyncio +from typing import Tuple, Union, Any, Callable + +from someipy._internal.someip_sd_header import ( + SdService, + TransportLayerProtocol, + SdEventGroupEntry, +) +from someipy._internal.someip_sd_builder import build_subscribe_eventgroup_entry +from someipy._internal.someip_header import ( + get_payload_from_someip_message, + SomeIpHeader, +) +from someipy._internal.service_discovery_abcs import ( + ServiceDiscoveryObserver, + ServiceDiscoverySender, +) +from someipy._internal.utils import create_udp_socket, EndpointType, DatagramAdapter +from someipy._internal.logging import get_logger + + +_logger = get_logger("client_service_instance") + + +class ClientServiceInstance(ServiceDiscoveryObserver): + service_id: int + instance_id: int + major_version: int + minor_version: int + endpoint: EndpointType + ttl: int + sd_sender: ServiceDiscoverySender + + eventgroup_to_subscribe: int + expect_ack: bool + + callback: Callable[[bytes], None] + + def __init__( + self, + service_id: int, + instance_id: int, + major_version: int, + minor_version: int, + endpoint: EndpointType, + ttl: int = 0, + sd_sender=None, + ): + self.service_id = service_id + self.instance_id = instance_id + self.major_version = major_version + self.minor_version = minor_version + self.endpoint = endpoint + self.ttl = ttl + self.sd_sender = sd_sender + + self.eventgroup_to_subscribe = -1 + self.expect_ack = False + + self.unicast_transport = None + + self.callback = None + + def register_callback(self, callback: Callable[[bytes], None]) -> None: + self.callback = callback + + def datagram_received(self, data: bytes, addr: Tuple[Union[str, Any], int]) -> None: + # TODO: Test if there is a subscription active for the received data + if self.callback is not None: + header = SomeIpHeader.from_buffer(data) + payload = get_payload_from_someip_message(header, data) + self.callback(payload) + + def connection_lost(self, exc: Exception) -> None: + pass + + def subscribe_eventgroup(self, eventgroup_id: int): + # TODO: Currently only one eventgroup per service is supported + self.eventgroup_to_subscribe = eventgroup_id + + def stop_subscribe_eventgroup(self, eventgroup_id: int): + # TODO: Implement StopSubscribe + raise NotImplementedError + + def find_service_update(self): + # Not needed in client service instance + pass + + def offer_service_update(self, offered_service: SdService): + if ( + self.eventgroup_to_subscribe != -1 + and offered_service.service_id == self.service_id + and offered_service.instance_id == self.instance_id + ): + ( + session_id, + reboot_flag, + ) = self.sd_sender.get_unicast_session_handler().update_session() + + subscribe_sd_header = build_subscribe_eventgroup_entry( + service_id=self.service_id, + instance_id=self.instance_id, + major_version=self.major_version, + ttl=self.ttl, + event_group_id=self.eventgroup_to_subscribe, + session_id=session_id, + reboot_flag=reboot_flag, + endpoint=self.endpoint, + protocol=TransportLayerProtocol.UDP, + ) + + # TODO: Subscription shall be only active when ACK is received + self.expect_ack = True + + _logger.debug( + f"Send subscribe for instance 0x{self.instance_id:04X}, service: 0x{self.service_id:04X}, evengroup ID: {self.eventgroup_to_subscribe} TTL: {self.ttl}, version: {self.major_version}.{self.minor_version}, session ID: {session_id}" + ) + + self.sd_sender.send_unicast( + buffer=subscribe_sd_header.to_buffer(), + dest_ip=offered_service.endpoint[0], + ) + + def subscribe_eventgroup_update(self, _, __) -> None: + # Not needed for client instance + pass + + def subscribe_ack_eventgroup_update( + self, event_group_entry: SdEventGroupEntry + ) -> None: + if self.expect_ack: + self.expect_ack = False + _logger.debug( + f"Received subscribe ACK for instance 0x{event_group_entry.sd_entry.instance_id:04X}, service 0x{event_group_entry.sd_entry.service_id:04X}, eventgroup 0x{event_group_entry.eventgroup_id:04X}" + ) + + +async def construct_client_service_instance( + service_id: int, + instance_id: int, + major_version: int, + minor_version: int, + endpoint: EndpointType, + ttl: int = 0, + sd_sender=None, +) -> ClientServiceInstance: + client_instance = ClientServiceInstance( + service_id, instance_id, major_version, minor_version, endpoint, ttl, sd_sender + ) + + loop = asyncio.get_running_loop() + rcv_socket = create_udp_socket(str(endpoint[0]), endpoint[1]) + + unicast_transport, _ = await loop.create_datagram_endpoint( + lambda: DatagramAdapter(target=client_instance), sock=rcv_socket + ) + client_instance.unicast_transport = unicast_transport + + return client_instance diff --git a/src/someipy/server_service_instance.py b/src/someipy/server_service_instance.py index bcbbf6f..853fa33 100644 --- a/src/someipy/server_service_instance.py +++ b/src/someipy/server_service_instance.py @@ -1,14 +1,22 @@ from dataclasses import dataclass from typing import List -from someipy.service_discovery import * from someipy._internal.message_types import MessageType -from someipy._internal.someip_sd_builder import * +from someipy._internal.someip_sd_builder import ( + build_subscribe_eventgroup_ack_entry, + build_offer_service_sd_header, + build_subscribe_eventgroup_ack_sd_header, +) from someipy._internal.someip_header import SomeIpHeader from someipy._internal.someip_sd_header import ( SdService, TransportLayerProtocol, SdEventGroupEntry, + SdIPV4EndpointOption, +) +from someipy._internal.service_discovery_abcs import ( + ServiceDiscoveryObserver, + ServiceDiscoverySender, ) from someipy._internal.simple_timer import SimplePeriodicTimer @@ -79,7 +87,7 @@ def __init__( def add_eventgroup(self, eventgroup: EventGroup): ids = [e.eventgroup_id for e in self.eventgroups] - if not eventgroup.eventgroup_id in ids: + if eventgroup.eventgroup_id not in ids: self.eventgroups.append(eventgroup) def find_service_update(self): @@ -122,7 +130,7 @@ def subscribe_eventgroup_update( sd_event_group: SdEventGroupEntry, ipv4_endpoint_option: SdIPV4EndpointOption, ) -> None: - eventgroup_ids = [e.eventgroup_id for e in self.eventgroups] + # eventgroup_ids = [e.eventgroup_id for e in self.eventgroups] # [PRS_SOMEIPSD_00829] When receiving a SubscribeEventgroupAck or Sub- # scribeEventgroupNack the Service ID, Instance ID, Eventgroup ID, and Major Ver-