Skip to content

Commit

Permalink
Testing default discovery port as 51880
Browse files Browse the repository at this point in the history
  • Loading branch information
Jaakko Heusala committed Feb 9, 2025
1 parent 13c6c9b commit 42f1ef3
Showing 1 changed file with 34 additions and 41 deletions.
75 changes: 34 additions & 41 deletions wg-discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,28 +332,28 @@ def log_message(self, fmt, *args):
logging.info("%s - %s", self.address_string(), fmt % args)


def query_peer(peer_key, allowed_ip, local_port, max_retries=1):
def query_peer(peer_key, addr, port, max_retries=1):
"""
Query a peer's /v1/endpoints URL.
Returns a tuple: (peer_key, allowed_ip, success flag, error message).
:rtype: object
Query a peer's /v1/endpoints URL using the configured remote HTTP port.
Returns a tuple: (peer_key, addr, success flag, error message).
"""
url = f"http://{allowed_ip}:{local_port}/v1/endpoints"
url = f"http://{addr}:{port}/v1/endpoints"
last_error = ""
for i in range(max_retries):
try:
with urllib.request.urlopen(url, timeout=5) as response:
if response.status == 200:
return peer_key, allowed_ip, True, ""
return peer_key, addr, True, ""
except Exception as e:
last_error = str(e)
return peer_key, allowed_ip, False, last_error
return peer_key, addr, False, last_error


def query_peer_data(url, max_retries=1):
def query_peer_data(allowed_ip, remote_port, max_retries=1):
"""
Query a given URL and return JSON data.
Query a given peer's discovery service URL and return JSON data.
"""
url = f"http://{allowed_ip}:{remote_port}/v1/endpoints"
for i in range(max_retries):
try:
with urllib.request.urlopen(url, timeout=5) as response:
Expand All @@ -364,7 +364,7 @@ def query_peer_data(url, max_retries=1):
return None


def run_auto_discovery(wg_interface, local_port, use_sudo, discovery_interval, max_workers, max_retries):
def run_auto_discovery(wg_interface, local_port, remote_port, use_sudo, discovery_interval, max_workers, max_retries):
"""
Optimized auto-discovery process:
1. Detect active peers (reachable via ping).
Expand All @@ -381,12 +381,11 @@ def run_auto_discovery(wg_interface, local_port, use_sudo, discovery_interval, m
return

total_peers = len(allowed_ips_map)
reachable_peers = {} # Peers that respond to ping on their internal WireGuard IP
discovery_peers = {} # Peers that respond to GET /v1/endpoints
inactive_peers = {}
reachable_peers = {}
discovery_peers = {}
discovery_peer_responses = {}

# Detect ping-reachable peers using parallel execution
# Detect ping-reachable peers
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_peer = {executor.submit(is_internal_ip_reachable, allowed_ip): peer_key
for peer_key, allowed_ip in allowed_ips_map.items()}
Expand All @@ -395,10 +394,10 @@ def run_auto_discovery(wg_interface, local_port, use_sudo, discovery_interval, m
if future.result():
reachable_peers[peer_key] = allowed_ips_map[peer_key]

# Identify active discovery peers and cache their responses
# Identify active discovery peers
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_disc_peer = {
executor.submit(query_peer_data, f"http://{allowed_ip}:{local_port}/v1/endpoints", max_retries): peer_key
executor.submit(query_peer_data, allowed_ip, remote_port, max_retries): peer_key
for peer_key, allowed_ip in reachable_peers.items()
}
for future in concurrent.futures.as_completed(future_to_disc_peer):
Expand All @@ -411,7 +410,7 @@ def run_auto_discovery(wg_interface, local_port, use_sudo, discovery_interval, m
except Exception as e:
logging.error("Error querying discovery peer %s: %s", peer_key, e)

# Identify inactive peers (not reachable via ping)
# Identify inactive peers
inactive_peers = {peer_key: remote_endpoints[peer_key] for peer_key in remote_endpoints if peer_key not in reachable_peers}

# Attempt to update inactive peers using cached discovery responses
Expand All @@ -428,66 +427,59 @@ def run_auto_discovery(wg_interface, local_port, use_sudo, discovery_interval, m
with cache_lock:
current_endpoint = cached_endpoints.get(peer_key)

# Prevent redundant updates
if current_endpoint == new_endpoint:
logging.debug("Skipping redundant update for peer %s: already set to %s", peer_key, new_endpoint)
continue

try:
wg_set_peer_endpoint(wg_interface, peer_key, new_endpoint, use_sudo)
with cache_lock:
cached_endpoints[peer_key] = new_endpoint # Update cache to prevent redundant updates
cached_endpoints[peer_key] = new_endpoint
logging.info("Updated peer %s endpoint from %s to %s", peer_key, old_endpoint, new_endpoint)
except Exception as e:
logging.error("Failed to update peer %s: %s", peer_key, e)

elapsed = time.time() - start_time

# Log the inactive peer keys
inactive_peer_keys = list(inactive_peers.keys())
logging.info("Auto-discovery completed in %.2f seconds: total peers=%d, reachable=%d, discovery peers=%d, inactive=%d (%s)",
elapsed, total_peers, len(reachable_peers), len(discovery_peers), len(inactive_peers),
", ".join(inactive_peer_keys) if inactive_peer_keys else "None")

# Schedule next discovery
Timer(discovery_interval, run_auto_discovery, args=(wg_interface, local_port, use_sudo, discovery_interval, max_workers, max_retries)).start()
Timer(discovery_interval, run_auto_discovery, args=(wg_interface, local_port, remote_port, use_sudo, discovery_interval, max_workers, max_retries)).start()


def run_server(bind_ip, port, wg_interface, allowed_ips, use_sudo, drop_user, drop_group,
auto_discovery, discovery_interval, max_workers, max_retries):
handler_class = partial(WGEndpointHandler,
wg_interface=wg_interface,
allowed_ips=allowed_ips,
use_sudo=use_sudo)
with socketserver.TCPServer((bind_ip, port), handler_class) as httpd:
logging.info("Starting WG endpoint discovery service on http://%s:%d/", bind_ip, port)
def run_server(bind_ip, local_port, wg_interface, allowed_ips, use_sudo, drop_user, drop_group,
auto_discovery, discovery_interval, max_workers, max_retries, remote_port):
handler_class = partial(WGEndpointHandler, wg_interface=wg_interface, allowed_ips=allowed_ips, use_sudo=use_sudo)
with socketserver.TCPServer((bind_ip, local_port), handler_class) as httpd:
logging.info("Starting WG endpoint discovery service on http://%s:%d/", bind_ip, local_port)
if drop_user:
try:
drop_privileges(drop_user, drop_group)
except Exception as e:
logging.error("Failed to drop privileges: %s", e)
sys.exit(1)
# Start the cache update worker thread (event-driven).

cache_thread = threading.Thread(target=cache_update_worker, args=(wg_interface, use_sudo))
cache_thread.daemon = True
cache_thread.start()
logging.debug("Cache update worker thread started.")

if auto_discovery:
run_auto_discovery(wg_interface, port, use_sudo, discovery_interval, max_workers, max_retries)
run_auto_discovery(wg_interface, local_port, remote_port, use_sudo, discovery_interval, max_workers, max_retries)
logging.debug("Auto-discovery process started.")
try:
httpd.serve_forever()
except KeyboardInterrupt:
logging.info("Shutting down WG endpoint discovery service")
httpd.server_close()


def parse_args():
parser = argparse.ArgumentParser(description='Dynamic WireGuard endpoint service')
parser.add_argument('--wg-interface', default='wg0', help='Name of the WireGuard interface (default: wg0)')
parser.add_argument('--bind-ip', default=None,
help='IP address to bind the HTTP server to (default: IP of the WG interface)')
parser.add_argument('--port', type=int, default=51820, help='Port number for the HTTP server (default: 51820)')
parser.add_argument('--local-port', type=int, default=51880, help='Port number for the local HTTP server (default: 51880)')
parser.add_argument('--remote-port', type=int, default=51880, help='Port number to contact remote discovery services (default: 51880)')
parser.add_argument('--allowed-ips', default='',
help='Comma-separated list of allowed source IP addresses (default: empty, will add bind IP automatically)')
parser.add_argument('--use-sudo', action='store_true', help='Use sudo when running wg commands (default: False)')
Expand Down Expand Up @@ -532,7 +524,6 @@ def main():
logging.error("Could not determine IP for interface %s: %s", wg_interface, e)
sys.exit(1)

port = args.port
allowed_ips = parse_allowed_ips(args.allowed_ips)
allowed_ips.add(bind_ip)
use_sudo = args.use_sudo
Expand All @@ -542,12 +533,14 @@ def main():
discovery_interval = args.discovery_interval
max_workers = args.max_workers
max_retries = args.max_retries
local_port = args.local_port
remote_port = args.remote_port

logging.info("Configuration: wg_interface=%s, bind_ip=%s, port=%d, allowed_ips=%s, use_sudo=%s, auto_discovery=%s, discovery_interval=%d, cache_freshness=%d, cache_wait_timeout=%d, max_workers=%d, max_retries=%d, user=%s, group=%s",
wg_interface, bind_ip, port, allowed_ips, use_sudo, auto_discovery,
logging.info("Configuration: wg_interface=%s, bind_ip=%s, local_port=%d, allowed_ips=%s, use_sudo=%s, auto_discovery=%s, discovery_interval=%d, cache_freshness=%d, cache_wait_timeout=%d, max_workers=%d, max_retries=%d, user=%s, group=%s",
wg_interface, bind_ip, local_port, allowed_ips, use_sudo, auto_discovery,
discovery_interval, args.cache_freshness, args.cache_wait_timeout, max_workers, max_retries, drop_user, drop_group)
run_server(bind_ip, port, wg_interface, allowed_ips, use_sudo, drop_user, drop_group,
auto_discovery, discovery_interval, max_workers, max_retries)
run_server(bind_ip, local_port, wg_interface, allowed_ips, use_sudo, drop_user, drop_group,
auto_discovery, discovery_interval, max_workers, max_retries, remote_port)


if __name__ == "__main__":
Expand Down

0 comments on commit 42f1ef3

Please sign in to comment.