-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow Provider(s)
amazon
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==8.28.0
Apache Airflow version
2.10.1
Operating System
Amazon Linux 2023
Deployment
Amazon (AWS) MWAA
Deployment details
MWAA in production and mwaa-local-runner for local testing. Bug exists on both.
What happened
Deferred GlueJobOperator tasks are getting job failures due to rate limiting on fetching logs from Cloudwatch, we have verbose set to False. The traceback shows a function print_job_logs that should only be called when verbose is true. Additionally when I watch the task logs in real time they are definitely pulling the Glue logs from Cloudwatch into Airflow.
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 558, in cleanup_finished_triggers
result = details["task"].result()
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 630, in run_trigger
async for event in trigger.run():
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/triggers/glue.py", line 73, in run
await hook.async_job_completion(self.job_name, self.run_id, self.verbose)
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 315, in async_job_completion
ret = self._handle_state(job_run_state, job_name, run_id, verbose, next_log_tokens)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 334, in _handle_state
self.print_job_logs(
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 278, in print_job_logs
continuation_tokens.output_stream_continuation = display_logs_from(
^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 245, in display_logs_from
for response in paginator.paginate(
File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/paginate.py", line 269, in __iter__
response = self._make_request(current_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/paginate.py", line 357, in _make_request
return self._method(**current_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/client.py", line 569, in _api_call
return self._make_api_call(operation_name, kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/client.py", line 1023, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.errorfactory.ThrottlingException: An error occurred (ThrottlingException) when calling the FilterLogEvents operation (reached max retries: 4): Rate exceededA colleague and I read through the operator code and it all looks correct, so we suspect that the issues is occurring when the task is serialized as that specifically converts the bool to a string. The string "False" would then be truthy and cause this code to run. I'm guessing that the tasks are serialized when they are deferred.
Serialization:
airflow/providers/src/airflow/providers/amazon/aws/triggers/glue.py
Lines 58 to 69 in ff6038b
| def serialize(self) -> tuple[str, dict[str, Any]]: | |
| return ( | |
| # dynamically generate the fully qualified name of the class | |
| self.__class__.__module__ + "." + self.__class__.__qualname__, | |
| { | |
| "job_name": self.job_name, | |
| "run_id": self.run_id, | |
| "verbose": str(self.verbose), | |
| "aws_conn_id": self.aws_conn_id, | |
| "job_poll_interval": self.job_poll_interval, | |
| }, | |
| ) |
Code which is getting called when verbose is false:
airflow/providers/src/airflow/providers/amazon/aws/hooks/glue.py
Lines 333 to 338 in ff6038b
| if verbose: | |
| self.print_job_logs( | |
| job_name=job_name, | |
| run_id=run_id, | |
| continuation_tokens=next_log_tokens, | |
| ) |
Example python showing bool serialization issues:
Python 3.11.7 (main, Oct 26 2024, 04:00:37) [GCC 11.4.1 20230605 (Red Hat 11.4.1-2)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> a = False
>>> str(a)
'False'
>>> bool(str(a))
True
>>>What you think should happen instead
Cloudwatch logs should not be read into Airflow when verbose is False.
If the serialization process is the issue, then it should either change to serialize differently or the deserialization should be changed to understand "True" and "False".
How to reproduce
- Create a GlueJobOperator task.
- Set deferrable=True and verbose=False
- Run the task and watch the Airflow logs while it is in a deferred state.
Anything else
Seems to be happening everytime if I watch the tasks. Completed tasks (success or failed) do not show the logs but deferred tasks do, which to me backs up some kind of serialization bug in the Operator.
The traceback shows that code is being called when running even though in the log it is not shown.
Would be willing to submit a PR if someone could assist.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct