Skip to content

Commit

Permalink
Update logic to allow retries in AWS Batch Client hook to be effective (
Browse files Browse the repository at this point in the history
#38998)

Co-authored-by: evgenyslab <e.nuger@gmail.com>
  • Loading branch information
shahar1 and evgenyslab authored Apr 22, 2024
1 parent 131a2e4 commit fcb2bee
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
2 changes: 2 additions & 0 deletions airflow/providers/amazon/aws/hooks/batch_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ def get_job_description(self, job_id: str) -> dict:
try:
response = self.get_conn().describe_jobs(jobs=[job_id])
return self.parse_job_description(job_id, response)
except AirflowException as err:
self.log.warning(err)
except botocore.exceptions.ClientError as err:
# Allow it to retry in case of exceeded quota limit of requests to AWS API
if err.response.get("Error", {}).get("Code") != "TooManyRequestsException":
Expand Down
13 changes: 7 additions & 6 deletions tests/providers/amazon/aws/hooks/test_batch_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,14 @@ def test_check_job_success_raises_unknown_status(self):
assert msg in str(ctx.value)
assert status in str(ctx.value)

def test_check_job_success_raises_without_jobs(self):
def test_check_job_success_raises_without_jobs(self, caplog):
self.client_mock.describe_jobs.return_value = {"jobs": []}
with pytest.raises(AirflowException) as ctx:
self.batch_client.check_job_success(JOB_ID)
self.client_mock.describe_jobs.assert_called_once_with(jobs=[JOB_ID])
msg = f"AWS Batch job ({JOB_ID}) description error"
assert msg in str(ctx.value)
with caplog.at_level(level=logging.WARNING):
with pytest.raises(AirflowException):
self.batch_client.check_job_success(JOB_ID)
self.client_mock.describe_jobs.assert_has_calls([mock.call(jobs=[JOB_ID])] * 3)
msg = f"AWS Batch job ({JOB_ID}) description error"
assert msg in caplog.messages[0]

def test_terminate_job(self):
self.client_mock.terminate_job.return_value = {}
Expand Down

0 comments on commit fcb2bee

Please sign in to comment.