Skip to content

Commit f107fd7

Browse files
bpiwowarclaude
andcommitted
fix(state): cache jobs to receive progress events in TUI
Jobs loaded via get_jobs() were not being cached, so when JobProgressEvent arrived, there was no job in _job_cache to apply it to. Progress updates were forwarded to TUI but the underlying job data wasn't updated. Add _get_or_load_job() method that caches jobs when loaded, ensuring progress events can be applied to cached jobs between get_jobs() calls. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 75c92d5 commit f107fd7

File tree

2 files changed

+45
-36
lines changed

2 files changed

+45
-36
lines changed

src/experimaestro/scheduler/remote/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,7 @@ def get_current_run(self, experiment_id: str) -> Optional[str]:
576576
exp = self.get_experiment(experiment_id)
577577
if exp is None:
578578
return None
579-
return exp.current_run_id
579+
return exp.run_id
580580

581581
def get_jobs(
582582
self,

src/experimaestro/scheduler/workspace_state_provider.py

Lines changed: 44 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,47 @@ def _clear_experiment_cache(self, experiment_id: str) -> None:
271271
for key in keys_to_remove:
272272
del self._experiment_cache[key]
273273

274+
def _get_or_load_job(
275+
self, job_id: str, task_id: str, submit_time: float | None
276+
) -> MockJob:
277+
"""Get job from cache or load from disk and cache it.
278+
279+
This ensures that job events (progress, state changes) can be applied
280+
to cached jobs, keeping them up to date between get_jobs() calls.
281+
282+
Args:
283+
job_id: Job identifier
284+
task_id: Task identifier (for job path)
285+
submit_time: Submit timestamp (fallback if job directory doesn't exist)
286+
287+
Returns:
288+
MockJob from cache or freshly loaded from disk
289+
"""
290+
with self._job_cache_lock:
291+
if job_id in self._job_cache:
292+
return self._job_cache[job_id]
293+
294+
# Load from disk
295+
job_path = self.workspace_path / "jobs" / task_id / job_id
296+
if job_path.exists():
297+
job = self._create_mock_job_from_path(job_path, task_id, job_id)
298+
else:
299+
# Job directory doesn't exist - create minimal MockJob
300+
job = MockJob(
301+
identifier=job_id,
302+
task_id=task_id,
303+
path=job_path,
304+
state="unscheduled",
305+
submittime=submit_time,
306+
starttime=None,
307+
endtime=None,
308+
progress=[],
309+
updated_at="",
310+
)
311+
312+
self._job_cache[job_id] = job
313+
return job
314+
274315
# =========================================================================
275316
# Experiment methods
276317
# =========================================================================
@@ -588,25 +629,8 @@ def get_jobs(
588629
if not all(job_info.tags.get(k) == v for k, v in tags.items()):
589630
continue
590631

591-
# Load full job data from job directory
592-
job_path = self.workspace_path / "jobs" / job_info.task_id / job_id
593-
if job_path.exists():
594-
job = self._create_mock_job_from_path(
595-
job_path, job_info.task_id, job_id
596-
)
597-
else:
598-
# Job directory doesn't exist - create minimal MockJob
599-
job = MockJob(
600-
identifier=job_id,
601-
task_id=job_info.task_id,
602-
path=job_path,
603-
state="unscheduled",
604-
submittime=job_info.timestamp,
605-
starttime=None,
606-
endtime=None,
607-
progress=[],
608-
updated_at="",
609-
)
632+
# Get job from cache or load from disk
633+
job = self._get_or_load_job(job_id, job_info.task_id, job_info.timestamp)
610634

611635
# Apply state filter on loaded job
612636
if state:
@@ -639,22 +663,7 @@ def get_job(
639663
if job_info is None:
640664
return None
641665

642-
job_path = self.workspace_path / "jobs" / job_info.task_id / job_id
643-
if job_path.exists():
644-
return self._create_mock_job_from_path(job_path, job_info.task_id, job_id)
645-
else:
646-
# Job directory doesn't exist - create minimal MockJob
647-
return MockJob(
648-
identifier=job_id,
649-
task_id=job_info.task_id,
650-
path=job_path,
651-
state="unscheduled",
652-
submittime=job_info.timestamp,
653-
starttime=None,
654-
endtime=None,
655-
progress=[],
656-
updated_at="",
657-
)
666+
return self._get_or_load_job(job_id, job_info.task_id, job_info.timestamp)
658667

659668
def get_all_jobs(
660669
self,

0 commit comments

Comments
 (0)