Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 44 additions & 6 deletions src/cryptoadvance/spectrum/elsock.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ElectrumSocket:
call_timeout = 10 # the most relevant timeout as it affects business-methods (using the call-method)
sleep_ping_loop = 10 # every x seconds we test the ability to call (ping)
tries_threshold = 3 # how many tries the ping might fail before it's giving up (monitor-loop will reestablish connection then)
wait_on_exit_timeout= 120 # needs to be bigger than the socket_timeout

sleep_recv_loop = 0.01 # seconds , the shorter the better performance but 0.001 might be much worse
sleep_write_loop = 0.01 # seconds , the shorter the better performance but 0.001 might be much worse
Expand Down Expand Up @@ -96,10 +97,16 @@ def __init__(
self._call_timeout = (
call_timeout if call_timeout else self.__class__.call_timeout
)
self.wait_on_exit_timeout = (
self.__class__.call_timeout
if self._socket_timeout * 3 < self.__class__.call_timeout
else self._socket_timeout * 5
)

self._results = {} # store results of the calls here
self._requests = []
self._notifications = []
self._wanted_status = "ok" # "ok" or "down"
# The monitor-thread will create the other threads
self._monitor_thread = create_and_start_bg_thread(self._monitor_loop)
while not (self.status == "ok" or self.status.startswith("broken_")):
Expand All @@ -109,6 +116,12 @@ def __init__(
# therefore setting it at the very end:
self._on_recreation_callback = socket_recreation_callback

def shutdown(self):
self._wanted_status = "down"

def startup(self):
self._wanted_status = "ok"

@property
def status(self) -> str:
"""Check the _monitor_loop for valid stati"""
Expand Down Expand Up @@ -309,31 +322,55 @@ def _monitor_loop(self):
while self.thread_status[
"all_alive"
]: # most relevant is the ping_status
if self._wanted_status != "ok":
self.status = "broken_killing_threads"
break
time.sleep(1)
self.status = "broken_killing_threads"
logger.info(
f"Issue with Electrum deteted, threads died: {','.join(self.thread_status['not_alive'])}"
)
if self._wanted_status != "down":
logger.info(
f"Issue with Electrum deteted, threads died: {','.join(self.thread_status['not_alive'])}"
)
else:
logger.info(f"Shutting down ElectrumSocket ...")

if self.status == "broken_killing_threads":
logger.info("trying to stop all threads ...")
self.running = False
# self._socket.setblocking(False)
counter = 0
log_frequency = 2
start = time.time()
while self.thread_status["any_alive"]:
# Should we have a timeout? What to do then?
wait_time = time.time() - start
if wait_time > self.wait_on_exit_timeout:
logger.error(
f"Timeout waiting for threads: {' '.join(self.thread_status['alive'])}"
)
break
if counter % log_frequency == 0:
logger.info(
f"Waiting for those threads to exit: {' '.join(self.thread_status['alive'])} ({counter}/{log_frequency})"
f"Waiting for those threads to exit: {' '.join(self.thread_status['alive'])} ({counter}/{log_frequency}) ({wait_time}) ({self.socket_timeout})"
)
if counter > 10:
log_frequency += 1
time.sleep(5)
counter += 1
self.status = "creating_socket"
if self._wanted_status == "down":
self.status = "down"
else:
self.status = "creating_socket"
self.running = True

if self.status == "down":
logger.info(
"ElSock shutdown. Waiting for further wanted_status requests"
)
while self._wanted_status == "down":
time.sleep(1)
self.status = "creating_socket"

except Exception as e:
logger.error(
"Monitoring Loop of Electrum-Socket got an Exception. This is critical if it's happening often!"
Expand Down Expand Up @@ -430,8 +467,9 @@ def recv_loop(self):
# f"Timeout in recv-loop, happens in {timeout_counter}/{read_counter} * 100 = {timeout_counter/read_counter * 100 }% of all reads. "
# )
# logger.error(f"consider to increase socket_timeout which is currently {self._socket_timeout}")

while not data.endswith(b"\n"): # b"\n" is the end of the message
if not self.running:
break
data += self._socket.recv(2048)
# data looks like this:
# b'{"jsonrpc": "2.0", "result": {"hex": "...", "height": 761086}, "id": 2210736436}\n'
Expand Down
18 changes: 9 additions & 9 deletions src/cryptoadvance/spectrum/spectrum.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def __init__(

def stop(self):
logger.info("Stopping Spectrum")
del self.sock
self.sock.shutdown()

def is_connected(self) -> bool:
"""Returns True if there is a socket connection, False otherwise."""
Expand Down Expand Up @@ -178,7 +178,6 @@ def _sync(self):
it calls a sync_script function to update the state. It also logs progress
every 100 scripts subscribed to and updates self.progress_percent
"""
ts = 0
try:
if self.sock.status != "ok":
logger.info("Syncprocess not starting, in offline-mode")
Expand Down Expand Up @@ -225,14 +224,14 @@ def _sync(self):
if res != sc.state:
self.sync_script(sc, res)
self.progress_percent = 100
except Exception as e:
logger.exception(e)
finally:
self._sync_in_progress = False
ts_diff_s = int((datetime.now() - ts).total_seconds())
logger.info(
f"Syncprocess finished syncing {all_scripts_len} scripts in {ts_diff_s} with {self.sync_speed} scripts/s)"
)
except Exception as e:
logger.exception(e)
finally:
self._sync_in_progress = False

def sync(self, asyncc=True):
if asyncc:
Expand Down Expand Up @@ -471,9 +470,10 @@ def jsonrpc(self, obj, wallet_name=None, catch_exceptions=True):
method = obj.get("method")
id = obj.get("id", 0)
params = obj.get("params", [])
logger.debug(
f"RPC called {method} {'wallet_name: ' + wallet_name if wallet_name else ''}"
)
if not self.app.config.get("SUPPRESS_JSONRPC_LOGGING", False):
logger.debug(
f"RPC called {method} {'wallet_name: ' + wallet_name if wallet_name else ''}"
)
try:
args = None
kwargs = None
Expand Down
12 changes: 12 additions & 0 deletions tests/test_spectrum_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ def test_unknownmethod(caplog, client):
assert result.status_code == 200


def test_suppress_logging(caplog, client, app):
caplog.set_level(logging.INFO)
caplog.set_level(logging.DEBUG, logger="cryptoadvance.spectrum")
result = client.post("/", json={"method": "getmininginfo"})
assert result.status_code == 200
assert "RPC called getmininginfo" in caplog.text
app.config["SUPPRESS_JSONRPC_LOGGING"] = True
result = client.post("/", json={"method": "getmininginfo"})
assert result.status_code == 200
assert caplog.text.count("RPC called getmininginfo") == 1


def test_getmininginfo(caplog, client):
caplog.set_level(logging.INFO)
caplog.set_level(logging.DEBUG, logger="cryptoadvance.spectrum")
Expand Down