-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Fix NoneType error when updating serialized DAG #56422
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix NoneType error when updating serialized DAG #56422
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
|
Hi ,I’ve fixed a stability issue in SerializedDagModel.write_dag for dynamic DAGs (#56306). |
7178d7c to
b5f07b5
Compare
ephraimbuddy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like bad rebase, please update and have only the query change
e55a5c9 to
b5f07b5
Compare
b5f07b5 to
8ddadaf
Compare
|
Hi @ephraimbuddy, Thanks for reviewing! Sorry about the earlier confusion — the bad rebase included extra changes by mistake. Appreciate your patience and review! |
|
Hi @ephraimbuddy, Thanks for the approval! I see the CI tests are failing with the I believe I can fix both issues at once. The plan is:
dag_version = session.scalars(
select(DagVersion)
.where(
DagVersion.dag_id == dag.dag_id,
)
.options(joinedload(DagVersion.task_instances))
.options(joinedload(DagVersion.serialized_dag))
.order_by(DagVersion.created_at.desc())
.limit(1)
).first() Should I go ahead and push this combined fix? Thanks! |
|
Maybe: dag_version = session.scalar(
select(DagVersion)
.where(
DagVersion.dag_id == dag.dag_id,
DagVersion.bundle_name == bundle_name
)
.options(joinedload(DagVersion.task_instances))
.options(joinedload(DagVersion.serialized_dag))
.order_by(DagVersion.created_at.desc())
.limit(1)
)In the above, I used |
83ad489 to
98a1300
Compare
|
Hi @ephraimbuddy, thanks! I've updated the code to use session.scalar() as you suggested |
You can do that. Then use |
98a1300 to
1d729ca
Compare
|
Hi @ephraimbuddy, I've rebased and pushed the changes. I used So, I've kept only I also added the This should hopefully fix all the CI failures now. Thanks! |
It was wrong to load the serdag and not use it. The initial idea was to use the serdag at line 437 but was omitted. Thinking about it now, it will be faster to only load serdag when there's a TI associated with the dag version
It was wrong to load the serdag and not use it. The initial idea was to use the serdag at line 437 but was omitted. Thinking about it now, it will be faster to only load serdag when there's a TI associated with the dag version
It was wrong to load the serdag and not use it. The initial idea was to use the serdag at line 437 but was omitted. Thinking about it now, it will be faster to only load serdag when there's a TI associated with the dag version (cherry picked from commit e5a88cc)
Issue: #56306
FIX: Prevent AttributeError when updating SerializedDagModel for dynamic DAGs
This PR fixes a stability issue in
SerializedDagModel.write_dagby adding a null check before updating an existing record for dynamic DAGs.Context and Problem
In the section of
write_dagdedicated to updating dynamic DAGs (i.e., whendag_versionexists but has no task instances), the code retrieves the latestSerializedDagModelinstance usingcls.get(dag.dag_id, session=session).If
cls.get(dag.dag_id, session=session)returnsNone(for example, due to a race condition where the record was just deleted or not found in the current session), the subsequent code attempts to modify attributes:latest_ser_dag._data = new_serialized_dag._dataThis would raise an
AttributeErrorbecauselatest_ser_dagisNone.Solution
latest_ser_dagexists before updating its attributes.AttributeErrorand improves the stability of DAG serialization for dynamic DAGs.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.