2323#
2424
2525from __future__ import annotations
26+ from json import JSONDecodeError
2627from typing import Any , Callable , Generator , Union
2728
29+ from ..json_utils .errors import JSONRPCError , NODE_UNKNOWN , RECEIVER_UNKNOWN
30+ from ..json_utils .json_objects import Notification
2831from ..core .message import Message , MessageTypes
2932from ..core .data_message import DataMessage
3033from ..json_utils .rpc_generator import RPCGenerator
@@ -57,11 +60,10 @@ def convert_data_message_to_messages(
5760 self , data_message : DataMessage , receivers : Union [set [Union [bytes , str ]], set [bytes ]],
5861 ) -> Generator [Message , Any , Any ]:
5962 cid = data_message .conversation_id
60- data = self .rpc_generator .build_request_str (method = "add_subscription_message" )
6163 for receiver in receivers :
6264 yield Message (
6365 receiver = receiver ,
64- data = data ,
66+ data = Notification ( "add_subscription_message" ) ,
6567 conversation_id = cid ,
6668 additional_payload = data_message .payload ,
6769 message_type = MessageTypes .JSON ,
@@ -72,3 +74,32 @@ def send_message(self, message: DataMessage) -> None:
7274 for msg in self .convert_data_message_to_messages (message , self .subscribers ):
7375 # ideas: change to ask and check, whether it succeeded, otherwise remove subscriber
7476 self .send_control_message (msg )
77+
78+ # TODO should unregister subscribers to which a message could not be forwarded
79+ def handle_json_error (self , message : Message ) -> None :
80+ """Unregister unavailable subscribers.
81+
82+ Call this method from the message handler, for example.
83+ """
84+ try :
85+ data : dict [str , Any ] = message .data # type: ignore
86+ except JSONDecodeError as exc :
87+ self .log .exception (f"Could not decode json message { message } " , exc_info = exc )
88+ return
89+ try :
90+ self .rpc_generator .get_result_from_response (data )
91+ except JSONRPCError as exc :
92+ error_code = exc .rpc_error .code
93+ try :
94+ error_data = exc .rpc_error .data
95+ except AttributeError :
96+ return
97+ if error_code == RECEIVER_UNKNOWN :
98+ self .unregister_subscriber (error_data )
99+ if error_code == NODE_UNKNOWN :
100+ if isinstance (error_data , str ):
101+ error_data = error_data .encode ()
102+ for subscriber in self .subscribers :
103+ if subscriber .startswith (error_data ):
104+ self .unregister_subscriber (subscriber )
105+
0 commit comments