Skip to content

Commit 02144df

Browse files
bpiwowarclaude
andcommitted
feat(scheduler): properly track SCHEDULED state for batch jobs
- Add _set_job_state_from_process() to check process state and set job state to SCHEDULED or RUNNING accordingly - Add aio_wait_until_running() to Process base class with polling default - SLURM: implement event-driven aio_wait_until_running() using watcher - Add clock icon (🕐) for SCHEDULED state in TUI - Show informative message when trying to view logs for SCHEDULED jobs This enables proper tracking of jobs that are submitted to batch systems like SLURM but not yet executing. The TUI now shows a distinct icon and prevents attempts to view logs that don't exist yet. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 43134f7 commit 02144df

File tree

5 files changed

+120
-5
lines changed

5 files changed

+120
-5
lines changed

src/experimaestro/connectors/__init__.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,28 @@ async def aio_isrunning(self):
138138
"""True is the process is truly running (I/O)"""
139139
return (await self.aio_state()).running
140140

141+
async def aio_wait_until_running(
142+
self, poll_interval: float = 5.0
143+
) -> "ProcessState":
144+
"""Wait until the process transitions from SCHEDULED to RUNNING (or finishes).
145+
146+
Subclasses (e.g., SLURM) may override with event-driven implementation.
147+
Default implementation polls aio_state() at regular intervals.
148+
149+
Args:
150+
poll_interval: How often to poll for state changes (seconds)
151+
152+
Returns:
153+
The new ProcessState (RUNNING or a finished state)
154+
"""
155+
import asyncio
156+
157+
while True:
158+
state = await self.aio_state()
159+
if state != ProcessState.SCHEDULED:
160+
return state
161+
await asyncio.sleep(poll_interval)
162+
141163
async def aio_code(self) -> Optional[int]:
142164
"""Returns a future containing the returned code
143165

src/experimaestro/launchers/slurm/base.py

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,10 +221,14 @@ def __init__(self, launcher: "SlurmLauncher"):
221221
self.fetched_event = threading.Event()
222222
self.updating_jobs = threading.Lock()
223223

224-
# Async waiters: jobid -> list of (asyncio.Future, event_loop)
224+
# Async waiters for job completion: jobid -> list of (asyncio.Future, event_loop)
225225
self.async_waiters: Dict[
226226
str, List[Tuple[asyncio.Future, asyncio.AbstractEventLoop]]
227227
] = {}
228+
# Async waiters for state change: jobid -> list of (asyncio.Future, event_loop)
229+
self.async_state_waiters: Dict[
230+
str, List[Tuple[asyncio.Future, asyncio.AbstractEventLoop]]
231+
] = {}
228232
self.async_waiters_lock = threading.Lock()
229233

230234
# Async waiters for first fetch: list of (asyncio.Future, event_loop)
@@ -346,9 +350,24 @@ def register_async_waiter(
346350
self.async_waiters[jobid].append((event, loop))
347351
return event
348352

353+
def register_async_state_waiter(
354+
self, jobid: str, loop: asyncio.AbstractEventLoop
355+
) -> asyncio.Future:
356+
"""Register an async waiter for job state change.
357+
358+
Returns an asyncio.Future that will be set on next state update.
359+
"""
360+
future = loop.create_future()
361+
with self.async_waiters_lock:
362+
if jobid not in self.async_state_waiters:
363+
self.async_state_waiters[jobid] = []
364+
self.async_state_waiters[jobid].append((future, loop))
365+
return future
366+
349367
def _notify_async_waiters(self):
350-
"""Notify async waiters for finished jobs"""
368+
"""Notify async waiters for finished jobs and state changes"""
351369
with self.async_waiters_lock:
370+
# Notify completion waiters
352371
finished_jobs = []
353372
for jobid, waiters in self.async_waiters.items():
354373
state = self.jobs.get(jobid)
@@ -361,6 +380,13 @@ def _notify_async_waiters(self):
361380
for jobid in finished_jobs:
362381
del self.async_waiters[jobid]
363382

383+
# Notify state change waiters (always notify with current state)
384+
for jobid, waiters in list(self.async_state_waiters.items()):
385+
state = self.jobs.get(jobid)
386+
for future, loop in waiters:
387+
loop.call_soon_threadsafe(future.set_result, state)
388+
self.async_state_waiters.clear()
389+
364390
def _notify_async_fetched_waiters(self):
365391
"""Notify async waiters waiting for the first fetch"""
366392
with self.async_waiters_lock:
@@ -524,6 +550,29 @@ async def aio_state(self, timeout: float | None = None) -> ProcessState:
524550
if release_after:
525551
SlurmProcessWatcher.release(watcher)
526552

553+
async def aio_wait_until_running(self) -> ProcessState:
554+
"""Wait until the job transitions from SCHEDULED to RUNNING (or finishes).
555+
556+
Uses the SLURM watcher's event-driven notification instead of polling.
557+
Returns the new ProcessState (RUNNING or a finished state).
558+
"""
559+
loop = asyncio.get_running_loop()
560+
561+
with SlurmProcessWatcher.get(self.launcher) as watcher:
562+
while True:
563+
# Check current state
564+
jobinfo = await watcher.aio_getjob(self.jobid)
565+
if jobinfo:
566+
if jobinfo.state != ProcessState.SCHEDULED:
567+
return jobinfo.state
568+
else:
569+
# Job not found yet, wait for next update
570+
pass
571+
572+
# Register for state change notification and wait
573+
future = watcher.register_async_state_waiter(self.jobid, loop)
574+
await future
575+
527576
def kill(self):
528577
logger.warning("Killing slurm job %s", self.jobid)
529578
builder = self.launcher.connector.processbuilder()

src/experimaestro/scheduler/base.py

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -853,16 +853,45 @@ async def aio_final_state(self, job: Job) -> JobState:
853853
# Wait for done handler to complete and notify exit condition
854854
return job.state
855855

856+
async def _set_job_state_from_process(self, job: Job, process: "Process") -> None:
857+
"""Set job state based on process state and wait until running.
858+
859+
Checks the process state and sets job to SCHEDULED or RUNNING accordingly.
860+
If SCHEDULED, waits until the process starts running (or finishes).
861+
862+
Args:
863+
job: The job to update
864+
process: The process to check state on
865+
"""
866+
from experimaestro.connectors import ProcessState
867+
868+
# Check initial process state
869+
state = await process.aio_state()
870+
if state == ProcessState.SCHEDULED:
871+
job.set_state(JobState.SCHEDULED)
872+
self.notify_job_state(job)
873+
874+
# Wait until running or finished (uses event-driven for SLURM)
875+
state = await process.aio_wait_until_running()
876+
if state == ProcessState.RUNNING:
877+
job.set_state(JobState.RUNNING)
878+
self.notify_job_state(job)
879+
logger.info("Job %s started running", job.identifier[:8])
880+
# If finished, state will be set later from marker files
881+
else:
882+
# Process is already running (or finished)
883+
job.set_state(JobState.RUNNING)
884+
self.notify_job_state(job)
885+
856886
async def _wait_for_job_process(self, job: Job, process: "Process") -> None:
857887
"""Wait for a running job process to complete and update state.
858888
859889
Args:
860890
job: The job with a running process
861891
process: The process to wait for
862892
"""
863-
# Notify listeners that job is running
864-
job.set_state(JobState.RUNNING)
865-
self.notify_job_state(job)
893+
# Set initial state (SCHEDULED or RUNNING) and wait until running
894+
await self._set_job_state_from_process(job, process)
866895

867896
# And now, we wait...
868897
code = await process.aio_code()
@@ -1066,6 +1095,9 @@ async def aio_start(self, job: Job) -> Optional[JobState]: # noqa: C901
10661095
logger.warning("Error while starting job", exc_info=True)
10671096
return JobState.ERROR
10681097

1098+
# Set initial state (SCHEDULED or RUNNING) and wait until running
1099+
await self._set_job_state_from_process(job, process)
1100+
10691101
# Wait for job to complete while holding locks
10701102
try:
10711103
logger.debug("Waiting for job %s process to end", job)

src/experimaestro/tui/app.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,16 @@ def on_view_job_logs(self, message: ViewJobLogs) -> None:
621621
For remote monitoring, switches to log viewer immediately with loading state,
622622
then starts adaptive sync in background.
623623
"""
624+
from experimaestro.scheduler.interfaces import JobState
625+
626+
# Check if job is scheduled (not yet running) - no logs available
627+
if message.job_state == JobState.SCHEDULED:
628+
self.notify(
629+
"Job is scheduled but not yet running - logs not available",
630+
severity="information",
631+
)
632+
return
633+
624634
job_path = Path(message.job_path)
625635
job_id = job_path.name
626636

src/experimaestro/tui/utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ def get_status_icon(status: str, failure_reason=None, transient=None):
4444
return "❌"
4545
elif status == "running":
4646
return "▶"
47+
elif status == "scheduled":
48+
return "🕐" # Scheduled (e.g., in SLURM queue)
4749
elif status == "waiting":
4850
return "⌛" # Waiting for dependencies
4951
elif status == "unscheduled" and transient is not None and transient.is_transient:

0 commit comments

Comments
 (0)