-
Notifications
You must be signed in to change notification settings - Fork 9k
/
Copy pathworkflow_run_service.py
136 lines (110 loc) · 4.66 KB
/
workflow_run_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from extensions.ext_database import db
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from models.model import App
from models.workflow import (
WorkflowNodeExecution,
WorkflowNodeExecutionTriggeredFrom,
WorkflowRun,
WorkflowRunTriggeredFrom,
)
class WorkflowRunService:
def get_paginate_advanced_chat_workflow_runs(self, app_model: App, args: dict) -> InfiniteScrollPagination:
"""
Get advanced chat app workflow run list
Only return triggered_from == advanced_chat
:param app_model: app model
:param args: request args
"""
class WorkflowWithMessage:
message_id: str
conversation_id: str
def __init__(self, workflow_run: WorkflowRun):
self._workflow_run = workflow_run
def __getattr__(self, item):
return getattr(self._workflow_run, item)
pagination = self.get_paginate_workflow_runs(app_model, args)
with_message_workflow_runs = []
for workflow_run in pagination.data:
message = workflow_run.message
with_message_workflow_run = WorkflowWithMessage(workflow_run=workflow_run)
if message:
with_message_workflow_run.message_id = message.id
with_message_workflow_run.conversation_id = message.conversation_id
with_message_workflow_runs.append(with_message_workflow_run)
pagination.data = with_message_workflow_runs
return pagination
def get_paginate_workflow_runs(self, app_model: App, args: dict) -> InfiniteScrollPagination:
"""
Get debug workflow run list
Only return triggered_from == debugging
:param app_model: app model
:param args: request args
"""
limit = int(args.get("limit", 20))
base_query = db.session.query(WorkflowRun).filter(
WorkflowRun.tenant_id == app_model.tenant_id,
WorkflowRun.app_id == app_model.id,
WorkflowRun.triggered_from == WorkflowRunTriggeredFrom.DEBUGGING.value,
)
if args.get("last_id"):
last_workflow_run = base_query.filter(
WorkflowRun.id == args.get("last_id"),
).first()
if not last_workflow_run:
raise ValueError("Last workflow run not exists")
workflow_runs = (
base_query.filter(
WorkflowRun.created_at < last_workflow_run.created_at, WorkflowRun.id != last_workflow_run.id
)
.order_by(WorkflowRun.created_at.desc())
.limit(limit)
.all()
)
else:
workflow_runs = base_query.order_by(WorkflowRun.created_at.desc()).limit(limit).all()
has_more = False
if len(workflow_runs) == limit:
current_page_first_workflow_run = workflow_runs[-1]
rest_count = base_query.filter(
WorkflowRun.created_at < current_page_first_workflow_run.created_at,
WorkflowRun.id != current_page_first_workflow_run.id,
).count()
if rest_count > 0:
has_more = True
return InfiniteScrollPagination(data=workflow_runs, limit=limit, has_more=has_more)
def get_workflow_run(self, app_model: App, run_id: str) -> WorkflowRun:
"""
Get workflow run detail
:param app_model: app model
:param run_id: workflow run id
"""
workflow_run = (
db.session.query(WorkflowRun)
.filter(
WorkflowRun.tenant_id == app_model.tenant_id,
WorkflowRun.app_id == app_model.id,
WorkflowRun.id == run_id,
)
.first()
)
return workflow_run
def get_workflow_run_node_executions(self, app_model: App, run_id: str) -> list[WorkflowNodeExecution]:
"""
Get workflow run node execution list
"""
workflow_run = self.get_workflow_run(app_model, run_id)
if not workflow_run:
return []
node_executions = (
db.session.query(WorkflowNodeExecution)
.filter(
WorkflowNodeExecution.tenant_id == app_model.tenant_id,
WorkflowNodeExecution.app_id == app_model.id,
WorkflowNodeExecution.workflow_id == workflow_run.workflow_id,
WorkflowNodeExecution.triggered_from == WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value,
WorkflowNodeExecution.workflow_run_id == run_id,
)
.order_by(WorkflowNodeExecution.index.desc())
.all()
)
return node_executions