Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/CodeInterpreter E2B Credential #3206

Merged
merged 30 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
6f2f182
Base changes for ServerSide Events (instead of socket.io)
vinodkiran Aug 25, 2024
183e951
lint fixes
vinodkiran Aug 25, 2024
f88694e
adding of interface and separate methods for streaming events
vinodkiran Aug 26, 2024
7ec4fa1
lint
vinodkiran Aug 26, 2024
c5991dd
Merge branch 'main' into feature/sse
vinodkiran Aug 26, 2024
d3c5438
first draft, handles both internal and external prediction end points.
vinodkiran Sep 1, 2024
f0c259f
lint fixes
vinodkiran Sep 1, 2024
ddc10b3
Merge branch 'main' into feature/sse
vinodkiran Sep 1, 2024
57d0a75
additional internal end point for streaming and associated changes
vinodkiran Sep 2, 2024
6487381
return streamresponse as true to build agent flow
HenryHengZJ Sep 3, 2024
768ec51
1) JSON formatting for internal events
vinodkiran Sep 3, 2024
1174c0d
1) convert internal event to metadata to maintain consistency with ex…
vinodkiran Sep 3, 2024
d24c9a4
fix action and metadata streaming
HenryHengZJ Sep 3, 2024
8e00eb2
fix for error when agent flow is aborted
vinodkiran Sep 4, 2024
77163b9
Merge branch 'main' into FEATURE/sse
vinodkiran Sep 4, 2024
1c936b9
prevent subflows from streaming and other code cleanup
vinodkiran Sep 4, 2024
3a66e2b
prevent streaming from enclosed tools
vinodkiran Sep 4, 2024
0a520d8
add fix for preventing chaintool streaming
HenryHengZJ Sep 4, 2024
6367d48
update lock file
HenryHengZJ Sep 5, 2024
88ec25e
add open when hidden to sse
HenryHengZJ Sep 7, 2024
f5a8939
Streaming errors
vinodkiran Sep 11, 2024
285ddaf
Streaming errors
vinodkiran Sep 11, 2024
1a00e94
add fix for showing error message
HenryHengZJ Sep 11, 2024
8a53556
Merge branch 'feature/sse' into feature/E2B
HenryHengZJ Sep 12, 2024
530cbb2
add code interpreter
HenryHengZJ Sep 13, 2024
a2cf396
add artifacts to view message dialog
HenryHengZJ Sep 15, 2024
348f762
Merge branch 'main' into feature/E2B
HenryHengZJ Sep 17, 2024
8dceb43
Update pnpm-lock.yaml
HenryHengZJ Sep 17, 2024
68de814
uncomment e2b credential
HenryHengZJ Sep 17, 2024
555406a
Merge branch 'main' into feature/E2B
HenryHengZJ Sep 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
additional internal end point for streaming and associated changes
  • Loading branch information
vinodkiran committed Sep 2, 2024
commit 57d0a754e999d06fc7cc5315e01f189f9dd108a1
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,16 @@ import { ChatPromptTemplate, MessagesPlaceholder, HumanMessagePromptTemplate, Pr
import { formatToOpenAIToolMessages } from 'langchain/agents/format_scratchpad/openai_tools'
import { getBaseClasses } from '../../../src/utils'
import { type ToolsAgentStep } from 'langchain/agents/openai/output_parser'
import { FlowiseMemory, ICommonObject, INode, INodeData, INodeParams, IUsedTool, IVisionChatModal } from '../../../src/Interface'
import {
FlowiseMemory,
ICommonObject,
INode,
INodeData,
INodeParams,
IServerSideEventStreamer,
IUsedTool,
IVisionChatModal
} from "../../../src/Interface";
import { ConsoleCallbackHandler, CustomChainHandler, additionalCallbacks } from '../../../src/handler'
import { AgentExecutor, ToolCallingAgentOutputParser } from '../../../src/agents'
import { Moderation, checkInputs, streamResponse } from '../../moderation/Moderation'
Expand Down Expand Up @@ -104,16 +113,19 @@ class ConversationalRetrievalToolAgent_Agents implements INode {
const memory = nodeData.inputs?.memory as FlowiseMemory
const moderations = nodeData.inputs?.inputModeration as Moderation[]

const isStreamable = options.socketIO && options.socketIOClientId
const shouldStreamResponse = options.shouldStreamResponse
const sseStreamer: IServerSideEventStreamer = options.sseStreamer as IServerSideEventStreamer
const chatId = options.chatId

if (moderations && moderations.length > 0) {
try {
// Use the output of the moderation chain as input for the OpenAI Function Agent
input = await checkInputs(moderations, input)
} catch (e) {
await new Promise((resolve) => setTimeout(resolve, 500))
if (isStreamable)
streamResponse(options.socketIO && options.socketIOClientId, e.message, options.socketIO, options.socketIOClientId)
if (shouldStreamResponse) {
streamResponse(sseStreamer, chatId, e.message)
}
return formatResponse(e.message)
}
}
Expand All @@ -127,15 +139,15 @@ class ConversationalRetrievalToolAgent_Agents implements INode {
let sourceDocuments: ICommonObject[] = []
let usedTools: IUsedTool[] = []

if (isStreamable) {
const handler = new CustomChainHandler(options.socketIO, options.socketIOClientId)
if (shouldStreamResponse) {
const handler = new CustomChainHandler(sseStreamer, chatId)
res = await executor.invoke({ input }, { callbacks: [loggerHandler, handler, ...callbacks] })
if (res.sourceDocuments) {
options.socketIO.to(options.socketIOClientId).emit('sourceDocuments', flatten(res.sourceDocuments))
sseStreamer.streamSourceDocumentsEvent(chatId, JSON.stringify(flatten(res.sourceDocuments)))
sourceDocuments = res.sourceDocuments
}
if (res.usedTools) {
options.socketIO.to(options.socketIOClientId).emit('usedTools', res.usedTools)
sseStreamer.streamUsedToolsEvent(chatId, JSON.stringify(res.usedTools))
usedTools = res.usedTools
}
} else {
Expand Down
58 changes: 35 additions & 23 deletions packages/server/src/controllers/internal-predictions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,47 @@ import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
// Send input message and get prediction result (Internal)
const createInternalPrediction = async (req: Request, res: Response, next: NextFunction) => {
try {
const chatId = req.params.chatId
//getRunningExpressApp().sseStreamer.addClient(chatId, res)
const apiResponse = await utilBuildChatflow(req, true)
return res.json(apiResponse)
} catch (error) {
next(error)
}
}

// Send input message and stream prediction result using SSE (Internal)
const createAndStreamInternalPrediction = async (req: Request, res: Response, next: NextFunction) => {
try {
const chatId = req.body.chatId
getRunningExpressApp().sseStreamer.addClient(chatId, res)
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
res.flushHeaders()

const apiResponse = await utilBuildChatflow(req, true)
if (apiResponse.isStreamValid) {
const sseStreamer = getRunningExpressApp().sseStreamer
if (apiResponse.chatId) {
sseStreamer.streamCustomEvent(apiResponse.chatId, 'chatId', apiResponse.chatId)
}
if (apiResponse.chatMessageId) {
sseStreamer.streamCustomEvent(apiResponse.chatId, 'chatMessageId', apiResponse.chatMessageId)
}
if (apiResponse.question) {
sseStreamer.streamCustomEvent(apiResponse.chatId, 'question', apiResponse.question)
}
if (apiResponse.sessionId) {
sseStreamer.streamCustomEvent(apiResponse.chatId, 'sessionId', apiResponse.sessionId)
}
if (apiResponse.memoryType) {
sseStreamer.streamCustomEvent(apiResponse.chatId, 'memoryType', apiResponse.memoryType)
}
sseStreamer.removeClient(apiResponse.chatId)
const sseStreamer = getRunningExpressApp().sseStreamer
if (apiResponse.chatId) {
sseStreamer.streamCustomEvent(apiResponse.chatId, 'chatId', apiResponse.chatId)
}

return res.json(apiResponse)
if (apiResponse.chatMessageId) {
sseStreamer.streamCustomEvent(apiResponse.chatId, 'chatMessageId', apiResponse.chatMessageId)
}
if (apiResponse.question) {
sseStreamer.streamCustomEvent(apiResponse.chatId, 'question', apiResponse.question)
}
if (apiResponse.sessionId) {
sseStreamer.streamCustomEvent(apiResponse.chatId, 'sessionId', apiResponse.sessionId)
}
if (apiResponse.memoryType) {
sseStreamer.streamCustomEvent(apiResponse.chatId, 'memoryType', apiResponse.memoryType)
}
sseStreamer.removeClient(apiResponse.chatId)
return
} catch (error) {
next(error)
}
}
export default {
createInternalPrediction
createInternalPrediction,
createAndStreamInternalPrediction
}
22 changes: 13 additions & 9 deletions packages/server/src/controllers/predictions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,32 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
const apiResponse = await predictionsServices.buildChatflow(req)
if (streamable?.isStreaming && req.body.streaming === 'true') {
const sseStreamer = getRunningExpressApp().sseStreamer
const metadataJson: any = {}
if (apiResponse.chatId) {
sseStreamer.streamCustomEvent(apiResponse.chatId, 'chatId', apiResponse.chatId)
metadataJson['chatId'] = apiResponse.chatId
//sseStreamer.streamCustomEvent(apiResponse.chatId, 'chatId', apiResponse.chatId)
}
if (apiResponse.chatMessageId) {
sseStreamer.streamCustomEvent(apiResponse.chatId, 'chatMessageId', apiResponse.chatMessageId)
metadataJson['chatMessageId'] = apiResponse.chatMessageId
//sseStreamer.streamCustomEvent(apiResponse.chatId, 'chatMessageId', apiResponse.chatMessageId)
}
if (apiResponse.question) {
sseStreamer.streamCustomEvent(apiResponse.chatId, 'question', apiResponse.question)
metadataJson['question'] = apiResponse.question
//sseStreamer.streamCustomEvent(apiResponse.chatId, 'question', apiResponse.question)
}
if (apiResponse.sessionId) {
sseStreamer.streamCustomEvent(apiResponse.chatId, 'sessionId', apiResponse.sessionId)
metadataJson['sessionId'] = apiResponse.sessionId
//sseStreamer.streamCustomEvent(apiResponse.chatId, 'sessionId', apiResponse.sessionId)
}
if (apiResponse.memoryType) {
sseStreamer.streamCustomEvent(apiResponse.chatId, 'memoryType', apiResponse.memoryType)
metadataJson['memoryType'] = apiResponse.memoryType
//sseStreamer.streamCustomEvent(apiResponse.chatId, 'memoryType', apiResponse.memoryType)
}
sseStreamer.streamCustomEvent(apiResponse.chatId, 'metadata', JSON.stringify(metadataJson))
sseStreamer.removeClient(apiResponse.chatId)
}
if (req.body.streaming === 'true') {
return
} else {
return res.json(apiResponse)
}
return res.json(apiResponse)
} else {
throw new InternalFlowiseError(StatusCodes.UNAUTHORIZED, `This site is not allowed to access this chatbot`)
}
Expand Down
7 changes: 3 additions & 4 deletions packages/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,9 @@ export class App {
'/api/v1/leads',
'/api/v1/get-upload-file',
'/api/v1/ip',
'/api/v1/ping',
'/api/v1/events'
]const URL_CASE_INSENSITIVE_REGEX: RegExp = /\/api\/v1\//i
'/api/v1/ping'
]
const URL_CASE_INSENSITIVE_REGEX: RegExp = /\/api\/v1\//i
const URL_CASE_SENSITIVE_REGEX: RegExp = /\/api\/v1\//

if (process.env.FLOWISE_USERNAME && process.env.FLOWISE_PASSWORD) {
Expand Down Expand Up @@ -203,7 +203,6 @@ export class App {

this.app.use('/api/v1', flowiseApiV1Router)
this.sseStreamer = new SSEStreamer(this.app)
this.sseStreamer.setupSSEEndpoint()

// ----------------------------------------
// Configure number of proxies in Host Environment
Expand Down
1 change: 1 addition & 0 deletions packages/server/src/routes/internal-predictions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ const router = express.Router()

// CREATE
router.post(['/', '/:id'], internalPredictionsController.createInternalPrediction)
router.post(['/', '/stream/:id'], internalPredictionsController.createAndStreamInternalPrediction)

export default router
Loading
Loading