[Feature] ClusterExecutor: Store file errors in output files#918
[Feature] ClusterExecutor: Store file errors in output files#918jan-janssen merged 1 commit intomainfrom
Conversation
📝 WalkthroughWalkthroughTwo backend execution files are modified to relocate error handling logic into try/except blocks. In Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/executorlib/task_scheduler/file/backend.py (1)
71-90:⚠️ Potential issue | 🔴 Critical
NameErrorifbackend_load_fileraises: bothapply_dictandtime_startare unbound.If
backend_load_file(line 72) throws,apply_dictis never assigned, so line 83 (apply_dict=apply_dict) raises aNameErrorinside the except handler. Similarly,time_start(line 73) is never set, so line 89 (time.time() - time_start) also raises aNameError.Note how
cache_parallel.pyavoids this by initializing both before the try block (lines 40–41). Apply the same pattern here.🐛 Proposed fix — initialize before the try block
def backend_execute_task_in_file(file_name: str) -> None: ... + apply_dict = {} + time_start = time.time() try: apply_dict = backend_load_file(file_name=file_name) - time_start = time.time() result = { "result": apply_dict["fn"].__call__( *apply_dict["args"], **apply_dict["kwargs"] ) } except Exception as error: result = {"error": error} backend_write_error_file( error=error, apply_dict=apply_dict, ) backend_write_file( file_name=file_name, output=result, runtime=time.time() - time_start, )src/executorlib/backend/cache_parallel.py (1)
42-68:⚠️ Potential issue | 🔴 CriticalDeadlock: if rank 0 throws before
bcast, all other ranks hang forever.If
backend_load_file(line 44) raises an exception on rank 0, execution jumps to theexceptblock. Meanwhile every other rank is blocked inMPI.COMM_WORLD.bcast(line 45) waiting for rank 0, which never arrives. TheBarrierat line 68 does not help because non-zero ranks never reach it.The same asymmetry applies to
gather(line 48): if rank 0 throws during__call__but other ranks complete successfully, they block ongatherwhile rank 0 is in theexceptblock.Before this PR, load + bcast lived outside the try block, so any failure would propagate as an unhandled exception and MPI would abort all processes — no deadlock. Moving them inside the try block silently catches the error on rank 0 only.
One approach: broadcast a success/failure flag after the
__call__so all ranks agree on the error path. For the load, keep it (and the bcast) before the try block so a load failure still crashes cleanly.💡 Sketch: keep load+bcast outside try, protect only the execution
time_start = time.time() - apply_dict = {} + apply_dict = None + if mpi_rank_zero: + apply_dict = backend_load_file(file_name=file_name) + apply_dict = MPI.COMM_WORLD.bcast(apply_dict, root=0) try: - if mpi_rank_zero: - apply_dict = backend_load_file(file_name=file_name) - apply_dict = MPI.COMM_WORLD.bcast(apply_dict, root=0) output = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"]) result = ( MPI.COMM_WORLD.gather(output, root=0) if mpi_size_larger_one else output ) except Exception as error: + # All ranks hit the same exception, so gather a sentinel to avoid deadlock + if mpi_size_larger_one: + MPI.COMM_WORLD.gather(None, root=0) if mpi_rank_zero: backend_write_file( file_name=file_name, output={"error": error}, runtime=time.time() - time_start, ) backend_write_error_file( error=error, apply_dict=apply_dict, ) else: if mpi_rank_zero: backend_write_file( file_name=file_name, output={"result": result}, runtime=time.time() - time_start, ) MPI.COMM_WORLD.Barrier()Note: this sketch still assumes all ranks throw/succeed together. If asymmetric failures are possible, a coordinated error flag (
allreduceon a boolean) is needed before branching.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #918 +/- ##
=======================================
Coverage 93.78% 93.78%
=======================================
Files 38 38
Lines 1947 1947
=======================================
Hits 1826 1826
Misses 121 121 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Summary by CodeRabbit