Skip to content

Custom Timetable Import Error #19869

@covaliov

Description

@covaliov

Apache Airflow version

2.2.2 (latest released)

Operating System

Darwin Kernel Version 21.1.0 RELEASE_ARM64_T8101 arm64

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

python_version | 3.9.7 (default, Sep 16 2021, 23:53:23) [Clang 12.0.0 ]

What happened

The following error is displayed in Web UI:

Broken DAG: [<EDITED>/scripts/airflow/dags/master/sample_dag/sample_dag.py] Traceback (most recent call last):
  File "<EDITED>/miniconda3/envs/dev-airflow/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 271, in serialize_to_json
    serialized_object[key] = _encode_timetable(value)
  File "<EDITED>/miniconda3/envs/dev-airflow/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 152, in _encode_timetable
    raise _TimetableNotRegistered(importable_string)
airflow.serialization.serialized_objects._TimetableNotRegistered: Timetable class 'workday.AfterWorkdayTimetable' is not registered

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<EDITED>/miniconda3/envs/dev-airflow/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 937, in to_dict
    json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
  File "<EDITED>/miniconda3/envs/dev-airflow/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 849, in serialize_dag
    raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}')
airflow.exceptions.SerializationError: Failed to serialize DAG 'learning_example_workday_timetable': Timetable class 'workday.AfterWorkdayTimetable' is not registered

What you expected to happen

For the custom timetable to be implemented and used by DAG.

How to reproduce

Following instructions from Custom DAG Scheduling with Timetables with following new DAG to implement:

import datetime

from airflow import DAG
from airflow import plugins_manager
from airflow.operators.dummy import DummyOperator

from workday import AfterWorkdayTimetable

with DAG(
    dag_id="learning_example_workday_timetable",
    start_date=datetime.datetime(2021,11,20),
    timetable=AfterWorkdayTimetable(),
    tags=["example","learning","timetable"],
) as dag:
    DummyOperator(task_id="run_this")

Anything else

I have tried digging through the code and believe the issue is in this line:

if _get_registered_timetable(importable_string) != timetable_class:

Perhaps the Custom DAG Scheduling with Timetables expects an __eq__ implemented in the AfterWorkdayTimetable class but it would appear that the AfterWorkdayTimetable class imported through the DAG and the AfterWorkdayTimetable class imported through plugin_manager have different id()'s:

plugins_manager.initialize_timetables_plugins()

The only way I could get it to import successfully was via the following sequence of import statements since _get_registered_timetable uses a lazy import:

import datetime

from airflow import DAG
from airflow import plugins_manager
from airflow.operators.dummy import DummyOperator

plugins_manager.initialize_timetables_plugins()
from workday import AfterWorkdayTimetable

I also had the webserver and scheduler restarted and confirmed the plugin is seen via cli:

airflow plugins                                                                                                                  ✔  dev-airflow   19:01:43 
name                     | source
=========================+===========================
workday_timetable_plugin | $PLUGINS_FOLDER/workday.py

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions