Skip to content

Commit

Permalink
Fixed active peer detection
Browse files Browse the repository at this point in the history
  • Loading branch information
Jaakko Heusala committed Feb 9, 2025
1 parent 2144461 commit 91132d9
Showing 1 changed file with 18 additions and 23 deletions.
41 changes: 18 additions & 23 deletions wg-discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ def query_peer_data(url, max_retries=1):
def run_auto_discovery(wg_interface, local_port, use_sudo, discovery_interval, max_workers, max_retries):
"""
Optimized auto-discovery process:
1. Detect active peers.
1. Detect active peers (reachable via ping).
2. Identify a subset of active discovery peers (not all active peers are discovery peers).
3. Use discovery peers to find updated endpoints for inactive peers.
"""
Expand All @@ -381,45 +381,38 @@ def run_auto_discovery(wg_interface, local_port, use_sudo, discovery_interval, m
return

total_peers = len(allowed_ips_map)
active_peers = {}
discovery_peers = {}
reachable_peers = {} # Peers that respond to ping on their internal WireGuard IP
discovery_peers = {} # Peers that respond to GET /v1/endpoints
inactive_peers = {}
peer_query_results = {}
discovery_peer_responses = {}

# Detect active peers efficiently
# Detect ping-reachable peers using parallel execution
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_peer = {executor.submit(query_peer, peer_key, allowed_ip, local_port, max_retries): peer_key
future_to_peer = {executor.submit(is_internal_ip_reachable, allowed_ip): peer_key
for peer_key, allowed_ip in allowed_ips_map.items()}
for future in concurrent.futures.as_completed(future_to_peer):
peer_key = future_to_peer[future]
try:
_, allowed_ip, success, _ = future.result()
peer_query_results[peer_key] = success
if success:
active_peers[peer_key] = allowed_ip
except Exception as e:
logging.error("Error querying peer %s: %s", peer_key, e)
peer_query_results[peer_key] = False
if future.result():
reachable_peers[peer_key] = allowed_ips_map[peer_key]

# Identify active discovery peers and cache their responses
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_disc_peer = {executor.submit(query_peer_data, f"http://{allowed_ip}:{local_port}/v1/endpoints", max_retries): peer_key
for peer_key, allowed_ip in active_peers.items()}
future_to_disc_peer = {
executor.submit(query_peer_data, f"http://{allowed_ip}:{local_port}/v1/endpoints", max_retries): peer_key
for peer_key, allowed_ip in reachable_peers.items()
}
for future in concurrent.futures.as_completed(future_to_disc_peer):
peer_key = future_to_disc_peer[future]
try:
response = future.result()
if response:
discovery_peers[peer_key] = active_peers[peer_key]
discovery_peers[peer_key] = reachable_peers[peer_key]
discovery_peer_responses[peer_key] = response
except Exception as e:
logging.error("Error querying discovery peer %s: %s", peer_key, e)

# Identify inactive peers
for peer_key in remote_endpoints:
if peer_key not in active_peers:
inactive_peers[peer_key] = remote_endpoints[peer_key]
# Identify inactive peers (not reachable via ping)
inactive_peers = {peer_key: remote_endpoints[peer_key] for peer_key in remote_endpoints if peer_key not in reachable_peers}

# Attempt to update inactive peers using cached discovery responses
for peer_key, old_endpoint in inactive_peers.items():
Expand Down Expand Up @@ -449,8 +442,10 @@ def run_auto_discovery(wg_interface, local_port, use_sudo, discovery_interval, m
logging.error("Failed to update peer %s: %s", peer_key, e)

elapsed = time.time() - start_time
logging.info("Auto-discovery completed in %.2f seconds: total peers=%d, active=%d, discovery peers=%d, inactive=%d",
elapsed, total_peers, len(active_peers), len(discovery_peers), len(inactive_peers))
logging.info("Auto-discovery completed in %.2f seconds: total peers=%d, reachable=%d, discovery peers=%d, inactive=%d",
elapsed, total_peers, len(reachable_peers), len(discovery_peers), len(inactive_peers))

# Schedule next discovery
Timer(discovery_interval, run_auto_discovery, args=(wg_interface, local_port, use_sudo, discovery_interval, max_workers, max_retries)).start()


Expand Down

0 comments on commit 91132d9

Please sign in to comment.