Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 44 additions & 37 deletions nostr/relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time
from dataclasses import dataclass
from queue import Queue
from threading import Lock
from threading import Lock, Thread
from typing import Optional
from websocket import WebSocketApp
from .event import Event
Expand All @@ -21,7 +21,6 @@ def to_json_object(self) -> dict[str, bool]:




@dataclass
class RelayProxyConnectionConfig:
host: Optional[str] = None
Expand All @@ -37,15 +36,13 @@ class Relay:
policy: RelayPolicy = RelayPolicy()
ssl_options: Optional[dict] = None
proxy_config: RelayProxyConnectionConfig = None
error_threshold: int = 5

def __post_init__(self):
self.queue = Queue()
self.outgoing_messages = Queue()
self.subscriptions: dict[str, Subscription] = {}
self.num_sent_events: int = 0
self.connected: bool = False
self.reconnect: bool = True
self.error_counter: int = 0
self.error_threshold: int = 0
self.lock: Lock = Lock()
self.ws: WebSocketApp = WebSocketApp(
self.url,
Expand All @@ -54,42 +51,55 @@ def __post_init__(self):
on_error=self._on_error,
on_close=self._on_close
)
self._connection_thread: Thread = None

def connect(self):
self.ws.run_forever(
sslopt=self.ssl_options,
http_proxy_host=self.proxy_config.host if self.proxy_config is not None else None,
http_proxy_port=self.proxy_config.port if self.proxy_config is not None else None,
proxy_type=self.proxy_config.type if self.proxy_config is not None else None,
)
def connect(self, is_reconnect=False):
if not self.is_connected():
with self.lock:
self._connection_thread = Thread(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some DEBUG logging is probably going to be necessary to optionally enable monitoring the connect/reconnect cycles of each Relay. Need max visibility into what's going on when in threading hell.

target=self.ws.run_forever,
kwargs={
"sslopt": self.ssl_options,
"http_proxy_host": self.proxy_config.host if self.proxy_config is not None else None,
"http_proxy_port": self.proxy_config.port if self.proxy_config is not None else None,
"proxy_type": self.proxy_config.type if self.proxy_config is not None else None
},
name=f"{self.url}-connection"
Copy link

@earonesty earonesty Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

daemon=True should probably be set

)
self._connection_thread.start()

if not is_reconnect:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be more direct here to safe this as self._outgoing_message_thread, and check that directly.

__init__():
  ...
  self._outgoing_message_thread = None

def connect(self):
  ...
  if not self._outgoing_message_thread:
    self._outgoing_message_thread = Thread(...)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then no is_reconnect needed

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Will change this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't its instantiation also just be moved into the __post_init__? Doesn't seem like there would ever a case where it would have to be re-created.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be I don't think it makes sense to start the outgoing_message_thread until the connection is made.

Thread(
target=self.outgoing_messages_worker,
name=f"{self.url}-outgoing-messages-worker",
daemon=True
).start()

time.sleep(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we sleep here? Shouldn't be necessary with the queue and the connection state anymore.


def close(self):
self.ws.close()
if self.is_connected():
self.ws.close()

def check_reconnect(self):
try:
self.close()
except:
pass
self.connected = False
if self.reconnect:
time.sleep(1)
self.connect()
def is_connected(self) -> bool:
with self.lock:
if self._connection_thread is None or not self._connection_thread.is_alive():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you assume that the Websocket is connected because the thread is alive? The connection could've been dropped by the relay. Does the thread necessarily come ton a halt in case of an error?

If that's the case, I prefer the reconnect inside the thread instead of spawning a new one (as it was done before).

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't think the thread is killed on an error. However, I am explicitly closing the connection after error_threshold is reached which does kill the thread.

return False
else:
return True

def publish(self, message: str):
self.queue.put(message)
self.outgoing_messages.put(message)

def queue_worker(self):
def outgoing_messages_worker(self):
while True:
if self.connected:
message = self.queue.get()
if self.is_connected():
message = self.outgoing_messages.get()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment here that Queue.get() blocks and waits by default. Was confusing to read this without that prior knowledge.

Also obv verify that the blocking still allows this thread to terminate when the main thread exits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's necessary since that is the normal way any Queue would work.

try:
self.ws.send(message)
self.num_sent_events += 1
except:
self.queue.put(message)
else:
time.sleep(0.1)
self.outgoing_messages.put(message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually more robust error-handling is probably needed here. If the message is just undeliverable for some reason, it's pointless to return it to the Queue. Maybe for now at least dump the traceback so we're aware of the failure. Something like:

except:
    import traceback
    traceback.print_tb()

(I forget which traceback calls do what w/formatting, presenting, etc)

Also can't recall if prints from within threads always make it out to the console.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I disagree on the return into the Queue. Relays constantly drop connections and allow reconnect. This ensures that a message in the queue will be delivered when the connection is back up.

Since we keep track of how often an error was encountered (with self.error_counter) we should do something with it. Right now it just stops reconnecting after it reaches self.error_threshold but an exponentially increasing reconnect sleep timer would be more elegant IMO.


def add_subscription(self, id, filters: Filters):
with self.lock:
Expand All @@ -115,21 +125,18 @@ def to_json_object(self) -> dict:
}

def _on_open(self, class_obj):
self.connected = True
pass

def _on_close(self, class_obj, status_code, message):
self.connected = False
self.error_counter = 0

def _on_message(self, class_obj, message: str):
self.message_pool.add_message(message, self.url)

def _on_error(self, class_obj, error):
self.connected = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this is removed. Errors are thrown when the relay disconnects the client. This is not a nostr error but a WebSocket error. I haven't encountered a case where an error was not a disconnect.

self.error_counter += 1
if self.error_threshold and self.error_counter > self.error_threshold:
pass
else:
self.check_reconnect()
if self.error_counter > self.error_threshold:
self.close()

def _is_valid_message(self, message: str) -> bool:
message = message.strip("\n")
Expand Down
32 changes: 19 additions & 13 deletions nostr/relay_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,20 @@ class RelayException(Exception):

@dataclass
class RelayManager:
connection_monitor_interval_secs: int = 5

def __post_init__(self):
self.relays: dict[str, Relay] = {}
self.message_pool: MessagePool = MessagePool()
self.lock: Lock = Lock()

threading.Thread(
target=self._relay_connection_monitor,
name="relay-connection-monitor",
daemon=True
).start()


def add_relay(
self,
url: str,
Expand All @@ -36,19 +45,7 @@ def add_relay(

with self.lock:
self.relays[url] = relay

threading.Thread(
target=relay.connect,
name=f"{relay.url}-thread"
).start()

threading.Thread(
target=relay.queue_worker,
name=f"{relay.url}-queue",
daemon=True
).start()

time.sleep(1)
relay.connect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is much better!


def remove_relay(self, url: str):
with self.lock:
Expand Down Expand Up @@ -109,3 +106,12 @@ def publish_event(self, event: Event):
for relay in self.relays.values():
if relay.policy.should_write:
relay.publish(event.to_message())

def _relay_connection_monitor(self):
while True:
with self.lock:
for relay in self.relays.values():
if not relay.is_connected():
relay.connect(True)

time.sleep(self.connection_monitor_interval_secs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be the exponentially increasing sleep counter here.