Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions api/routers/generation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import json
import threading
import time
import traceback
import uuid
from typing import Dict
Expand All @@ -16,6 +17,19 @@
_jobs: Dict[str, JobStatus] = {}
_cancelled: set = set()
_cancel_events: Dict[str, threading.Event] = {}
_completed_at: Dict[str, float] = {}

_JOB_TTL = 1800 # purge terminal jobs after 30 minutes


def _purge_old_jobs() -> None:
cutoff = time.monotonic() - _JOB_TTL
stale = [jid for jid, t in _completed_at.items() if t < cutoff]
for jid in stale:
_jobs.pop(jid, None)
_cancelled.discard(jid)
_cancel_events.pop(jid, None)
_completed_at.pop(jid, None)


@router.post("/from-image")
Expand Down Expand Up @@ -63,6 +77,8 @@ async def generate_from_image(
**model_params,
}

_purge_old_jobs()

job = JobStatus(job_id=job_id, status="pending", progress=0)
_jobs[job_id] = job
_cancel_events[job_id] = threading.Event()
Expand Down Expand Up @@ -91,6 +107,7 @@ async def cancel_job(job_id: str):
_cancel_events[job_id].set()
if job.status in ("pending", "running"):
job.status = "cancelled"
_completed_at[job_id] = time.monotonic()
# Kill the active generator subprocess immediately so inference stops now.
# _run_generation will catch the resulting exception, see job_id in _cancelled,
# and return cleanly without setting an error status.
Expand Down Expand Up @@ -162,6 +179,7 @@ def progress_cb(pct: int, step: str = "") -> None:

job.status = "done"
job.progress = 100
_completed_at[job_id] = time.monotonic()
try:
rel = output_path.relative_to(WORKSPACE_DIR)
job.output_url = f"/workspace/{rel.as_posix()}"
Expand All @@ -170,6 +188,7 @@ def progress_cb(pct: int, step: str = "") -> None:

except GenerationCancelled:
job.status = "cancelled"
_completed_at[job_id] = time.monotonic()
except Exception as exc:
if job_id in _cancelled:
return
Expand All @@ -181,3 +200,4 @@ def progress_cb(pct: int, step: str = "") -> None:
print(msg.encode("ascii", errors="replace").decode("ascii"))
job.status = "error"
job.error = tb.strip()
_completed_at[job_id] = time.monotonic()