Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kelindar/event v1.5.2 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaU
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kelindar/event v1.5.2 h1:qtgssZqMh/QQMCIxlbx4wU3DoMHOrJXKdiZhphJ4YbY=
github.com/kelindar/event v1.5.2/go.mod h1:UxWPQjWK8u0o9Z3ponm2mgREimM95hm26/M9z8F488Q=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
Expand Down
200 changes: 85 additions & 115 deletions llama-swap.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/fsnotify/fsnotify"
"github.com/gin-gonic/gin"
"github.com/kelindar/event"
"github.com/mostlygeek/llama-swap/proxy"
)

Expand Down Expand Up @@ -53,144 +54,113 @@ func main() {
gin.SetMode(gin.ReleaseMode)
}

proxyManager := proxy.New(config)

// Setup channels for server management
reloadChan := make(chan *proxy.ProxyManager)
exitChan := make(chan struct{})
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

// Create server with initial handler
srv := &http.Server{
Addr: *listenStr,
Handler: proxyManager,
Addr: *listenStr,
}

// Start server
fmt.Printf("llama-swap listening on %s\n", *listenStr)
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
fmt.Printf("Fatal server error: %v\n", err)
close(exitChan)
}
}()

// Handle config reloads and signals
go func() {
currentManager := proxyManager
for {
select {
case newManager := <-reloadChan:
log.Println("Config change detected, waiting for in-flight requests to complete...")
// Stop old manager processes gracefully (this waits for in-flight requests)
currentManager.StopProcesses(proxy.StopWaitForInflightRequest)
// Now do a full shutdown to clear the process map
currentManager.Shutdown()
currentManager = newManager
srv.Handler = newManager
log.Println("Server handler updated with new config")
case sig := <-sigChan:
fmt.Printf("Received signal %v, shutting down...\n", sig)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
currentManager.Shutdown()
if err := srv.Shutdown(ctx); err != nil {
fmt.Printf("Server shutdown error: %v\n", err)
}
close(exitChan)
// Support for watching config and reloading when it changes
reloadProxyManager := func() {
if currentPM, ok := srv.Handler.(*proxy.ProxyManager); ok {
config, err = proxy.LoadConfig(*configPath)
if err != nil {
fmt.Printf("Warning, unable to reload configuration: %v\n", err)
return
}
}
}()

// Start file watcher if requested
if *watchConfig {
absConfigPath, err := filepath.Abs(*configPath)
if err != nil {
log.Printf("Error getting absolute path for config: %v. File watching disabled.", err)
fmt.Println("Configuration Changed")
currentPM.Shutdown()
srv.Handler = proxy.New(config)
fmt.Println("Configuration Reloaded")
} else {
go watchConfigFileWithReload(absConfigPath, reloadChan)
config, err = proxy.LoadConfig(*configPath)
if err != nil {
fmt.Printf("Error, unable to load configuration: %v\n", err)
os.Exit(1)
}
srv.Handler = proxy.New(config)
}
}

// Wait for exit signal
<-exitChan
}
// load the initial proxy manager
reloadProxyManager()
debouncedReload := debounce(time.Second, reloadProxyManager)
if *watchConfig {
defer event.On(func(e proxy.ConfigFileChangedEvent) {
debouncedReload()
})()

fmt.Println("Watching Configuration for changes")
go func() {
absConfigPath, err := filepath.Abs(*configPath)
if err != nil {
fmt.Printf("Error getting absolute path for watching config file: %v\n", err)
return
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
fmt.Printf("Error creating file watcher: %v. File watching disabled.\n", err)
return
}

// watchConfigFileWithReload monitors the configuration file and sends new ProxyManager instances through reloadChan.
func watchConfigFileWithReload(configPath string, reloadChan chan<- *proxy.ProxyManager) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Printf("Error creating file watcher: %v. File watching disabled.", err)
return
}
defer watcher.Close()
err = watcher.Add(absConfigPath)
if err != nil {
fmt.Printf("Error adding config path (%s) to watcher: %v. File watching disabled.", absConfigPath, err)
return
}

err = watcher.Add(configPath)
if err != nil {
log.Printf("Error adding config path (%s) to watcher: %v. File watching disabled.", configPath, err)
return
defer watcher.Close()
for {
select {
case changeEvent := <-watcher.Events:
if changeEvent.Name == absConfigPath && (changeEvent.Has(fsnotify.Write) || changeEvent.Has(fsnotify.Create) || changeEvent.Has(fsnotify.Remove)) {
event.Emit(proxy.ConfigFileChangedEvent{})
}

case err := <-watcher.Errors:
log.Printf("File watcher error: %v", err)
}
}
}()
}

log.Printf("Watching config file for changes: %s", configPath)
// shutdown on signal
go func() {
sig := <-sigChan
fmt.Printf("Received signal %v, shutting down...\n", sig)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

srv.Handler.(*proxy.ProxyManager).Shutdown()
if err := srv.Shutdown(ctx); err != nil {
fmt.Printf("Server shutdown error: %v\n", err)
}
close(exitChan)
}()

var debounceTimer *time.Timer
debounceDuration := 2 * time.Second
// Start server
fmt.Printf("llama-swap listening on %s\n", *listenStr)
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Fatal server error: %v\n", err)
}
}()

for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
// We only care about writes/creates to the specific config file
if event.Name == configPath && (event.Has(fsnotify.Write) || event.Has(fsnotify.Create) || event.Has(fsnotify.Remove)) {
// Reset or start the debounce timer
if debounceTimer != nil {
debounceTimer.Stop()
}
debounceTimer = time.AfterFunc(debounceDuration, func() {
log.Printf("Config file modified: %s, reloading...", event.Name)

// Try up to 3 times with exponential backoff
var newConfig proxy.Config
var err error
for retries := 0; retries < 3; retries++ {
// Load new configuration
newConfig, err = proxy.LoadConfig(configPath)
if err == nil {
break
}
log.Printf("Error loading new config (attempt %d/3): %v", retries+1, err)
if retries < 2 {
time.Sleep(time.Duration(1<<retries) * time.Second)
}
}
if err != nil {
log.Printf("Failed to load new config after retries: %v", err)
return
}
// Wait for exit signal
<-exitChan
}

// Create new ProxyManager with new config
newPM := proxy.New(newConfig)
reloadChan <- newPM
log.Println("Config reloaded successfully")
if (event.Has(fsnotify.Remove)) {
// re-add watcher
err = watcher.Add(configPath)
if err != nil {
log.Printf("Could not re-add watcher for %s: %s", configPath, err)
}
}
})
}
case err, ok := <-watcher.Errors:
if !ok {
log.Println("File watcher error channel closed.")
return
}
log.Printf("File watcher error: %v", err)
func debounce(interval time.Duration, f func()) func() {
var timer *time.Timer
return func() {
if timer != nil {
timer.Stop()
}
timer = time.AfterFunc(interval, f)
}
}
31 changes: 31 additions & 0 deletions proxy/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package proxy

// package level registry of the different event types

const ProcessStateChangeEventID = 0x01
const ChatCompletionStatsEventID = 0x02
const ConfigFileChangedEventID = 0x03

type ProcessStateChangeEvent struct {
ProcessName string
NewState ProcessState
OldState ProcessState
}

func (e ProcessStateChangeEvent) Type() uint32 {
return ProcessStateChangeEventID
}

type ChatCompletionStats struct {
TokensGenerated int
}

func (e ChatCompletionStats) Type() uint32 {
return ChatCompletionStatsEventID
}

type ConfigFileChangedEvent struct{}

func (e ConfigFileChangedEvent) Type() uint32 {
return ConfigFileChangedEventID
}
53 changes: 22 additions & 31 deletions proxy/logMonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,25 @@ package proxy

import (
"container/ring"
"context"
"fmt"
"io"
"os"
"sync"

"github.com/kelindar/event"
)

type LogLevel int

type LogDataEvent struct {
Data []byte
}

func (e LogDataEvent) Type() uint32 {
return 0x01
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Event type ID conflicts with ProcessStateChangeEvent.

The LogDataEvent.Type() returns 0x01, which is the same ID used by ProcessStateChangeEvent in proxy/events.go. This will cause event routing issues. Consider moving this event definition to events.go with a unique ID.

Move the LogDataEvent definition to proxy/events.go:

// In proxy/events.go
const LogDataEventID = 0x04

type LogDataEvent struct {
    Data []byte
}

func (e LogDataEvent) Type() uint32 {
    return LogDataEventID
}

Then remove the duplicate definition from this file.

🤖 Prompt for AI Agents
In proxy/logMonitor.go around lines 20 to 22, the LogDataEvent.Type() method
returns an event type ID 0x01, which conflicts with ProcessStateChangeEvent's ID
in proxy/events.go. To fix this, move the LogDataEvent struct and its Type()
method to proxy/events.go, assign it a unique ID such as 0x04, and remove the
LogDataEvent definition from logMonitor.go to avoid duplicate event definitions
and routing conflicts.


const (
LevelDebug LogLevel = iota
LevelInfo
Expand All @@ -18,7 +29,7 @@ const (
)

type LogMonitor struct {
clients map[chan []byte]bool
eventbus *event.Dispatcher
mu sync.RWMutex
buffer *ring.Ring
bufferMu sync.RWMutex
Expand All @@ -37,11 +48,11 @@ func NewLogMonitor() *LogMonitor {

func NewLogMonitorWriter(stdout io.Writer) *LogMonitor {
return &LogMonitor{
clients: make(map[chan []byte]bool),
buffer: ring.New(10 * 1024), // keep 10KB of buffered logs
stdout: stdout,
level: LevelInfo,
prefix: "",
eventbus: event.NewDispatcher(),
buffer: ring.New(10 * 1024), // keep 10KB of buffered logs
stdout: stdout,
level: LevelInfo,
prefix: "",
}
}

Expand Down Expand Up @@ -81,34 +92,14 @@ func (w *LogMonitor) GetHistory() []byte {
return history
}

func (w *LogMonitor) Subscribe() chan []byte {
w.mu.Lock()
defer w.mu.Unlock()

ch := make(chan []byte, 100)
w.clients[ch] = true
return ch
}

func (w *LogMonitor) Unsubscribe(ch chan []byte) {
w.mu.Lock()
defer w.mu.Unlock()

delete(w.clients, ch)
close(ch)
func (w *LogMonitor) OnLogData(callback func(data []byte)) context.CancelFunc {
return event.Subscribe(w.eventbus, func(e LogDataEvent) {
callback(e.Data)
})
}

func (w *LogMonitor) broadcast(msg []byte) {
w.mu.RLock()
defer w.mu.RUnlock()

for client := range w.clients {
select {
case client <- msg:
default:
// If client buffer is full, skip
}
}
event.Publish(w.eventbus, LogDataEvent{Data: msg})
}

func (w *LogMonitor) SetPrefix(prefix string) {
Expand Down
Loading
Loading