Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion providers/src/airflow/providers/amazon/aws/hooks/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,20 +282,24 @@ def display_logs_from(log_group: str, continuation_token: str | None) -> str | N
log_group_error, continuation_tokens.error_stream_continuation
)

def job_completion(self, job_name: str, run_id: str, verbose: bool = False) -> dict[str, str]:
def job_completion(
self, job_name: str, run_id: str, verbose: bool = False, sleep_before_return: int = 0
) -> dict[str, str]:
"""
Wait until Glue job with job_name finishes; return final state if finished or raises AirflowException.

:param job_name: unique job name per AWS account
:param run_id: The job-run ID of the predecessor job run
:param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs. (default: False)
:param sleep_before_return: time in seconds to wait before returning final status.
:return: Dict of JobRunState and JobRunId
"""
next_log_tokens = self.LogContinuationTokens()
while True:
job_run_state = self.get_job_state(job_name, run_id)
ret = self._handle_state(job_run_state, job_name, run_id, verbose, next_log_tokens)
if ret:
time.sleep(sleep_before_return)
return ret
else:
time.sleep(self.job_poll_interval)
Expand Down
11 changes: 10 additions & 1 deletion providers/src/airflow/providers/amazon/aws/operators/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ class GlueJobOperator(BaseOperator):
:param update_config: If True, Operator will update job configuration. (default: False)
:param replace_script_file: If True, the script file will be replaced in S3. (default: False)
:param stop_job_run_on_kill: If True, Operator will stop the job run when task is killed.
:param sleep_before_return: time in seconds to wait before returning final status. This is meaningful in case
of limiting concurrency, Glue needs 5-10 seconds to clean up resources.
Thus if status is returned immediately it might end up in case of more than 1 concurrent run.
It is recommended to set this parameter to 10 when you are using concurrency=1.
For more information see: https://repost.aws/questions/QUaKgpLBMPSGWO0iq2Fob_bw/glue-run-concurrent-jobs#ANFpCL2fRnQRqgDFuIU_rpvA
"""

template_fields: Sequence[str] = (
Expand Down Expand Up @@ -118,6 +123,7 @@ def __init__(
update_config: bool = False,
job_poll_interval: int | float = 6,
stop_job_run_on_kill: bool = False,
sleep_before_return: int = 0,
**kwargs,
):
super().__init__(**kwargs)
Expand Down Expand Up @@ -145,6 +151,7 @@ def __init__(
self.job_poll_interval = job_poll_interval
self.stop_job_run_on_kill = stop_job_run_on_kill
self._job_run_id: str | None = None
self.sleep_before_return: int = sleep_before_return

@cached_property
def glue_job_hook(self) -> GlueJobHook:
Expand Down Expand Up @@ -220,7 +227,9 @@ def execute(self, context: Context):
method_name="execute_complete",
)
elif self.wait_for_completion:
glue_job_run = self.glue_job_hook.job_completion(self.job_name, self._job_run_id, self.verbose)
glue_job_run = self.glue_job_hook.job_completion(
self.job_name, self._job_run_id, self.verbose, self.sleep_before_return
)
self.log.info(
"AWS Glue Job: %s status: %s. Run Id: %s",
self.job_name,
Expand Down