Skip to content

Commit 1921e57

Browse files
authored
Add Event Bus (#184)
Major internal refactor to use an event bus to pass event/messages along. These changes are largely invisible user facing but sets up internal design for real time stats and information. - `--watch-config` logic refactored for events - remove multiple SSE api endpoints, replaced with /api/events - keep all functionality essentially the same - UI/backend sync is in near real time now
1 parent c867a6c commit 1921e57

File tree

13 files changed

+375
-359
lines changed

13 files changed

+375
-359
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/tidwall/gjson v1.18.0
1010
github.com/tidwall/sjson v1.2.5
1111
gopkg.in/yaml.v3 v3.0.1
12+
github.com/kelindar/event v1.5.2
1213
)
1314

1415
require (

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaU
3636
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
3737
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
3838
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
39+
github.com/kelindar/event v1.5.2 h1:qtgssZqMh/QQMCIxlbx4wU3DoMHOrJXKdiZhphJ4YbY=
40+
github.com/kelindar/event v1.5.2/go.mod h1:UxWPQjWK8u0o9Z3ponm2mgREimM95hm26/M9z8F488Q=
3941
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
4042
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
4143
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=

llama-swap.go

Lines changed: 101 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/fsnotify/fsnotify"
1616
"github.com/gin-gonic/gin"
17+
"github.com/kelindar/event"
1718
"github.com/mostlygeek/llama-swap/proxy"
1819
)
1920

@@ -53,144 +54,129 @@ func main() {
5354
gin.SetMode(gin.ReleaseMode)
5455
}
5556

56-
proxyManager := proxy.New(config)
57-
5857
// Setup channels for server management
59-
reloadChan := make(chan *proxy.ProxyManager)
6058
exitChan := make(chan struct{})
6159
sigChan := make(chan os.Signal, 1)
6260
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
6361

6462
// Create server with initial handler
6563
srv := &http.Server{
66-
Addr: *listenStr,
67-
Handler: proxyManager,
64+
Addr: *listenStr,
6865
}
6966

70-
// Start server
71-
fmt.Printf("llama-swap listening on %s\n", *listenStr)
72-
go func() {
73-
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
74-
fmt.Printf("Fatal server error: %v\n", err)
75-
close(exitChan)
76-
}
77-
}()
78-
79-
// Handle config reloads and signals
80-
go func() {
81-
currentManager := proxyManager
82-
for {
83-
select {
84-
case newManager := <-reloadChan:
85-
log.Println("Config change detected, waiting for in-flight requests to complete...")
86-
// Stop old manager processes gracefully (this waits for in-flight requests)
87-
currentManager.StopProcesses(proxy.StopWaitForInflightRequest)
88-
// Now do a full shutdown to clear the process map
89-
currentManager.Shutdown()
90-
currentManager = newManager
91-
srv.Handler = newManager
92-
log.Println("Server handler updated with new config")
93-
case sig := <-sigChan:
94-
fmt.Printf("Received signal %v, shutting down...\n", sig)
95-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
96-
defer cancel()
97-
currentManager.Shutdown()
98-
if err := srv.Shutdown(ctx); err != nil {
99-
fmt.Printf("Server shutdown error: %v\n", err)
100-
}
101-
close(exitChan)
67+
// Support for watching config and reloading when it changes
68+
reloadProxyManager := func() {
69+
if currentPM, ok := srv.Handler.(*proxy.ProxyManager); ok {
70+
config, err = proxy.LoadConfig(*configPath)
71+
if err != nil {
72+
fmt.Printf("Warning, unable to reload configuration: %v\n", err)
10273
return
10374
}
104-
}
105-
}()
10675

107-
// Start file watcher if requested
108-
if *watchConfig {
109-
absConfigPath, err := filepath.Abs(*configPath)
110-
if err != nil {
111-
log.Printf("Error getting absolute path for config: %v. File watching disabled.", err)
76+
fmt.Println("Configuration Changed")
77+
currentPM.Shutdown()
78+
srv.Handler = proxy.New(config)
79+
fmt.Println("Configuration Reloaded")
80+
81+
// wait a few seconds and tell any UI to reload
82+
time.AfterFunc(3*time.Second, func() {
83+
event.Emit(proxy.ConfigFileChangedEvent{
84+
ReloadingState: proxy.ReloadingStateEnd,
85+
})
86+
})
11287
} else {
113-
go watchConfigFileWithReload(absConfigPath, reloadChan)
88+
config, err = proxy.LoadConfig(*configPath)
89+
if err != nil {
90+
fmt.Printf("Error, unable to load configuration: %v\n", err)
91+
os.Exit(1)
92+
}
93+
srv.Handler = proxy.New(config)
11494
}
11595
}
11696

117-
// Wait for exit signal
118-
<-exitChan
119-
}
120-
121-
// watchConfigFileWithReload monitors the configuration file and sends new ProxyManager instances through reloadChan.
122-
func watchConfigFileWithReload(configPath string, reloadChan chan<- *proxy.ProxyManager) {
123-
watcher, err := fsnotify.NewWatcher()
124-
if err != nil {
125-
log.Printf("Error creating file watcher: %v. File watching disabled.", err)
126-
return
127-
}
128-
defer watcher.Close()
129-
130-
err = watcher.Add(configPath)
131-
if err != nil {
132-
log.Printf("Error adding config path (%s) to watcher: %v. File watching disabled.", configPath, err)
133-
return
134-
}
135-
136-
log.Printf("Watching config file for changes: %s", configPath)
97+
// load the initial proxy manager
98+
reloadProxyManager()
99+
debouncedReload := debounce(time.Second, reloadProxyManager)
100+
if *watchConfig {
101+
defer event.On(func(e proxy.ConfigFileChangedEvent) {
102+
if e.ReloadingState == proxy.ReloadingStateStart {
103+
debouncedReload()
104+
}
105+
})()
137106

138-
var debounceTimer *time.Timer
139-
debounceDuration := 2 * time.Second
107+
fmt.Println("Watching Configuration for changes")
108+
go func() {
109+
absConfigPath, err := filepath.Abs(*configPath)
110+
if err != nil {
111+
fmt.Printf("Error getting absolute path for watching config file: %v\n", err)
112+
return
113+
}
114+
watcher, err := fsnotify.NewWatcher()
115+
if err != nil {
116+
fmt.Printf("Error creating file watcher: %v. File watching disabled.\n", err)
117+
return
118+
}
140119

141-
for {
142-
select {
143-
case event, ok := <-watcher.Events:
144-
if !ok {
120+
err = watcher.Add(absConfigPath)
121+
if err != nil {
122+
fmt.Printf("Error adding config path (%s) to watcher: %v. File watching disabled.", absConfigPath, err)
145123
return
146124
}
147-
// We only care about writes/creates to the specific config file
148-
if event.Name == configPath && (event.Has(fsnotify.Write) || event.Has(fsnotify.Create) || event.Has(fsnotify.Remove)) {
149-
// Reset or start the debounce timer
150-
if debounceTimer != nil {
151-
debounceTimer.Stop()
152-
}
153-
debounceTimer = time.AfterFunc(debounceDuration, func() {
154-
log.Printf("Config file modified: %s, reloading...", event.Name)
155-
156-
// Try up to 3 times with exponential backoff
157-
var newConfig proxy.Config
158-
var err error
159-
for retries := 0; retries < 3; retries++ {
160-
// Load new configuration
161-
newConfig, err = proxy.LoadConfig(configPath)
162-
if err == nil {
163-
break
164-
}
165-
log.Printf("Error loading new config (attempt %d/3): %v", retries+1, err)
166-
if retries < 2 {
167-
time.Sleep(time.Duration(1<<retries) * time.Second)
168-
}
169-
}
170-
if err != nil {
171-
log.Printf("Failed to load new config after retries: %v", err)
172-
return
173-
}
174125

175-
// Create new ProxyManager with new config
176-
newPM := proxy.New(newConfig)
177-
reloadChan <- newPM
178-
log.Println("Config reloaded successfully")
179-
if (event.Has(fsnotify.Remove)) {
180-
// re-add watcher
181-
err = watcher.Add(configPath)
182-
if err != nil {
183-
log.Printf("Could not re-add watcher for %s: %s", configPath, err)
184-
}
126+
defer watcher.Close()
127+
for {
128+
select {
129+
case changeEvent := <-watcher.Events:
130+
if changeEvent.Name == absConfigPath && (changeEvent.Has(fsnotify.Write) || changeEvent.Has(fsnotify.Create) || changeEvent.Has(fsnotify.Remove)) {
131+
event.Emit(proxy.ConfigFileChangedEvent{
132+
ReloadingState: proxy.ReloadingStateStart,
133+
})
185134
}
186-
})
187-
}
188-
case err, ok := <-watcher.Errors:
189-
if !ok {
190-
log.Println("File watcher error channel closed.")
191-
return
135+
136+
case err := <-watcher.Errors:
137+
log.Printf("File watcher error: %v", err)
138+
}
192139
}
193-
log.Printf("File watcher error: %v", err)
140+
}()
141+
}
142+
143+
// shutdown on signal
144+
go func() {
145+
sig := <-sigChan
146+
fmt.Printf("Received signal %v, shutting down...\n", sig)
147+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
148+
defer cancel()
149+
150+
if pm, ok := srv.Handler.(*proxy.ProxyManager); ok {
151+
pm.Shutdown()
152+
} else {
153+
fmt.Println("srv.Handler is not of type *proxy.ProxyManager")
154+
}
155+
156+
if err := srv.Shutdown(ctx); err != nil {
157+
fmt.Printf("Server shutdown error: %v\n", err)
158+
}
159+
close(exitChan)
160+
}()
161+
162+
// Start server
163+
fmt.Printf("llama-swap listening on %s\n", *listenStr)
164+
go func() {
165+
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
166+
log.Fatalf("Fatal server error: %v\n", err)
167+
}
168+
}()
169+
170+
// Wait for exit signal
171+
<-exitChan
172+
}
173+
174+
func debounce(interval time.Duration, f func()) func() {
175+
var timer *time.Timer
176+
return func() {
177+
if timer != nil {
178+
timer.Stop()
194179
}
180+
timer = time.AfterFunc(interval, f)
195181
}
196182
}

proxy/events.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package proxy
2+
3+
// package level registry of the different event types
4+
5+
const ProcessStateChangeEventID = 0x01
6+
const ChatCompletionStatsEventID = 0x02
7+
const ConfigFileChangedEventID = 0x03
8+
const LogDataEventID = 0x04
9+
10+
type ProcessStateChangeEvent struct {
11+
ProcessName string
12+
NewState ProcessState
13+
OldState ProcessState
14+
}
15+
16+
func (e ProcessStateChangeEvent) Type() uint32 {
17+
return ProcessStateChangeEventID
18+
}
19+
20+
type ChatCompletionStats struct {
21+
TokensGenerated int
22+
}
23+
24+
func (e ChatCompletionStats) Type() uint32 {
25+
return ChatCompletionStatsEventID
26+
}
27+
28+
type ReloadingState int
29+
30+
const (
31+
ReloadingStateStart ReloadingState = iota
32+
ReloadingStateEnd
33+
)
34+
35+
type ConfigFileChangedEvent struct {
36+
ReloadingState ReloadingState
37+
}
38+
39+
func (e ConfigFileChangedEvent) Type() uint32 {
40+
return ConfigFileChangedEventID
41+
}
42+
43+
type LogDataEvent struct {
44+
Data []byte
45+
}
46+
47+
func (e LogDataEvent) Type() uint32 {
48+
return LogDataEventID
49+
}

proxy/logMonitor.go

Lines changed: 14 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@ package proxy
22

33
import (
44
"container/ring"
5+
"context"
56
"fmt"
67
"io"
78
"os"
89
"sync"
10+
11+
"github.com/kelindar/event"
912
)
1013

1114
type LogLevel int
@@ -18,7 +21,7 @@ const (
1821
)
1922

2023
type LogMonitor struct {
21-
clients map[chan []byte]bool
24+
eventbus *event.Dispatcher
2225
mu sync.RWMutex
2326
buffer *ring.Ring
2427
bufferMu sync.RWMutex
@@ -37,11 +40,11 @@ func NewLogMonitor() *LogMonitor {
3740

3841
func NewLogMonitorWriter(stdout io.Writer) *LogMonitor {
3942
return &LogMonitor{
40-
clients: make(map[chan []byte]bool),
41-
buffer: ring.New(10 * 1024), // keep 10KB of buffered logs
42-
stdout: stdout,
43-
level: LevelInfo,
44-
prefix: "",
43+
eventbus: event.NewDispatcher(),
44+
buffer: ring.New(10 * 1024), // keep 10KB of buffered logs
45+
stdout: stdout,
46+
level: LevelInfo,
47+
prefix: "",
4548
}
4649
}
4750

@@ -81,34 +84,14 @@ func (w *LogMonitor) GetHistory() []byte {
8184
return history
8285
}
8386

84-
func (w *LogMonitor) Subscribe() chan []byte {
85-
w.mu.Lock()
86-
defer w.mu.Unlock()
87-
88-
ch := make(chan []byte, 100)
89-
w.clients[ch] = true
90-
return ch
91-
}
92-
93-
func (w *LogMonitor) Unsubscribe(ch chan []byte) {
94-
w.mu.Lock()
95-
defer w.mu.Unlock()
96-
97-
delete(w.clients, ch)
98-
close(ch)
87+
func (w *LogMonitor) OnLogData(callback func(data []byte)) context.CancelFunc {
88+
return event.Subscribe(w.eventbus, func(e LogDataEvent) {
89+
callback(e.Data)
90+
})
9991
}
10092

10193
func (w *LogMonitor) broadcast(msg []byte) {
102-
w.mu.RLock()
103-
defer w.mu.RUnlock()
104-
105-
for client := range w.clients {
106-
select {
107-
case client <- msg:
108-
default:
109-
// If client buffer is full, skip
110-
}
111-
}
94+
event.Publish(w.eventbus, LogDataEvent{Data: msg})
11295
}
11396

11497
func (w *LogMonitor) SetPrefix(prefix string) {

0 commit comments

Comments
 (0)