Skip to content

Serialization problem in trigger in Google provider that raises NotImplementedError in case when non-primitive type is returned from triggerer #54781

@VladaZakharova

Description

@VladaZakharova

Apache Airflow version

3.0.5

If "Other Airflow 2 version" selected, which one?

No response

What happened?

In Airflow 3.0.3 with PR #51699 we have introduced new logic to serialize objects, that caused multiple failures in Google provider triggers.
There are for now 7 triggers that fail because of "too complicated" objects returned from triggerer on the step when it is trying to serialize the object before passing it to worker. The one example I can give is "airflow.providers.google.cloud.triggers.dataplex.DataplexDataProfileJobTrigger" and the error that is returned from the trigger is:

triggerer  | TriggerRunner().run()
triggerer  | File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 845, in run
triggerer  | asyncio.run(self.arun())
triggerer  | File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
triggerer  | return loop.run_until_complete(main)
triggerer  | File "/usr/local/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
triggerer  | return future.result()
triggerer  | File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 869, in arun
triggerer  | await self.sync_state_to_supervisor(finished_ids)
triggerer  | File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 1055, in sync_state_to_supervisor
triggerer  | resp = await self.comms_decoder.asend(msg)
triggerer  | File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 790, in asend
triggerer  | bytes = frame.as_bytes()
triggerer  | File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/comms.py", line 148, in as_bytes
triggerer  | self.req_encoder.encode_into(self, buffer, 4)
triggerer  | File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/comms.py", line 125, in _msgpack_enc_hook
triggerer  | raise NotImplementedError(f"Objects of type {type(obj)} are not supported")
triggerer  | NotImplementedError: Objects of type <enum 'State'> are not supported
triggerer  | 2025-08-21 12:39:09 [warning  ] Process exited abnormally      [airflow.jobs.triggerer_job_runner] exit_code=126
triggerer  | 2025-08-21 12:39:09 [error    ] Trigger runner process has died! Exiting. [airflow.jobs.triggerer_job_runner]
triggerer  | [2025-08-21T12:39:09.349+0000] {triggerer_job_runner.py:177} INFO - Waiting for triggers to clean up
triggerer  | [2025-08-21T12:39:09.350+0000] {triggerer_job_runner.py:182} INFO - Exited trigger loop

After that the trigger is dying and not doing anything. the task fails with trigger timeout
Not sure if we have issue for that, but the whole list looks like this:

- dataplex_data_profile: kills the triggerer (NotImplementedError: Objects of type <enum 'State'> are not supported)
- dataplex_data_quality: kills the triggerer (NotImplementedError: Objects of type <enum 'State'> are not supported)
- dataproc_batch_deferrable: kills the triggerer (NotImplementedError: Objects of type <enum 'State'> are not supported)
- dataproc_spark_deferrable: kills the triggerer (NotImplementedError: Objects of type <enum 'State'> are not supported)
- example_gcp_transfer: kill the triggerer (NotImplementedError: Objects of type <class 'google.cloud.sotrage_transfer_v1.types.transfer_types.TransferOperation'> are not supported)
- example_transfer_gcs_to_gcs: kill the triggerer (NotImplementedError: Objects of type <class 'google.cloud.sotrage_transfer_v1.types.transfer_types.TransferOperation'> are not supported)
- example_gcp_transfer_aws: kill the triggerer (NotImplementedError: Objects of type <class 'google.cloud.sotrage_transfer_v1.types.transfer_types.TransferOperation'> are not supported)

For all those system tests we observe the same error and I think this should be covered in here to extend the logic for types (introduced in #51699 ):

def _msgpack_enc_hook(obj: Any) -> Any:
    import pendulum

    if isinstance(obj, pendulum.DateTime):
        # convert the pendulm Datetime subclass into a raw datetime so that msgspec can use it's native
        # encoding
        return datetime(
            obj.year, obj.month, obj.day, obj.hour, obj.minute, obj.second, obj.microsecond, tzinfo=obj.tzinfo
        )
    if isinstance(obj, Path):
        return str(obj)
    if isinstance(obj, BaseModel):
        return obj.model_dump(exclude_unset=True)

    # Raise a NotImplementedError for other types
    raise NotImplementedError(f"Objects of type {type(obj)} are not supported")

What you think should happen instead?

No response

How to reproduce

  1. Run example_dataplex_dp.py in breeze
  2. wait until the task run_data_scan_def will start
  3. observe breeze cli to see the actual error in the triggerer. Should look like this:
    triggerer | NotImplementedError: Objects of type <enum 'State'> are not supported
  4. the task will die after some time with triggerer timeout

Operating System

Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

This is the problem from airflow core, was introduced in version 3.0.3, starting from this PR: #51699

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions