Skip to content

Commit a0072d3

Browse files
authored
Feature: Implementing tor connections (#49)
* Implementing tor connections * added sock dependency * improvements and documentation * polishing and tests
1 parent 8fcde0a commit a0072d3

File tree

6 files changed

+246
-47
lines changed

6 files changed

+246
-47
lines changed

requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ Flask>=2.1.1
33
Flask-SQLAlchemy>=2.5.1
44
sqlalchemy>=1.4.42
55
psycopg2-binary
6-
requests>=2.26.0
6+
requests>=2.26.0
7+
pysocks==1.7.1

src/cryptoadvance/spectrum/elsock.py

Lines changed: 152 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,57 @@
22
import logging
33
import random
44
import socket
5+
import socks
56
import ssl
67
import sys
78
import threading
89
import time
910
from queue import Queue
1011

11-
from .util import FlaskThread, SpectrumException, handle_exception
12+
from .util import FlaskThread, SpectrumInternalException, handle_exception
1213

1314
# TODO: normal handling of ctrl+C interrupt
1415

1516
logger = logging.getLogger(__name__)
1617

1718

1819
class ElSockTimeoutException(Exception):
20+
"""Called in different contexts where a timeout is relevant"""
21+
1922
pass
2023

2124

2225
class ElectrumSocket:
26+
"""An Electrum protocol implementation based on threads
27+
Supports ssl, tor and uses callbacks for notification
28+
and a callback if the socket has been recreated.
29+
30+
### Implementation description
31+
32+
This uses a _monitor_thread which is creating/controlling 4 other more technical threads:
33+
* the write and recv threads are reading from self._requests and writing to self._results
34+
( and writing to self._notifications for new blocks and new states of scripts )
35+
* the notify thread is reading from self._notifications and callback for those
36+
* the ping-loop is uses the call-method to ping the electrum server. If it's failing for tries_threshold
37+
it'll exit
38+
39+
All of those threads are background-threads.
40+
41+
If any of the above threads exits (probably the ping-thread as some canary in the coalmine)
42+
the monitor-loop will detect that and recreate everything (simply spoken).
43+
44+
45+
"""
2346

24-
tries_threshold = 3 # how many tries the ping might fail before it's giving up
25-
sleep_ping_loop = 10 # seconds
26-
sleep_recv_loop = 0.01 # seconds
27-
sleep_write_loop = 0.01
28-
timeout = 10 # seconds for the call method
47+
# fmt: off
48+
call_timeout = 10 # the most relevant timeout as it affects business-methods (using the call-method)
49+
sleep_ping_loop = 10 # every x seconds we test the ability to call (ping)
50+
tries_threshold = 3 # how many tries the ping might fail before it's giving up (monitor-loop will reestablish connection then)
51+
52+
sleep_recv_loop = 0.01 # seconds , the shorter the better performance
53+
sleep_write_loop = 0.01 # seconds , the shorter the better performance
54+
socket_timeout = 10 # seconds for self._socket.recv(2048) (won't show up in the logs)
55+
# fmt: on
2956

3057
def __init__(
3158
self,
@@ -34,7 +61,9 @@ def __init__(
3461
use_ssl=False,
3562
callback=None,
3663
socket_recreation_callback=None,
37-
timeout=None,
64+
socket_timeout=None,
65+
call_timeout=None,
66+
proxy_url=None,
3867
):
3968
"""
4069
Initializes a new instance of the ElectrumSocket class.
@@ -49,16 +78,24 @@ def __init__(
4978
Returns:
5079
None
5180
"""
52-
logger.info(f"Initializing ElectrumSocket with {host}:{port} (ssl: {ssl})")
81+
logger.info(
82+
f"Initializing ElectrumSocket with {host}:{port} (ssl: {ssl}) (proxy: {proxy_url})"
83+
)
5384
self._host = host
5485
self._port = port
5586
self._use_ssl = use_ssl
87+
self.proxy_url = proxy_url
5688
assert type(self._host) == str
5789
assert type(self._port) == int
5890
assert type(self._use_ssl) == bool
5991
self.running = True
6092
self._callback = callback
61-
self._timeout = timeout if timeout else self.__class__.timeout
93+
self._socket_timeout = (
94+
socket_timeout if socket_timeout else self.__class__.socket_timeout
95+
)
96+
self._call_timeout = (
97+
call_timeout if call_timeout else self.__class__.call_timeout
98+
)
6299

63100
self._results = {} # store results of the calls here
64101
self._requests = []
@@ -72,35 +109,68 @@ def __init__(
72109
self._on_recreation_callback = socket_recreation_callback
73110

74111
@property
75-
def status(self):
112+
def status(self) -> str:
113+
"""Check the _monitor_loop for valid stati"""
76114
if hasattr(self, "_status"):
77115
return self._status
78116
return "unknown"
79117

80118
@status.setter
81-
def status(self, value):
119+
def status(self, value: str):
120+
"""Check the _monitor_loop for valid stati"""
82121
logger.info(f"ElectrumSocket Status changed from {self.status} to {value}")
83122
self._status = value
84123

85-
def _establish_socket(self):
124+
@property
125+
def uses_tor(self) -> bool:
126+
"""Whether the underlying socket is using tor"""
127+
if hasattr(self, "_uses_tor"):
128+
return self._uses_tor
129+
return False
130+
131+
@uses_tor.setter
132+
def uses_tor(self, value: bool):
133+
self._uses_tor = value
134+
135+
def _establish_socket(self) -> bool:
86136
"""Establishes a new socket connection to the specified host and port.
87137
88138
If a socket connection already exists, it will be closed before creating a new one.
89139
If SSL encryption is enabled, the socket will be wrapped with SSL.
90-
The socket timeout is set to 5 seconds before connecting.
91-
Once connected, the socket timeout is set to None, which means it is a blocking socket.
140+
The socket_timeout is set to 5 seconds before connecting.
141+
Once connected, the socket timeout is set to self._socket_timeout, which means it
142+
is a non blocking socket.
92143
93144
Returns:
94145
boolean if successfull
95146
"""
96147

97148
# Just to be sure, maybe close it upfront
98149
try:
150+
# close if open
99151
if hasattr(self, "_socket"):
100152
if not self.is_socket_closed():
101153
self._socket.close()
102-
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
154+
155+
# maybe use tor
156+
if self.proxy_url:
157+
try:
158+
ip, port = parse_proxy_url(self.proxy_url)
159+
socks.set_default_proxy(socks.PROXY_TYPE_SOCKS5, ip, port, True)
160+
self.uses_tor = True
161+
except SpectrumInternalException as e:
162+
logger.error(f"Cannot use proxy_url : {e}")
163+
self.uses_tor = False
164+
else:
165+
self.uses_tor = False
166+
self._socket = (
167+
socks.socksocket()
168+
if self.uses_tor
169+
else socket.socket(socket.AF_INET, socket.SOCK_STREAM)
170+
)
103171
logger.debug(f"socket created : {self._socket}")
172+
173+
# maybe use ssl
104174
if self._use_ssl:
105175
self._socket = ssl.wrap_socket(self._socket)
106176
logger.debug(f"socket wrapped : {self._socket}")
@@ -112,14 +182,18 @@ def _establish_socket(self):
112182
logger.error(f"Internet connection might not be up: {e}")
113183
return False
114184
logger.debug(f"socket connected: {self._socket}")
115-
self._socket.settimeout(20) # That means it's a BLOCKING socket
116-
logger.info(f"Successfully created Socket {self._socket}")
185+
self._socket.settimeout(
186+
self._socket_timeout
187+
) # That means it's a NON-BLOCKING socket
188+
logger.info(
189+
f"Successfully created Socket {self._socket} (ssl={self._use_ssl}/tor={self.uses_tor})"
190+
)
117191
return True
118192
except Exception as e:
119193
logger.exception(e)
120194
return False
121195

122-
def _create_threads(self):
196+
def _create_threads(self) -> bool:
123197
"""
124198
Creates and starts the threads for:
125199
* receiving notifications
@@ -139,7 +213,7 @@ def _create_threads(self):
139213
logger.exception()
140214
return False
141215

142-
def is_socket_closed(self):
216+
def is_socket_closed(self) -> bool:
143217
"""Checks whether the socket connection is closed or not.
144218
145219
Returns:
@@ -154,11 +228,20 @@ def is_socket_closed(self):
154228

155229
def _monitor_loop(self):
156230
"""
157-
The loop function for monitoring the socket connection.
158-
If the ping thread is not alive, the socket connection and threads will be recreated.
231+
An endless loop function for monitoring the socket connection.
232+
If the ping thread is not alive, the socket connection and threads will be recreated via walking through
233+
this state-machine:
234+
235+
[![](https://mermaid.ink/img/pako:eNqNkrFuwzAMRH_F4BjES0cPmdKxU7dGgcFITCJIFgOZKhoY_vcqVtoihWtUk8R7PAo4DqDZEDTQCwptLZ4idvX7kwpVPrvVvqrrTaUjodhwanvWjqSI7CbtENlRaJ31_kbIObOmL8i89rflr-Ij-OA8R96nzTrPa_8ZsPjpBZY-SCehNlKROU9H7w-oXelaACYDdrCGjmKH1uSAhluXAjlTRwqafDV0xORFgQpjRtPF5AifjRWO0BzR97QGTMKv16ChkZjoC7rn_E1dMLwx_7xpMnkpmzEtyPgJFpXCuw?type=png)](https://mermaid-js.github.io/mermaid-live-editor/edit#pako:eNqNkrFuwzAMRH_F4BjES0cPmdKxU7dGgcFITCJIFgOZKhoY_vcqVtoihWtUk8R7PAo4DqDZEDTQCwptLZ4idvX7kwpVPrvVvqrrTaUjodhwanvWjqSI7CbtENlRaJ31_kbIObOmL8i89rflr-Ij-OA8R96nzTrPa_8ZsPjpBZY-SCehNlKROU9H7w-oXelaACYDdrCGjmKH1uSAhluXAjlTRwqafDV0xORFgQpjRtPF5AifjRWO0BzR97QGTMKv16ChkZjoC7rn_E1dMLwx_7xpMnkpmzEtyPgJFpXCuw)
236+
237+
The states are stored in the `ElectrumSocket.state` property. The Constructor of the `ElectrumSocket` is hardly doing more than just setting up the `_monitor_thread` which is an endless loop going through these states:
238+
* `creating_sockets` will create the sockets and pass to `creating_threads` or to `broken_creating_sockets` if that fails
239+
* `broken_creating_sockets` will try to create the socket and sleep for some time if that fails (and endlessly try to do that)
240+
* `creating_threads` will create the write/recv/ping/notify threads and start them
241+
* `execute_recreation_callback` will call that callback after setting the status to `ok`
242+
* the `ok` state will now simply check the other thready and if one of them is no longer alive (probably the ping-thread as he will exit if ping fails for 4 times) it will transition to `broken_killing_threads`
243+
* `broken_killing_threads` will set `self.running` to false and wait for the threads to terminate. Especially the `recv` thread might not terminate until he get internet connection (again). This might take forever. If all threads are terminated, it will transition to `creating_socket`
159244
160-
Returns:
161-
None
162245
"""
163246

164247
self.status = "creating_socket"
@@ -244,7 +327,7 @@ def _monitor_loop(self):
244327
) # to prevent high cpu load if this exception will occur endlessly
245328

246329
@property
247-
def thread_status(self):
330+
def thread_status(self) -> dict:
248331
"""Returning a handy dict containing all informations about the current
249332
thread_status.
250333
e.g.:
@@ -285,9 +368,6 @@ def thread_status(self):
285368
def _write_loop(self):
286369
"""
287370
The loop function for writing requests to the Electrum server.
288-
289-
Returns:
290-
None
291371
"""
292372
sleep = self.sleep_write_loop
293373
while self.running:
@@ -315,8 +395,25 @@ def recv_loop(self):
315395
None
316396
"""
317397
sleep = self.sleep_recv_loop # This probably heavily impacts the sync-time
398+
read_counter = 0
399+
timeout_counter = 0
318400
while self.running:
319-
data = self._socket.recv(2048)
401+
try:
402+
data = self._socket.recv(2048)
403+
read_counter += 1
404+
except TimeoutError:
405+
pass
406+
# This might happen quite often as we're using a non-blocking socket here.
407+
# And if no data is there to read from and the timeout is reached, we'll
408+
# get this error. However it's not a real error-condition (imho)
409+
410+
# As i'm not 100% sure about that stuff, i'll keep that code around to uncomment any time:
411+
# timeout_counter += 1
412+
# logger.error(
413+
# f"Timeout in recv-loop, happens in {timeout_counter}/{read_counter} * 100 = {timeout_counter/read_counter * 100 }% of all reads. "
414+
# )
415+
# logger.error(f"consider to increase socket_timeout which is currently {self._socket_timeout}")
416+
320417
while not data.endswith(b"\n"): # b"\n" is the end of the message
321418
data += self._socket.recv(2048)
322419
# data looks like this:
@@ -352,7 +449,7 @@ def _ping_loop(self):
352449
except ElSockTimeoutException as e:
353450
tries = tries + 1
354451
logger.error(
355-
f"Error in ping-loop ({tries}th time, next try in {self.sleep_ping_loop} seconds if threshold not met"
452+
f"Timeout in ping-loop ({tries}th time, next try in {self.sleep_ping_loop} seconds if threshold not met"
356453
)
357454
if tries > self.tries_threshold:
358455
logger.error(
@@ -378,17 +475,20 @@ def notify(self, data):
378475
else:
379476
logger.debug("Notification:", data)
380477

381-
def call(self, method, params=[]):
478+
def call(self, method, params=[]) -> dict:
382479
"""
383480
Calls a method on the Electrum server and returns the response.
384481
385482
Args:
386483
- method (str): The name of the method to call on the Electrum server.
387484
- *params: The parameters to pass to the method.
388-
- timeout (float): The timeout for the request. If not specified, the default timeout of the ElectrumSocket instance will be used.
485+
389486
390487
Returns:
391488
dict: The response from the Electrum server.
489+
490+
might raise a ElSockTimeoutException if self._call_timeout is over
491+
392492
"""
393493
uid = random.randint(0, 1 << 32)
394494
obj = {"jsonrpc": "2.0", "method": method, "params": params, "id": uid}
@@ -398,9 +498,9 @@ def call(self, method, params=[]):
398498
while uid not in self._results: # wait for response
399499
# time.sleep(1)
400500
time.sleep(0.01)
401-
if time.time() - start > self._timeout:
501+
if time.time() - start > self._call_timeout:
402502
raise ElSockTimeoutException(
403-
f"Timeout ({self._timeout} seconds) waiting for {method} on {self._socket}"
503+
f"Timeout in call ({self._call_timeout} seconds) waiting for {method} on {self._socket}"
404504
)
405505
res = self._results.pop(uid)
406506
if "error" in res:
@@ -415,10 +515,11 @@ def ping(self):
415515

416516
def __del__(self):
417517
logger.info("Closing socket ...")
418-
self._socket.close()
518+
if hasattr(self, "_socket"):
519+
self._socket.close()
419520

420521

421-
def create_and_start_bg_thread(func):
522+
def create_and_start_bg_thread(func) -> FlaskThread:
422523
"""Creates and starts a new background thread that executes the given function.
423524
424525
The thread is started as a daemon thread, which means it will automatically terminate
@@ -435,3 +536,19 @@ def create_and_start_bg_thread(func):
435536
thread.start()
436537
logger.info(f"Started bg thread for {func.__name__}")
437538
return thread
539+
540+
541+
def parse_proxy_url(proxy_url: str):
542+
"""A proxy_url like socks5h://localhost:9050 will get parsed and returned into something like:
543+
[ "localhost", "9050"]
544+
the url HAS to start with socks5h
545+
"""
546+
if not proxy_url.startswith("socks5h://"):
547+
raise SpectrumInternalException(f"Wrong schema for proxy_url: {proxy_url}")
548+
proxy_url = proxy_url.replace("socks5h://", "")
549+
arr = proxy_url.split(":")
550+
if len(arr) != 2:
551+
raise SpectrumInternalException(
552+
f"Wrong uri has more than one ':' : {proxy_url}"
553+
)
554+
return arr[0], arr[1]

0 commit comments

Comments
 (0)