Add unbounded broflake widget proxy lifecycle manager#336
Add unbounded broflake widget proxy lifecycle manager#336myleshorton wants to merge 7 commits intomainfrom
Conversation
Adds vpn/unbounded.go which manages the broflake widget proxy lifecycle: start/stop based on local setting and config, emit connection events via the radiance event bus, and wire into radiance startup/shutdown. Includes the UnboundedKey setting and broflake dependency. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Require all three conditions: local opt-in, server feature flag, and server-provided unbounded config. Cache lastCfg from config events so SetUnbounded can start immediately when the user toggles on and config is already available. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This pull request adds lifecycle management for the "unbounded" broflake widget proxy, which enables peer-to-peer proxy capabilities. The implementation integrates with Radiance's configuration and event systems to automatically start/stop the proxy based on server configuration and local user settings.
Changes:
- New lifecycle manager for broflake widget proxy with automatic start/stop based on settings and server config
- Event emission for consumer connection state changes via the radiance event bus
- Integration into Radiance startup and shutdown sequences
- Addition of UnboundedKey setting and broflake/quic-go dependencies
Reviewed changes
Copilot reviewed 4 out of 5 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| vpn/unbounded.go | New file implementing unboundedManager with lifecycle management, config subscription, and event emission for connection changes |
| radiance.go | Wires unbounded subscription into initialization (line 149) and shutdown (line 151) |
| common/settings/settings.go | Adds UnboundedKey constant for user opt-in setting |
| go.mod | Adds broflake dependency, updates common package, adds replace directives for go-nats and quic-go forks |
| go.sum | Updates checksums for new and updated dependencies |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| unbounded.start(cfg.Unbounded) | ||
| } else if !shouldRun && running { | ||
| unbounded.stop() |
There was a problem hiding this comment.
There is a potential race condition between reading the running state and calling start/stop. Between lines 77-78 (reading the running state) and lines 81-84 (deciding to start/stop), another goroutine could change the state. While start() has a guard to return early if already running, stop() does not verify the state was actually "running" when the decision was made. Consider holding the lock across the entire check-and-act sequence, or use atomic operations for state management.
| unbounded.start(cfg.Unbounded) | |
| } else if !shouldRun && running { | |
| unbounded.stop() | |
| // start() is internally guarded against being called when already running. | |
| unbounded.start(cfg.Unbounded) | |
| } else if !shouldRun && running { | |
| // Re-check running state under lock before calling stop() | |
| unbounded.mu.Lock() | |
| runningNow := unbounded.cancel != nil | |
| unbounded.mu.Unlock() | |
| if runningNow { | |
| unbounded.stop() | |
| } |
| } | ||
| } | ||
|
|
||
| _, ui, err := clientcore.NewBroflake(bfOpt, rtcOpt, egOpt) |
There was a problem hiding this comment.
The first return value from clientcore.NewBroflake is being ignored. If this represents a resource that needs cleanup or provides important functionality, it should be stored and potentially cleaned up when the widget is stopped. Review the broflake API documentation to determine if this value should be used.
| package vpn | ||
|
|
||
| import ( | ||
| "context" | ||
| "log/slog" | ||
| "net" | ||
| "sync" | ||
|
|
||
| C "github.com/getlantern/common" | ||
|
|
||
| "github.com/getlantern/broflake/clientcore" | ||
|
|
||
| "github.com/getlantern/radiance/common/settings" | ||
| "github.com/getlantern/radiance/config" | ||
| "github.com/getlantern/radiance/events" | ||
| ) | ||
|
|
||
| // UnboundedConnectionEvent is emitted when a consumer connection changes state | ||
| // in the broflake widget proxy. State: 1 = connected, -1 = disconnected. | ||
| type UnboundedConnectionEvent struct { | ||
| events.Event | ||
| State int `json:"state"` | ||
| WorkerIdx int `json:"workerIdx"` | ||
| Addr string `json:"addr"` | ||
| } | ||
|
|
||
| var unbounded = &unboundedManager{} | ||
|
|
||
| type unboundedManager struct { | ||
| mu sync.Mutex | ||
| cancel context.CancelFunc | ||
| lastCfg *C.UnboundedConfig // most recent config from server | ||
| } | ||
|
|
||
| func UnboundedEnabled() bool { | ||
| return settings.GetBool(settings.UnboundedKey) | ||
| } | ||
|
|
||
| func SetUnbounded(enable bool) error { | ||
| if UnboundedEnabled() == enable { | ||
| return nil | ||
| } | ||
| if err := settings.Set(settings.UnboundedKey, enable); err != nil { | ||
| return err | ||
| } | ||
| slog.Info("Updated Unbounded widget proxy", "enabled", enable) | ||
| if enable { | ||
| unbounded.mu.Lock() | ||
| cfg := unbounded.lastCfg | ||
| unbounded.mu.Unlock() | ||
| if cfg != nil { | ||
| unbounded.start(cfg) | ||
| } else { | ||
| slog.Info("Unbounded: enabled locally, will start when server config arrives") | ||
| } | ||
| } else { | ||
| unbounded.stop() | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // InitUnboundedSubscription subscribes to config changes and starts/stops the | ||
| // broflake widget proxy based on three conditions: | ||
| // 1. settings.UnboundedKey is true (local opt-in) | ||
| // 2. cfg.Features["unbounded"] is true (server says run it) | ||
| // 3. cfg.Unbounded != nil (server provided discovery/egress URLs) | ||
| func InitUnboundedSubscription() { | ||
| events.Subscribe(func(evt config.NewConfigEvent) { | ||
| if evt.New == nil { | ||
| return | ||
| } | ||
| cfg := evt.New.ConfigResponse | ||
|
|
||
| // Always store the latest unbounded config for use by SetUnbounded | ||
| unbounded.mu.Lock() | ||
| unbounded.lastCfg = cfg.Unbounded | ||
| running := unbounded.cancel != nil | ||
| unbounded.mu.Unlock() | ||
|
|
||
| shouldRun := shouldRunUnbounded(cfg) | ||
| if shouldRun && !running { | ||
| unbounded.start(cfg.Unbounded) | ||
| } else if !shouldRun && running { | ||
| unbounded.stop() | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| func shouldRunUnbounded(cfg C.ConfigResponse) bool { | ||
| if !settings.GetBool(settings.UnboundedKey) { | ||
| return false | ||
| } | ||
| if !cfg.Features[C.UNBOUNDED] { | ||
| return false | ||
| } | ||
| if cfg.Unbounded == nil { | ||
| return false | ||
| } | ||
| return true | ||
| } | ||
|
|
||
| func (m *unboundedManager) start(ucfg *C.UnboundedConfig) { | ||
| m.mu.Lock() | ||
| defer m.mu.Unlock() | ||
| if m.cancel != nil { | ||
| return // already running | ||
| } | ||
|
|
||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| m.cancel = cancel | ||
|
|
||
| go func() { | ||
| slog.Info("Unbounded: starting broflake widget proxy") | ||
|
|
||
| bfOpt := clientcore.NewDefaultBroflakeOptions() | ||
| bfOpt.ClientType = "widget" | ||
| if ucfg != nil { | ||
| if ucfg.CTableSize > 0 { | ||
| bfOpt.CTableSize = ucfg.CTableSize | ||
| } | ||
| if ucfg.PTableSize > 0 { | ||
| bfOpt.PTableSize = ucfg.PTableSize | ||
| } | ||
| } | ||
|
|
||
| // Wire up connection change callback to emit radiance events | ||
| bfOpt.OnConnectionChangeFunc = func(state int, workerIdx int, addr net.IP) { | ||
| addrStr := "" | ||
| if addr != nil { | ||
| addrStr = addr.String() | ||
| } | ||
| slog.Debug("Unbounded: consumer connection change", "state", state, "workerIdx", workerIdx, "addr", addrStr) | ||
| events.Emit(UnboundedConnectionEvent{ | ||
| State: state, | ||
| WorkerIdx: workerIdx, | ||
| Addr: addrStr, | ||
| }) | ||
| } | ||
|
|
||
| rtcOpt := clientcore.NewDefaultWebRTCOptions() | ||
| if ucfg != nil { | ||
| if ucfg.DiscoverySrv != "" { | ||
| rtcOpt.DiscoverySrv = ucfg.DiscoverySrv | ||
| } | ||
| if ucfg.DiscoveryEndpoint != "" { | ||
| rtcOpt.Endpoint = ucfg.DiscoveryEndpoint | ||
| } | ||
| } | ||
|
|
||
| egOpt := clientcore.NewDefaultEgressOptions() | ||
| if ucfg != nil { | ||
| if ucfg.EgressAddr != "" { | ||
| egOpt.Addr = ucfg.EgressAddr | ||
| } | ||
| if ucfg.EgressEndpoint != "" { | ||
| egOpt.Endpoint = ucfg.EgressEndpoint | ||
| } | ||
| } | ||
|
|
||
| _, ui, err := clientcore.NewBroflake(bfOpt, rtcOpt, egOpt) | ||
| if err != nil { | ||
| slog.Error("Unbounded: failed to create broflake widget", "error", err) | ||
| m.mu.Lock() | ||
| m.cancel = nil | ||
| m.mu.Unlock() | ||
| return | ||
| } | ||
|
|
||
| slog.Info("Unbounded: broflake widget proxy started") | ||
| <-ctx.Done() | ||
| slog.Info("Unbounded: stopping broflake widget proxy") | ||
| ui.Stop() | ||
| slog.Info("Unbounded: broflake widget proxy stopped") | ||
| }() | ||
| } | ||
|
|
||
| func (m *unboundedManager) stop() { | ||
| m.mu.Lock() | ||
| defer m.mu.Unlock() | ||
| if m.cancel != nil { | ||
| m.cancel() | ||
| m.cancel = nil | ||
| } | ||
| } | ||
|
|
||
| func StopUnbounded(_ context.Context) error { | ||
| unbounded.stop() | ||
| return nil | ||
| } |
There was a problem hiding this comment.
The new unbounded.go file lacks test coverage. The vpn package has comprehensive tests for other components (see vpn/vpn_test.go, vpn/split_tunnel_test.go, etc.). Consider adding tests to verify: (1) the lifecycle management (start/stop), (2) the interaction between SetUnbounded and config updates, (3) proper cleanup on shutdown, and (4) the race condition scenarios between concurrent start/stop calls.
| cancel context.CancelFunc | ||
| lastCfg *C.UnboundedConfig // most recent config from server | ||
| } | ||
|
|
There was a problem hiding this comment.
The exported function UnboundedEnabled lacks a documentation comment. Following Go conventions and the patterns seen in this codebase (e.g., QuickConnect, ConnectToServer in vpn/vpn.go), all exported functions should have documentation comments that describe their purpose and behavior.
| // UnboundedEnabled reports whether the Unbounded widget proxy is enabled in settings. |
| func SetUnbounded(enable bool) error { | ||
| if UnboundedEnabled() == enable { | ||
| return nil | ||
| } | ||
| if err := settings.Set(settings.UnboundedKey, enable); err != nil { | ||
| return err | ||
| } | ||
| slog.Info("Updated Unbounded widget proxy", "enabled", enable) | ||
| if enable { | ||
| unbounded.mu.Lock() | ||
| cfg := unbounded.lastCfg | ||
| unbounded.mu.Unlock() | ||
| if cfg != nil { | ||
| unbounded.start(cfg) | ||
| } else { | ||
| slog.Info("Unbounded: enabled locally, will start when server config arrives") | ||
| } | ||
| } else { | ||
| unbounded.stop() | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
The exported function SetUnbounded lacks a documentation comment. Following Go conventions and the patterns seen in this codebase (e.g., QuickConnect, ConnectToServer in vpn/vpn.go), all exported functions should have documentation comments that describe their purpose, parameters, and behavior.
| func StopUnbounded(_ context.Context) error { | ||
| unbounded.stop() | ||
| return nil | ||
| } |
There was a problem hiding this comment.
The exported function StopUnbounded lacks a documentation comment. Following Go conventions and the patterns seen in this codebase (e.g., Disconnect in vpn/vpn.go), all exported functions should have documentation comments that describe their purpose and behavior.
| <-ctx.Done() | ||
| slog.Info("Unbounded: stopping broflake widget proxy") | ||
| ui.Stop() | ||
| slog.Info("Unbounded: broflake widget proxy stopped") |
There was a problem hiding this comment.
The goroutine started in the start method may not clean up the cancel function in m.cancel when the context is cancelled normally. After line 173 (ui.Stop()), the goroutine should also set m.cancel to nil under lock to maintain consistency with the error path (lines 163-165). Without this, a subsequent call to stop() would attempt to cancel an already-completed context.
| slog.Info("Unbounded: broflake widget proxy stopped") | |
| slog.Info("Unbounded: broflake widget proxy stopped") | |
| m.mu.Lock() | |
| m.cancel = nil | |
| m.mu.Unlock() |
| // replace github.com/getlantern/common => ../common | ||
| replace github.com/enobufs/go-nats => github.com/noahlevenson/go-nats v0.0.0-20230720174341-49df1f749775 | ||
|
|
||
| replace github.com/quic-go/quic-go => github.com/getlantern/quic-go-unbounded-fork v0.51.3-unbounded |
There was a problem hiding this comment.
The version tag 'v0.51.3-unbounded' uses a non-standard format. Semantic versioning typically uses pre-release identifiers with a hyphen (e.g., v0.51.3-alpha.1), but 'unbounded' appears to be a fork identifier rather than a pre-release version. This is valid for Go modules, but consider using a more conventional approach like using a pseudo-version (v0.0.0-YYYYMMDDHHMMSS-commithash) or a proper pre-release tag to avoid confusion with actual semver pre-releases.
- Add doc comments on UnboundedEnabled, SetUnbounded, StopUnbounded - Nil out cancel func after ui.Stop() in goroutine to prevent stale ref - Add idempotency comment on stop() call in config subscription - Add unbounded_test.go with tests for shouldRunUnbounded, toggle, stop-when-not-running, and start/stop lifecycle Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Document that start() and stop() are internally guarded and idempotent, making the TOCTOU race between reading running state and acting harmless - Add comment explaining why BroflakeConn return value is unused (widget proxies donate bandwidth, they don't route traffic through the conn) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Use per-test settings init with t.Cleanup(settings.Reset) instead of TestMain. The boxoptions tests call settings.Reset in their cleanup, which wipes the settings singleton (including file path) between tests. With TestMain the settings were initialized once but then reset by other tests running in between, causing "settings file path is not set". Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 6 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| _, ui, err := clientcore.NewBroflake(bfOpt, rtcOpt, egOpt) | ||
| if err != nil { | ||
| slog.Error("Unbounded: failed to create broflake widget", "error", err) | ||
| m.mu.Lock() | ||
| m.cancel = nil | ||
| m.mu.Unlock() | ||
| return | ||
| } |
There was a problem hiding this comment.
When NewBroflake returns an error, the cancel function is set to nil at lines 172-174, but the context created at line 116 is never canceled. This means the context will leak and never be garbage collected. Consider calling cancel() before setting m.cancel to nil, or restructure the code to avoid creating the context until after NewBroflake succeeds.
| unbounded.stop() | ||
| assert.Nil(t, unbounded.cancel) |
There was a problem hiding this comment.
This test has a race condition. It reads unbounded.cancel without holding the mutex, while the stop() method being tested acquires the mutex before accessing the field. The test should acquire the mutex before reading unbounded.cancel to avoid data races, similar to how it's done correctly in TestStartStopLifecycle at lines 68-70 and 74-76.
| unbounded.stop() | |
| assert.Nil(t, unbounded.cancel) | |
| unbounded.stop() | |
| unbounded.mu.Lock() | |
| assert.Nil(t, unbounded.cancel) | |
| unbounded.mu.Unlock() |
| unbounded.mu.Lock() | ||
| unbounded.cancel = nil | ||
| unbounded.mu.Unlock() | ||
|
|
||
| unbounded.mu.Lock() | ||
| assert.Nil(t, unbounded.cancel) | ||
| unbounded.mu.Unlock() | ||
|
|
||
| // stop is safe when already stopped | ||
| unbounded.stop() | ||
| unbounded.mu.Lock() | ||
| assert.Nil(t, unbounded.cancel) |
There was a problem hiding this comment.
This test doesn't verify any meaningful behavior. It just sets cancel to nil and then immediately checks that it's nil, without actually testing the start/stop lifecycle. Consider either removing this test or extending it to actually test the start/stop lifecycle by calling start() with a valid config and then verifying that stop() properly cancels the operation.
| unbounded.mu.Lock() | |
| unbounded.cancel = nil | |
| unbounded.mu.Unlock() | |
| unbounded.mu.Lock() | |
| assert.Nil(t, unbounded.cancel) | |
| unbounded.mu.Unlock() | |
| // stop is safe when already stopped | |
| unbounded.stop() | |
| unbounded.mu.Lock() | |
| assert.Nil(t, unbounded.cancel) | |
| // Ensure we start from a stopped state. | |
| unbounded.mu.Lock() | |
| unbounded.cancel = nil | |
| unbounded.mu.Unlock() | |
| // stop is safe when already stopped | |
| unbounded.stop() | |
| unbounded.mu.Lock() | |
| assert.Nil(t, unbounded.cancel, "cancel should remain nil when stopping an already stopped unbounded") | |
| unbounded.mu.Unlock() | |
| // Simulate a running state by setting a non-nil cancel function. | |
| unbounded.mu.Lock() | |
| unbounded.cancel = func() {} | |
| assert.NotNil(t, unbounded.cancel, "cancel should be non-nil in simulated running state") | |
| unbounded.mu.Unlock() | |
| // Now stopping should clear the cancel function and return to a stopped state. | |
| unbounded.stop() | |
| unbounded.mu.Lock() | |
| assert.Nil(t, unbounded.cancel, "cancel should be nil after stopping from a running state") |
Summary
vpn/unbounded.go: manages broflake widget proxy start/stop based on local setting and configUnboundedConnectionEventvia radiance event bus on consumer connection changesInitUnboundedSubscription) and shutdown (StopUnbounded)UnboundedKeysetting and broflake dependencyDepends on:
🤖 Generated with Claude Code