Skip to content

Commit

Permalink
chore: use cache instead of re-querying node record during workflow e…
Browse files Browse the repository at this point in the history
…xecution (langgenius#9280)
  • Loading branch information
takatost authored Oct 12, 2024
1 parent d9773c9 commit 29188e0
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 13 deletions.
3 changes: 3 additions & 0 deletions api/core/app/apps/advanced_chat/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from models.model import Conversation, EndUser, Message
from models.workflow import (
Workflow,
WorkflowNodeExecution,
WorkflowRunStatus,
)

Expand All @@ -72,6 +73,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
_workflow: Workflow
_user: Union[Account, EndUser]
_workflow_system_variables: dict[SystemVariableKey, Any]
_wip_workflow_node_executions: dict[str, WorkflowNodeExecution]

def __init__(
self,
Expand Down Expand Up @@ -115,6 +117,7 @@ def __init__(
}

self._task_state = WorkflowTaskState()
self._wip_workflow_node_executions = {}

self._conversation_name_generate_thread = None

Expand Down
3 changes: 3 additions & 0 deletions api/core/app/apps/workflow/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
Workflow,
WorkflowAppLog,
WorkflowAppLogCreatedFrom,
WorkflowNodeExecution,
WorkflowRun,
WorkflowRunStatus,
)
Expand All @@ -69,6 +70,7 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
_task_state: WorkflowTaskState
_application_generate_entity: WorkflowAppGenerateEntity
_workflow_system_variables: dict[SystemVariableKey, Any]
_wip_workflow_node_executions: dict[str, WorkflowNodeExecution]

def __init__(
self,
Expand Down Expand Up @@ -103,6 +105,7 @@ def __init__(
}

self._task_state = WorkflowTaskState()
self._wip_workflow_node_executions = {}

def process(self) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]:
"""
Expand Down
21 changes: 8 additions & 13 deletions api/core/app/task_pipeline/workflow_cycle_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class WorkflowCycleManage:
_user: Union[Account, EndUser]
_task_state: WorkflowTaskState
_workflow_system_variables: dict[SystemVariableKey, Any]
_wip_workflow_node_executions: dict[str, WorkflowNodeExecution]

def _handle_workflow_run_start(self) -> WorkflowRun:
max_sequence = (
Expand Down Expand Up @@ -251,6 +252,8 @@ def _handle_node_execution_start(
db.session.refresh(workflow_node_execution)
db.session.close()

self._wip_workflow_node_executions[workflow_node_execution.node_execution_id] = workflow_node_execution

return workflow_node_execution

def _handle_workflow_node_execution_success(self, event: QueueNodeSucceededEvent) -> WorkflowNodeExecution:
Expand All @@ -275,9 +278,10 @@ def _handle_workflow_node_execution_success(self, event: QueueNodeSucceededEvent
workflow_node_execution.elapsed_time = (workflow_node_execution.finished_at - event.start_at).total_seconds()

db.session.commit()
db.session.refresh(workflow_node_execution)
db.session.close()

self._wip_workflow_node_executions.pop(workflow_node_execution.node_execution_id)

return workflow_node_execution

def _handle_workflow_node_execution_failed(self, event: QueueNodeFailedEvent) -> WorkflowNodeExecution:
Expand All @@ -300,9 +304,10 @@ def _handle_workflow_node_execution_failed(self, event: QueueNodeFailedEvent) ->
workflow_node_execution.elapsed_time = (workflow_node_execution.finished_at - event.start_at).total_seconds()

db.session.commit()
db.session.refresh(workflow_node_execution)
db.session.close()

self._wip_workflow_node_executions.pop(workflow_node_execution.node_execution_id)

return workflow_node_execution

#################################################
Expand Down Expand Up @@ -678,17 +683,7 @@ def _refetch_workflow_node_execution(self, node_execution_id: str) -> WorkflowNo
:param node_execution_id: workflow node execution id
:return:
"""
workflow_node_execution = (
db.session.query(WorkflowNodeExecution)
.filter(
WorkflowNodeExecution.tenant_id == self._application_generate_entity.app_config.tenant_id,
WorkflowNodeExecution.app_id == self._application_generate_entity.app_config.app_id,
WorkflowNodeExecution.workflow_id == self._workflow.id,
WorkflowNodeExecution.triggered_from == WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value,
WorkflowNodeExecution.node_execution_id == node_execution_id,
)
.first()
)
workflow_node_execution = self._wip_workflow_node_executions.get(node_execution_id)

if not workflow_node_execution:
raise Exception(f"Workflow node execution not found: {node_execution_id}")
Expand Down

0 comments on commit 29188e0

Please sign in to comment.