Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 82 additions & 15 deletions apps/sim/app/api/wand-generate/route.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { unstable_noStore as noStore } from 'next/cache'
import { type NextRequest, NextResponse } from 'next/server'
import OpenAI, { AzureOpenAI } from 'openai'
import { env } from '@/lib/env'
import { createLogger } from '@/lib/logs/console/logger'

export const dynamic = 'force-dynamic'
export const runtime = 'edge'
export const maxDuration = 60

const logger = createLogger('WandGenerateAPI')
Expand Down Expand Up @@ -62,7 +62,6 @@ export async function POST(req: NextRequest) {
}

try {
noStore()
const body = (await req.json()) as RequestBody

const { prompt, systemPrompt, stream = false, history = [] } = body
Expand Down Expand Up @@ -107,14 +106,38 @@ export async function POST(req: NextRequest) {
`[${requestId}] Starting streaming request to ${useWandAzure ? 'Azure OpenAI' : 'OpenAI'}`
)

const streamCompletion = await client.chat.completions.create({
model: useWandAzure ? wandModelName : 'gpt-4o',
messages: messages,
temperature: 0.3,
max_tokens: 10000,
stream: true,
logger.info(
`[${requestId}] About to create stream with model: ${useWandAzure ? wandModelName : 'gpt-4o'}`
)

// Add AbortController with timeout
const abortController = new AbortController()
const timeoutId = setTimeout(() => {
abortController.abort('Stream timeout after 30 seconds')
}, 30000)

// Forward request abort signal if available
req.signal?.addEventListener('abort', () => {
abortController.abort('Request cancelled by client')
})

const streamCompletion = await client.chat.completions.create(
{
model: useWandAzure ? wandModelName : 'gpt-4o',
messages: messages,
temperature: 0.3,
max_tokens: 10000,
stream: true,
stream_options: { include_usage: true },
},
{
signal: abortController.signal, // Add AbortSignal
}
)

clearTimeout(timeoutId) // Clear timeout after successful creation
logger.info(`[${requestId}] Stream created successfully, starting reader pattern`)

logger.debug(`[${requestId}] Stream connection established successfully`)

return new Response(
Expand All @@ -123,27 +146,71 @@ export async function POST(req: NextRequest) {
const encoder = new TextEncoder()

try {
logger.info(`[${requestId}] Starting streaming with timeout protection`)
let chunkCount = 0
let hasUsageData = false

// Use for await with AbortController timeout protection
for await (const chunk of streamCompletion) {
const content = chunk.choices[0]?.delta?.content || ''
chunkCount++

if (chunkCount === 1) {
logger.info(`[${requestId}] Received first chunk via for await`)
}

// Process the chunk
const content = chunk.choices?.[0]?.delta?.content || ''
if (content) {
// Use SSE format identical to chat streaming
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ chunk: content })}\n\n`)
)
}

// Check for usage data
if (chunk.usage) {
hasUsageData = true
logger.info(
`[${requestId}] Received usage data: ${JSON.stringify(chunk.usage)}`
)
}

// Log every 5th chunk to avoid spam
if (chunkCount % 5 === 0) {
logger.debug(`[${requestId}] Processed ${chunkCount} chunks so far`)
}
}

logger.info(
`[${requestId}] Reader pattern completed. Total chunks: ${chunkCount}, Usage data received: ${hasUsageData}`
)

// Send completion signal in SSE format
logger.info(`[${requestId}] Sending completion signal`)
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`))

logger.info(`[${requestId}] Closing controller`)
controller.close()
logger.info(`[${requestId}] Wand generation streaming completed`)

logger.info(`[${requestId}] Wand generation streaming completed successfully`)
} catch (streamError: any) {
logger.error(`[${requestId}] Streaming error`, { error: streamError.message })
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ error: 'Streaming failed', done: true })}\n\n`
if (streamError.name === 'AbortError') {
logger.info(
`[${requestId}] Stream was aborted (timeout or cancel): ${streamError.message}`
)
)
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ error: 'Stream cancelled', done: true })}\n\n`
)
)
} else {
logger.error(`[${requestId}] Streaming error`, { error: streamError.message })
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ error: 'Streaming failed', done: true })}\n\n`
)
)
}
controller.close()
}
},
Expand Down