Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions apps/sim/app/api/files/presigned/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,9 @@ async function handleS3PresignedUrl(
)
}

// For chat images, use direct S3 URLs since they need to be permanently accessible
// For other files, use serve path for access control
// For chat images and knowledge base files, use direct URLs since they need to be accessible by external services
const finalPath =
uploadType === 'chat'
uploadType === 'chat' || uploadType === 'knowledge-base'
? `https://${config.bucket}.s3.${config.region}.amazonaws.com/${uniqueKey}`
: `/api/files/serve/s3/${encodeURIComponent(uniqueKey)}`

Expand Down
60 changes: 60 additions & 0 deletions apps/sim/background/knowledge-processing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { task } from '@trigger.dev/sdk'
import { processDocumentAsync } from '@/lib/knowledge/documents/service'
import { createLogger } from '@/lib/logs/console/logger'

const logger = createLogger('TriggerKnowledgeProcessing')

export type DocumentProcessingPayload = {
knowledgeBaseId: string
documentId: string
docData: {
filename: string
fileUrl: string
fileSize: number
mimeType: string
}
processingOptions: {
chunkSize?: number
minCharactersPerChunk?: number
recipe?: string
lang?: string
chunkOverlap?: number
}
requestId: string
}

export const processDocument = task({
id: 'knowledge-process-document',
maxDuration: 300,
retry: {
maxAttempts: 3,
factor: 2,
minTimeoutInMs: 1000,
maxTimeoutInMs: 10000,
},
queue: {
concurrencyLimit: 20,
name: 'document-processing-queue',
},
run: async (payload: DocumentProcessingPayload) => {
const { knowledgeBaseId, documentId, docData, processingOptions, requestId } = payload

logger.info(`[${requestId}] Starting Trigger.dev processing for document: ${docData.filename}`)

try {
await processDocumentAsync(knowledgeBaseId, documentId, docData, processingOptions)

logger.info(`[${requestId}] Successfully processed document: ${docData.filename}`)

return {
success: true,
documentId,
filename: docData.filename,
processingTime: Date.now(),
}
} catch (error) {
logger.error(`[${requestId}] Failed to process document: ${docData.filename}`, error)
throw error
}
},
})
109 changes: 102 additions & 7 deletions apps/sim/lib/knowledge/documents/service.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import crypto, { randomUUID } from 'crypto'
import { tasks } from '@trigger.dev/sdk'
import { and, asc, desc, eq, inArray, isNull, sql } from 'drizzle-orm'
import { getSlotsForFieldType, type TAG_SLOT_CONFIG } from '@/lib/constants/knowledge'
import { generateEmbeddings } from '@/lib/embeddings/utils'
import { env } from '@/lib/env'
import { processDocument } from '@/lib/knowledge/documents/document-processor'
import { getNextAvailableSlot } from '@/lib/knowledge/tags/service'
import { createLogger } from '@/lib/logs/console/logger'
import { getRedisClient } from '@/lib/redis'
import type { DocumentProcessingPayload } from '@/background/knowledge-processing'
import { db } from '@/db'
import { document, embedding, knowledgeBaseTagDefinitions } from '@/db/schema'
import { DocumentProcessingQueue } from './queue'
Expand Down Expand Up @@ -181,14 +184,55 @@ export async function processDocumentTags(
}

/**
* Process documents with Redis queue when available, fallback to concurrency control
* Process documents with best available method: Trigger.dev > Redis queue > in-memory concurrency control
*/
export async function processDocumentsWithQueue(
createdDocuments: DocumentData[],
knowledgeBaseId: string,
processingOptions: ProcessingOptions,
requestId: string
): Promise<void> {
// Priority 1: Trigger.dev
if (isTriggerAvailable()) {
try {
logger.info(
`[${requestId}] Using Trigger.dev background processing for ${createdDocuments.length} documents`
)

const triggerPayloads = createdDocuments.map((doc) => ({
knowledgeBaseId,
documentId: doc.documentId,
docData: {
filename: doc.filename,
fileUrl: doc.fileUrl,
fileSize: doc.fileSize,
mimeType: doc.mimeType,
},
processingOptions: {
chunkSize: processingOptions.chunkSize || 1024,
minCharactersPerChunk: processingOptions.minCharactersPerChunk || 1,
recipe: processingOptions.recipe || 'default',
lang: processingOptions.lang || 'en',
chunkOverlap: processingOptions.chunkOverlap || 200,
},
requestId,
}))

const result = await processDocumentsWithTrigger(triggerPayloads, requestId)

if (result.success) {
logger.info(
`[${requestId}] Successfully triggered background processing: ${result.message}`
)
return
}
logger.warn(`[${requestId}] Trigger.dev failed: ${result.message}, falling back to Redis`)
} catch (error) {
logger.warn(`[${requestId}] Trigger.dev processing failed, falling back to Redis:`, error)
}
}

// Priority 2: Redis queue
const queue = getDocumentQueue()
const redisClient = getRedisClient()

Expand All @@ -213,6 +257,7 @@ export async function processDocumentsWithQueue(

await Promise.all(jobPromises)

// Start Redis background processing
queue
.processJobs(async (job) => {
const data = job.data as DocumentJobData
Expand All @@ -221,7 +266,6 @@ export async function processDocumentsWithQueue(
})
.catch((error) => {
logger.error(`[${requestId}] Error in Redis queue processing:`, error)
// Don't throw here - let the processing continue with fallback if needed
})

logger.info(`[${requestId}] All documents queued for Redis processing`)
Expand All @@ -231,7 +275,10 @@ export async function processDocumentsWithQueue(
}
}

logger.info(`[${requestId}] Using fallback in-memory processing (Redis not available or failed)`)
// Priority 3: In-memory processing
logger.info(
`[${requestId}] Using fallback in-memory processing (neither Trigger.dev nor Redis available)`
)
await processDocumentsWithConcurrencyControl(
createdDocuments,
knowledgeBaseId,
Expand Down Expand Up @@ -500,6 +547,51 @@ export async function processDocumentAsync(
}
}

/**
* Check if Trigger.dev is available and configured
*/
export function isTriggerAvailable(): boolean {
return !!(env.TRIGGER_SECRET_KEY && env.TRIGGER_DEV_ENABLED !== false)
}

/**
* Process documents using Trigger.dev
*/
export async function processDocumentsWithTrigger(
documents: DocumentProcessingPayload[],
requestId: string
): Promise<{ success: boolean; message: string; jobIds?: string[] }> {
if (!isTriggerAvailable()) {
throw new Error('Trigger.dev is not configured - TRIGGER_SECRET_KEY missing')
}

try {
logger.info(`[${requestId}] Triggering background processing for ${documents.length} documents`)

const jobPromises = documents.map(async (document) => {
const job = await tasks.trigger('knowledge-process-document', document)
return job.id
})

const jobIds = await Promise.all(jobPromises)

logger.info(`[${requestId}] Triggered ${jobIds.length} document processing jobs`)

return {
success: true,
message: `${documents.length} document processing jobs triggered`,
jobIds,
}
} catch (error) {
logger.error(`[${requestId}] Failed to trigger document processing jobs:`, error)

return {
success: false,
message: error instanceof Error ? error.message : 'Failed to trigger background jobs',
}
}
}

/**
* Create document records in database with tags
*/
Expand Down Expand Up @@ -644,8 +736,8 @@ export async function getDocuments(
search,
limit = 50,
offset = 0,
sortBy = 'uploadedAt',
sortOrder = 'desc',
sortBy = 'filename',
sortOrder = 'asc',
} = options

// Build where conditions
Expand Down Expand Up @@ -696,7 +788,10 @@ export async function getDocuments(
}
}

const orderByClause = sortOrder === 'asc' ? asc(getOrderByColumn()) : desc(getOrderByColumn())
// Use stable secondary sort to prevent shifting when primary values are identical
const primaryOrderBy = sortOrder === 'asc' ? asc(getOrderByColumn()) : desc(getOrderByColumn())
const secondaryOrderBy =
sortBy === 'filename' ? desc(document.uploadedAt) : asc(document.filename)

const documents = await db
.select({
Expand Down Expand Up @@ -725,7 +820,7 @@ export async function getDocuments(
})
.from(document)
.where(and(...whereConditions))
.orderBy(orderByClause)
.orderBy(primaryOrderBy, secondaryOrderBy)
.limit(limit)
.offset(offset)

Expand Down
4 changes: 2 additions & 2 deletions apps/sim/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
"@radix-ui/react-tooltip": "^1.1.6",
"@react-email/components": "^0.0.34",
"@sentry/nextjs": "^9.15.0",
"@trigger.dev/sdk": "4.0.0",
"@trigger.dev/sdk": "4.0.1",
"@types/pg": "8.15.5",
"@types/three": "0.177.0",
"@vercel/og": "^0.6.5",
Expand Down Expand Up @@ -134,7 +134,7 @@
"@testing-library/jest-dom": "^6.6.3",
"@testing-library/react": "^16.3.0",
"@testing-library/user-event": "^14.6.1",
"@trigger.dev/build": "4.0.0",
"@trigger.dev/build": "4.0.1",
"@types/html-to-text": "^9.0.4",
"@types/js-yaml": "4.0.9",
"@types/jsdom": "21.1.7",
Expand Down
12 changes: 5 additions & 7 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.