Skip to content

Dag Processor Error #50530

@aa-matthias

Description

@aa-matthias

Apache Airflow version

3.0.0

If "Other Airflow 2 version" selected, which one?

No response

What happened?

When running DAG processor these errors are shown in stderr - DAGs are not parsed successfully.

Note: Same DAG seems to parse successful when running airflow dags reserialize

--- Supervised process Last chance exception handler ---
Traceback (most recent call last):
  File "./virtualenvs/airflow/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py", line 368, in _fork_main
    target()
  File "./virtualenvs/airflow/lib/python3.12/site-packages/airflow/dag_processing/processor.py", line 82, in _parse_file_entrypoint
    msg = comms_decoder.get_message()
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "./virtualenvs/airflow/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 632, in get_message
    msg = self.decoder.validate_json(line)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "./virtualenvs/airflow/lib/python3.12/site-packages/pydantic/type_adapter.py", line 468, in validate_json
    return self.validator.validate_json(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "./virtualenvs/airflow/lib/python3.12/site-packages/pydantic/_internal/_mock_val_ser.py", line 100, in __getattr__
    raise PydanticUserError(self._error_message, code=self._code)
pydantic.errors.PydanticUserError: `TypeAdapter[typing.Annotated[typing.Union[ForwardRef('DagFileParseRequest'), airflow.sdk.execution_time.comms.ConnectionResult, airflow.sdk.execution_time.comms.VariableResult, airflow.sdk.execution_time.comms.ErrorResponse, airflow.sdk.execution_time.comms.OKResponse], FieldInfo(annotation=NoneType, required=True, discriminator='type')]]` is not fully defined; you should define `typing.Annotated[typing.Union[ForwardRef('DagFileParseRequest'), airflow.sdk.execution_time.comms.ConnectionResult, airflow.sdk.execution_time.comms.VariableResult, airflow.sdk.execution_time.comms.ErrorResponse, airflow.sdk.execution_time.comms.OKResponse], FieldInfo(annotation=NoneType, required=True, discriminator='type')]` and all referenced types, then call `.rebuild()` on the instance.

For further information visit https://errors.pydantic.dev/2.11/u/class-not-fully-defined

What you think should happen instead?

No response

How to reproduce

upgraded from 2.10.5 - migrated db and config, adapted dags, added new supervisor program for dag-processor

Operating System

Ubuntu 24.04.2 LTS

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions