diff --git a/docs/sources/changes.rst b/docs/sources/changes.rst index f9e29c3..a436dcb 100644 --- a/docs/sources/changes.rst +++ b/docs/sources/changes.rst @@ -7,8 +7,10 @@ Version History 1.4.0_dev --------- +- Library class FritzStatus with additional properties: *attenuation*, *str_attenuation*, *noise_margin* and *str_noise_margin* (#69) +- Library class FritzHost with additional method *get_host_name* (#75) +- Namespace prefix for xml-arguments removed (#66) - Test extended for Python 3.9 (#73) -- Library class FritzStatus has additional properties: *attenuation*, *str_attenuation*, *noise_margin* and *str_noise_margin* (#69). 1.3.4 diff --git a/fritzconnection/core/fritzmonitor.py b/fritzconnection/core/fritzmonitor.py index 5ed3f5c..4ab9b56 100644 --- a/fritzconnection/core/fritzmonitor.py +++ b/fritzconnection/core/fritzmonitor.py @@ -16,6 +16,7 @@ import queue import socket import threading +import time FRITZ_IP_ADDRESS = "169.254.1.1" @@ -24,6 +25,9 @@ FRITZ_MONITOR_CHUNK_SIZE = 1024 * 4 FRITZ_MONITOR_SOCKET_TIMEOUT = 10 +RECONNECT_DELAY = 60 # time in seconds to wait for retry after connection lost +RECONNECT_TRIES = 5 # number of tries to reconnect before giving up + class EventReporter: """ @@ -81,10 +85,28 @@ def __init__( self.monitor_thread = None self.encoding = encoding + @property + def has_monitor_thread(self): + """ + Returns True if a monitor-thread has been created. + That should be the case after calling start() and before calling stop(). + """ + return bool(self.monitor_thread) + + @property + def is_alive(self): + """ + Returns True if there is a monitor-thread and the thread is running. + Returns False otherwise. + """ + return self.has_monitor_thread and self.monitor_thread.is_alive() + def start( self, queue_size=FRITZ_MONITOR_QUEUE_SIZE, block_on_filled_queue=False, + reconnect_delay=RECONNECT_DELAY, + reconnect_tries=RECONNECT_TRIES, sock=None, ): """ @@ -104,6 +126,8 @@ def start( "monitor_queue": monitor_queue, "sock": sock, "block_on_filled_queue": block_on_filled_queue, + "reconnect_delay": reconnect_delay, + "reconnect_tries": reconnect_tries, } # clear event object in case the instance gets 'reused': self.stop_flag.clear() @@ -137,7 +161,35 @@ def _get_connected_socket(self, sock=None): raise OSError(msg) return sock - def _monitor(self, monitor_queue, sock, block_on_filled_queue): + def _reconnect_socket( + self, sock, reconnect_delay=RECONNECT_DELAY, reconnect_tries=RECONNECT_TRIES + ): + """ + Try to reconnect a lost connection on the given socket. + Returns True on success and False otherwise. + """ + while reconnect_tries > 0: + time.sleep(reconnect_delay) + try: + self._get_connected_socket(sock) + except OSError: + reconnect_tries -= 1 + else: + return True + return False + + def _monitor( + self, + monitor_queue, + sock, + block_on_filled_queue, + reconnect_delay, + reconnect_tries, + ): + """ + The internal monitor routine running in a separate thread. + """ + # Instantiat an EventReporter to push event to the event_queue. event_reporter = EventReporter( monitor_queue=monitor_queue, block_on_filled_queue=block_on_filled_queue ) @@ -147,15 +199,28 @@ def _monitor(self, monitor_queue, sock, block_on_filled_queue): except socket.timeout: # without a timeout an open socket will never return from a # connection closed by a router (may be of limited resources). + # Therefore be sure to set a timeout at socket creation (elsewhere). # So just try again after timeout. continue if not raw_data: # empty response indicates a lost connection. # try to reconnect. - ... + success = self._reconnect_socket( + sock, + reconnect_delay=reconnect_delay, + reconnect_tries=reconnect_tries, + ) + if not success: + # reconnet has failed: terminate the thread + break else: # sock.recv returns a bytearray to decode: response = raw_data.decode(self.encoding) event_reporter.add(response) # clean up on terminating thread: - sock.close() + try: + sock.close() + except OSError: + pass + # reset monitor_thread to be able to restart the again + self.monitor_thread = None diff --git a/fritzconnection/tests/test_fritzmonitor.py b/fritzconnection/tests/test_fritzmonitor.py index c7f2220..afe6d8a 100644 --- a/fritzconnection/tests/test_fritzmonitor.py +++ b/fritzconnection/tests/test_fritzmonitor.py @@ -9,6 +9,10 @@ class MockSocket: + """ + A socket dummy to simulate receiving data and optional timeouts. + """ + def __init__(self, mock_data=None, timeout=None, raise_connect_timeout=False): self.mock_data = mock_data self.timeout = timeout @@ -34,6 +38,54 @@ def recv(self, chunk_size=None): return chunk.encode("utf-8") +class MockReconnectSocket(MockSocket): + """ + Dummy socket to simulate a single reconnect. + """ + + def __init__(self, mock_data=None): + self.data_provider = self._returner(mock_data) + self.connect_called_num = 0 + + def connect(self, *args): + self.connect_called_num += 1 + + def recv(self, chuck_size=None): + try: + data = next(self.data_provider) + except StopIteration: + data = " " + return data.encode("utf-8") + + @staticmethod + def _returner(mock_data): + for data in mock_data: + yield data + + +class MockReconnectFailSocket(MockReconnectSocket): + """ + Dummy socket to simulate multiple faild reconnections. + """ + + def __init__(self, mock_data=None, timeouts=0): + super().__init__(mock_data) + self._connector = self.connector(timeouts) + + def connect(self, *args): + super().connect(*args) + if not next(self._connector): + raise socket.timeout("mock reconnect timeout") + + @staticmethod + def connector(timeouts): + yield True + for _ in range(timeouts): + yield False + while True: + yield True + + def test_init_fritzmonitor(): fm = FritzMonitor() assert fm.monitor_thread == None @@ -65,9 +117,9 @@ def test_event_reporter(data, expected): def test_start_stop(): mock_socket = MockSocket(timeout=0.01) fm = FritzMonitor() - event_queue = fm.start(sock=mock_socket) - assert isinstance(event_queue, queue.Queue) - assert isinstance(fm.monitor_thread, threading.Thread) + assert fm.monitor_thread is None + fm.start(sock=mock_socket) + assert fm.monitor_thread is not None assert fm.monitor_thread.is_alive() is True thread = fm.monitor_thread fm.stop() @@ -75,11 +127,41 @@ def test_start_stop(): assert fm.monitor_thread is None -def test_start_twice(): +def test_start_stop_properties(): mock_socket = MockSocket(timeout=0.01) fm = FritzMonitor() + assert fm.has_monitor_thread is False + assert fm.is_alive is False _ = fm.start(sock=mock_socket) - pytest.raises(RuntimeError, fm.start) + assert fm.has_monitor_thread is True + assert fm.is_alive is True + fm.stop() + assert fm.has_monitor_thread is False + assert fm.is_alive is False + + +def test_queue_and_threading_instances(): + mock_socket = MockSocket(timeout=0.01) + fm = FritzMonitor() + event_queue = fm.start(sock=mock_socket) + assert isinstance(event_queue, queue.Queue) + assert isinstance(fm.monitor_thread, threading.Thread) + fm.stop() + + +def test_start_twice(): + """ + It is a failure to start a running instance again. + """ + mock_socket = MockSocket(timeout=0.01) + fm = FritzMonitor() + fm.start(sock=mock_socket) + with pytest.raises(RuntimeError): + # start running instance again: should raise a RuntimeError + fm.start(sock=mock_socket) + fm.stop() + # but starting now again should work: + fm.start(sock=mock_socket) fm.stop() @@ -142,3 +224,149 @@ def test_get_events(mock_data, chunk_size, expected_events): assert received_event_num == n fm.stop() assert thread.is_alive() is False + + +def test_reconnect(): + data = ["first\n", "", "second\n"] + mock_socket = MockReconnectSocket(data) + fm = FritzMonitor() + event_queue = fm.start(sock=mock_socket, reconnect_delay=0.001) + for expected in [data[0], data[2]]: + # should not raise _queue.Empty: + assert event_queue.get(timeout=0.1) == expected.strip() + fm.stop() + assert mock_socket.connect_called_num == 2 + + +@pytest.mark.parametrize( + "timeouts, tries, expected_result", + [ + (0, 0, True), + (0, 1, True), + (1, 1, False), + (1, 2, True), + (4, 5, True), + (5, 5, False), + (6, 5, False), + ], +) +def test_MockReconnectFailSocket(timeouts, tries, expected_result): + """ + Internal test to check whether the MockReconnectFailSocket class works as expected. + """ + sock = MockReconnectFailSocket(timeouts=timeouts) + assert sock.connect_called_num == 0 + sock.connect() + assert sock.connect_called_num == 1 + result = True # got connection + for cycle in range(tries): + try: + sock.connect() + except OSError: + result = False + else: + result = True + break + finally: + assert ( + sock.connect_called_num == cycle + 2 + ) # cycle is zero based plus initional connection + assert result == expected_result + + +@pytest.mark.parametrize( + "timeouts", list(range(6)), +) +def test__get_connected_socket(timeouts): + socket = MockReconnectFailSocket(timeouts=timeouts) + fm = FritzMonitor() + s = fm._get_connected_socket(sock=socket) # make initional connection + assert s == socket + for _ in range(timeouts): + with pytest.raises(OSError): + fm._get_connected_socket(sock=socket) + fm._get_connected_socket(sock=socket) + + +@pytest.mark.parametrize( + "timeouts, tries, expected_result", + [ + (0, 0, False), + (0, 1, True), + (1, 0, False), + (1, 1, False), + (1, 2, True), + (4, 5, True), + (5, 5, False), + (6, 5, False), + ], +) +def test__reconnect_socket(timeouts, tries, expected_result): + mock_socket = MockReconnectFailSocket(timeouts=timeouts) + fm = FritzMonitor() + socket = fm._get_connected_socket(sock=mock_socket) # make initional connection + result = fm._reconnect_socket( + sock=socket, reconnect_delay=0.001, reconnect_tries=tries + ) + assert result == expected_result + + +@pytest.mark.parametrize( + "data, timeouts, tries, success", + [ + (["first\n", "second\n"], 0, 0, True), + (["first\n", "", "second\n"], 1, 0, False), + (["first\n", "", "second\n"], 0, 1, True), + (["first\n", "", "second\n"], 1, 1, False), + (["first\n", "", "second\n"], 1, 2, True), + # default for tries: 5 + (["first\n", "", "second\n"], 3, 5, True), + (["first\n", "", "second\n"], 4, 5, True), + (["first\n", "", "second\n"], 5, 5, False), + ], +) +def test_terminate_thread_on_failed_reconnection(data, timeouts, tries, success): + """ + Check for thread-termination in case reconnection fails. + """ + mock_socket = MockReconnectFailSocket(data, timeouts=timeouts) + fm = FritzMonitor() + fm.start(sock=mock_socket, reconnect_delay=0.001, reconnect_tries=tries) + # give thread some time: + time.sleep(0.01) + if success: + assert fm.is_alive is True + else: + assert fm.is_alive is False + assert fm.monitor_thread is None + fm.stop() + + +def test_restart_failed_monitor(): + """ + Check whether a fritzmonitor instance with a lost connection can get started again. + Starting the same instance twice does (and should) not work. + See test_start_twice(). + But after a failed reconnect (a lost connection) the same instance without calling stop() + """ + socket = MockReconnectFailSocket( + mock_data=["first\n", "", "second\n"], timeouts=16 + ) # just some timeouts + fm = FritzMonitor() + fm.start( + sock=socket, reconnect_delay=0.001, reconnect_tries=5 + ) # set default explicit for clarity + # give socket some time to lose connection: + time.sleep(0.01) + assert fm.is_alive is False + assert fm.stop_flag.is_set() is False + # dont' call stop here! + # fm.stop() + socket = MockSocket(timeout=0.01) # socket not losing connection + # should not trigger a RuntimeError + fm.start( + sock=socket, reconnect_delay=0.001, reconnect_tries=5 + ) # set default explicit for clarity + assert fm.is_alive is True + fm.stop() +