Skip to content

Commit c1843ec

Browse files
authored
feat: cleanup job definitions (#20)
This PR cleans up job definitions on success and failure resolves #17 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Improved the process for cleaning up job resources during job completion or cancellation. - Centralized the cleanup steps to ensure consistent deallocation of resources, enhancing overall job management reliability. - Enhanced clarity in job status handling with refined control flow. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent cc4df2f commit c1843ec

File tree

1 file changed

+13
-7
lines changed

1 file changed

+13
-7
lines changed

snakemake_executor_plugin_aws_batch/__init__.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -185,13 +185,16 @@ async def check_active_jobs(
185185
for job in active_jobs:
186186
async with self.status_rate_limiter:
187187
status_code, msg = self._get_job_status(job)
188+
189+
if status_code is not None:
188190
if status_code == 0:
189191
self.report_job_success(job)
190-
elif status_code is not None:
192+
else:
191193
message = f"AWS Batch job failed. Code: {status_code}, Msg: {msg}."
192194
self.report_job_error(job, msg=message)
193-
else:
194-
yield job
195+
self.cleanup_job_resources(job)
196+
else:
197+
yield job
195198

196199
def _get_job_status(self, job: SubmittedJobInfo) -> tuple[int, Optional[str]]:
197200
"""
@@ -240,9 +243,8 @@ def _terminate_job(self, job: SubmittedJobInfo):
240243
reason="terminated by snakemake",
241244
)
242245
except Exception as e:
243-
job_spec = job.aux["job_params"]
244246
self.logger.info(
245-
f"failed to terminate Batch job definition: {job_spec} with error: {e}"
247+
f"failed to terminate Batch job: {job.external_jobid} with error: {e}"
246248
)
247249

248250
def _deregister_job(self, job: SubmittedJobInfo):
@@ -260,6 +262,11 @@ def _deregister_job(self, job: SubmittedJobInfo):
260262
f"{job_def_arn} with error {e}"
261263
)
262264

265+
def cleanup_job_resources(self, job: SubmittedJobInfo):
266+
"""Terminate and deregister job resources"""
267+
self._terminate_job(job)
268+
self._deregister_job(job)
269+
263270
def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
264271
# Cancel all active jobs.
265272
# This method is called when Snakemake is interrupted.
@@ -268,5 +275,4 @@ def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
268275
self.logger.info("shutting down...")
269276
# cleanup jobs
270277
for j in active_jobs:
271-
self._terminate_job(j)
272-
self._deregister_job(j)
278+
self.cleanup_job_resources(j)

0 commit comments

Comments
 (0)