-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
3.0.5
If "Other Airflow 2 version" selected, which one?
No response
What happened?
In Airflow 3.0.3 with PR #51699 we have introduced new logic to serialize objects, that caused multiple failures in Google provider triggers.
There are for now 7 triggers that fail because of "too complicated" objects returned from triggerer on the step when it is trying to serialize the object before passing it to worker. The one example I can give is "airflow.providers.google.cloud.triggers.dataplex.DataplexDataProfileJobTrigger" and the error that is returned from the trigger is:
triggerer | TriggerRunner().run()
triggerer | File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 845, in run
triggerer | asyncio.run(self.arun())
triggerer | File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
triggerer | return loop.run_until_complete(main)
triggerer | File "/usr/local/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
triggerer | return future.result()
triggerer | File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 869, in arun
triggerer | await self.sync_state_to_supervisor(finished_ids)
triggerer | File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 1055, in sync_state_to_supervisor
triggerer | resp = await self.comms_decoder.asend(msg)
triggerer | File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 790, in asend
triggerer | bytes = frame.as_bytes()
triggerer | File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/comms.py", line 148, in as_bytes
triggerer | self.req_encoder.encode_into(self, buffer, 4)
triggerer | File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/comms.py", line 125, in _msgpack_enc_hook
triggerer | raise NotImplementedError(f"Objects of type {type(obj)} are not supported")
triggerer | NotImplementedError: Objects of type <enum 'State'> are not supported
triggerer | 2025-08-21 12:39:09 [warning ] Process exited abnormally [airflow.jobs.triggerer_job_runner] exit_code=126
triggerer | 2025-08-21 12:39:09 [error ] Trigger runner process has died! Exiting. [airflow.jobs.triggerer_job_runner]
triggerer | [2025-08-21T12:39:09.349+0000] {triggerer_job_runner.py:177} INFO - Waiting for triggers to clean up
triggerer | [2025-08-21T12:39:09.350+0000] {triggerer_job_runner.py:182} INFO - Exited trigger loop
After that the trigger is dying and not doing anything. the task fails with trigger timeout
Not sure if we have issue for that, but the whole list looks like this:
- dataplex_data_profile: kills the triggerer (NotImplementedError: Objects of type <enum 'State'> are not supported)
- dataplex_data_quality: kills the triggerer (NotImplementedError: Objects of type <enum 'State'> are not supported)
- dataproc_batch_deferrable: kills the triggerer (NotImplementedError: Objects of type <enum 'State'> are not supported)
- dataproc_spark_deferrable: kills the triggerer (NotImplementedError: Objects of type <enum 'State'> are not supported)
- example_gcp_transfer: kill the triggerer (NotImplementedError: Objects of type <class 'google.cloud.sotrage_transfer_v1.types.transfer_types.TransferOperation'> are not supported)
- example_transfer_gcs_to_gcs: kill the triggerer (NotImplementedError: Objects of type <class 'google.cloud.sotrage_transfer_v1.types.transfer_types.TransferOperation'> are not supported)
- example_gcp_transfer_aws: kill the triggerer (NotImplementedError: Objects of type <class 'google.cloud.sotrage_transfer_v1.types.transfer_types.TransferOperation'> are not supported)
For all those system tests we observe the same error and I think this should be covered in here to extend the logic for types (introduced in #51699 ):
def _msgpack_enc_hook(obj: Any) -> Any:
import pendulum
if isinstance(obj, pendulum.DateTime):
# convert the pendulm Datetime subclass into a raw datetime so that msgspec can use it's native
# encoding
return datetime(
obj.year, obj.month, obj.day, obj.hour, obj.minute, obj.second, obj.microsecond, tzinfo=obj.tzinfo
)
if isinstance(obj, Path):
return str(obj)
if isinstance(obj, BaseModel):
return obj.model_dump(exclude_unset=True)
# Raise a NotImplementedError for other types
raise NotImplementedError(f"Objects of type {type(obj)} are not supported")
What you think should happen instead?
No response
How to reproduce
- Run example_dataplex_dp.py in breeze
- wait until the task run_data_scan_def will start
- observe breeze cli to see the actual error in the triggerer. Should look like this:
triggerer | NotImplementedError: Objects of type <enum 'State'> are not supported - the task will die after some time with
triggerer timeout
Operating System
Debian GNU/Linux 12 (bookworm)
Versions of Apache Airflow Providers
This is the problem from airflow core, was introduced in version 3.0.3, starting from this PR: #51699
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct