Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion internal/upstream/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Manager struct {
shutdownCtx context.Context
shutdownCancel context.CancelFunc
shuttingDown bool // Flag to prevent reconnections during shutdown
shutdownWg sync.WaitGroup // Tracks all background goroutines for clean shutdown

// Docker recovery state
dockerRecoveryMu sync.RWMutex
Expand Down Expand Up @@ -147,11 +148,13 @@ func NewManager(logger *zap.Logger, globalConfig *config.Config, boltStorage *st

// Start database event monitor for cross-process OAuth completion notifications
if boltStorage != nil {
manager.shutdownWg.Add(1)
go manager.startOAuthEventMonitor(shutdownCtx)
}

// Start Docker recovery monitor (internal feature, always enabled)
if storageMgr != nil {
manager.shutdownWg.Add(1)
go manager.startDockerRecoveryMonitor(shutdownCtx)
}

Expand Down Expand Up @@ -360,6 +363,27 @@ func (m *Manager) RemoveServer(id string) {
func (m *Manager) ShutdownAll(ctx context.Context) error {
m.logger.Info("Shutting down all upstream servers")

// Cancel shutdown context to stop background goroutines (OAuth monitor, Docker recovery)
if m.shutdownCancel != nil {
m.shutdownCancel()
}

// Wait for background goroutines to exit (with timeout)
waitDone := make(chan struct{})
go func() {
m.shutdownWg.Wait()
close(waitDone)
}()

select {
case <-waitDone:
m.logger.Info("All background goroutines exited cleanly")
case <-time.After(3 * time.Second):
m.logger.Warn("Background goroutines did not exit within 3 seconds, proceeding with shutdown")
case <-ctx.Done():
m.logger.Warn("Shutdown context cancelled while waiting for goroutines", zap.Error(ctx.Err()))
}

// Set shutdown flag to prevent any reconnection attempts
m.mu.Lock()
m.shuttingDown = true
Expand Down Expand Up @@ -980,11 +1004,14 @@ func (m *Manager) ConnectAll(ctx context.Context) error {

// DisconnectAll disconnects from all servers
func (m *Manager) DisconnectAll() error {
// Cancel shutdown context to stop OAuth event monitor
// Cancel shutdown context to stop OAuth event monitor and Docker recovery
if m.shutdownCancel != nil {
m.shutdownCancel()
}

// Wait for background goroutines to exit
m.shutdownWg.Wait()

m.mu.RLock()
clients := make([]*managed.Client, 0, len(m.clients))
for _, client := range m.clients {
Expand Down Expand Up @@ -1460,6 +1487,7 @@ func (m *Manager) ForceReconnectAll(reason string) *ReconnectResult {

// startOAuthEventMonitor monitors the database for OAuth completion events from CLI processes
func (m *Manager) startOAuthEventMonitor(ctx context.Context) {
defer m.shutdownWg.Done()
m.logger.Info("Starting OAuth event monitor for cross-process notifications")

ticker := time.NewTicker(5 * time.Second) // Check every 5 seconds
Expand Down Expand Up @@ -1703,6 +1731,7 @@ func (m *Manager) InvalidateAllToolCountCaches() {

// startDockerRecoveryMonitor monitors Docker availability and triggers recovery when needed
func (m *Manager) startDockerRecoveryMonitor(ctx context.Context) {
defer m.shutdownWg.Done()
m.logger.Info("Starting Docker recovery monitor")

// Load existing recovery state (always persist for reliability)
Expand Down Expand Up @@ -1890,6 +1919,7 @@ func (m *Manager) handleDockerUnavailable(ctx context.Context) {
}()

// Restart monitoring
m.shutdownWg.Add(1)
go m.startDockerRecoveryMonitor(ctx)
return
}
Expand Down
Loading