Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ Flask>=2.1.1
Flask-SQLAlchemy>=2.5.1
sqlalchemy>=1.4.42
psycopg2-binary
requests>=2.26.0
requests>=2.26.0
pysocks==1.7.1
187 changes: 152 additions & 35 deletions src/cryptoadvance/spectrum/elsock.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,57 @@
import logging
import random
import socket
import socks
import ssl
import sys
import threading
import time
from queue import Queue

from .util import FlaskThread, SpectrumException, handle_exception
from .util import FlaskThread, SpectrumInternalException, handle_exception

# TODO: normal handling of ctrl+C interrupt

logger = logging.getLogger(__name__)


class ElSockTimeoutException(Exception):
"""Called in different contexts where a timeout is relevant"""

pass


class ElectrumSocket:
"""An Electrum protocol implementation based on threads
Supports ssl, tor and uses callbacks for notification
and a callback if the socket has been recreated.

### Implementation description

This uses a _monitor_thread which is creating/controlling 4 other more technical threads:
* the write and recv threads are reading from self._requests and writing to self._results
( and writing to self._notifications for new blocks and new states of scripts )
* the notify thread is reading from self._notifications and callback for those
* the ping-loop is uses the call-method to ping the electrum server. If it's failing for tries_threshold
it'll exit

All of those threads are background-threads.

If any of the above threads exits (probably the ping-thread as some canary in the coalmine)
the monitor-loop will detect that and recreate everything (simply spoken).


"""

tries_threshold = 3 # how many tries the ping might fail before it's giving up
sleep_ping_loop = 10 # seconds
sleep_recv_loop = 0.01 # seconds
sleep_write_loop = 0.01
timeout = 10 # seconds for the call method
# fmt: off
call_timeout = 10 # the most relevant timeout as it affects business-methods (using the call-method)
sleep_ping_loop = 10 # every x seconds we test the ability to call (ping)
tries_threshold = 3 # how many tries the ping might fail before it's giving up (monitor-loop will reestablish connection then)

sleep_recv_loop = 0.01 # seconds , the shorter the better performance
sleep_write_loop = 0.01 # seconds , the shorter the better performance
socket_timeout = 10 # seconds for self._socket.recv(2048) (won't show up in the logs)
# fmt: on

def __init__(
self,
Expand All @@ -34,7 +61,9 @@ def __init__(
use_ssl=False,
callback=None,
socket_recreation_callback=None,
timeout=None,
socket_timeout=None,
call_timeout=None,
proxy_url=None,
):
"""
Initializes a new instance of the ElectrumSocket class.
Expand All @@ -49,16 +78,24 @@ def __init__(
Returns:
None
"""
logger.info(f"Initializing ElectrumSocket with {host}:{port} (ssl: {ssl})")
logger.info(
f"Initializing ElectrumSocket with {host}:{port} (ssl: {ssl}) (proxy: {proxy_url})"
)
self._host = host
self._port = port
self._use_ssl = use_ssl
self.proxy_url = proxy_url
assert type(self._host) == str
assert type(self._port) == int
assert type(self._use_ssl) == bool
self.running = True
self._callback = callback
self._timeout = timeout if timeout else self.__class__.timeout
self._socket_timeout = (
socket_timeout if socket_timeout else self.__class__.socket_timeout
)
self._call_timeout = (
call_timeout if call_timeout else self.__class__.call_timeout
)

self._results = {} # store results of the calls here
self._requests = []
Expand All @@ -72,35 +109,68 @@ def __init__(
self._on_recreation_callback = socket_recreation_callback

@property
def status(self):
def status(self) -> str:
"""Check the _monitor_loop for valid stati"""
if hasattr(self, "_status"):
return self._status
return "unknown"

@status.setter
def status(self, value):
def status(self, value: str):
"""Check the _monitor_loop for valid stati"""
logger.info(f"ElectrumSocket Status changed from {self.status} to {value}")
self._status = value

def _establish_socket(self):
@property
def uses_tor(self) -> bool:
"""Whether the underlying socket is using tor"""
if hasattr(self, "_uses_tor"):
return self._uses_tor
return False

@uses_tor.setter
def uses_tor(self, value: bool):
self._uses_tor = value

def _establish_socket(self) -> bool:
"""Establishes a new socket connection to the specified host and port.

If a socket connection already exists, it will be closed before creating a new one.
If SSL encryption is enabled, the socket will be wrapped with SSL.
The socket timeout is set to 5 seconds before connecting.
Once connected, the socket timeout is set to None, which means it is a blocking socket.
The socket_timeout is set to 5 seconds before connecting.
Once connected, the socket timeout is set to self._socket_timeout, which means it
is a non blocking socket.

Returns:
boolean if successfull
"""

# Just to be sure, maybe close it upfront
try:
# close if open
if hasattr(self, "_socket"):
if not self.is_socket_closed():
self._socket.close()
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# maybe use tor
if self.proxy_url:
try:
ip, port = parse_proxy_url(self.proxy_url)
socks.set_default_proxy(socks.PROXY_TYPE_SOCKS5, ip, port, True)
self.uses_tor = True
except SpectrumInternalException as e:
logger.error(f"Cannot use proxy_url : {e}")
self.uses_tor = False
else:
self.uses_tor = False
self._socket = (
socks.socksocket()
if self.uses_tor
else socket.socket(socket.AF_INET, socket.SOCK_STREAM)
)
logger.debug(f"socket created : {self._socket}")

# maybe use ssl
if self._use_ssl:
self._socket = ssl.wrap_socket(self._socket)
logger.debug(f"socket wrapped : {self._socket}")
Expand All @@ -112,14 +182,18 @@ def _establish_socket(self):
logger.error(f"Internet connection might not be up: {e}")
return False
logger.debug(f"socket connected: {self._socket}")
self._socket.settimeout(20) # That means it's a BLOCKING socket
logger.info(f"Successfully created Socket {self._socket}")
self._socket.settimeout(
self._socket_timeout
) # That means it's a NON-BLOCKING socket
logger.info(
f"Successfully created Socket {self._socket} (ssl={self._use_ssl}/tor={self.uses_tor})"
)
return True
except Exception as e:
logger.exception(e)
return False

def _create_threads(self):
def _create_threads(self) -> bool:
"""
Creates and starts the threads for:
* receiving notifications
Expand All @@ -139,7 +213,7 @@ def _create_threads(self):
logger.exception()
return False

def is_socket_closed(self):
def is_socket_closed(self) -> bool:
"""Checks whether the socket connection is closed or not.

Returns:
Expand All @@ -154,11 +228,20 @@ def is_socket_closed(self):

def _monitor_loop(self):
"""
The loop function for monitoring the socket connection.
If the ping thread is not alive, the socket connection and threads will be recreated.
An endless loop function for monitoring the socket connection.
If the ping thread is not alive, the socket connection and threads will be recreated via walking through
this state-machine:

[![](https://mermaid.ink/img/pako:eNqNkrFuwzAMRH_F4BjES0cPmdKxU7dGgcFITCJIFgOZKhoY_vcqVtoihWtUk8R7PAo4DqDZEDTQCwptLZ4idvX7kwpVPrvVvqrrTaUjodhwanvWjqSI7CbtENlRaJ31_kbIObOmL8i89rflr-Ij-OA8R96nzTrPa_8ZsPjpBZY-SCehNlKROU9H7w-oXelaACYDdrCGjmKH1uSAhluXAjlTRwqafDV0xORFgQpjRtPF5AifjRWO0BzR97QGTMKv16ChkZjoC7rn_E1dMLwx_7xpMnkpmzEtyPgJFpXCuw?type=png)](https://mermaid-js.github.io/mermaid-live-editor/edit#pako:eNqNkrFuwzAMRH_F4BjES0cPmdKxU7dGgcFITCJIFgOZKhoY_vcqVtoihWtUk8R7PAo4DqDZEDTQCwptLZ4idvX7kwpVPrvVvqrrTaUjodhwanvWjqSI7CbtENlRaJ31_kbIObOmL8i89rflr-Ij-OA8R96nzTrPa_8ZsPjpBZY-SCehNlKROU9H7w-oXelaACYDdrCGjmKH1uSAhluXAjlTRwqafDV0xORFgQpjRtPF5AifjRWO0BzR97QGTMKv16ChkZjoC7rn_E1dMLwx_7xpMnkpmzEtyPgJFpXCuw)

The states are stored in the `ElectrumSocket.state` property. The Constructor of the `ElectrumSocket` is hardly doing more than just setting up the `_monitor_thread` which is an endless loop going through these states:
* `creating_sockets` will create the sockets and pass to `creating_threads` or to `broken_creating_sockets` if that fails
* `broken_creating_sockets` will try to create the socket and sleep for some time if that fails (and endlessly try to do that)
* `creating_threads` will create the write/recv/ping/notify threads and start them
* `execute_recreation_callback` will call that callback after setting the status to `ok`
* the `ok` state will now simply check the other thready and if one of them is no longer alive (probably the ping-thread as he will exit if ping fails for 4 times) it will transition to `broken_killing_threads`
* `broken_killing_threads` will set `self.running` to false and wait for the threads to terminate. Especially the `recv` thread might not terminate until he get internet connection (again). This might take forever. If all threads are terminated, it will transition to `creating_socket`

Returns:
None
"""

self.status = "creating_socket"
Expand Down Expand Up @@ -244,7 +327,7 @@ def _monitor_loop(self):
) # to prevent high cpu load if this exception will occur endlessly

@property
def thread_status(self):
def thread_status(self) -> dict:
"""Returning a handy dict containing all informations about the current
thread_status.
e.g.:
Expand Down Expand Up @@ -285,9 +368,6 @@ def thread_status(self):
def _write_loop(self):
"""
The loop function for writing requests to the Electrum server.

Returns:
None
"""
sleep = self.sleep_write_loop
while self.running:
Expand Down Expand Up @@ -315,8 +395,25 @@ def recv_loop(self):
None
"""
sleep = self.sleep_recv_loop # This probably heavily impacts the sync-time
read_counter = 0
timeout_counter = 0
while self.running:
data = self._socket.recv(2048)
try:
data = self._socket.recv(2048)
read_counter += 1
except TimeoutError:
pass
# This might happen quite often as we're using a non-blocking socket here.
# And if no data is there to read from and the timeout is reached, we'll
# get this error. However it's not a real error-condition (imho)

# As i'm not 100% sure about that stuff, i'll keep that code around to uncomment any time:
# timeout_counter += 1
# logger.error(
# f"Timeout in recv-loop, happens in {timeout_counter}/{read_counter} * 100 = {timeout_counter/read_counter * 100 }% of all reads. "
# )
# logger.error(f"consider to increase socket_timeout which is currently {self._socket_timeout}")

while not data.endswith(b"\n"): # b"\n" is the end of the message
data += self._socket.recv(2048)
# data looks like this:
Expand Down Expand Up @@ -352,7 +449,7 @@ def _ping_loop(self):
except ElSockTimeoutException as e:
tries = tries + 1
logger.error(
f"Error in ping-loop ({tries}th time, next try in {self.sleep_ping_loop} seconds if threshold not met"
f"Timeout in ping-loop ({tries}th time, next try in {self.sleep_ping_loop} seconds if threshold not met"
)
if tries > self.tries_threshold:
logger.error(
Expand All @@ -378,17 +475,20 @@ def notify(self, data):
else:
logger.debug("Notification:", data)

def call(self, method, params=[]):
def call(self, method, params=[]) -> dict:
"""
Calls a method on the Electrum server and returns the response.

Args:
- method (str): The name of the method to call on the Electrum server.
- *params: The parameters to pass to the method.
- timeout (float): The timeout for the request. If not specified, the default timeout of the ElectrumSocket instance will be used.


Returns:
dict: The response from the Electrum server.

might raise a ElSockTimeoutException if self._call_timeout is over

"""
uid = random.randint(0, 1 << 32)
obj = {"jsonrpc": "2.0", "method": method, "params": params, "id": uid}
Expand All @@ -398,9 +498,9 @@ def call(self, method, params=[]):
while uid not in self._results: # wait for response
# time.sleep(1)
time.sleep(0.01)
if time.time() - start > self._timeout:
if time.time() - start > self._call_timeout:
raise ElSockTimeoutException(
f"Timeout ({self._timeout} seconds) waiting for {method} on {self._socket}"
f"Timeout in call ({self._call_timeout} seconds) waiting for {method} on {self._socket}"
)
res = self._results.pop(uid)
if "error" in res:
Expand All @@ -415,10 +515,11 @@ def ping(self):

def __del__(self):
logger.info("Closing socket ...")
self._socket.close()
if hasattr(self, "_socket"):
self._socket.close()


def create_and_start_bg_thread(func):
def create_and_start_bg_thread(func) -> FlaskThread:
"""Creates and starts a new background thread that executes the given function.

The thread is started as a daemon thread, which means it will automatically terminate
Expand All @@ -435,3 +536,19 @@ def create_and_start_bg_thread(func):
thread.start()
logger.info(f"Started bg thread for {func.__name__}")
return thread


def parse_proxy_url(proxy_url: str):
"""A proxy_url like socks5h://localhost:9050 will get parsed and returned into something like:
[ "localhost", "9050"]
the url HAS to start with socks5h
"""
if not proxy_url.startswith("socks5h://"):
raise SpectrumInternalException(f"Wrong schema for proxy_url: {proxy_url}")
proxy_url = proxy_url.replace("socks5h://", "")
arr = proxy_url.split(":")
if len(arr) != 2:
raise SpectrumInternalException(
f"Wrong uri has more than one ':' : {proxy_url}"
)
return arr[0], arr[1]
Loading