Skip to content

Commit

Permalink
Enable receiving events via TCP
Browse files Browse the repository at this point in the history
  • Loading branch information
chrizog committed May 22, 2024
1 parent 3c951e1 commit 64c5356
Show file tree
Hide file tree
Showing 10 changed files with 383 additions and 19 deletions.
103 changes: 103 additions & 0 deletions example_apps/receive_events_tcp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import asyncio
import ipaddress
import logging

from someipy import ServiceBuilder, EventGroup, TransportLayerProtocol, SomeIpMessage
from someipy.service_discovery import construct_service_discovery
from someipy.client_service_instance import construct_client_service_instance
from someipy.logging import set_someipy_log_level
from temperature_msg import TemparatureMsg

SD_MULTICAST_GROUP = "224.224.224.245"
SD_PORT = 30490
INTERFACE_IP = "127.0.0.1"

SAMPLE_SERVICE_ID = 0x1234
SAMPLE_INSTANCE_ID = 0x5678
SAMPLE_EVENTGROUP_ID = 0x0321
SAMPLE_EVENT_ID = 0x0123

def temperature_callback(someip_message: SomeIpMessage) -> None:
"""
Callback function that is called when a temperature message is received.
Args:
someip_message (SomeIpMessage): The SomeIpMessage object containing the received message.
Returns:
None: This function does not return anything.
"""
try:
print(f"Received {len(someip_message.payload)} bytes. Try to deserialize..")
temperature_msg = TemparatureMsg().deserialize(someip_message.payload)
print(temperature_msg)
except Exception as e:
print(f"Error in deserialization: {e}")

async def main():
# It's possible to configure the logging level of the someipy library, e.g. logging.INFO, logging.DEBUG, logging.WARN, ..
set_someipy_log_level(logging.DEBUG)

# Since the construction of the class ServiceDiscoveryProtocol is not trivial and would require an async __init__ function
# use the construct_service_discovery function
# The local interface IP address needs to be passed so that the src-address of all SD UDP packets is correctly set
service_discovery = await construct_service_discovery(
SD_MULTICAST_GROUP, SD_PORT, INTERFACE_IP
)

# 1. For receiving events use a ClientServiceInstance. Since the construction of the class ClientServiceInstance is not
# trivial and would require an async __init__ function use the construct_client_service_instance function
# 2. Pass the service and instance ID, version and endpoint and TTL. The endpoint is needed because it will be the dest IP
# and port to which the events are sent to and the client will listen to
# 3. The ServiceDiscoveryProtocol object has to be passed as well, so the ClientServiceInstance can offer his service to
# other ECUs
temperature_eventgroup = EventGroup(
id=SAMPLE_EVENTGROUP_ID, event_ids=[SAMPLE_EVENT_ID]
)
temperature_service = (
ServiceBuilder()
.with_service_id(SAMPLE_SERVICE_ID)
.with_major_version(1)
.with_eventgroup(temperature_eventgroup)
.build()
)

service_instance_temperature = await construct_client_service_instance(
service=temperature_service,
instance_id=SAMPLE_INSTANCE_ID,
endpoint=(ipaddress.IPv4Address(INTERFACE_IP), 3002),
ttl=5,
sd_sender=service_discovery,
protocol=TransportLayerProtocol.TCP
)

# It's possible to optionally register a callback function which will be called when an event from the
# subscribed event group is received. The callback function will get the bytes of the payload passed which
# can be deserialized in the callback function
service_instance_temperature.register_callback(temperature_callback)

# In order to subscribe to an event group, just pass the event group ID to the subscribe_eventgroup method
service_instance_temperature.subscribe_eventgroup(SAMPLE_EVENTGROUP_ID)

# The service instance has to be attached always to the ServiceDiscoveryProtocol object, so that the service instance
# is notified by the ServiceDiscoveryProtocol about e.g. offers from other ECUs and can also subscribe to offered
# services
service_discovery.attach(service_instance_temperature)

try:
# Keep the task alive
await asyncio.Future()
except asyncio.CancelledError as e:
print("Shutdown..")
finally:
print("Service Discovery close..")
service_discovery.close()

print("End main task..")


if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt as e:
pass
19 changes: 16 additions & 3 deletions example_apps/receive_events_udp.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,22 @@


def temperature_callback(someip_message: SomeIpMessage) -> None:
print(f"Received {len(someip_message.payload)} bytes.")
temperature_msg = TemparatureMsg().deserialize(someip_message.payload)
print(temperature_msg)
"""
Callback function that is called when a temperature message is received.
Args:
someip_message (SomeIpMessage): The SomeIpMessage object containing the received message.
Returns:
None: This function does not return anything.
"""
try:
print(f"Received {len(someip_message.payload)} bytes. Try to deserialize..")
temperature_msg = TemparatureMsg().deserialize(someip_message.payload)
print(temperature_msg)
except Exception as e:
print(f"Error in deserialization: {e}")



async def main():
Expand Down
4 changes: 2 additions & 2 deletions src/someipy/_internal/tcp_client_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,15 @@ def __init__(self, client_manager: TcpClientManager):

def connection_made(self, transport: asyncio.BaseTransport):
peername: Tuple = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
# print('Connection from {}'.format(peername))
self._transport = transport
self._ip_addr_client = peername[0]
self._port_client = peername[1]

self._client_manager.register_client(self)

def data_received(self, data: bytes):
print('Data received {}: {}'.format(self.ip_addr, data))
# print('Data received {}: {}'.format(self.ip_addr, data))

# Push data to processor
result = self._data_processor.process_data(data)
Expand Down
33 changes: 33 additions & 0 deletions src/someipy/_internal/tcp_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import asyncio
from someipy._internal.logging import get_logger

_logger_name = "tcp_connection"

class TcpConnection():
def __init__(self, ip_server: str, port: int):
self.ip_server = ip_server
self.port = port
self.reader = None
self.writer = None

async def connect(self, src_ip: str, src_port: int):
try:
local_addr = (src_ip, src_port)
self.reader, self.writer = await asyncio.open_connection(self.ip_server, self.port, local_addr=local_addr)
get_logger(_logger_name).debug(f"Connected to {self.ip_server}:{self.port}")
except Exception as e:
get_logger(_logger_name).error(f"Error connecting to {self.ip_server}:{self.port}: {e}")

def is_open(self):
if self.writer is None or self.writer.is_closing():
return False
return not self.reader.at_eof()

async def close(self):
if self.writer:
self.writer.close()
await self.writer.wait_closed()
self.writer = None
get_logger(_logger_name).debug(f"Connection to {self.ip_server}:{self.port} closed")
else:
get_logger(_logger_name).debug("No connection to close")
109 changes: 96 additions & 13 deletions src/someipy/client_service_instance.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
from enum import Enum
import struct
from typing import Tuple, Callable, Set, List

from someipy import Service
Expand All @@ -7,21 +9,21 @@
TransportLayerProtocol,
SdEventGroupEntry,
)
from someipy._internal.someip_header import SomeIpHeader, get_payload_from_someip_message
from someipy._internal.someip_sd_builder import build_subscribe_eventgroup_entry
from someipy._internal.service_discovery_abcs import (
ServiceDiscoveryObserver,
ServiceDiscoverySender,
)
from someipy._internal.tcp_client_manager import TcpClientManager, TcpClientProtocol
from someipy._internal.utils import create_udp_socket, EndpointType
from someipy._internal.logging import get_logger
from someipy._internal.message_types import MessageType
from someipy._internal.someip_endpoint import (
SomeipEndpoint,
TCPSomeipEndpoint,
UDPSomeipEndpoint,
SomeIpMessage,
)
from someipy._internal.tcp_connection import TcpConnection

_logger_name = "client_service_instance"

Expand Down Expand Up @@ -65,6 +67,11 @@ def __init__(
self._expected_acks = []
self._callback = None

self._tcp_connection: TcpConnection = None

self._tcp_connect_lock = asyncio.Lock()
self._tcp_task = None

def register_callback(self, callback: Callable[[SomeIpMessage], None]) -> None:
self._callback = callback

Expand Down Expand Up @@ -133,12 +140,98 @@ def offer_service_update(self, offered_service: SdService):
f"session ID: {session_id}"
)

if self._protocol == TransportLayerProtocol.TCP:
if self._tcp_task is None:
get_logger(_logger_name).debug(f"Create new TCP task for client of 0x{self._instance_id:04X}, 0x{self._service.id:04X}")
self._tcp_task = asyncio.create_task(self.setup_tcp_connection(str(self._endpoint[0]), self._endpoint[1], str(offered_service.endpoint[0]), offered_service.endpoint[1]))

self._expected_acks.append(ExpectedAck(eventgroup_to_subscribe))
self._sd_sender.send_unicast(
buffer=subscribe_sd_header.to_buffer(),
dest_ip=offered_service.endpoint[0],
)

async def setup_tcp_connection(self, src_ip: str, src_port: int, dst_ip: str, dst_port: int):
# TODO: Check for stop condition
while True:
try:
get_logger(_logger_name).debug(f"Try to open TCP connection to ({dst_ip}, {dst_port})")
self._tcp_connection = TcpConnection(dst_ip, dst_port)
await self._tcp_connection.connect(src_ip, src_port)

class State(Enum):
HEADER = 1
PAYLOAD = 2
PENDING = 3
state = State.HEADER

expected_bytes = 8 # 2x 32-bit for header
header_data = bytes()
data: bytes = bytes()
count = 0
get_logger(_logger_name).debug(f"Start TCP read on port {src_port}")

while self._tcp_connection.is_open():
try:
if state == State.HEADER:
while len(data) < expected_bytes:
new_data = await asyncio.wait_for(self._tcp_connection.reader.read(8), 3.0)
data += new_data
service_id, method_id, length = struct.unpack(">HHI", data[0:8])

count += 1
# print(f"{count} Received {len(data)} bytes: Service ID: 0x{service_id:02x} Method ID: 0x{method_id:02x} Length: {length}")

header_data = data[0:8]

# The length bytes also covers 8 bytes header data without payload
expected_bytes = length
state = State.PAYLOAD

elif state == State.PAYLOAD:
data = bytes()
while len(data) < expected_bytes:
new_data = await asyncio.wait_for(self._tcp_connection.reader.read(expected_bytes), 3.0)
data += new_data

# print(f"Received {len(data)} bytes from expected {expected_bytes}")

header_data = header_data + data[0:8]
payload_data = data[8:]

message_data = header_data + payload_data
# hex_representation = ' '.join(f'0x{byte:02x}' for byte in message_data)
# print(hex_representation)
someip_header = SomeIpHeader.from_buffer(buf=message_data)
# print(str(someip_header))
payload_data = get_payload_from_someip_message(someip_header, message_data)
# hex_representation = ' '.join(f'0x{byte:02x}' for byte in payload_data)
# print(hex_representation)

if self._callback is not None:
self._callback(SomeIpMessage(someip_header, payload_data))

if len(data) == expected_bytes:
data = bytes()
else:
data = data[expected_bytes:]
state = State.HEADER
expected_bytes = 8

except TimeoutError:
get_logger(_logger_name).debug(f"Timeout reading from TCP connection ({src_ip}, {src_port})")


except Exception as e:
get_logger(_logger_name).error(f"Exception in setup_tcp_connection: {e}")
finally:
# 3. If the connection is closed, try to reconnect at beginning of loop (1)
await self._tcp_connection.close()

# Sleep for a while before reconnect
await asyncio.sleep(1)


def subscribe_eventgroup_update(self, _, __) -> None:
# Not needed for client instance
pass
Expand Down Expand Up @@ -196,22 +289,12 @@ async def construct_client_service_instance(

elif protocol == TransportLayerProtocol.TCP:

tcp_client_manager = TcpClientManager()
loop = asyncio.get_running_loop()
server = await loop.create_server(
lambda: TcpClientProtocol(client_manager=tcp_client_manager),
str(endpoint[0]),
endpoint[1],
)

tcp_someip_endpoint = TCPSomeipEndpoint(server, tcp_client_manager)

server_instance = ClientServiceInstance(
service,
instance_id,
endpoint,
TransportLayerProtocol.TCP,
tcp_someip_endpoint,
None,
ttl,
sd_sender,
)
Expand Down
4 changes: 4 additions & 0 deletions test_apps/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
cmake_minimum_required (VERSION 3.13)

project(someipy_test_apps)

set(CMAKE_BUILD_TYPE Debug)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14")


find_package(vsomeip3 CONFIG REQUIRED)

set(CMAKE_INSTALL_PREFIX "${CMAKE_SOURCE_DIR}/install")
Expand All @@ -24,3 +27,4 @@ endfunction()
create_test_target("send_events")

create_test_target("receive_events_udp")
create_test_target("receive_events_tcp")
Loading

0 comments on commit 64c5356

Please sign in to comment.