Skip to content

Commit e59957a

Browse files
committed
fix: improve streaming responses and tool message handling
- Add is_error field support for tool messages in Anthropic integration - Fix OpenAI content serialization for assistant and tool messages - Add StreamOptions to model parameters for better streaming control - Correct Anthropic streaming response format with proper usage tracking - Fix tool result handling to support error states and multiple tool results - Add stream index parameter to streaming response converters - Ensure proper content block handling across both providers
1 parent 74e2674 commit e59957a

File tree

6 files changed

+175
-77
lines changed

6 files changed

+175
-77
lines changed

core/providers/anthropic.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,10 @@ func prepareAnthropicChatRequest(messages []schemas.BifrostMessage, params *sche
515515
"tool_use_id": *msg.ToolMessage.ToolCallID,
516516
}
517517

518+
if msg.ToolMessage.IsError != nil {
519+
toolCallResult["is_error"] = *msg.ToolMessage.IsError
520+
}
521+
518522
var toolCallResultContent []map[string]interface{}
519523

520524
if msg.Content.ContentStr != nil {

core/providers/openai.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,12 +222,23 @@ func prepareOpenAIChatRequest(messages []schemas.BifrostMessage, params *schemas
222222
for _, msg := range messages {
223223
if msg.Role == schemas.ModelChatMessageRoleAssistant {
224224
assistantMessage := map[string]interface{}{
225-
"role": msg.Role,
226-
"content": msg.Content,
225+
"role": msg.Role,
227226
}
228227
if msg.AssistantMessage != nil && msg.AssistantMessage.ToolCalls != nil {
229228
assistantMessage["tool_calls"] = *msg.AssistantMessage.ToolCalls
230229
}
230+
if msg.Content.ContentStr != nil {
231+
assistantMessage["content"] = *msg.Content.ContentStr
232+
} else if msg.Content.ContentBlocks != nil && len(*msg.Content.ContentBlocks) > 0 {
233+
var sb strings.Builder
234+
for _, block := range *msg.Content.ContentBlocks {
235+
if block.Text != nil && *block.Text != "" {
236+
sb.WriteString(*block.Text)
237+
sb.WriteString(" ")
238+
}
239+
}
240+
assistantMessage["content"] = sb.String()
241+
}
231242
formattedMessages = append(formattedMessages, assistantMessage)
232243
} else {
233244
message := map[string]interface{}{
@@ -250,6 +261,24 @@ func prepareOpenAIChatRequest(messages []schemas.BifrostMessage, params *schemas
250261

251262
if msg.ToolMessage != nil && msg.ToolMessage.ToolCallID != nil {
252263
message["tool_call_id"] = *msg.ToolMessage.ToolCallID
264+
if msg.IsError != nil {
265+
message["is_error"] = *msg.IsError
266+
}
267+
268+
content := message["content"]
269+
if contentBlocks, ok := content.([]schemas.ContentBlock); ok {
270+
var sb strings.Builder
271+
for _, block := range contentBlocks {
272+
if block.Text != nil && *block.Text != "" {
273+
sb.WriteString(*block.Text)
274+
sb.WriteString(" ")
275+
} else if block.ImageURL != nil {
276+
sb.WriteString(block.ImageURL.URL)
277+
sb.WriteString(" ")
278+
}
279+
}
280+
message["content"] = sb.String()
281+
}
253282
}
254283

255284
formattedMessages = append(formattedMessages, message)

core/schemas/bifrost.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ const (
1010
DefaultInitialPoolSize = 100
1111
)
1212

13+
// StreamOptions represents the options for streaming requests.
14+
type StreamOptions struct {
15+
IncludeUsage bool `json:"include_usage"`
16+
}
17+
1318
// BifrostConfig represents the configuration for initializing a Bifrost instance.
1419
// It contains the necessary components for setting up the system including account details,
1520
// plugins, logging, and initial pool size.
@@ -161,19 +166,20 @@ type Fallback struct {
161166
// your request to the model. Bifrost follows a standard set of parameters which
162167
// mapped to the provider's parameters.
163168
type ModelParameters struct {
164-
ToolChoice *ToolChoice `json:"tool_choice,omitempty"` // Whether to call a tool
165-
Tools *[]Tool `json:"tools,omitempty"` // Tools to use
166-
Temperature *float64 `json:"temperature,omitempty"` // Controls randomness in the output
167-
TopP *float64 `json:"top_p,omitempty"` // Controls diversity via nucleus sampling
168-
TopK *int `json:"top_k,omitempty"` // Controls diversity via top-k sampling
169-
MaxTokens *int `json:"max_tokens,omitempty"` // Maximum number of tokens to generate
170-
StopSequences *[]string `json:"stop_sequences,omitempty"` // Sequences that stop generation
171-
PresencePenalty *float64 `json:"presence_penalty,omitempty"` // Penalizes repeated tokens
172-
FrequencyPenalty *float64 `json:"frequency_penalty,omitempty"` // Penalizes frequent tokens
173-
ParallelToolCalls *bool `json:"parallel_tool_calls,omitempty"` // Enables parallel tool calls
174-
EncodingFormat *string `json:"encoding_format,omitempty"` // Format for embedding output (e.g., "float", "base64")
175-
Dimensions *int `json:"dimensions,omitempty"` // Number of dimensions for embedding output
176-
User *string `json:"user,omitempty"` // User identifier for tracking
169+
ToolChoice *ToolChoice `json:"tool_choice,omitempty"` // Whether to call a tool
170+
Tools *[]Tool `json:"tools,omitempty"` // Tools to use
171+
Temperature *float64 `json:"temperature,omitempty"` // Controls randomness in the output
172+
TopP *float64 `json:"top_p,omitempty"` // Controls diversity via nucleus sampling
173+
TopK *int `json:"top_k,omitempty"` // Controls diversity via top-k sampling
174+
MaxTokens *int `json:"max_tokens,omitempty"` // Maximum number of tokens to generate
175+
StopSequences *[]string `json:"stop_sequences,omitempty"` // Sequences that stop generation
176+
PresencePenalty *float64 `json:"presence_penalty,omitempty"` // Penalizes repeated tokens
177+
FrequencyPenalty *float64 `json:"frequency_penalty,omitempty"` // Penalizes frequent tokens
178+
ParallelToolCalls *bool `json:"parallel_tool_calls,omitempty"` // Enables parallel tool calls
179+
EncodingFormat *string `json:"encoding_format,omitempty"` // Format for embedding output (e.g., "float", "base64")
180+
Dimensions *int `json:"dimensions,omitempty"` // Number of dimensions for embedding output
181+
User *string `json:"user,omitempty"` // User identifier for tracking
182+
StreamOptions *StreamOptions `json:"stream_options,omitempty"` // Stream options for streaming requests
177183
// Dynamic parameters that can be provider-specific, they are directly
178184
// added to the request as is.
179185
ExtraParams map[string]interface{} `json:"-"`
@@ -351,6 +357,7 @@ type ContentBlock struct {
351357
// ToolMessage represents a message from a tool
352358
type ToolMessage struct {
353359
ToolCallID *string `json:"tool_call_id,omitempty"`
360+
IsError *bool `json:"is_error,omitempty"`
354361
}
355362

356363
// AssistantMessage represents a message from an assistant

transports/bifrost-http/integrations/anthropic/types.go

Lines changed: 114 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package anthropic
33
import (
44
"encoding/json"
55
"fmt"
6-
"strings"
76

87
bifrost "github.com/maximhq/bifrost/core"
98
"github.com/maximhq/bifrost/core/schemas"
@@ -21,6 +20,7 @@ type AnthropicContentBlock struct {
2120
Name *string `json:"name,omitempty"` // For tool_use content
2221
Input interface{} `json:"input,omitempty"` // For tool_use content
2322
Content AnthropicContent `json:"content,omitempty"` // For tool_result content
23+
IsError *bool `json:"is_error,omitempty"` // For tool_result content
2424
Source *AnthropicImageSource `json:"source,omitempty"` // For image content
2525
}
2626

@@ -63,17 +63,18 @@ type AnthropicToolChoice struct {
6363

6464
// AnthropicMessageRequest represents an Anthropic messages API request
6565
type AnthropicMessageRequest struct {
66-
Model string `json:"model"`
67-
MaxTokens int `json:"max_tokens"`
68-
Messages []AnthropicMessage `json:"messages"`
69-
System *AnthropicContent `json:"system,omitempty"`
70-
Temperature *float64 `json:"temperature,omitempty"`
71-
TopP *float64 `json:"top_p,omitempty"`
72-
TopK *int `json:"top_k,omitempty"`
73-
StopSequences *[]string `json:"stop_sequences,omitempty"`
74-
Stream *bool `json:"stream,omitempty"`
75-
Tools *[]AnthropicTool `json:"tools,omitempty"`
76-
ToolChoice *AnthropicToolChoice `json:"tool_choice,omitempty"`
66+
Model string `json:"model"`
67+
MaxTokens int `json:"max_tokens"`
68+
Messages []AnthropicMessage `json:"messages"`
69+
System *AnthropicContent `json:"system,omitempty"`
70+
Temperature *float64 `json:"temperature,omitempty"`
71+
TopP *float64 `json:"top_p,omitempty"`
72+
TopK *int `json:"top_k,omitempty"`
73+
StopSequences *[]string `json:"stop_sequences,omitempty"`
74+
Stream *bool `json:"stream,omitempty"`
75+
StreamOptions *schemas.StreamOptions `json:"stream_options,omitempty"`
76+
Tools *[]AnthropicTool `json:"tools,omitempty"`
77+
ToolChoice *AnthropicToolChoice `json:"tool_choice,omitempty"`
7778
}
7879

7980
// IsStreamingRequested implements the StreamingRequest interface
@@ -126,6 +127,23 @@ type AnthropicStreamResponse struct {
126127
Usage *AnthropicUsage `json:"usage,omitempty"`
127128
}
128129

130+
func (s *AnthropicStreamResponse) ToSSE() string {
131+
jsonData, err := json.Marshal(s)
132+
if err != nil {
133+
return "event: error\ndata: {\"type\": \"error\", \"error\": {\"type\": \"internal_error\", \"message\": \"Failed to marshal stream response\"}}\n\n"
134+
}
135+
return fmt.Sprintf("event: %s\ndata: %s\n\n", s.Type, string(jsonData))
136+
}
137+
138+
func (s *AnthropicStreamResponse) SetModel(model string) {
139+
if s.Model != nil {
140+
*s.Model = model
141+
}
142+
if s.Message != nil && s.Message.Model != "" {
143+
s.Message.Model = model
144+
}
145+
}
146+
129147
// AnthropicStreamMessage represents the message structure in streaming events
130148
type AnthropicStreamMessage struct {
131149
ID string `json:"id"`
@@ -238,6 +256,8 @@ func (r *AnthropicMessageRequest) ConvertToBifrostRequest() *schemas.BifrostRequ
238256
var toolCalls []schemas.ToolCall
239257
var contentBlocks []schemas.ContentBlock
240258

259+
skipAppendMessage := false
260+
241261
for _, content := range *msg.Content.ContentBlocks {
242262
switch content.Type {
243263
case "text":
@@ -281,48 +301,69 @@ func (r *AnthropicMessageRequest) ConvertToBifrostRequest() *schemas.BifrostRequ
281301
toolCalls = append(toolCalls, tc)
282302
}
283303
case "tool_result":
284-
if content.ToolUseID != nil {
285-
bifrostMsg.ToolMessage = &schemas.ToolMessage{
286-
ToolCallID: content.ToolUseID,
287-
}
288-
if content.Content.ContentStr != nil {
289-
contentBlocks = append(contentBlocks, schemas.ContentBlock{
290-
Type: schemas.ContentBlockTypeText,
291-
Text: content.Content.ContentStr,
292-
})
293-
} else if content.Content.ContentBlocks != nil {
294-
for _, block := range *content.Content.ContentBlocks {
295-
if block.Text != nil {
296-
contentBlocks = append(contentBlocks, schemas.ContentBlock{
297-
Type: schemas.ContentBlockTypeText,
298-
Text: block.Text,
299-
})
300-
} else if block.Source != nil {
301-
contentBlocks = append(contentBlocks, schemas.ContentBlock{
302-
Type: schemas.ContentBlockTypeImage,
303-
ImageURL: &schemas.ImageURLStruct{
304-
URL: func() string {
305-
if block.Source.Data != nil {
306-
mime := "image/png"
307-
if block.Source.MediaType != nil && *block.Source.MediaType != "" {
308-
mime = *block.Source.MediaType
309-
}
310-
return "data:" + mime + ";base64," + *block.Source.Data
311-
}
312-
if block.Source.URL != nil {
313-
return *block.Source.URL
304+
if content.ToolUseID == nil || *content.ToolUseID == "" {
305+
continue
306+
}
307+
308+
skipAppendMessage = true
309+
310+
bifrostMsg.Role = schemas.ModelChatMessageRoleTool
311+
bifrostMsg.ToolMessage = &schemas.ToolMessage{
312+
ToolCallID: content.ToolUseID,
313+
IsError: content.IsError,
314+
}
315+
if content.Content.ContentStr != nil {
316+
contentBlocks = append(contentBlocks, schemas.ContentBlock{
317+
Type: schemas.ContentBlockTypeText,
318+
Text: content.Content.ContentStr,
319+
})
320+
} else if content.Content.ContentBlocks != nil {
321+
for _, block := range *content.Content.ContentBlocks {
322+
if block.Text != nil {
323+
contentBlocks = append(contentBlocks, schemas.ContentBlock{
324+
Type: schemas.ContentBlockTypeText,
325+
Text: block.Text,
326+
})
327+
} else if block.Source != nil {
328+
contentBlocks = append(contentBlocks, schemas.ContentBlock{
329+
Type: schemas.ContentBlockTypeImage,
330+
ImageURL: &schemas.ImageURLStruct{
331+
URL: func() string {
332+
if block.Source.Data != nil {
333+
mime := "image/png"
334+
if block.Source.MediaType != nil && *block.Source.MediaType != "" {
335+
mime = *block.Source.MediaType
314336
}
315-
return ""
316-
}()},
317-
})
318-
}
337+
return "data:" + mime + ";base64," + *block.Source.Data
338+
}
339+
if block.Source.URL != nil {
340+
return *block.Source.URL
341+
}
342+
return ""
343+
}()},
344+
})
319345
}
320346
}
321-
bifrostMsg.Role = schemas.ModelChatMessageRoleTool
322347
}
348+
349+
if len(contentBlocks) > 0 {
350+
blocks := make([]schemas.ContentBlock, len(contentBlocks))
351+
copy(blocks, contentBlocks)
352+
bifrostMsg.Content = schemas.MessageContent{
353+
ContentBlocks: &blocks,
354+
}
355+
messages = append(messages, bifrostMsg)
356+
bifrostMsg = schemas.BifrostMessage{}
357+
contentBlocks = contentBlocks[:0]
358+
}
359+
continue
323360
}
324361
}
325362

363+
if skipAppendMessage {
364+
continue
365+
}
366+
326367
// Concatenate all text contents
327368
if len(contentBlocks) > 0 {
328369
bifrostMsg.Content = schemas.MessageContent{
@@ -360,6 +401,9 @@ func (r *AnthropicMessageRequest) ConvertToBifrostRequest() *schemas.BifrostRequ
360401
if r.StopSequences != nil {
361402
params.StopSequences = r.StopSequences
362403
}
404+
if r.StreamOptions != nil {
405+
params.StreamOptions = r.StreamOptions
406+
}
363407

364408
bifrostReq.Params = params
365409
}
@@ -450,6 +494,9 @@ func DeriveAnthropicFromBifrostResponse(bifrostResp *schemas.BifrostResponse) *A
450494
InputTokens: bifrostResp.Usage.PromptTokens,
451495
OutputTokens: bifrostResp.Usage.CompletionTokens,
452496
}
497+
if bifrostResp.Usage.TokenDetails != nil {
498+
anthropicResp.Usage.CacheReadInputTokens = bifrostResp.Usage.TokenDetails.CachedTokens
499+
}
453500
}
454501

455502
// Convert choices to content
@@ -521,9 +568,9 @@ func DeriveAnthropicFromBifrostResponse(bifrostResp *schemas.BifrostResponse) *A
521568
}
522569

523570
// DeriveAnthropicStreamFromBifrostResponse converts a Bifrost streaming response to Anthropic SSE string format
524-
func DeriveAnthropicStreamFromBifrostResponse(bifrostResp *schemas.BifrostResponse, streamIndex int) string {
571+
func DeriveAnthropicStreamFromBifrostResponse(bifrostResp *schemas.BifrostResponse, streamIndex int) []*AnthropicStreamResponse {
525572
if bifrostResp == nil {
526-
return ""
573+
return nil
527574
}
528575

529576
var streamRespList []*AnthropicStreamResponse
@@ -550,6 +597,13 @@ func DeriveAnthropicStreamFromBifrostResponse(bifrostResp *schemas.BifrostRespon
550597
if bifrostResp.Usage.TokenDetails != nil {
551598
usage.CacheReadInputTokens = bifrostResp.Usage.TokenDetails.CachedTokens
552599
}
600+
} else {
601+
// Default to 1 token for input and output, e.g. for DeepSeek api.
602+
// Return the actual usage in the final message delta.
603+
usage = &AnthropicUsage{
604+
InputTokens: 1,
605+
OutputTokens: 1,
606+
}
553607
}
554608
streamResp.Type = "message_start"
555609
streamResp.Message = &AnthropicStreamMessage{
@@ -625,14 +679,19 @@ func DeriveAnthropicStreamFromBifrostResponse(bifrostResp *schemas.BifrostRespon
625679
})
626680

627681
// Handle message delta
682+
usage := &AnthropicUsage{
683+
OutputTokens: bifrostResp.Usage.CompletionTokens,
684+
InputTokens: bifrostResp.Usage.PromptTokens,
685+
}
686+
if bifrostResp.Usage.TokenDetails != nil {
687+
usage.CacheReadInputTokens = bifrostResp.Usage.TokenDetails.CachedTokens
688+
}
628689
streamResp = &AnthropicStreamResponse{
629690
Type: "message_delta",
630691
Delta: &AnthropicStreamDelta{
631692
StopReason: choice.FinishReason,
632693
},
633-
Usage: &AnthropicUsage{
634-
OutputTokens: bifrostResp.Usage.CompletionTokens,
635-
},
694+
Usage: usage,
636695
}
637696
}
638697

@@ -668,24 +727,17 @@ func DeriveAnthropicStreamFromBifrostResponse(bifrostResp *schemas.BifrostRespon
668727

669728
}
670729

671-
var sb strings.Builder
730+
result := make([]*AnthropicStreamResponse, 0, len(streamRespList))
672731
for _, streamResp := range streamRespList {
673732
// Ignore empty stream responses
674733
if streamResp.Type == "" {
675734
continue
676735
}
677736

678-
// Marshal to JSON and format as SSE
679-
jsonData, err := json.Marshal(streamResp)
680-
if err != nil {
681-
return ""
682-
}
683-
684-
// Format as Anthropic SSE
685-
sb.WriteString(fmt.Sprintf("event: %s\ndata: %s\n\n", streamResp.Type, jsonData))
737+
result = append(result, streamResp)
686738
}
687739

688-
return sb.String()
740+
return result
689741
}
690742

691743
// DeriveAnthropicErrorFromBifrostError derives a AnthropicMessageError from a BifrostError

0 commit comments

Comments
 (0)