-
Notifications
You must be signed in to change notification settings - Fork 87
refactor relay threads #71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ | |
import time | ||
from dataclasses import dataclass | ||
from queue import Queue | ||
from threading import Lock | ||
from threading import Lock, Thread | ||
from typing import Optional | ||
from websocket import WebSocketApp | ||
from .event import Event | ||
|
@@ -21,7 +21,6 @@ def to_json_object(self) -> dict[str, bool]: | |
|
||
|
||
|
||
|
||
@dataclass | ||
class RelayProxyConnectionConfig: | ||
host: Optional[str] = None | ||
|
@@ -37,15 +36,13 @@ class Relay: | |
policy: RelayPolicy = RelayPolicy() | ||
ssl_options: Optional[dict] = None | ||
proxy_config: RelayProxyConnectionConfig = None | ||
error_threshold: int = 5 | ||
|
||
def __post_init__(self): | ||
self.queue = Queue() | ||
self.outgoing_messages = Queue() | ||
self.subscriptions: dict[str, Subscription] = {} | ||
self.num_sent_events: int = 0 | ||
self.connected: bool = False | ||
self.reconnect: bool = True | ||
self.error_counter: int = 0 | ||
self.error_threshold: int = 0 | ||
self.lock: Lock = Lock() | ||
self.ws: WebSocketApp = WebSocketApp( | ||
self.url, | ||
|
@@ -54,42 +51,55 @@ def __post_init__(self): | |
on_error=self._on_error, | ||
on_close=self._on_close | ||
) | ||
self._connection_thread: Thread = None | ||
|
||
def connect(self): | ||
self.ws.run_forever( | ||
sslopt=self.ssl_options, | ||
http_proxy_host=self.proxy_config.host if self.proxy_config is not None else None, | ||
http_proxy_port=self.proxy_config.port if self.proxy_config is not None else None, | ||
proxy_type=self.proxy_config.type if self.proxy_config is not None else None, | ||
) | ||
def connect(self, is_reconnect=False): | ||
if not self.is_connected(): | ||
with self.lock: | ||
self._connection_thread = Thread( | ||
target=self.ws.run_forever, | ||
kwargs={ | ||
"sslopt": self.ssl_options, | ||
"http_proxy_host": self.proxy_config.host if self.proxy_config is not None else None, | ||
"http_proxy_port": self.proxy_config.port if self.proxy_config is not None else None, | ||
"proxy_type": self.proxy_config.type if self.proxy_config is not None else None | ||
}, | ||
name=f"{self.url}-connection" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. daemon=True should probably be set |
||
) | ||
self._connection_thread.start() | ||
|
||
if not is_reconnect: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be more direct here to safe this as self._outgoing_message_thread, and check that directly.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then no There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call. Will change this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Couldn't its instantiation also just be moved into the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It could be I don't think it makes sense to start the |
||
Thread( | ||
target=self.outgoing_messages_worker, | ||
name=f"{self.url}-outgoing-messages-worker", | ||
daemon=True | ||
).start() | ||
|
||
time.sleep(1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we sleep here? Shouldn't be necessary with the queue and the connection state anymore. |
||
|
||
def close(self): | ||
self.ws.close() | ||
if self.is_connected(): | ||
self.ws.close() | ||
|
||
def check_reconnect(self): | ||
try: | ||
self.close() | ||
except: | ||
pass | ||
self.connected = False | ||
if self.reconnect: | ||
time.sleep(1) | ||
self.connect() | ||
def is_connected(self) -> bool: | ||
with self.lock: | ||
if self._connection_thread is None or not self._connection_thread.is_alive(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you assume that the Websocket is connected because the thread is alive? The connection could've been dropped by the relay. Does the thread necessarily come ton a halt in case of an error? If that's the case, I prefer the reconnect inside the thread instead of spawning a new one (as it was done before). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, I don't think the thread is killed on an error. However, I am explicitly closing the connection after |
||
return False | ||
else: | ||
return True | ||
|
||
def publish(self, message: str): | ||
self.queue.put(message) | ||
self.outgoing_messages.put(message) | ||
|
||
def queue_worker(self): | ||
def outgoing_messages_worker(self): | ||
while True: | ||
if self.connected: | ||
message = self.queue.get() | ||
if self.is_connected(): | ||
message = self.outgoing_messages.get() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add comment here that Also obv verify that the blocking still allows this thread to terminate when the main thread exits. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it's necessary since that is the normal way any Queue would work. |
||
try: | ||
self.ws.send(message) | ||
self.num_sent_events += 1 | ||
except: | ||
self.queue.put(message) | ||
else: | ||
time.sleep(0.1) | ||
self.outgoing_messages.put(message) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Eventually more robust error-handling is probably needed here. If the message is just undeliverable for some reason, it's pointless to return it to the Queue. Maybe for now at least dump the traceback so we're aware of the failure. Something like:
(I forget which traceback calls do what w/formatting, presenting, etc) Also can't recall if prints from within threads always make it out to the console. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I disagree on the return into the Queue. Relays constantly drop connections and allow reconnect. This ensures that a message in the queue will be delivered when the connection is back up. Since we keep track of how often an error was encountered (with |
||
|
||
def add_subscription(self, id, filters: Filters): | ||
with self.lock: | ||
|
@@ -115,21 +125,18 @@ def to_json_object(self) -> dict: | |
} | ||
|
||
def _on_open(self, class_obj): | ||
self.connected = True | ||
pass | ||
|
||
def _on_close(self, class_obj, status_code, message): | ||
self.connected = False | ||
self.error_counter = 0 | ||
|
||
def _on_message(self, class_obj, message: str): | ||
self.message_pool.add_message(message, self.url) | ||
|
||
def _on_error(self, class_obj, error): | ||
self.connected = False | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand why this is removed. Errors are thrown when the relay disconnects the client. This is not a nostr error but a WebSocket error. I haven't encountered a case where an error was not a disconnect. |
||
self.error_counter += 1 | ||
if self.error_threshold and self.error_counter > self.error_threshold: | ||
pass | ||
else: | ||
self.check_reconnect() | ||
if self.error_counter > self.error_threshold: | ||
self.close() | ||
|
||
def _is_valid_message(self, message: str) -> bool: | ||
message = message.strip("\n") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,11 +20,20 @@ class RelayException(Exception): | |
|
||
@dataclass | ||
class RelayManager: | ||
connection_monitor_interval_secs: int = 5 | ||
|
||
def __post_init__(self): | ||
self.relays: dict[str, Relay] = {} | ||
self.message_pool: MessagePool = MessagePool() | ||
self.lock: Lock = Lock() | ||
|
||
threading.Thread( | ||
target=self._relay_connection_monitor, | ||
name="relay-connection-monitor", | ||
daemon=True | ||
).start() | ||
|
||
|
||
def add_relay( | ||
self, | ||
url: str, | ||
|
@@ -36,19 +45,7 @@ def add_relay( | |
|
||
with self.lock: | ||
self.relays[url] = relay | ||
|
||
threading.Thread( | ||
target=relay.connect, | ||
name=f"{relay.url}-thread" | ||
).start() | ||
|
||
threading.Thread( | ||
target=relay.queue_worker, | ||
name=f"{relay.url}-queue", | ||
daemon=True | ||
).start() | ||
|
||
time.sleep(1) | ||
relay.connect() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is much better! |
||
|
||
def remove_relay(self, url: str): | ||
with self.lock: | ||
|
@@ -109,3 +106,12 @@ def publish_event(self, event: Event): | |
for relay in self.relays.values(): | ||
if relay.policy.should_write: | ||
relay.publish(event.to_message()) | ||
|
||
def _relay_connection_monitor(self): | ||
while True: | ||
with self.lock: | ||
for relay in self.relays.values(): | ||
if not relay.is_connected(): | ||
relay.connect(True) | ||
|
||
time.sleep(self.connection_monitor_interval_secs) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could be the exponentially increasing sleep counter here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some DEBUG logging is probably going to be necessary to optionally enable monitoring the connect/reconnect cycles of each
Relay
. Need max visibility into what's going on when in threading hell.