1616import asyncio
1717from enum import Enum
1818import struct
19- from typing import Iterable , Tuple , Callable , Set , List
19+ from typing import Dict , Iterable , Tuple , Callable , Set , List
2020
2121from someipy import Service
2222from someipy ._internal .method_result import MethodResult
@@ -86,7 +86,10 @@ class ClientServiceInstance(ServiceDiscoveryObserver):
8686 _callback : Callable [[bytes ], None ]
8787 _found_services : Iterable [FoundService ]
8888 _subscription_active : bool
89- _method_call_future : asyncio .Future
89+
90+ _method_call_futures : Dict [int , asyncio .Future ]
91+ _client_id : int
92+ _session_id : int
9093
9194 def __init__ (
9295 self ,
@@ -97,6 +100,7 @@ def __init__(
97100 someip_endpoint : SomeipEndpoint ,
98101 ttl : int = 0 ,
99102 sd_sender = None ,
103+ client_id : int = 0 ,
100104 ):
101105 self ._service = service
102106 self ._instance_id = instance_id
@@ -119,7 +123,10 @@ def __init__(
119123
120124 self ._found_services = []
121125 self ._subscription_active = False
122- self ._method_call_future = None
126+ self ._method_call_futures : Dict [int , asyncio .Future ] = {}
127+ self ._client_id = client_id
128+
129+ self ._session_id = 0 # Starts from 1 to 0xFFFF
123130
124131 def register_callback (self , callback : Callable [[SomeIpMessage ], None ]) -> None :
125132 """
@@ -157,7 +164,7 @@ async def call_method(self, method_id: int, payload: bytes) -> MethodResult:
157164 payload (bytes): The payload to send with the method call.
158165
159166 Returns:
160- MethodResult: The result of the method call which can contain an error or a successfull result including the response payload.
167+ MethodResult: The result of the method call which can contain an error or a successful result including the response payload.
161168
162169 Raises:
163170 RuntimeError: If the TCP connection to the server cannot be established or if the server service has not been found yet.
@@ -174,20 +181,25 @@ async def call_method(self, method_id: int, payload: bytes) -> MethodResult:
174181 f"Method 0x{ method_id :04x} called, but service 0x{ self ._service .id :04X} with instance 0x{ self ._instance_id :04X} not found yet."
175182 )
176183
184+ # Session ID is a 16-bit value and should be incremented for each method call starting from 1
185+ self ._session_id = (self ._session_id + 1 ) % 0xFFFF
186+ session_id = self ._session_id
187+
177188 header = SomeIpHeader (
178189 service_id = self ._service .id ,
179190 method_id = method_id ,
180- client_id = 0x00 ,
181- session_id = 0x00 ,
191+ client_id = self . _client_id ,
192+ session_id = session_id ,
182193 protocol_version = 0x01 ,
183- interface_version = 0x00 ,
194+ interface_version = self . _service . major_version ,
184195 message_type = MessageType .REQUEST .value ,
185196 return_code = 0x00 ,
186197 length = len (payload ) + 8 ,
187198 )
188199 someip_message = SomeIpMessage (header , payload )
189200
190- self ._method_call_future = asyncio .get_running_loop ().create_future ()
201+ call_future = asyncio .get_running_loop ().create_future ()
202+ self ._method_call_futures [session_id ] = call_future
191203
192204 dst_address = str (self ._found_services [0 ].service .endpoint [0 ])
193205 dst_port = self ._found_services [0 ].service .endpoint [1 ]
@@ -234,20 +246,28 @@ async def call_method(self, method_id: int, payload: bytes) -> MethodResult:
234246 endpoint_to_str_int_tuple (self ._found_services [0 ].service .endpoint ),
235247 )
236248
237- # After sending the method call wait for maximum three seconds
249+ # After sending the method call wait for maximum 10 seconds
238250 try :
239- await asyncio .wait_for (self . _method_call_future , 3 .0 )
251+ await asyncio .wait_for (call_future , 10 .0 )
240252 except asyncio .TimeoutError :
253+
254+ # Remove the call_future from self._method_call_futures
255+ del self ._method_call_futures [session_id ]
256+
241257 get_logger (_logger_name ).error (
242258 f"Waiting on response for method call 0x{ method_id :04X} timed out."
243259 )
244260 raise
245261
246- return self ._method_call_future .result ()
262+ method_result = call_future .result ()
263+ del self ._method_call_futures [session_id ]
264+ return method_result
247265
248266 def someip_message_received (
249267 self , someip_message : SomeIpMessage , addr : Tuple [str , int ]
250268 ) -> None :
269+
270+ # Handling a notification message
251271 if (
252272 someip_message .header .client_id == 0x00
253273 and someip_message .header .message_type == MessageType .NOTIFICATION .value
@@ -257,16 +277,33 @@ def someip_message_received(
257277 self ._callback (someip_message )
258278 return
259279
280+ # Handling a response message
260281 if (
261282 someip_message .header .message_type == MessageType .RESPONSE .value
262283 or someip_message .header .message_type == MessageType .ERROR .value
263284 ):
264- if self ._method_call_future is not None :
285+ if someip_message .header .session_id not in self ._method_call_futures .keys ():
286+ return
287+ if someip_message .header .client_id != self ._client_id :
288+ return
289+
290+ call_future = None
291+ try :
292+ call_future = self ._method_call_futures [
293+ someip_message .header .session_id
294+ ]
295+ except :
296+ get_logger (_logger_name ).error (
297+ f"Received response for unknown session ID { someip_message .header .session_id } "
298+ )
299+ return
300+
301+ if call_future is not None :
265302 result = MethodResult ()
266303 result .message_type = MessageType (someip_message .header .message_type )
267304 result .return_code = ReturnCode (someip_message .header .return_code )
268305 result .payload = someip_message .payload
269- self . _method_call_future .set_result (result )
306+ call_future .set_result (result )
270307 return
271308
272309 def subscribe_eventgroup (self , eventgroup_id : int ):
@@ -314,64 +351,72 @@ def handle_find_service(self):
314351 def handle_offer_service (self , offered_service : SdService ):
315352 if self ._service .id != offered_service .service_id :
316353 return
317- if self ._instance_id != offered_service .instance_id :
354+ if (
355+ self ._instance_id != 0xFFFF
356+ and offered_service .instance_id != 0xFFFF
357+ and self ._instance_id != offered_service .instance_id
358+ ):
359+ # 0xFFFF allows to handle any instance ID
360+ return
361+ if self ._service .major_version != offered_service .major_version :
318362 return
319-
320363 if (
321- offered_service . service_id == self ._service .id
322- and offered_service . instance_id == self . _instance_id
364+ self ._service .minor_version != 0xFFFFFFFF
365+ and self . _service . minor_version != offered_service . minor_version
323366 ):
324- if FoundService ( offered_service ) not in self . _found_services :
325- self . _found_services . append ( FoundService ( offered_service ))
367+ # 0xFFFFFFFF allows to handle any minor version
368+ return
326369
327- if len ( self . _eventgroups_to_subscribe ) == 0 :
328- return
370+ if FoundService ( offered_service ) not in self . _found_services :
371+ self . _found_services . append ( FoundService ( offered_service ))
329372
330- # Try to subscribe to requested event groups
331- for eventgroup_to_subscribe in self ._eventgroups_to_subscribe :
332- (
333- session_id ,
334- reboot_flag ,
335- ) = self ._sd_sender .get_unicast_session_handler ().update_session ()
336-
337- # Improvement: Pack all entries into a single SD message
338- subscribe_sd_header = build_subscribe_eventgroup_sd_header (
339- service_id = self ._service .id ,
340- instance_id = self ._instance_id ,
341- major_version = self ._service .major_version ,
342- ttl = self ._ttl ,
343- event_group_id = eventgroup_to_subscribe ,
344- session_id = session_id ,
345- reboot_flag = reboot_flag ,
346- endpoint = self ._endpoint ,
347- protocol = self ._protocol ,
348- )
373+ if len (self ._eventgroups_to_subscribe ) == 0 :
374+ return
349375
350- get_logger (_logger_name ).debug (
351- f"Send subscribe for instance 0x{ self ._instance_id :04X} , service: 0x{ self ._service .id :04X} , "
352- f"eventgroup ID: { eventgroup_to_subscribe } TTL: { self ._ttl } , version: "
353- f"session ID: { session_id } "
354- )
376+ # Try to subscribe to requested event groups
377+ for eventgroup_to_subscribe in self ._eventgroups_to_subscribe :
378+ (
379+ session_id ,
380+ reboot_flag ,
381+ ) = self ._sd_sender .get_unicast_session_handler ().update_session ()
382+ # Improvement: Pack all entries into a single SD message
383+ subscribe_sd_header = build_subscribe_eventgroup_sd_header (
384+ service_id = self ._service .id ,
385+ instance_id = self ._instance_id ,
386+ major_version = self ._service .major_version ,
387+ ttl = self ._ttl ,
388+ event_group_id = eventgroup_to_subscribe ,
389+ session_id = session_id ,
390+ reboot_flag = reboot_flag ,
391+ endpoint = self ._endpoint ,
392+ protocol = self ._protocol ,
393+ )
355394
356- if self ._protocol == TransportLayerProtocol .TCP :
357- if self ._tcp_task is None :
358- get_logger (_logger_name ).debug (
359- f"Create new TCP task for client of 0x{ self ._instance_id :04X} , 0x{ self ._service .id :04X} "
360- )
361- self ._tcp_task = asyncio .create_task (
362- self .setup_tcp_connection (
363- str (self ._endpoint [0 ]),
364- self ._endpoint [1 ],
365- str (offered_service .endpoint [0 ]),
366- offered_service .endpoint [1 ],
367- )
395+ get_logger (_logger_name ).debug (
396+ f"Send subscribe for instance 0x{ self ._instance_id :04X} , service: 0x{ self ._service .id :04X} , "
397+ f"eventgroup ID: { eventgroup_to_subscribe } TTL: { self ._ttl } , version: "
398+ f"session ID: { session_id } "
399+ )
400+
401+ if self ._protocol == TransportLayerProtocol .TCP :
402+ if self ._tcp_task is None :
403+ get_logger (_logger_name ).debug (
404+ f"Create new TCP task for client of 0x{ self ._instance_id :04X} , 0x{ self ._service .id :04X} "
405+ )
406+ self ._tcp_task = asyncio .create_task (
407+ self .setup_tcp_connection (
408+ str (self ._endpoint [0 ]),
409+ self ._endpoint [1 ],
410+ str (offered_service .endpoint [0 ]),
411+ offered_service .endpoint [1 ],
368412 )
413+ )
369414
370- self ._expected_acks .append (ExpectedAck (eventgroup_to_subscribe ))
371- self ._sd_sender .send_unicast (
372- buffer = subscribe_sd_header .to_buffer (),
373- dest_ip = offered_service .endpoint [0 ],
374- )
415+ self ._expected_acks .append (ExpectedAck (eventgroup_to_subscribe ))
416+ self ._sd_sender .send_unicast (
417+ buffer = subscribe_sd_header .to_buffer (),
418+ dest_ip = offered_service .endpoint [0 ],
419+ )
375420
376421 def handle_stop_offer_service (self , offered_service : SdService ) -> None :
377422 if self ._service .id != offered_service .service_id :
@@ -489,6 +534,7 @@ async def construct_client_service_instance(
489534 ttl : int = 0 ,
490535 sd_sender = None ,
491536 protocol = TransportLayerProtocol .UDP ,
537+ client_id : int = 0 ,
492538) -> ClientServiceInstance :
493539 """
494540 Asynchronously constructs a ClientServerInstance. Based on the given transport protocol, proper endpoints are setup before constructing the actual ServerServiceInstance.
@@ -523,6 +569,7 @@ async def construct_client_service_instance(
523569 udp_endpoint ,
524570 ttl ,
525571 sd_sender ,
572+ client_id ,
526573 )
527574
528575 udp_endpoint .set_someip_callback (client_instance .someip_message_received )
0 commit comments