@@ -294,9 +294,44 @@ func (bifrost *Bifrost) TranscriptionStreamRequest(ctx context.Context, req *sch
294
294
return bifrost .handleStreamRequest (ctx , req , schemas .TranscriptionStreamRequest )
295
295
}
296
296
297
+ // RemovePlugin removes a plugin from the server.
298
+ func (bifrost * Bifrost ) RemovePlugin (name string ) error {
299
+ var pluginToCleanup schemas.Plugin
300
+ for {
301
+ oldPlugins := bifrost .plugins .Load ()
302
+ if oldPlugins == nil {
303
+ return nil
304
+ }
305
+ // Create new slice with replaced plugin
306
+ newPlugins := make ([]schemas.Plugin , len (* oldPlugins ))
307
+ copy (newPlugins , * oldPlugins )
308
+ for i , p := range newPlugins {
309
+ if p .GetName () == name {
310
+ pluginToCleanup = p
311
+ bifrost .logger .Debug ("removing plugin %s" , name )
312
+ newPlugins = append (newPlugins [:i ], newPlugins [i + 1 :]... )
313
+ break
314
+ }
315
+ }
316
+ // Atomic compare-and-swap
317
+ if bifrost .plugins .CompareAndSwap (oldPlugins , & newPlugins ) {
318
+ // Cleanup the old plugin
319
+ if pluginToCleanup != nil {
320
+ err := pluginToCleanup .Cleanup ()
321
+ if err != nil {
322
+ bifrost .logger .Warn ("failed to cleanup old plugin %s: %v" , pluginToCleanup .GetName (), err )
323
+ }
324
+ }
325
+ return nil
326
+ }
327
+ // Retrying as swapping did not work
328
+ }
329
+ }
330
+
297
331
// ReloadPlugin reloads a plugin with new instance
298
332
// During the reload - it's stop the world phase where we take a global lock on the plugin mutex
299
- func (bifrost * Bifrost ) ReloadPlugin (plugin schemas.Plugin ) error {
333
+ func (bifrost * Bifrost ) ReloadPlugin (plugin schemas.Plugin ) error {
334
+ var pluginToCleanup schemas.Plugin
300
335
for {
301
336
oldPlugins := bifrost .plugins .Load ()
302
337
if oldPlugins == nil {
@@ -308,22 +343,32 @@ func (bifrost *Bifrost) ReloadPlugin(plugin schemas.Plugin) error {
308
343
found := false
309
344
for i , p := range newPlugins {
310
345
if p .GetName () == plugin .GetName () {
346
+ // Cleaning up old plugin before replacing it
347
+ pluginToCleanup = p
348
+ bifrost .logger .Debug ("replacing plugin %s with new instance" , plugin .GetName ())
311
349
newPlugins [i ] = plugin
312
350
found = true
313
351
break
314
352
}
315
353
}
316
- if ! found {
354
+ if ! found {
317
355
// This means that user is adding a new plugin
356
+ bifrost .logger .Debug ("adding new plugin %s" , plugin .GetName ())
318
357
newPlugins = append (newPlugins , plugin )
319
358
}
320
359
// Atomic compare-and-swap
321
360
if bifrost .plugins .CompareAndSwap (oldPlugins , & newPlugins ) {
361
+ // Cleanup the old plugin
362
+ if pluginToCleanup != nil {
363
+ err := pluginToCleanup .Cleanup ()
364
+ if err != nil {
365
+ bifrost .logger .Warn ("failed to cleanup old plugin %s: %v" , pluginToCleanup .GetName (), err )
366
+ }
367
+ }
322
368
return nil
323
369
}
324
370
// Retrying as swapping did not work
325
371
}
326
-
327
372
}
328
373
329
374
// UpdateProviderConcurrency dynamically updates the queue size and concurrency for an existing provider.
@@ -1023,7 +1068,7 @@ func (bifrost *Bifrost) tryRequest(req *schemas.BifrostRequest, ctx context.Cont
1023
1068
1024
1069
msg := bifrost .getChannelMessage (* preReq , requestType )
1025
1070
msg .Context = ctx
1026
-
1071
+ startTime := time . Now ()
1027
1072
select {
1028
1073
case queue <- * msg :
1029
1074
// Message was sent successfully
@@ -1049,6 +1094,10 @@ func (bifrost *Bifrost) tryRequest(req *schemas.BifrostRequest, ctx context.Cont
1049
1094
var resp * schemas.BifrostResponse
1050
1095
select {
1051
1096
case result = <- msg .Response :
1097
+ latency := time .Since (startTime ).Milliseconds ()
1098
+ if result .ExtraFields .Latency == nil {
1099
+ result .ExtraFields .Latency = & latency
1100
+ }
1052
1101
resp , bifrostErr := pipeline .RunPostHooks (& ctx , result , nil , len (* bifrost .plugins .Load ()))
1053
1102
if bifrostErr != nil {
1054
1103
bifrost .releaseChannelMessage (msg )
@@ -1172,7 +1221,7 @@ func (bifrost *Bifrost) tryStreamRequest(req *schemas.BifrostRequest, ctx contex
1172
1221
// Marking final chunk
1173
1222
ctx = context .WithValue (ctx , schemas .BifrostContextKeyStreamEndIndicator , true )
1174
1223
// On error we will complete post-hooks
1175
- recoveredResp , recoveredErr := pipeline .RunPostHooks (& ctx , nil , & bifrostErrVal , len (bifrost .plugins ))
1224
+ recoveredResp , recoveredErr := pipeline .RunPostHooks (& ctx , nil , & bifrostErrVal , len (* bifrost .plugins . Load () ))
1176
1225
bifrost .releaseChannelMessage (msg )
1177
1226
if recoveredErr != nil {
1178
1227
return nil , recoveredErr
@@ -1332,7 +1381,7 @@ func handleProviderRequest(provider schemas.Provider, req *ChannelMessage, key s
1332
1381
case schemas .TextCompletionRequest :
1333
1382
return provider .TextCompletion (req .Context , req .Model , key , * req .Input .TextCompletionInput , req .Params )
1334
1383
case schemas .ChatCompletionRequest :
1335
- return provider .ChatCompletion (req .Context , req .Model , key , * req .Input .ChatCompletionInput , req .Params )
1384
+ return provider .ChatCompletion (req .Context , req .Model , key , req .Input .ChatCompletionInput , req .Params )
1336
1385
case schemas .EmbeddingRequest :
1337
1386
return provider .Embedding (req .Context , req .Model , key , req .Input .EmbeddingInput , req .Params )
1338
1387
case schemas .SpeechRequest :
@@ -1353,7 +1402,7 @@ func handleProviderRequest(provider schemas.Provider, req *ChannelMessage, key s
1353
1402
func handleProviderStreamRequest (provider schemas.Provider , req * ChannelMessage , key schemas.Key , postHookRunner schemas.PostHookRunner , reqType schemas.RequestType ) (chan * schemas.BifrostStream , * schemas.BifrostError ) {
1354
1403
switch reqType {
1355
1404
case schemas .ChatCompletionStreamRequest :
1356
- return provider .ChatCompletionStream (req .Context , postHookRunner , req .Model , key , * req .Input .ChatCompletionInput , req .Params )
1405
+ return provider .ChatCompletionStream (req .Context , postHookRunner , req .Model , key , req .Input .ChatCompletionInput , req .Params )
1357
1406
case schemas .SpeechStreamRequest :
1358
1407
return provider .SpeechStream (req .Context , postHookRunner , req .Model , key , req .Input .SpeechInput , req .Params )
1359
1408
case schemas .TranscriptionStreamRequest :
@@ -1375,6 +1424,7 @@ func (p *PluginPipeline) RunPreHooks(ctx *context.Context, req *schemas.BifrostR
1375
1424
var shortCircuit * schemas.PluginShortCircuit
1376
1425
var err error
1377
1426
for i , plugin := range p .plugins {
1427
+ p .logger .Debug ("running pre-hook for plugin %s" , plugin .GetName ())
1378
1428
req , shortCircuit , err = plugin .PreHook (ctx , req )
1379
1429
if err != nil {
1380
1430
p .preHookErrors = append (p .preHookErrors , err )
@@ -1391,17 +1441,19 @@ func (p *PluginPipeline) RunPreHooks(ctx *context.Context, req *schemas.BifrostR
1391
1441
// RunPostHooks executes PostHooks in reverse order for the plugins whose PreHook ran.
1392
1442
// Accepts the response and error, and allows plugins to transform either (e.g., recover from error, or invalidate a response).
1393
1443
// Returns the final response and error after all hooks. If both are set, error takes precedence unless error is nil.
1394
- func (p * PluginPipeline ) RunPostHooks (ctx * context.Context , resp * schemas.BifrostResponse , bifrostErr * schemas.BifrostError , count int ) (* schemas.BifrostResponse , * schemas.BifrostError ) {
1444
+ // runFrom is the count of plugins whose PreHooks ran; PostHooks will run in reverse from index (runFrom - 1) down to 0
1445
+ func (p * PluginPipeline ) RunPostHooks (ctx * context.Context , resp * schemas.BifrostResponse , bifrostErr * schemas.BifrostError , runFrom int ) (* schemas.BifrostResponse , * schemas.BifrostError ) {
1395
1446
// Defensive: ensure count is within valid bounds
1396
- if count < 0 {
1397
- count = 0
1447
+ if runFrom < 0 {
1448
+ runFrom = 0
1398
1449
}
1399
- if count > len (p .plugins ) {
1400
- count = len (p .plugins )
1450
+ if runFrom > len (p .plugins ) {
1451
+ runFrom = len (p .plugins )
1401
1452
}
1402
1453
var err error
1403
- for i := count - 1 ; i >= 0 ; i -- {
1454
+ for i := runFrom - 1 ; i >= 0 ; i -- {
1404
1455
plugin := p .plugins [i ]
1456
+ p .logger .Debug ("running post-hook for plugin %s" , plugin .GetName ())
1405
1457
resp , bifrostErr , err = plugin .PostHook (ctx , resp , bifrostErr )
1406
1458
if err != nil {
1407
1459
p .postHookErrors = append (p .postHookErrors , err )
@@ -1618,4 +1670,5 @@ func (bifrost *Bifrost) Shutdown() {
1618
1670
bifrost .logger .Warn (fmt .Sprintf ("Error cleaning up plugin: %s" , err .Error ()))
1619
1671
}
1620
1672
}
1673
+ bifrost .logger .Info ("all request channels closed" )
1621
1674
}
0 commit comments