Skip to content

Commit

Permalink
update worker shutdown gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
HenryHengZJ committed Jan 22, 2025
1 parent 67aa942 commit 2262965
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 38 deletions.
10 changes: 7 additions & 3 deletions packages/server/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export default class Worker extends BaseCommand {
await appDataSource.initialize()
await appDataSource.runMigrations({ transaction: 'each' })

// Initialize chatflow pool
// Initialize abortcontroller pool
const abortControllerPool = new AbortControllerPool()

// Init telemetry
Expand All @@ -85,10 +85,14 @@ export default class Worker extends BaseCommand {

async stopProcess() {
try {
const queueManager = QueueManager.getInstance()
const predictionWorker = queueManager.getQueue('prediction').getWorker()
logger.info(`Shutting down Flowise Prediction Worker ${this.predictionWorkerId}...`)
await predictionWorker.close()

const upsertWorker = queueManager.getQueue('upsert').getWorker()
logger.info(`Shutting down Flowise Upsertion Worker ${this.upsertionWorkerId}...`)
//const serverApp = Server.getInstance()
//if (serverApp) await serverApp.stopApp()
await upsertWorker.close()
} catch (error) {
logger.error('There was an error shutting down Flowise Worker...', error)
await this.failExit()
Expand Down
4 changes: 2 additions & 2 deletions packages/server/src/controllers/internal-predictions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const createInternalPrediction = async (req: Request, res: Response, next: NextF
const createAndStreamInternalPrediction = async (req: Request, res: Response, next: NextFunction) => {
const chatId = req.body.chatId
const sseStreamer = getRunningExpressApp().sseStreamer
const redisSubscriber = getRunningExpressApp().redisSubscriber

try {
sseStreamer.addClient(chatId, res)
res.setHeader('Content-Type', 'text/event-stream')
Expand All @@ -33,7 +33,7 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne
res.flushHeaders()

if (process.env.MODE === MODE.QUEUE) {
redisSubscriber.subscribe(chatId)
getRunningExpressApp().redisSubscriber.subscribe(chatId)
}

const apiResponse = await utilBuildChatflow(req, true)
Expand Down
7 changes: 3 additions & 4 deletions packages/server/src/controllers/predictions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
const isStreamingRequested = req.body.streaming === 'true' || req.body.streaming === true
if (streamable?.isStreaming && isStreamingRequested) {
const sseStreamer = getRunningExpressApp().sseStreamer
const redisSubscriber = getRunningExpressApp().redisSubscriber

let chatId = req.body.chatId
if (!req.body.chatId) {
Expand All @@ -72,11 +71,11 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
res.flushHeaders()

if (process.env.MODE === MODE.QUEUE) {
redisSubscriber.subscribe(chatId)
getRunningExpressApp().redisSubscriber.subscribe(chatId)
}

const apiResponse = await predictionsServices.buildChatflow(req)
if (apiResponse) sseStreamer.streamMetadataEvent(apiResponse.chatId, apiResponse)
sseStreamer.streamMetadataEvent(apiResponse.chatId, apiResponse)
} catch (error) {
if (chatId) {
sseStreamer.streamErrorEvent(chatId, getErrorMessage(error))
Expand All @@ -87,7 +86,7 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
}
} else {
const apiResponse = await predictionsServices.buildChatflow(req)
if (apiResponse) return res.json(apiResponse)
return res.json(apiResponse)
}
} else {
const isStreamingRequested = req.body.streaming === 'true' || req.body.streaming === true
Expand Down
8 changes: 7 additions & 1 deletion packages/server/src/queue/BaseQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export abstract class BaseQueue {
protected queue: Queue
protected queueEvents: QueueEvents
protected connection: RedisOptions
private worker: Worker

constructor(queueName: string, connection: RedisOptions) {
this.connection = connection
Expand All @@ -27,13 +28,17 @@ export abstract class BaseQueue {

abstract getQueue(): Queue

public getWorker(): Worker {
return this.worker
}

public async addJob(jobData: any): Promise<Job> {
const jobId = jobData.id || uuidv4()
return await this.queue.add(jobId, jobData, { removeOnFail: true })
}

public createWorker(concurrency: number = WORKER_CONCURRENCY): Worker {
return new Worker(
this.worker = new Worker(
this.queue.name,
async (job: Job) => {
const start = new Date().getTime()
Expand All @@ -48,6 +53,7 @@ export abstract class BaseQueue {
concurrency
}
)
return this.worker
}

public async getJobs(): Promise<Job[]> {
Expand Down
28 changes: 0 additions & 28 deletions packages/server/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import {
IMessage,
FlowiseMemory,
IFileUpload,
mapExtToInputField,
getS3Config
} from 'flowise-components'
import { randomBytes } from 'crypto'
Expand Down Expand Up @@ -1759,33 +1758,6 @@ export const getAPIOverrideConfig = (chatflow: IChatFlow) => {
const apiOverrideStatus: boolean =
apiConfig.overrideConfig && apiConfig.overrideConfig.status ? apiConfig.overrideConfig.status : false

/* For "files" input, add a new node override with the actual input name such as pdfFile, txtFile, etc, to allow overriding the input
* https://github.com/FlowiseAI/Flowise/pull/3569
*/
for (const nodeLabel in nodeOverrides) {
const params = nodeOverrides[nodeLabel]
const enabledFileParam = params.find((param) => param.enabled && param.name === 'files')
if (enabledFileParam) {
if (enabledFileParam.type.includes(',')) {
const fileInputFieldsFromExt = enabledFileParam.type.split(',').map((fileType) => mapExtToInputField(fileType.trim()))
for (const fileInputFieldFromExt of fileInputFieldsFromExt) {
if (nodeOverrides[nodeLabel].some((param) => param.name === fileInputFieldFromExt)) {
continue
}
nodeOverrides[nodeLabel].push({
...enabledFileParam,
name: fileInputFieldFromExt
})
}
} else {
const fileInputFieldFromExt = mapExtToInputField(enabledFileParam.type)
nodeOverrides[nodeLabel].push({
...enabledFileParam,
name: fileInputFieldFromExt
})
}
}
}
return { nodeOverrides, variableOverrides, apiOverrideStatus }
} catch (error) {
return { nodeOverrides: {}, variableOverrides: [], apiOverrideStatus: false }
Expand Down

0 comments on commit 2262965

Please sign in to comment.