Skip to content

Commit

Permalink
Introduce client service instance class for receiving events
Browse files Browse the repository at this point in the history
  • Loading branch information
chrizog committed Feb 8, 2024
1 parent 8fec5b6 commit 479b2c2
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 5 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ The library is still under development. The current major limitations and deviat
### SOME/IP

- Only events (and field notifiers) are supported. Methods (and field getters/setters) are not supported yet.
- Receiving events is not supported yet. The server-side only is supported for now. Client service instances for receiving SOME/IP events are supported soon.
- Only UDP services are supported.
- Only unicast services are supported.
- SOME/IP-TP is not supported.
Expand Down
79 changes: 79 additions & 0 deletions example_apps/receive_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import asyncio
import ipaddress
import logging

from someipy.service_discovery import construct_service_discovery
from someipy.client_service_instance import construct_client_service_instance
from someipy.logging import set_someipy_log_level
from temperature_msg import TemparatureMsg

SD_MULTICAST_GROUP = "224.224.224.245"
SD_PORT = 30490
INTERFACE_IP = "127.0.0.1"

SAMPLE_EVENTGROUP_ID = 20


def temperature_callback(payload: bytes) -> None:
print(f"Received {len(payload)} bytes.")
temperature_msg = TemparatureMsg().deserialize(payload)
print(temperature_msg)


async def main():
# It's possible to configure the logging level of the someipy library, e.g. logging.INFO, logging.DEBUG, logging.WARN, ..
set_someipy_log_level(logging.DEBUG)

# Since the construction of the class ServiceDiscoveryProtocol is not trivial and would require an async __init__ function
# use the construct_service_discovery function
# The local interface IP address needs to be passed so that the src-address of all SD UDP packets is correctly set
service_discovery = await construct_service_discovery(
SD_MULTICAST_GROUP, SD_PORT, INTERFACE_IP
)

# 1. For receiving events use a ClientServiceInstance. Since the construction of the class ClientServiceInstance is not
# trivial and would require an async __init__ function use the construct_client_service_instance function
# 2. Pass the service and instance ID, version and endpoint and TTL. The endpoint is needed because it will be the dest IP
# and port to which the events are sent to and the client will listen to
# 3. The ServiceDiscoveryProtocol object has to be passed as well, so the ClientServiceInstance can offer his service to
# other ECUs
service_instance_temperature = await construct_client_service_instance(
service_id=1,
instance_id=1000,
major_version=1,
minor_version=0,
endpoint=(ipaddress.IPv4Address(INTERFACE_IP), 3002),
ttl=5,
sd_sender=service_discovery,
)

# It's possible to optionally register a callback function which will be called when an event from the
# subscribed event group is received. The callback function will get the bytes of the payload passed which
# can be deserialized in the callback function
service_instance_temperature.register_callback(temperature_callback)

# In order to subscribe to an event group, just pass the event group ID to the subscribe_eventgroup method
service_instance_temperature.subscribe_eventgroup(SAMPLE_EVENTGROUP_ID)

# The service instance has to be attached always to the ServiceDiscoveryProtocol object, so that the service instance
# is notified by the ServiceDiscoveryProtocol about e.g. offers from other ECUs and can also subscribe to offered
# services
service_discovery.attach(service_instance_temperature)

try:
# Keep the task alive
await asyncio.Future()
except asyncio.CancelledError as e:
print("Shutdown..")
finally:
print("Service Discovery close..")
service_discovery.close()

print("End main task..")


if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt as e:
pass
159 changes: 159 additions & 0 deletions src/someipy/client_service_instance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import asyncio
from typing import Tuple, Union, Any, Callable

from someipy._internal.someip_sd_header import (
SdService,
TransportLayerProtocol,
SdEventGroupEntry,
)
from someipy._internal.someip_sd_builder import build_subscribe_eventgroup_entry
from someipy._internal.someip_header import (
get_payload_from_someip_message,
SomeIpHeader,
)
from someipy._internal.service_discovery_abcs import (
ServiceDiscoveryObserver,
ServiceDiscoverySender,
)
from someipy._internal.utils import create_udp_socket, EndpointType, DatagramAdapter
from someipy._internal.logging import get_logger


_logger = get_logger("client_service_instance")


class ClientServiceInstance(ServiceDiscoveryObserver):
service_id: int
instance_id: int
major_version: int
minor_version: int
endpoint: EndpointType
ttl: int
sd_sender: ServiceDiscoverySender

eventgroup_to_subscribe: int
expect_ack: bool

callback: Callable[[bytes], None]

def __init__(
self,
service_id: int,
instance_id: int,
major_version: int,
minor_version: int,
endpoint: EndpointType,
ttl: int = 0,
sd_sender=None,
):
self.service_id = service_id
self.instance_id = instance_id
self.major_version = major_version
self.minor_version = minor_version
self.endpoint = endpoint
self.ttl = ttl
self.sd_sender = sd_sender

self.eventgroup_to_subscribe = -1
self.expect_ack = False

self.unicast_transport = None

self.callback = None

def register_callback(self, callback: Callable[[bytes], None]) -> None:
self.callback = callback

def datagram_received(self, data: bytes, addr: Tuple[Union[str, Any], int]) -> None:
# TODO: Test if there is a subscription active for the received data
if self.callback is not None:
header = SomeIpHeader.from_buffer(data)
payload = get_payload_from_someip_message(header, data)
self.callback(payload)

def connection_lost(self, exc: Exception) -> None:
pass

def subscribe_eventgroup(self, eventgroup_id: int):
# TODO: Currently only one eventgroup per service is supported
self.eventgroup_to_subscribe = eventgroup_id

def stop_subscribe_eventgroup(self, eventgroup_id: int):
# TODO: Implement StopSubscribe
raise NotImplementedError

def find_service_update(self):
# Not needed in client service instance
pass

def offer_service_update(self, offered_service: SdService):
if (
self.eventgroup_to_subscribe != -1
and offered_service.service_id == self.service_id
and offered_service.instance_id == self.instance_id
):
(
session_id,
reboot_flag,
) = self.sd_sender.get_unicast_session_handler().update_session()

subscribe_sd_header = build_subscribe_eventgroup_entry(
service_id=self.service_id,
instance_id=self.instance_id,
major_version=self.major_version,
ttl=self.ttl,
event_group_id=self.eventgroup_to_subscribe,
session_id=session_id,
reboot_flag=reboot_flag,
endpoint=self.endpoint,
protocol=TransportLayerProtocol.UDP,
)

# TODO: Subscription shall be only active when ACK is received
self.expect_ack = True

_logger.debug(
f"Send subscribe for instance 0x{self.instance_id:04X}, service: 0x{self.service_id:04X}, evengroup ID: {self.eventgroup_to_subscribe} TTL: {self.ttl}, version: {self.major_version}.{self.minor_version}, session ID: {session_id}"
)

self.sd_sender.send_unicast(
buffer=subscribe_sd_header.to_buffer(),
dest_ip=offered_service.endpoint[0],
)

def subscribe_eventgroup_update(self, _, __) -> None:
# Not needed for client instance
pass

def subscribe_ack_eventgroup_update(
self, event_group_entry: SdEventGroupEntry
) -> None:
if self.expect_ack:
self.expect_ack = False
_logger.debug(
f"Received subscribe ACK for instance 0x{event_group_entry.sd_entry.instance_id:04X}, service 0x{event_group_entry.sd_entry.service_id:04X}, eventgroup 0x{event_group_entry.eventgroup_id:04X}"
)


async def construct_client_service_instance(
service_id: int,
instance_id: int,
major_version: int,
minor_version: int,
endpoint: EndpointType,
ttl: int = 0,
sd_sender=None,
) -> ClientServiceInstance:
client_instance = ClientServiceInstance(
service_id, instance_id, major_version, minor_version, endpoint, ttl, sd_sender
)

loop = asyncio.get_running_loop()
rcv_socket = create_udp_socket(str(endpoint[0]), endpoint[1])

unicast_transport, _ = await loop.create_datagram_endpoint(
lambda: DatagramAdapter(target=client_instance), sock=rcv_socket
)
client_instance.unicast_transport = unicast_transport

return client_instance
16 changes: 12 additions & 4 deletions src/someipy/server_service_instance.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
from dataclasses import dataclass
from typing import List

from someipy.service_discovery import *
from someipy._internal.message_types import MessageType
from someipy._internal.someip_sd_builder import *
from someipy._internal.someip_sd_builder import (
build_subscribe_eventgroup_ack_entry,
build_offer_service_sd_header,
build_subscribe_eventgroup_ack_sd_header,
)
from someipy._internal.someip_header import SomeIpHeader
from someipy._internal.someip_sd_header import (
SdService,
TransportLayerProtocol,
SdEventGroupEntry,
SdIPV4EndpointOption,
)
from someipy._internal.service_discovery_abcs import (
ServiceDiscoveryObserver,
ServiceDiscoverySender,
)

from someipy._internal.simple_timer import SimplePeriodicTimer
Expand Down Expand Up @@ -79,7 +87,7 @@ def __init__(

def add_eventgroup(self, eventgroup: EventGroup):
ids = [e.eventgroup_id for e in self.eventgroups]
if not eventgroup.eventgroup_id in ids:
if eventgroup.eventgroup_id not in ids:
self.eventgroups.append(eventgroup)

def find_service_update(self):
Expand Down Expand Up @@ -122,7 +130,7 @@ def subscribe_eventgroup_update(
sd_event_group: SdEventGroupEntry,
ipv4_endpoint_option: SdIPV4EndpointOption,
) -> None:
eventgroup_ids = [e.eventgroup_id for e in self.eventgroups]
# eventgroup_ids = [e.eventgroup_id for e in self.eventgroups]

# [PRS_SOMEIPSD_00829] When receiving a SubscribeEventgroupAck or Sub-
# scribeEventgroupNack the Service ID, Instance ID, Eventgroup ID, and Major Ver-
Expand Down

0 comments on commit 479b2c2

Please sign in to comment.