Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/Add bullmq redis for message queue processing #3568

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
add bullmq redis for message queue processing
  • Loading branch information
HenryHengZJ committed Nov 25, 2024
commit 00d8fb4cbb73c08fd23c711ceaba63d55e2c08cf
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
"start": "run-script-os",
"start:windows": "cd packages/server/bin && run start",
"start:default": "cd packages/server/bin && ./run start",
"start-worker": "run-script-os",
"start-worker:windows": "cd packages/server/bin && run worker",
"start-worker:default": "cd packages/server/bin && ./run worker",
"clean": "pnpm --filter \"./packages/**\" clean",
"nuke": "pnpm --filter \"./packages/**\" nuke && rimraf node_modules .turbo",
"format": "prettier --write \"**/*.{ts,tsx,md}\"",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
} from '../../../src/Interface'
import { AgentExecutor } from '../../../src/agents'
import { addImagesToMessages, llmSupportsVision } from '../../../src/multiModalUtils'
import { checkInputs, Moderation } from '../../moderation/Moderation'
import { checkInputs, Moderation, streamResponse } from '../../moderation/Moderation'
import { formatResponse } from '../../outputparsers/OutputParserHelpers'

const DEFAULT_PREFIX = `Assistant is a large language model trained by OpenAI.
Expand Down Expand Up @@ -124,10 +124,9 @@ class ConversationalAgent_Agents implements INode {
input = await checkInputs(moderations, input)
} catch (e) {
await new Promise((resolve) => setTimeout(resolve, 500))
// if (options.shouldStreamResponse) {
// streamResponse(options.sseStreamer, options.chatId, e.message)
// }
//streamResponse(options.socketIO && options.socketIOClientId, e.message, options.socketIO, options.socketIOClientId)
if (options.shouldStreamResponse) {
streamResponse(sseStreamer, chatId, e.message)
}
return formatResponse(e.message)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ class SqlDatabaseChain_Chains implements INode {
if (shouldStreamResponse) {
streamResponse(sseStreamer, chatId, e.message)
}
// streamResponse(options.socketIO && options.socketIOClientId, e.message, options.socketIO, options.socketIOClientId)
return formatResponse(e.message)
}
}
Expand Down
1 change: 0 additions & 1 deletion packages/components/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@
"redis": "^4.6.7",
"replicate": "^0.31.1",
"sanitize-filename": "^1.6.3",
"socket.io": "^4.6.1",
"srt-parser-2": "^1.2.3",
"typeorm": "^0.3.6",
"weaviate-ts-client": "^1.1.0",
Expand Down
3 changes: 0 additions & 3 deletions packages/components/src/Interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -402,12 +402,9 @@ export interface IStateWithMessages extends ICommonObject {
}

export interface IServerSideEventStreamer {
streamEvent(chatId: string, data: string): void
streamStartEvent(chatId: string, data: any): void

streamTokenEvent(chatId: string, data: string): void
streamCustomEvent(chatId: string, eventType: string, data: any): void

streamSourceDocumentsEvent(chatId: string, data: any): void
streamUsedToolsEvent(chatId: string, data: any): void
streamFileAnnotationsEvent(chatId: string, data: any): void
Expand Down
6 changes: 4 additions & 2 deletions packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
"nuke": "rimraf dist node_modules .turbo",
"start:windows": "cd bin && run start",
"start:default": "cd bin && ./run start",
"start-worker:windows": "cd bin && run worker",
"start-worker:default": "cd bin && ./run worker",
"dev": "tsc-watch --noClear -p ./tsconfig.json --onSuccess \"pnpm start\"",
"oclif-dev": "run-script-os",
"oclif-dev:windows": "cd bin && dev start",
Expand Down Expand Up @@ -54,7 +56,7 @@
},
"license": "SEE LICENSE IN LICENSE.md",
"dependencies": {
"@oclif/core": "^1.13.10",
"@oclif/core": "4.0.7",
"@opentelemetry/api": "^1.3.0",
"@opentelemetry/core": "1.27.0",
"@opentelemetry/exporter-metrics-otlp-grpc": "0.54.0",
Expand All @@ -73,6 +75,7 @@
"@types/uuid": "^9.0.7",
"async-mutex": "^0.4.0",
"axios": "1.6.2",
"bullmq": "^5.13.2",
"content-disposition": "0.5.4",
"cors": "^2.8.5",
"crypto-js": "^4.1.1",
Expand All @@ -96,7 +99,6 @@
"prom-client": "^15.1.3",
"reflect-metadata": "^0.1.13",
"sanitize-html": "^2.11.0",
"socket.io": "^4.6.1",
"sqlite3": "^5.1.6",
"typeorm": "^0.3.6",
"uuid": "^9.0.1",
Expand Down
50 changes: 49 additions & 1 deletion packages/server/src/Interface.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
import { IAction, ICommonObject, IFileUpload, INode, INodeData as INodeDataFromComponent, INodeParams } from 'flowise-components'
import {
IAction,
ICommonObject,
IFileUpload,
INode,
INodeData as INodeDataFromComponent,
INodeParams,
IServerSideEventStreamer
} from 'flowise-components'
import { DataSource } from 'typeorm'
import { CachePool } from './CachePool'
import { Telemetry } from './utils/telemetry'

export type MessageType = 'apiMessage' | 'userMessage'

Expand Down Expand Up @@ -26,6 +37,7 @@ export interface IChatFlow {
isPublic?: boolean
apikeyid?: string
analytic?: string
speechToText?: string
chatbotConfig?: string
followUpPrompts?: string
apiConfig?: string
Expand Down Expand Up @@ -288,5 +300,41 @@ export interface ICustomTemplate {
usecases?: string
}

export interface IFlowConfig {
chatflowid: string
chatId: string
sessionId: string
chatHistory: IMessage[]
apiMessageId: string
overrideConfig?: ICommonObject
}

export interface IExecuteFlowParams {
startingNodeIds: string[]
endingNodeIds: string[]
nodes: IReactFlowNode[]
edges: IReactFlowEdge[]
graph: INodeDirectedGraph
depthQueue: IDepthQueue
componentNodes: IComponentNodes
incomingInput: IncomingInput
flowConfig: IFlowConfig
chatflow: IChatFlow
memoryType: string
fileUploads: IFileUpload[]
uploadedFilesContent: string
userMessageDateTime: Date
appDataSource: DataSource
apiOverrideStatus: boolean
nodeOverrides: ICommonObject
variableOverrides: ICommonObject[]
sseStreamer: IServerSideEventStreamer
telemetry: Telemetry
cachePool: CachePool
baseURL: string
isStreamValid: boolean
isInternal: boolean
}

// DocumentStore related
export * from './Interface.DocumentStore'
175 changes: 175 additions & 0 deletions packages/server/src/commands/base.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import { Command, Flags } from '@oclif/core'
import path from 'path'
import dotenv from 'dotenv'
import logger from '../utils/logger'

dotenv.config({ path: path.join(__dirname, '..', '..', '.env'), override: true })

enum EXIT_CODE {
SUCCESS = 0,
FAILED = 1
}

export abstract class BaseCommand extends Command {
static flags = {
FLOWISE_USERNAME: Flags.string(),
FLOWISE_PASSWORD: Flags.string(),
FLOWISE_FILE_SIZE_LIMIT: Flags.string(),
PORT: Flags.string(),
CORS_ORIGINS: Flags.string(),
IFRAME_ORIGINS: Flags.string(),
DEBUG: Flags.string(),
BLOB_STORAGE_PATH: Flags.string(),
APIKEY_STORAGE_TYPE: Flags.string(),
APIKEY_PATH: Flags.string(),
SECRETKEY_PATH: Flags.string(),
FLOWISE_SECRETKEY_OVERWRITE: Flags.string(),
LOG_PATH: Flags.string(),
LOG_LEVEL: Flags.string(),
TOOL_FUNCTION_BUILTIN_DEP: Flags.string(),
TOOL_FUNCTION_EXTERNAL_DEP: Flags.string(),
NUMBER_OF_PROXIES: Flags.string(),
DISABLE_CHATFLOW_REUSE: Flags.string(),
DATABASE_TYPE: Flags.string(),
DATABASE_PATH: Flags.string(),
DATABASE_PORT: Flags.string(),
DATABASE_HOST: Flags.string(),
DATABASE_NAME: Flags.string(),
DATABASE_USER: Flags.string(),
DATABASE_PASSWORD: Flags.string(),
DATABASE_SSL: Flags.string(),
DATABASE_SSL_KEY_BASE64: Flags.string(),
LANGCHAIN_TRACING_V2: Flags.string(),
LANGCHAIN_ENDPOINT: Flags.string(),
LANGCHAIN_API_KEY: Flags.string(),
LANGCHAIN_PROJECT: Flags.string(),
DISABLE_FLOWISE_TELEMETRY: Flags.string(),
MODEL_LIST_CONFIG_JSON: Flags.string(),
STORAGE_TYPE: Flags.string(),
S3_STORAGE_BUCKET_NAME: Flags.string(),
S3_STORAGE_ACCESS_KEY_ID: Flags.string(),
S3_STORAGE_SECRET_ACCESS_KEY: Flags.string(),
S3_STORAGE_REGION: Flags.string(),
S3_ENDPOINT_URL: Flags.string(),
S3_FORCE_PATH_STYLE: Flags.string(),
SHOW_COMMUNITY_NODES: Flags.string(),
QUEUE_MODE: Flags.string(),
QUEUE_NAME: Flags.string(),
REDIS_HOST: Flags.string(),
REDIS_PORT: Flags.string()
}

protected async stopProcess() {
// Overridden method by child class
}

protected onTerminate() {
return async () => {
try {
// Shut down the app after timeout if it ever stuck removing pools
setTimeout(async () => {
logger.info('Flowise was forced to shut down after 30 secs')
await this.failExit()
}, 30000)

await this.stopProcess()
} catch (error) {
logger.error('There was an error shutting down Flowise...', error)
}
}
}

protected async gracefullyExit() {
process.exit(EXIT_CODE.SUCCESS)
}

protected async failExit() {
process.exit(EXIT_CODE.FAILED)
}

async init(): Promise<void> {
await super.init()

process.on('SIGTERM', this.onTerminate())
process.on('SIGINT', this.onTerminate())

// Prevent throw new Error from crashing the app
// TODO: Get rid of this and send proper error message to ui
process.on('uncaughtException', (err) => {
logger.error('uncaughtException: ', err)
})

process.on('unhandledRejection', (err) => {
logger.error('unhandledRejection: ', err)
})

const { flags } = await this.parse(BaseCommand)
if (flags.PORT) process.env.PORT = flags.PORT
if (flags.CORS_ORIGINS) process.env.CORS_ORIGINS = flags.CORS_ORIGINS
if (flags.IFRAME_ORIGINS) process.env.IFRAME_ORIGINS = flags.IFRAME_ORIGINS
if (flags.DEBUG) process.env.DEBUG = flags.DEBUG
if (flags.NUMBER_OF_PROXIES) process.env.NUMBER_OF_PROXIES = flags.NUMBER_OF_PROXIES
if (flags.DISABLE_CHATFLOW_REUSE) process.env.DISABLE_CHATFLOW_REUSE = flags.DISABLE_CHATFLOW_REUSE
if (flags.SHOW_COMMUNITY_NODES) process.env.SHOW_COMMUNITY_NODES = flags.SHOW_COMMUNITY_NODES

// Authorization
if (flags.FLOWISE_USERNAME) process.env.FLOWISE_USERNAME = flags.FLOWISE_USERNAME
if (flags.FLOWISE_PASSWORD) process.env.FLOWISE_PASSWORD = flags.FLOWISE_PASSWORD
if (flags.APIKEY_STORAGE_TYPE) process.env.APIKEY_STORAGE_TYPE = flags.APIKEY_STORAGE_TYPE
if (flags.APIKEY_PATH) process.env.APIKEY_PATH = flags.APIKEY_PATH

// API Configuration
if (flags.FLOWISE_FILE_SIZE_LIMIT) process.env.FLOWISE_FILE_SIZE_LIMIT = flags.FLOWISE_FILE_SIZE_LIMIT

// Credentials
if (flags.SECRETKEY_PATH) process.env.SECRETKEY_PATH = flags.SECRETKEY_PATH
if (flags.FLOWISE_SECRETKEY_OVERWRITE) process.env.FLOWISE_SECRETKEY_OVERWRITE = flags.FLOWISE_SECRETKEY_OVERWRITE

// Logs
if (flags.LOG_PATH) process.env.LOG_PATH = flags.LOG_PATH
if (flags.LOG_LEVEL) process.env.LOG_LEVEL = flags.LOG_LEVEL

// Tool functions
if (flags.TOOL_FUNCTION_BUILTIN_DEP) process.env.TOOL_FUNCTION_BUILTIN_DEP = flags.TOOL_FUNCTION_BUILTIN_DEP
if (flags.TOOL_FUNCTION_EXTERNAL_DEP) process.env.TOOL_FUNCTION_EXTERNAL_DEP = flags.TOOL_FUNCTION_EXTERNAL_DEP

// Database config
if (flags.DATABASE_TYPE) process.env.DATABASE_TYPE = flags.DATABASE_TYPE
if (flags.DATABASE_PATH) process.env.DATABASE_PATH = flags.DATABASE_PATH
if (flags.DATABASE_PORT) process.env.DATABASE_PORT = flags.DATABASE_PORT
if (flags.DATABASE_HOST) process.env.DATABASE_HOST = flags.DATABASE_HOST
if (flags.DATABASE_NAME) process.env.DATABASE_NAME = flags.DATABASE_NAME
if (flags.DATABASE_USER) process.env.DATABASE_USER = flags.DATABASE_USER
if (flags.DATABASE_PASSWORD) process.env.DATABASE_PASSWORD = flags.DATABASE_PASSWORD
if (flags.DATABASE_SSL) process.env.DATABASE_SSL = flags.DATABASE_SSL
if (flags.DATABASE_SSL_KEY_BASE64) process.env.DATABASE_SSL_KEY_BASE64 = flags.DATABASE_SSL_KEY_BASE64

// Langsmith tracing
if (flags.LANGCHAIN_TRACING_V2) process.env.LANGCHAIN_TRACING_V2 = flags.LANGCHAIN_TRACING_V2
if (flags.LANGCHAIN_ENDPOINT) process.env.LANGCHAIN_ENDPOINT = flags.LANGCHAIN_ENDPOINT
if (flags.LANGCHAIN_API_KEY) process.env.LANGCHAIN_API_KEY = flags.LANGCHAIN_API_KEY
if (flags.LANGCHAIN_PROJECT) process.env.LANGCHAIN_PROJECT = flags.LANGCHAIN_PROJECT

// Telemetry
if (flags.DISABLE_FLOWISE_TELEMETRY) process.env.DISABLE_FLOWISE_TELEMETRY = flags.DISABLE_FLOWISE_TELEMETRY

// Model list config
if (flags.MODEL_LIST_CONFIG_JSON) process.env.MODEL_LIST_CONFIG_JSON = flags.MODEL_LIST_CONFIG_JSON

// Storage
if (flags.STORAGE_TYPE) process.env.STORAGE_TYPE = flags.STORAGE_TYPE
if (flags.BLOB_STORAGE_PATH) process.env.BLOB_STORAGE_PATH = flags.BLOB_STORAGE_PATH
if (flags.S3_STORAGE_BUCKET_NAME) process.env.S3_STORAGE_BUCKET_NAME = flags.S3_STORAGE_BUCKET_NAME
if (flags.S3_STORAGE_ACCESS_KEY_ID) process.env.S3_STORAGE_ACCESS_KEY_ID = flags.S3_STORAGE_ACCESS_KEY_ID
if (flags.S3_STORAGE_SECRET_ACCESS_KEY) process.env.S3_STORAGE_SECRET_ACCESS_KEY = flags.S3_STORAGE_SECRET_ACCESS_KEY
if (flags.S3_STORAGE_REGION) process.env.S3_STORAGE_REGION = flags.S3_STORAGE_REGION
if (flags.S3_ENDPOINT_URL) process.env.S3_ENDPOINT_URL = flags.S3_ENDPOINT_URL
if (flags.S3_FORCE_PATH_STYLE) process.env.S3_FORCE_PATH_STYLE = flags.S3_FORCE_PATH_STYLE

// Queue
if (flags.QUEUE_MODE) process.env.QUEUE_MODE = flags.QUEUE_MODE
if (flags.QUEUE_NAME) process.env.QUEUE_NAME = flags.QUEUE_NAME
if (flags.REDIS_HOST) process.env.REDIS_HOST = flags.REDIS_HOST
if (flags.REDIS_PORT) process.env.REDIS_PORT = flags.REDIS_PORT
}
}
Loading
Loading