Skip to content

Commit

Permalink
fix up disconnection logic
Browse files Browse the repository at this point in the history
Also, fine tune timeouts for initial directory connections. The default
TCP timeout in twisted TCP4Endpoints being 30 seconds is too slow, we
are now at 10s which seems reasonable; but the timeout of trying to
connect before declaring that message channel setup must continue
without a particular directory is different; so we wait a maximum of 22
seconds (2 tries at the 10s timeout, with breathing room), before giving
up on that directory for initial startup; but *not* giving up re-trying
to connect to that directory.
Additionally, if after that initial 22 seconds we have failed to
handshake with *any* directory, we give an error message that the onion
message channel is not functional, and stop trying to do anything.
  • Loading branch information
AdamISZ committed Apr 1, 2022
1 parent 755b508 commit 3a428fa
Showing 1 changed file with 37 additions and 9 deletions.
46 changes: 37 additions & 9 deletions jmdaemon/jmdaemon/onionmc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

NOT_SERVING_ONION_HOSTNAME = "NOT-SERVING-ONION"

# How many seconds to wait before treating an onion
# as unreachable
CONNECT_TO_ONION_TIMEOUT = 10

def location_tuple_to_str(t: Tuple[str, int]) -> str:
return f"{t[0]}:{t[1]}"

Expand Down Expand Up @@ -191,6 +195,13 @@ def register_disconnection(self, p: OnionLineProtocol) -> None:
return
del self.peers[peer_location]

def disconnect_inbound_peer(self, inbound_peer_str: str) -> None:
if inbound_peer_str not in self.peers:
log.warn("cannot disconnect peer at {}, not found".format(
inbound_peer_str))
proto = self.peers[inbound_peer_str]
proto.transport.loseConnection()

def receive_message(self, message: OnionCustomMessage,
p: OnionLineProtocol) -> None:
self.client.receive_msg(message, network_addr_to_string(
Expand Down Expand Up @@ -470,11 +481,12 @@ def connect(self) -> None:
self.tcp_connector = reactor.connectTCP(self.hostname, self.port,
self.factory)
else:
# non-default timeout; needs to be OOM lower than our
# non-default timeout; needs to be much lower than our
# 'wait at least a minute for the IRC connections to come up',
# which is used for *all* message channels, together.
torEndpoint = TCP4ClientEndpoint(reactor, self.socks5_host,
self.socks5_port, timeout=10)
self.socks5_port,
timeout=CONNECT_TO_ONION_TIMEOUT)
onionEndpoint = TorSocksEndpoint(torEndpoint, self.hostname,
self.port)
self.reconnecting_service = ClientService(onionEndpoint, self.factory)
Expand Down Expand Up @@ -527,12 +539,14 @@ def disconnect(self) -> None:
if not (self.hostname and self.port > 0):
raise OnionPeerConnectionError(
"Cannot disconnect without host, port info")
d = self.factory.proto_client.transport.loseConnection()
d.addCallback(self.complete_disconnection)
d.addErrback(log.warn, "Failed to disconnect from peer {}.".format(
self.peer_location()))
if self.factory:
d = self.reconnecting_service.stopService()
d.addCallback(self.complete_disconnection)
else:
self.messagechannel.proto_factory.disconnect_inbound_peer(
self.alternate_location)

def complete_disconnection(self) -> None:
def complete_disconnection(self, r) -> None:
log.debug("Disconnected from peer: {}".format(self.peer_location()))
self.update_status(PEER_STATUS_DISCONNECTED)
self.factory = None
Expand Down Expand Up @@ -1078,11 +1092,15 @@ def process_handshake(self, peerid: str, message: str,
# at all.
if not accepted:
log.warn("Directory: {} rejected our handshake.".format(peerid))
# explicitly choose to disconnect (if other side already did,
# this is no-op).
peer.disconnect()
return
if not (app_name == JM_APP_NAME and is_directory and JM_VERSION \
<= proto_max and JM_VERSION >= proto_min and accepted):
log.warn("Handshake from directory is incompatible or "
"rejected: {}".format(handshake_json))
peer.disconnect()
return
if not net == self.btc_network:
log.warn("Handshake from directory is on an incompatible "
Expand Down Expand Up @@ -1124,7 +1142,7 @@ def process_handshake(self, peerid: str, message: str,
if not net == self.btc_network:
log.warn("Handshake from peer is on an incompatible "
"network: {}".format(net))
return
accepted = False
# If accepted, we should update the peer to have the full
# location which in general will not yet be present, so as to
# allow publishing their location via `getpeerlist`. Note
Expand Down Expand Up @@ -1271,8 +1289,18 @@ def wait_for_directories(self) -> None:
# so we are guaranteed to have only directory peers.
if len(self.get_connected_directory_peers()) < len(self.peers):
self.directory_wait_counter += 1
if self.directory_wait_counter < 5: # < 50 seconds
# < 2*11 = 22 seconds; compare with CONNECT_TO_ONION_TIMEOUT;
# with current vals, we get to try twice before entirely
# giving up.
if self.directory_wait_counter < 11:
return
if len(self.get_connected_directory_peers()) == 0:
# at least one handshake must have succeeded, for us
# to continue.
log.error("We failed to connect and handshake with "
"ANY directories; onion messaging is not functioning.")
self.wait_for_directories_loop.stop()
return
# This is what triggers the start of taker/maker workflows.
# Note that even if the preceding (max) 50 seconds failed to
# connect all our configured dps, we will keep trying and they
Expand Down

0 comments on commit 3a428fa

Please sign in to comment.