Skip to content

Commit 31a0bac

Browse files
authored
Fixes: thread issue (#51)
* timeout for waiting on threads ending * fixes for exception-handling * using shutdown * option to suppress jsonrpc_logging in debug * breaking more than often the loop * remove unnecessary if
1 parent 4ea3221 commit 31a0bac

File tree

3 files changed

+65
-15
lines changed

3 files changed

+65
-15
lines changed

src/cryptoadvance/spectrum/elsock.py

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class ElectrumSocket:
4848
call_timeout = 10 # the most relevant timeout as it affects business-methods (using the call-method)
4949
sleep_ping_loop = 10 # every x seconds we test the ability to call (ping)
5050
tries_threshold = 3 # how many tries the ping might fail before it's giving up (monitor-loop will reestablish connection then)
51+
wait_on_exit_timeout= 120 # needs to be bigger than the socket_timeout
5152

5253
sleep_recv_loop = 0.01 # seconds , the shorter the better performance but 0.001 might be much worse
5354
sleep_write_loop = 0.01 # seconds , the shorter the better performance but 0.001 might be much worse
@@ -96,10 +97,16 @@ def __init__(
9697
self._call_timeout = (
9798
call_timeout if call_timeout else self.__class__.call_timeout
9899
)
100+
self.wait_on_exit_timeout = (
101+
self.__class__.call_timeout
102+
if self._socket_timeout * 3 < self.__class__.call_timeout
103+
else self._socket_timeout * 5
104+
)
99105

100106
self._results = {} # store results of the calls here
101107
self._requests = []
102108
self._notifications = []
109+
self._wanted_status = "ok" # "ok" or "down"
103110
# The monitor-thread will create the other threads
104111
self._monitor_thread = create_and_start_bg_thread(self._monitor_loop)
105112
while not (self.status == "ok" or self.status.startswith("broken_")):
@@ -109,6 +116,12 @@ def __init__(
109116
# therefore setting it at the very end:
110117
self._on_recreation_callback = socket_recreation_callback
111118

119+
def shutdown(self):
120+
self._wanted_status = "down"
121+
122+
def startup(self):
123+
self._wanted_status = "ok"
124+
112125
@property
113126
def status(self) -> str:
114127
"""Check the _monitor_loop for valid stati"""
@@ -309,31 +322,55 @@ def _monitor_loop(self):
309322
while self.thread_status[
310323
"all_alive"
311324
]: # most relevant is the ping_status
325+
if self._wanted_status != "ok":
326+
self.status = "broken_killing_threads"
327+
break
312328
time.sleep(1)
313329
self.status = "broken_killing_threads"
314-
logger.info(
315-
f"Issue with Electrum deteted, threads died: {','.join(self.thread_status['not_alive'])}"
316-
)
330+
if self._wanted_status != "down":
331+
logger.info(
332+
f"Issue with Electrum deteted, threads died: {','.join(self.thread_status['not_alive'])}"
333+
)
334+
else:
335+
logger.info(f"Shutting down ElectrumSocket ...")
317336

318337
if self.status == "broken_killing_threads":
319338
logger.info("trying to stop all threads ...")
320339
self.running = False
321340
# self._socket.setblocking(False)
322341
counter = 0
323342
log_frequency = 2
343+
start = time.time()
324344
while self.thread_status["any_alive"]:
325345
# Should we have a timeout? What to do then?
346+
wait_time = time.time() - start
347+
if wait_time > self.wait_on_exit_timeout:
348+
logger.error(
349+
f"Timeout waiting for threads: {' '.join(self.thread_status['alive'])}"
350+
)
351+
break
326352
if counter % log_frequency == 0:
327353
logger.info(
328-
f"Waiting for those threads to exit: {' '.join(self.thread_status['alive'])} ({counter}/{log_frequency})"
354+
f"Waiting for those threads to exit: {' '.join(self.thread_status['alive'])} ({counter}/{log_frequency}) ({wait_time}) ({self.socket_timeout})"
329355
)
330356
if counter > 10:
331357
log_frequency += 1
332358
time.sleep(5)
333359
counter += 1
334-
self.status = "creating_socket"
360+
if self._wanted_status == "down":
361+
self.status = "down"
362+
else:
363+
self.status = "creating_socket"
335364
self.running = True
336365

366+
if self.status == "down":
367+
logger.info(
368+
"ElSock shutdown. Waiting for further wanted_status requests"
369+
)
370+
while self._wanted_status == "down":
371+
time.sleep(1)
372+
self.status = "creating_socket"
373+
337374
except Exception as e:
338375
logger.error(
339376
"Monitoring Loop of Electrum-Socket got an Exception. This is critical if it's happening often!"
@@ -430,8 +467,9 @@ def recv_loop(self):
430467
# f"Timeout in recv-loop, happens in {timeout_counter}/{read_counter} * 100 = {timeout_counter/read_counter * 100 }% of all reads. "
431468
# )
432469
# logger.error(f"consider to increase socket_timeout which is currently {self._socket_timeout}")
433-
434470
while not data.endswith(b"\n"): # b"\n" is the end of the message
471+
if not self.running:
472+
break
435473
data += self._socket.recv(2048)
436474
# data looks like this:
437475
# b'{"jsonrpc": "2.0", "result": {"hex": "...", "height": 761086}, "id": 2210736436}\n'

src/cryptoadvance/spectrum/spectrum.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def __init__(
141141

142142
def stop(self):
143143
logger.info("Stopping Spectrum")
144-
del self.sock
144+
self.sock.shutdown()
145145

146146
def is_connected(self) -> bool:
147147
"""Returns True if there is a socket connection, False otherwise."""
@@ -178,7 +178,6 @@ def _sync(self):
178178
it calls a sync_script function to update the state. It also logs progress
179179
every 100 scripts subscribed to and updates self.progress_percent
180180
"""
181-
ts = 0
182181
try:
183182
if self.sock.status != "ok":
184183
logger.info("Syncprocess not starting, in offline-mode")
@@ -225,14 +224,14 @@ def _sync(self):
225224
if res != sc.state:
226225
self.sync_script(sc, res)
227226
self.progress_percent = 100
228-
except Exception as e:
229-
logger.exception(e)
230-
finally:
231-
self._sync_in_progress = False
232227
ts_diff_s = int((datetime.now() - ts).total_seconds())
233228
logger.info(
234229
f"Syncprocess finished syncing {all_scripts_len} scripts in {ts_diff_s} with {self.sync_speed} scripts/s)"
235230
)
231+
except Exception as e:
232+
logger.exception(e)
233+
finally:
234+
self._sync_in_progress = False
236235

237236
def sync(self, asyncc=True):
238237
if asyncc:
@@ -471,9 +470,10 @@ def jsonrpc(self, obj, wallet_name=None, catch_exceptions=True):
471470
method = obj.get("method")
472471
id = obj.get("id", 0)
473472
params = obj.get("params", [])
474-
logger.debug(
475-
f"RPC called {method} {'wallet_name: ' + wallet_name if wallet_name else ''}"
476-
)
473+
if not self.app.config.get("SUPPRESS_JSONRPC_LOGGING", False):
474+
logger.debug(
475+
f"RPC called {method} {'wallet_name: ' + wallet_name if wallet_name else ''}"
476+
)
477477
try:
478478
args = None
479479
kwargs = None

tests/test_spectrum_rpc.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,18 @@ def test_unknownmethod(caplog, client):
2424
assert result.status_code == 200
2525

2626

27+
def test_suppress_logging(caplog, client, app):
28+
caplog.set_level(logging.INFO)
29+
caplog.set_level(logging.DEBUG, logger="cryptoadvance.spectrum")
30+
result = client.post("/", json={"method": "getmininginfo"})
31+
assert result.status_code == 200
32+
assert "RPC called getmininginfo" in caplog.text
33+
app.config["SUPPRESS_JSONRPC_LOGGING"] = True
34+
result = client.post("/", json={"method": "getmininginfo"})
35+
assert result.status_code == 200
36+
assert caplog.text.count("RPC called getmininginfo") == 1
37+
38+
2739
def test_getmininginfo(caplog, client):
2840
caplog.set_level(logging.INFO)
2941
caplog.set_level(logging.DEBUG, logger="cryptoadvance.spectrum")

0 commit comments

Comments
 (0)