-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
2.2.2 (latest released)
Operating System
Darwin Kernel Version 21.1.0 RELEASE_ARM64_T8101 arm64
Versions of Apache Airflow Providers
No response
Deployment
Virtualenv installation
Deployment details
python_version | 3.9.7 (default, Sep 16 2021, 23:53:23) [Clang 12.0.0 ]
What happened
The following error is displayed in Web UI:
Broken DAG: [<EDITED>/scripts/airflow/dags/master/sample_dag/sample_dag.py] Traceback (most recent call last):
File "<EDITED>/miniconda3/envs/dev-airflow/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 271, in serialize_to_json
serialized_object[key] = _encode_timetable(value)
File "<EDITED>/miniconda3/envs/dev-airflow/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 152, in _encode_timetable
raise _TimetableNotRegistered(importable_string)
airflow.serialization.serialized_objects._TimetableNotRegistered: Timetable class 'workday.AfterWorkdayTimetable' is not registered
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<EDITED>/miniconda3/envs/dev-airflow/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 937, in to_dict
json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
File "<EDITED>/miniconda3/envs/dev-airflow/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 849, in serialize_dag
raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}')
airflow.exceptions.SerializationError: Failed to serialize DAG 'learning_example_workday_timetable': Timetable class 'workday.AfterWorkdayTimetable' is not registered
What you expected to happen
For the custom timetable to be implemented and used by DAG.
How to reproduce
Following instructions from Custom DAG Scheduling with Timetables with following new DAG to implement:
import datetime
from airflow import DAG
from airflow import plugins_manager
from airflow.operators.dummy import DummyOperator
from workday import AfterWorkdayTimetable
with DAG(
dag_id="learning_example_workday_timetable",
start_date=datetime.datetime(2021,11,20),
timetable=AfterWorkdayTimetable(),
tags=["example","learning","timetable"],
) as dag:
DummyOperator(task_id="run_this")Anything else
I have tried digging through the code and believe the issue is in this line:
| if _get_registered_timetable(importable_string) != timetable_class: |
Perhaps the Custom DAG Scheduling with Timetables expects an __eq__ implemented in the AfterWorkdayTimetable class but it would appear that the AfterWorkdayTimetable class imported through the DAG and the AfterWorkdayTimetable class imported through plugin_manager have different id()'s:
| plugins_manager.initialize_timetables_plugins() |
The only way I could get it to import successfully was via the following sequence of import statements since _get_registered_timetable uses a lazy import:
import datetime
from airflow import DAG
from airflow import plugins_manager
from airflow.operators.dummy import DummyOperator
plugins_manager.initialize_timetables_plugins()
from workday import AfterWorkdayTimetableI also had the webserver and scheduler restarted and confirmed the plugin is seen via cli:
airflow plugins ✔ dev-airflow 19:01:43
name | source
=========================+===========================
workday_timetable_plugin | $PLUGINS_FOLDER/workday.pyAre you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct