Skip to content

Deferred Glue Operators seem to always be verbose #43620

@jimwbaldwin

Description

@jimwbaldwin

Apache Airflow Provider(s)

amazon

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.28.0

Apache Airflow version

2.10.1

Operating System

Amazon Linux 2023

Deployment

Amazon (AWS) MWAA

Deployment details

MWAA in production and mwaa-local-runner for local testing. Bug exists on both.

What happened

Deferred GlueJobOperator tasks are getting job failures due to rate limiting on fetching logs from Cloudwatch, we have verbose set to False. The traceback shows a function print_job_logs that should only be called when verbose is true. Additionally when I watch the task logs in real time they are definitely pulling the Glue logs from Cloudwatch into Airflow.

Traceback (most recent call last):

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 558, in cleanup_finished_triggers
    result = details["task"].result()
             ^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 630, in run_trigger
    async for event in trigger.run():

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/triggers/glue.py", line 73, in run
    await hook.async_job_completion(self.job_name, self.run_id, self.verbose)

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 315, in async_job_completion
    ret = self._handle_state(job_run_state, job_name, run_id, verbose, next_log_tokens)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 334, in _handle_state
    self.print_job_logs(

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 278, in print_job_logs
    continuation_tokens.output_stream_continuation = display_logs_from(
                                                     ^^^^^^^^^^^^^^^^^^

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 245, in display_logs_from
    for response in paginator.paginate(

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/paginate.py", line 269, in __iter__
    response = self._make_request(current_kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/paginate.py", line 357, in _make_request
    return self._method(**current_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/client.py", line 569, in _api_call
    return self._make_api_call(operation_name, kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/client.py", line 1023, in _make_api_call
    raise error_class(parsed_response, operation_name)

botocore.errorfactory.ThrottlingException: An error occurred (ThrottlingException) when calling the FilterLogEvents operation (reached max retries: 4): Rate exceeded

A colleague and I read through the operator code and it all looks correct, so we suspect that the issues is occurring when the task is serialized as that specifically converts the bool to a string. The string "False" would then be truthy and cause this code to run. I'm guessing that the tasks are serialized when they are deferred.

Serialization:

def serialize(self) -> tuple[str, dict[str, Any]]:
return (
# dynamically generate the fully qualified name of the class
self.__class__.__module__ + "." + self.__class__.__qualname__,
{
"job_name": self.job_name,
"run_id": self.run_id,
"verbose": str(self.verbose),
"aws_conn_id": self.aws_conn_id,
"job_poll_interval": self.job_poll_interval,
},
)

Code which is getting called when verbose is false:

if verbose:
self.print_job_logs(
job_name=job_name,
run_id=run_id,
continuation_tokens=next_log_tokens,
)

Example python showing bool serialization issues:

Python 3.11.7 (main, Oct 26 2024, 04:00:37) [GCC 11.4.1 20230605 (Red Hat 11.4.1-2)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> a = False
>>> str(a)
'False'
>>> bool(str(a))
True
>>>

What you think should happen instead

Cloudwatch logs should not be read into Airflow when verbose is False.

If the serialization process is the issue, then it should either change to serialize differently or the deserialization should be changed to understand "True" and "False".

How to reproduce

  1. Create a GlueJobOperator task.
  2. Set deferrable=True and verbose=False
  3. Run the task and watch the Airflow logs while it is in a deferred state.

Anything else

Seems to be happening everytime if I watch the tasks. Completed tasks (success or failed) do not show the logs but deferred tasks do, which to me backs up some kind of serialization bug in the Operator.

The traceback shows that code is being called when running even though in the log it is not shown.

Would be willing to submit a PR if someone could assist.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions