From 91132d96447268a5cba30dae7929a3ca3fab6c32 Mon Sep 17 00:00:00 2001 From: Jaakko Heusala Date: Sun, 9 Feb 2025 15:17:32 +0200 Subject: [PATCH] Fixed active peer detection --- wg-discovery.py | 41 ++++++++++++++++++----------------------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/wg-discovery.py b/wg-discovery.py index 5e90ee9..92d7cac 100644 --- a/wg-discovery.py +++ b/wg-discovery.py @@ -367,7 +367,7 @@ def query_peer_data(url, max_retries=1): def run_auto_discovery(wg_interface, local_port, use_sudo, discovery_interval, max_workers, max_retries): """ Optimized auto-discovery process: - 1. Detect active peers. + 1. Detect active peers (reachable via ping). 2. Identify a subset of active discovery peers (not all active peers are discovery peers). 3. Use discovery peers to find updated endpoints for inactive peers. """ @@ -381,45 +381,38 @@ def run_auto_discovery(wg_interface, local_port, use_sudo, discovery_interval, m return total_peers = len(allowed_ips_map) - active_peers = {} - discovery_peers = {} + reachable_peers = {} # Peers that respond to ping on their internal WireGuard IP + discovery_peers = {} # Peers that respond to GET /v1/endpoints inactive_peers = {} - peer_query_results = {} discovery_peer_responses = {} - # Detect active peers efficiently + # Detect ping-reachable peers using parallel execution with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - future_to_peer = {executor.submit(query_peer, peer_key, allowed_ip, local_port, max_retries): peer_key + future_to_peer = {executor.submit(is_internal_ip_reachable, allowed_ip): peer_key for peer_key, allowed_ip in allowed_ips_map.items()} for future in concurrent.futures.as_completed(future_to_peer): peer_key = future_to_peer[future] - try: - _, allowed_ip, success, _ = future.result() - peer_query_results[peer_key] = success - if success: - active_peers[peer_key] = allowed_ip - except Exception as e: - logging.error("Error querying peer %s: %s", peer_key, e) - peer_query_results[peer_key] = False + if future.result(): + reachable_peers[peer_key] = allowed_ips_map[peer_key] # Identify active discovery peers and cache their responses with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - future_to_disc_peer = {executor.submit(query_peer_data, f"http://{allowed_ip}:{local_port}/v1/endpoints", max_retries): peer_key - for peer_key, allowed_ip in active_peers.items()} + future_to_disc_peer = { + executor.submit(query_peer_data, f"http://{allowed_ip}:{local_port}/v1/endpoints", max_retries): peer_key + for peer_key, allowed_ip in reachable_peers.items() + } for future in concurrent.futures.as_completed(future_to_disc_peer): peer_key = future_to_disc_peer[future] try: response = future.result() if response: - discovery_peers[peer_key] = active_peers[peer_key] + discovery_peers[peer_key] = reachable_peers[peer_key] discovery_peer_responses[peer_key] = response except Exception as e: logging.error("Error querying discovery peer %s: %s", peer_key, e) - # Identify inactive peers - for peer_key in remote_endpoints: - if peer_key not in active_peers: - inactive_peers[peer_key] = remote_endpoints[peer_key] + # Identify inactive peers (not reachable via ping) + inactive_peers = {peer_key: remote_endpoints[peer_key] for peer_key in remote_endpoints if peer_key not in reachable_peers} # Attempt to update inactive peers using cached discovery responses for peer_key, old_endpoint in inactive_peers.items(): @@ -449,8 +442,10 @@ def run_auto_discovery(wg_interface, local_port, use_sudo, discovery_interval, m logging.error("Failed to update peer %s: %s", peer_key, e) elapsed = time.time() - start_time - logging.info("Auto-discovery completed in %.2f seconds: total peers=%d, active=%d, discovery peers=%d, inactive=%d", - elapsed, total_peers, len(active_peers), len(discovery_peers), len(inactive_peers)) + logging.info("Auto-discovery completed in %.2f seconds: total peers=%d, reachable=%d, discovery peers=%d, inactive=%d", + elapsed, total_peers, len(reachable_peers), len(discovery_peers), len(inactive_peers)) + + # Schedule next discovery Timer(discovery_interval, run_auto_discovery, args=(wg_interface, local_port, use_sudo, discovery_interval, max_workers, max_retries)).start()