-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AIP-65: Add DAG versioning support #42913
base: main
Are you sure you want to change the base?
Conversation
8aab698
to
aad084c
Compare
version_number=version_number, | ||
dag_code=dag_code, | ||
serialized_dag=serialized_dag, | ||
version_name=version_name or cls._generate_unique_random_string(session), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should augment the version_name
we store when a DAG author provides is to us. Otherwise, we'd have the same version_name
for all versions until they change it, which isn't great.
e.g. if I have version_name=foo
in my DAG, and I add a new task but don't update the version. So maybe something like foo-{version_number}
or something? Or maybe we toss on an int to signify when it hasn't changed, but the DAG has, like foo-2
.
We also only stamp a new version when the serdag actually changes. I wonder if we'd be better off combing the user provided version and the serdag hash to determine when to create a new version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should augment the version_name we store when a DAG author provides is to us. Otherwise, we'd have the same version_name for all versions until they change it, which isn't great.
e.g. if I have version_name=foo in my DAG, and I add a new task but don't update the version. So maybe something like foo-{version_number} or something? Or maybe we toss on an int to signify when it hasn't changed, but the DAG has, like foo-2.
That's my plan, but I want to have it in a method. The version_number
increments, while the version_name remains the same until changed. However, a method named version
would return version_name-version_number
(foo-2
), which would be used on the UI. I think having them separate is much better than using Python logic, which might go wrong.
We also only stamp a new version when the serdag actually changes. I wonder if we'd be better off combing the user provided version and the serdag hash to determine when to create a new version?
The serdag is 1:1 to dag_version. When it changes, the version also changes and there's a link between the two tables which allows us to access the hash e.g :
version = session.query(DagVersion).get(1)
print(version.serialized_dag.dag_hash)
I feel we shouldn't combine the hash and version since a change in the user-provided version name would trigger a new serialized dag version, which also means a new dagversion.
The dag_hash can remain an internal property that we use to know that the version has changed
ad4f57c
to
dd272b0
Compare
eb13cdd
to
1b81f2f
Compare
1b81f2f
to
ef4120d
Compare
airflow/models/dag.py
Outdated
@@ -2463,7 +2471,7 @@ def create_dagrun( | |||
conf: dict | None = None, | |||
run_type: DagRunType | None = None, | |||
session: Session = NEW_SESSION, | |||
dag_hash: str | None = None, | |||
dag_version_id: int | None = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure that we want to have (only) an integer value for the DAG version? Who woul dbe responsible to increment?
I am thinking about a backup/restore, how can we be sure durch such cases the IDs will not overwrite each other?
Besides the question about int
: If it is an integer then it is not really an ID, we should rather call it dag_version
if it represents a numeric value (I think)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dag_version_id is a foreignkey from dag_version table to dagrun and auto increments based on how sqlalchemy works. However, it will eventually be refactored to be a UUID.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking about a backup/restore, how can we be sure durch such cases the IDs will not overwrite each other?
Since it's a foreign key, I don't see how it will overwrite. The above code is just for creating the dag run. Maybe we should change it to the dag_version object itself and not the ID?
sa.Column("dag_code_id", sa.Integer(), nullable=True), | ||
sa.Column("serialized_dag_id", sa.Integer(), nullable=True), | ||
sa.ForeignKeyConstraint(("dag_id",), ["dag.dag_id"], name=op.f("dag_version_dag_id_fkey")), | ||
sa.PrimaryKeyConstraint("id", name=op.f("dag_version_pkey")), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the ID the only primary key w/o considering the dag_id contained in primary key? So the ID is a global ID irrespective of the DAG linked?
Which code part/logic is responsible to auto-increment this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think to second question I found the answer... which relates to the other (follow-up) comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the ID the only primary key w/o considering the dag_id contained in primary key? So the ID is a global ID irrespective of the DAG linked?
Yes. I don't see any issues with that, and it's more straightforward. Do you have a case where using only the ID would be an issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not directly ... but in cases if (for whatever reason) data from DBs need to be merged INT values easily can generate clashes. So maybe as in the other comments... I'd feel a bit more comfortable with a UUID... but might be a matter of taste.
airflow/models/dag_version.py
Outdated
version_number = 1 | ||
|
||
if existing_dag_version: | ||
version_number = existing_dag_version.version_number + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code smells to be non-thread safe. If multiple instances run concurrently it could generate a clash in app logic for ID generation. If this should represent an auto-increment... this should be rather pushed-down to database engine (e.g. identity column)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will look into this
21569f4
to
55e1257
Compare
This commit introduces versioning for DAGs Changes: - Introduced DagVersion model to handle versioning of DAGs. - Added version_name field to DAG for use in tracking the dagversion by users - Added support for version retrieval in the get_dag_source API endpoint - Modified DAG execution logic to reference dag_version_id instead of the dag_hash to ensure DAG runs are linked to specific versions. Fix tests revert RESTAPI changes
fcf2e40
to
9dc8ec6
Compare
This commit introduces versioning for DAGs.
Changes:
Closes: #42333, #42334, #42336, #42335