Skip to content

Commit 9d790b6

Browse files
waleedlatif1waleedlatif
andcommitted
fix(deployed-chat): allow non-streaming responses in deployed chat, allow partial failure responses in deployed chat (#833)
* fix(deployed-chat): allow non-streaming responses in deployed chat, allow partial failure responses in deployed chat * fix(csp): runtime variable resolution for CSP * cleanup --------- Co-authored-by: waleedlatif <waleedlatif@waleedlatifs-MacBook-Pro.local>
1 parent 6db7c0d commit 9d790b6

File tree

10 files changed

+446
-47
lines changed

10 files changed

+446
-47
lines changed

apps/sim/app/api/chat/utils.test.ts

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,4 +350,77 @@ describe('Chat API Utils', () => {
350350
expect(result3.error).toBe('Email not authorized')
351351
})
352352
})
353+
354+
describe('Execution Result Processing', () => {
355+
it('should process logs regardless of overall success status', () => {
356+
// Test that logs are processed even when overall execution fails
357+
// This is key for partial success scenarios
358+
const executionResult = {
359+
success: false, // Overall execution failed
360+
output: {},
361+
logs: [
362+
{
363+
blockId: 'agent1',
364+
startedAt: '2023-01-01T00:00:00Z',
365+
endedAt: '2023-01-01T00:00:01Z',
366+
durationMs: 1000,
367+
success: true,
368+
output: { content: 'Agent 1 succeeded' },
369+
error: undefined,
370+
},
371+
{
372+
blockId: 'agent2',
373+
startedAt: '2023-01-01T00:00:00Z',
374+
endedAt: '2023-01-01T00:00:01Z',
375+
durationMs: 500,
376+
success: false,
377+
output: null,
378+
error: 'Agent 2 failed',
379+
},
380+
],
381+
metadata: { duration: 1000 },
382+
}
383+
384+
// Test the key logic: logs should be processed regardless of overall success
385+
expect(executionResult.success).toBe(false)
386+
expect(executionResult.logs).toBeDefined()
387+
expect(executionResult.logs).toHaveLength(2)
388+
389+
// First log should be successful
390+
expect(executionResult.logs[0].success).toBe(true)
391+
expect(executionResult.logs[0].output?.content).toBe('Agent 1 succeeded')
392+
393+
// Second log should be failed
394+
expect(executionResult.logs[1].success).toBe(false)
395+
expect(executionResult.logs[1].error).toBe('Agent 2 failed')
396+
})
397+
398+
it('should handle ExecutionResult vs StreamingExecution types correctly', () => {
399+
const executionResult = {
400+
success: true,
401+
output: { content: 'test' },
402+
logs: [],
403+
metadata: { duration: 100 },
404+
}
405+
406+
// Test direct ExecutionResult
407+
const directResult = executionResult
408+
const extractedDirect = directResult
409+
expect(extractedDirect).toBe(executionResult)
410+
411+
// Test StreamingExecution with embedded ExecutionResult
412+
const streamingResult = {
413+
stream: new ReadableStream(),
414+
execution: executionResult,
415+
}
416+
417+
// Simulate the type extraction logic from executeWorkflowForChat
418+
const extractedFromStreaming =
419+
streamingResult && typeof streamingResult === 'object' && 'execution' in streamingResult
420+
? streamingResult.execution
421+
: streamingResult
422+
423+
expect(extractedFromStreaming).toBe(executionResult)
424+
})
425+
})
353426
})

apps/sim/app/api/chat/utils.ts

Lines changed: 122 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import { getBlock } from '@/blocks'
1414
import { db } from '@/db'
1515
import { chat, environment as envTable, userStats, workflow } from '@/db/schema'
1616
import { Executor } from '@/executor'
17-
import type { BlockLog } from '@/executor/types'
17+
import type { BlockLog, ExecutionResult } from '@/executor/types'
1818
import { Serializer } from '@/serializer'
1919
import { mergeSubblockState } from '@/stores/workflows/server-utils'
2020
import type { WorkflowState } from '@/stores/workflows/workflow/types'
@@ -549,6 +549,7 @@ export async function executeWorkflowForChat(
549549
async start(controller) {
550550
const encoder = new TextEncoder()
551551
const streamedContent = new Map<string, string>()
552+
const streamedBlocks = new Set<string>() // Track which blocks have started streaming
552553

553554
const onStream = async (streamingExecution: any): Promise<void> => {
554555
if (!streamingExecution.stream) return
@@ -557,6 +558,15 @@ export async function executeWorkflowForChat(
557558
const reader = streamingExecution.stream.getReader()
558559
if (blockId) {
559560
streamedContent.set(blockId, '')
561+
562+
// Add separator if this is not the first block to stream
563+
if (streamedBlocks.size > 0) {
564+
// Send separator before the new block starts
565+
controller.enqueue(
566+
encoder.encode(`data: ${JSON.stringify({ blockId, chunk: '\n\n' })}\n\n`)
567+
)
568+
}
569+
streamedBlocks.add(blockId)
560570
}
561571
try {
562572
while (true) {
@@ -615,25 +625,117 @@ export async function executeWorkflowForChat(
615625
throw error
616626
}
617627

618-
if (result && 'success' in result) {
619-
// Update streamed content and apply tokenization
620-
if (result.logs) {
621-
result.logs.forEach((log: BlockLog) => {
622-
if (streamedContent.has(log.blockId)) {
623-
const content = streamedContent.get(log.blockId)
624-
if (log.output) {
625-
log.output.content = content
628+
// Handle both ExecutionResult and StreamingExecution types
629+
const executionResult =
630+
result && typeof result === 'object' && 'execution' in result
631+
? (result.execution as ExecutionResult)
632+
: (result as ExecutionResult)
633+
634+
if (executionResult?.logs) {
635+
// Update streamed content and apply tokenization - process regardless of overall success
636+
// This ensures partial successes (some agents succeed, some fail) still return results
637+
638+
// Add newlines between different agent outputs for better readability
639+
const processedOutputs = new Set<string>()
640+
executionResult.logs.forEach((log: BlockLog) => {
641+
if (streamedContent.has(log.blockId)) {
642+
const content = streamedContent.get(log.blockId)
643+
if (log.output && content) {
644+
// Add newline separation between different outputs (but not before the first one)
645+
const separator = processedOutputs.size > 0 ? '\n\n' : ''
646+
log.output.content = separator + content
647+
processedOutputs.add(log.blockId)
648+
}
649+
}
650+
})
651+
652+
// Also process non-streamed outputs from selected blocks (like function blocks)
653+
// This uses the same logic as the chat panel to ensure identical behavior
654+
const nonStreamingLogs = executionResult.logs.filter(
655+
(log: BlockLog) => !streamedContent.has(log.blockId)
656+
)
657+
658+
// Extract the exact same functions used by the chat panel
659+
const extractBlockIdFromOutputId = (outputId: string): string => {
660+
return outputId.includes('_') ? outputId.split('_')[0] : outputId.split('.')[0]
661+
}
662+
663+
const extractPathFromOutputId = (outputId: string, blockId: string): string => {
664+
return outputId.substring(blockId.length + 1)
665+
}
666+
667+
const parseOutputContentSafely = (output: any): any => {
668+
if (!output?.content) {
669+
return output
670+
}
671+
672+
if (typeof output.content === 'string') {
673+
try {
674+
return JSON.parse(output.content)
675+
} catch (e) {
676+
// Fallback to original structure if parsing fails
677+
return output
678+
}
679+
}
680+
681+
return output
682+
}
683+
684+
// Filter outputs that have matching logs (exactly like chat panel)
685+
const outputsToRender = selectedOutputIds.filter((outputId) => {
686+
const blockIdForOutput = extractBlockIdFromOutputId(outputId)
687+
return nonStreamingLogs.some((log) => log.blockId === blockIdForOutput)
688+
})
689+
690+
// Process each selected output (exactly like chat panel)
691+
for (const outputId of outputsToRender) {
692+
const blockIdForOutput = extractBlockIdFromOutputId(outputId)
693+
const path = extractPathFromOutputId(outputId, blockIdForOutput)
694+
const log = nonStreamingLogs.find((l) => l.blockId === blockIdForOutput)
695+
696+
if (log) {
697+
let outputValue: any = log.output
698+
699+
if (path) {
700+
// Parse JSON content safely (exactly like chat panel)
701+
outputValue = parseOutputContentSafely(outputValue)
702+
703+
const pathParts = path.split('.')
704+
for (const part of pathParts) {
705+
if (outputValue && typeof outputValue === 'object' && part in outputValue) {
706+
outputValue = outputValue[part]
707+
} else {
708+
outputValue = undefined
709+
break
710+
}
626711
}
627712
}
628-
})
629713

630-
// Process all logs for streaming tokenization
631-
const processedCount = processStreamingBlockLogs(result.logs, streamedContent)
632-
logger.info(`[CHAT-API] Processed ${processedCount} blocks for streaming tokenization`)
714+
if (outputValue !== undefined) {
715+
// Add newline separation between different outputs
716+
const separator = processedOutputs.size > 0 ? '\n\n' : ''
717+
718+
// Format the output exactly like the chat panel
719+
const formattedOutput =
720+
typeof outputValue === 'string' ? outputValue : JSON.stringify(outputValue, null, 2)
721+
722+
// Update the log content
723+
if (!log.output.content) {
724+
log.output.content = separator + formattedOutput
725+
} else {
726+
log.output.content = separator + formattedOutput
727+
}
728+
processedOutputs.add(log.blockId)
729+
}
730+
}
633731
}
634732

635-
const { traceSpans, totalDuration } = buildTraceSpans(result)
636-
const enrichedResult = { ...result, traceSpans, totalDuration }
733+
// Process all logs for streaming tokenization
734+
const processedCount = processStreamingBlockLogs(executionResult.logs, streamedContent)
735+
logger.info(`Processed ${processedCount} blocks for streaming tokenization`)
736+
737+
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
738+
const enrichedResult = { ...executionResult, traceSpans, totalDuration }
637739
if (conversationId) {
638740
if (!enrichedResult.metadata) {
639741
enrichedResult.metadata = {
@@ -646,7 +748,7 @@ export async function executeWorkflowForChat(
646748
const executionId = uuidv4()
647749
logger.debug(`Generated execution ID for deployed chat: ${executionId}`)
648750

649-
if (result.success) {
751+
if (executionResult.success) {
650752
try {
651753
await db
652754
.update(userStats)
@@ -669,12 +771,12 @@ export async function executeWorkflowForChat(
669771
}
670772

671773
// Complete logging session (for both success and failure)
672-
if (result && 'success' in result) {
673-
const { traceSpans } = buildTraceSpans(result)
774+
if (executionResult?.logs) {
775+
const { traceSpans } = buildTraceSpans(executionResult)
674776
await loggingSession.safeComplete({
675777
endedAt: new Date().toISOString(),
676-
totalDurationMs: result.metadata?.duration || 0,
677-
finalOutput: result.output,
778+
totalDurationMs: executionResult.metadata?.duration || 0,
779+
finalOutput: executionResult.output,
678780
traceSpans,
679781
})
680782
}

apps/sim/app/chat/[subdomain]/hooks/use-chat-streaming.ts

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import { useRef, useState } from 'react'
44
import { createLogger } from '@/lib/logs/console/logger'
55
import type { ChatMessage } from '@/app/chat/[subdomain]/components/message/message'
6+
// No longer need complex output extraction - backend handles this
7+
import type { ExecutionResult } from '@/executor/types'
68

79
const logger = createLogger('UseChatStreaming')
810

@@ -96,6 +98,8 @@ export function useChatStreaming() {
9698
let accumulatedText = ''
9799
let lastAudioPosition = 0
98100

101+
// Track which blocks have streamed content (like chat panel)
102+
const messageIdMap = new Map<string, string>()
99103
const messageId = crypto.randomUUID()
100104
setMessages((prev) => [
101105
...prev,
@@ -148,13 +152,49 @@ export function useChatStreaming() {
148152
const { blockId, chunk: contentChunk, event: eventType } = json
149153

150154
if (eventType === 'final' && json.data) {
155+
// The backend has already processed and combined all outputs
156+
// We just need to extract the combined content and use it
157+
const result = json.data as ExecutionResult
158+
159+
// Collect all content from logs that have output.content (backend processed)
160+
let combinedContent = ''
161+
if (result.logs) {
162+
const contentParts: string[] = []
163+
164+
// Get content from all logs that have processed content
165+
result.logs.forEach((log) => {
166+
if (log.output?.content && typeof log.output.content === 'string') {
167+
// The backend already includes proper separators, so just collect the content
168+
contentParts.push(log.output.content)
169+
}
170+
})
171+
172+
// Join without additional separators since backend already handles this
173+
combinedContent = contentParts.join('')
174+
}
175+
176+
// Update the existing streaming message with the final combined content
151177
setMessages((prev) =>
152-
prev.map((msg) => (msg.id === messageId ? { ...msg, isStreaming: false } : msg))
178+
prev.map((msg) =>
179+
msg.id === messageId
180+
? {
181+
...msg,
182+
content: combinedContent || accumulatedText, // Use combined content or fallback to streamed
183+
isStreaming: false,
184+
}
185+
: msg
186+
)
153187
)
188+
154189
return
155190
}
156191

157192
if (blockId && contentChunk) {
193+
// Track that this block has streamed content (like chat panel)
194+
if (!messageIdMap.has(blockId)) {
195+
messageIdMap.set(blockId, messageId)
196+
}
197+
158198
accumulatedText += contentChunk
159199
setMessages((prev) =>
160200
prev.map((msg) =>

0 commit comments

Comments
 (0)