Skip to content

Commit 1ca89b8

Browse files
ric-yuKosinkadinkguill
authored
Add unified jobs API with /api/jobs endpoints (comfyanonymous#11054)
* feat: create a /jobs api to return queue and history jobs * update unused vars * include priority * create jobs helper file * fix ruff * update how we set error message * include execution error in both responses * rename error -> failed, fix output shape * re-use queue and history functions * set workflow id * allow srot by exec duration * fix tests * send priority and remove error msg * use ws messages to get start and end times * revert main.py fully * refactor: move all /jobs business logic to jobs.py * fix failing test * remove some tests * fix non dict nodes * address comments * filter by workflow id and remove null fields * add clearer typing - remove get("..") or .. * refactor query params to top get_job(s) doc, add remove_sensitive_from_queue * add brief comment explaining why we skip animated * comment that format field is for frontend backward compatibility * fix whitespace --------- Co-authored-by: Jedrzej Kosinski <kosinkadink1@gmail.com> Co-authored-by: guill <jacob.e.segal@gmail.com>
1 parent bf7dc63 commit 1ca89b8

File tree

4 files changed

+918
-3
lines changed

4 files changed

+918
-3
lines changed

comfy_execution/jobs.py

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
"""
2+
Job utilities for the /api/jobs endpoint.
3+
Provides normalization and helper functions for job status tracking.
4+
"""
5+
6+
from typing import Optional
7+
8+
from comfy_api.internal import prune_dict
9+
10+
11+
class JobStatus:
12+
"""Job status constants."""
13+
PENDING = 'pending'
14+
IN_PROGRESS = 'in_progress'
15+
COMPLETED = 'completed'
16+
FAILED = 'failed'
17+
18+
ALL = [PENDING, IN_PROGRESS, COMPLETED, FAILED]
19+
20+
21+
# Media types that can be previewed in the frontend
22+
PREVIEWABLE_MEDIA_TYPES = frozenset({'images', 'video', 'audio'})
23+
24+
# 3D file extensions for preview fallback (no dedicated media_type exists)
25+
THREE_D_EXTENSIONS = frozenset({'.obj', '.fbx', '.gltf', '.glb'})
26+
27+
28+
def _extract_job_metadata(extra_data: dict) -> tuple[Optional[int], Optional[str]]:
29+
"""Extract create_time and workflow_id from extra_data.
30+
31+
Returns:
32+
tuple: (create_time, workflow_id)
33+
"""
34+
create_time = extra_data.get('create_time')
35+
extra_pnginfo = extra_data.get('extra_pnginfo', {})
36+
workflow_id = extra_pnginfo.get('workflow', {}).get('id')
37+
return create_time, workflow_id
38+
39+
40+
def is_previewable(media_type: str, item: dict) -> bool:
41+
"""
42+
Check if an output item is previewable.
43+
Matches frontend logic in ComfyUI_frontend/src/stores/queueStore.ts
44+
Maintains backwards compatibility with existing logic.
45+
46+
Priority:
47+
1. media_type is 'images', 'video', or 'audio'
48+
2. format field starts with 'video/' or 'audio/'
49+
3. filename has a 3D extension (.obj, .fbx, .gltf, .glb)
50+
"""
51+
if media_type in PREVIEWABLE_MEDIA_TYPES:
52+
return True
53+
54+
# Check format field (MIME type).
55+
# Maintains backwards compatibility with how custom node outputs are handled in the frontend.
56+
fmt = item.get('format', '')
57+
if fmt and (fmt.startswith('video/') or fmt.startswith('audio/')):
58+
return True
59+
60+
# Check for 3D files by extension
61+
filename = item.get('filename', '').lower()
62+
if any(filename.endswith(ext) for ext in THREE_D_EXTENSIONS):
63+
return True
64+
65+
return False
66+
67+
68+
def normalize_queue_item(item: tuple, status: str) -> dict:
69+
"""Convert queue item tuple to unified job dict.
70+
71+
Expects item with sensitive data already removed (5 elements).
72+
"""
73+
priority, prompt_id, _, extra_data, _ = item
74+
create_time, workflow_id = _extract_job_metadata(extra_data)
75+
76+
return prune_dict({
77+
'id': prompt_id,
78+
'status': status,
79+
'priority': priority,
80+
'create_time': create_time,
81+
'outputs_count': 0,
82+
'workflow_id': workflow_id,
83+
})
84+
85+
86+
def normalize_history_item(prompt_id: str, history_item: dict, include_outputs: bool = False) -> dict:
87+
"""Convert history item dict to unified job dict.
88+
89+
History items have sensitive data already removed (prompt tuple has 5 elements).
90+
"""
91+
prompt_tuple = history_item['prompt']
92+
priority, _, prompt, extra_data, _ = prompt_tuple
93+
create_time, workflow_id = _extract_job_metadata(extra_data)
94+
95+
status_info = history_item.get('status', {})
96+
status_str = status_info.get('status_str') if status_info else None
97+
if status_str == 'success':
98+
status = JobStatus.COMPLETED
99+
elif status_str == 'error':
100+
status = JobStatus.FAILED
101+
else:
102+
status = JobStatus.COMPLETED
103+
104+
outputs = history_item.get('outputs', {})
105+
outputs_count, preview_output = get_outputs_summary(outputs)
106+
107+
execution_error = None
108+
execution_start_time = None
109+
execution_end_time = None
110+
if status_info:
111+
messages = status_info.get('messages', [])
112+
for entry in messages:
113+
if isinstance(entry, (list, tuple)) and len(entry) >= 2:
114+
event_name, event_data = entry[0], entry[1]
115+
if isinstance(event_data, dict):
116+
if event_name == 'execution_start':
117+
execution_start_time = event_data.get('timestamp')
118+
elif event_name in ('execution_success', 'execution_error', 'execution_interrupted'):
119+
execution_end_time = event_data.get('timestamp')
120+
if event_name == 'execution_error':
121+
execution_error = event_data
122+
123+
job = prune_dict({
124+
'id': prompt_id,
125+
'status': status,
126+
'priority': priority,
127+
'create_time': create_time,
128+
'execution_start_time': execution_start_time,
129+
'execution_end_time': execution_end_time,
130+
'execution_error': execution_error,
131+
'outputs_count': outputs_count,
132+
'preview_output': preview_output,
133+
'workflow_id': workflow_id,
134+
})
135+
136+
if include_outputs:
137+
job['outputs'] = outputs
138+
job['execution_status'] = status_info
139+
job['workflow'] = {
140+
'prompt': prompt,
141+
'extra_data': extra_data,
142+
}
143+
144+
return job
145+
146+
147+
def get_outputs_summary(outputs: dict) -> tuple[int, Optional[dict]]:
148+
"""
149+
Count outputs and find preview in a single pass.
150+
Returns (outputs_count, preview_output).
151+
152+
Preview priority (matching frontend):
153+
1. type="output" with previewable media
154+
2. Any previewable media
155+
"""
156+
count = 0
157+
preview_output = None
158+
fallback_preview = None
159+
160+
for node_id, node_outputs in outputs.items():
161+
if not isinstance(node_outputs, dict):
162+
continue
163+
for media_type, items in node_outputs.items():
164+
# 'animated' is a boolean flag, not actual output items
165+
if media_type == 'animated' or not isinstance(items, list):
166+
continue
167+
168+
for item in items:
169+
if not isinstance(item, dict):
170+
continue
171+
count += 1
172+
173+
if preview_output is None and is_previewable(media_type, item):
174+
enriched = {
175+
**item,
176+
'nodeId': node_id,
177+
'mediaType': media_type
178+
}
179+
if item.get('type') == 'output':
180+
preview_output = enriched
181+
elif fallback_preview is None:
182+
fallback_preview = enriched
183+
184+
return count, preview_output or fallback_preview
185+
186+
187+
def apply_sorting(jobs: list[dict], sort_by: str, sort_order: str) -> list[dict]:
188+
"""Sort jobs list by specified field and order."""
189+
reverse = (sort_order == 'desc')
190+
191+
if sort_by == 'execution_duration':
192+
def get_sort_key(job):
193+
start = job.get('execution_start_time', 0)
194+
end = job.get('execution_end_time', 0)
195+
return end - start if end and start else 0
196+
else:
197+
def get_sort_key(job):
198+
return job.get('create_time', 0)
199+
200+
return sorted(jobs, key=get_sort_key, reverse=reverse)
201+
202+
203+
def get_job(prompt_id: str, running: list, queued: list, history: dict) -> Optional[dict]:
204+
"""
205+
Get a single job by prompt_id from history or queue.
206+
207+
Args:
208+
prompt_id: The prompt ID to look up
209+
running: List of currently running queue items
210+
queued: List of pending queue items
211+
history: Dict of history items keyed by prompt_id
212+
213+
Returns:
214+
Job dict with full details, or None if not found
215+
"""
216+
if prompt_id in history:
217+
return normalize_history_item(prompt_id, history[prompt_id], include_outputs=True)
218+
219+
for item in running:
220+
if item[1] == prompt_id:
221+
return normalize_queue_item(item, JobStatus.IN_PROGRESS)
222+
223+
for item in queued:
224+
if item[1] == prompt_id:
225+
return normalize_queue_item(item, JobStatus.PENDING)
226+
227+
return None
228+
229+
230+
def get_all_jobs(
231+
running: list,
232+
queued: list,
233+
history: dict,
234+
status_filter: Optional[list[str]] = None,
235+
workflow_id: Optional[str] = None,
236+
sort_by: str = "created_at",
237+
sort_order: str = "desc",
238+
limit: Optional[int] = None,
239+
offset: int = 0
240+
) -> tuple[list[dict], int]:
241+
"""
242+
Get all jobs (running, pending, completed) with filtering and sorting.
243+
244+
Args:
245+
running: List of currently running queue items
246+
queued: List of pending queue items
247+
history: Dict of history items keyed by prompt_id
248+
status_filter: List of statuses to include (from JobStatus.ALL)
249+
workflow_id: Filter by workflow ID
250+
sort_by: Field to sort by ('created_at', 'execution_duration')
251+
sort_order: 'asc' or 'desc'
252+
limit: Maximum number of items to return
253+
offset: Number of items to skip
254+
255+
Returns:
256+
tuple: (jobs_list, total_count)
257+
"""
258+
jobs = []
259+
260+
if status_filter is None:
261+
status_filter = JobStatus.ALL
262+
263+
if JobStatus.IN_PROGRESS in status_filter:
264+
for item in running:
265+
jobs.append(normalize_queue_item(item, JobStatus.IN_PROGRESS))
266+
267+
if JobStatus.PENDING in status_filter:
268+
for item in queued:
269+
jobs.append(normalize_queue_item(item, JobStatus.PENDING))
270+
271+
include_completed = JobStatus.COMPLETED in status_filter
272+
include_failed = JobStatus.FAILED in status_filter
273+
if include_completed or include_failed:
274+
for prompt_id, history_item in history.items():
275+
is_failed = history_item.get('status', {}).get('status_str') == 'error'
276+
if (is_failed and include_failed) or (not is_failed and include_completed):
277+
jobs.append(normalize_history_item(prompt_id, history_item))
278+
279+
if workflow_id:
280+
jobs = [j for j in jobs if j.get('workflow_id') == workflow_id]
281+
282+
jobs = apply_sorting(jobs, sort_by, sort_order)
283+
284+
total_count = len(jobs)
285+
286+
if offset > 0:
287+
jobs = jobs[offset:]
288+
if limit is not None:
289+
jobs = jobs[:limit]
290+
291+
return (jobs, total_count)

0 commit comments

Comments
 (0)