Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions snakemake_executor_plugin_aws_batch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,16 @@ async def check_active_jobs(
for job in active_jobs:
async with self.status_rate_limiter:
status_code, msg = self._get_job_status(job)

if status_code is not None:
if status_code == 0:
self.report_job_success(job)
elif status_code is not None:
else:
message = f"AWS Batch job failed. Code: {status_code}, Msg: {msg}."
self.report_job_error(job, msg=message)
else:
yield job
self.cleanup_job_resources(job)
else:
yield job

def _get_job_status(self, job: SubmittedJobInfo) -> tuple[int, Optional[str]]:
"""
Expand Down Expand Up @@ -240,9 +243,8 @@ def _terminate_job(self, job: SubmittedJobInfo):
reason="terminated by snakemake",
)
except Exception as e:
job_spec = job.aux["job_params"]
self.logger.info(
f"failed to terminate Batch job definition: {job_spec} with error: {e}"
f"failed to terminate Batch job: {job.external_jobid} with error: {e}"
)

def _deregister_job(self, job: SubmittedJobInfo):
Expand All @@ -260,6 +262,11 @@ def _deregister_job(self, job: SubmittedJobInfo):
f"{job_def_arn} with error {e}"
)

def cleanup_job_resources(self, job: SubmittedJobInfo):
"""Terminate and deregister job resources"""
self._terminate_job(job)
self._deregister_job(job)

def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
# Cancel all active jobs.
# This method is called when Snakemake is interrupted.
Expand All @@ -268,5 +275,4 @@ def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
self.logger.info("shutting down...")
# cleanup jobs
for j in active_jobs:
self._terminate_job(j)
self._deregister_job(j)
self.cleanup_job_resources(j)