Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-65: Add DAG versioning support #42913

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

ephraimbuddy
Copy link
Contributor

@ephraimbuddy ephraimbuddy commented Oct 10, 2024

This commit introduces versioning for DAGs.

Changes:

  • Introduced DagVersion model to handle versioning of DAGs.
  • Added version_name field to DAG for use in tracking the dagversion by users
  • Added support for version retrieval in the get_dag_source API endpoint
  • Modified DAG execution logic to reference dag_version_id instead of the dag_hash to ensure DAG runs are linked to specific versions.

Closes: #42333, #42334, #42336, #42335

@boring-cyborg boring-cyborg bot added area:API Airflow's REST/HTTP API area:CLI area:db-migrations PRs with DB migration area:dev-tools area:Scheduler including HA (high availability) scheduler area:serialization area:UI Related to UI/UX. For Frontend Developers. area:webserver Webserver related Issues kind:documentation labels Oct 10, 2024
version_number=version_number,
dag_code=dag_code,
serialized_dag=serialized_dag,
version_name=version_name or cls._generate_unique_random_string(session),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should augment the version_name we store when a DAG author provides is to us. Otherwise, we'd have the same version_name for all versions until they change it, which isn't great.

e.g. if I have version_name=foo in my DAG, and I add a new task but don't update the version. So maybe something like foo-{version_number} or something? Or maybe we toss on an int to signify when it hasn't changed, but the DAG has, like foo-2.

We also only stamp a new version when the serdag actually changes. I wonder if we'd be better off combing the user provided version and the serdag hash to determine when to create a new version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should augment the version_name we store when a DAG author provides is to us. Otherwise, we'd have the same version_name for all versions until they change it, which isn't great.

e.g. if I have version_name=foo in my DAG, and I add a new task but don't update the version. So maybe something like foo-{version_number} or something? Or maybe we toss on an int to signify when it hasn't changed, but the DAG has, like foo-2.

That's my plan, but I want to have it in a method. The version_number increments, while the version_name remains the same until changed. However, a method named version would return version_name-version_number (foo-2), which would be used on the UI. I think having them separate is much better than using Python logic, which might go wrong.

We also only stamp a new version when the serdag actually changes. I wonder if we'd be better off combing the user provided version and the serdag hash to determine when to create a new version?

The serdag is 1:1 to dag_version. When it changes, the version also changes and there's a link between the two tables which allows us to access the hash e.g :

version = session.query(DagVersion).get(1)
print(version.serialized_dag.dag_hash)

I feel we shouldn't combine the hash and version since a change in the user-provided version name would trigger a new serialized dag version, which also means a new dagversion.

The dag_hash can remain an internal property that we use to know that the version has changed

@ephraimbuddy ephraimbuddy force-pushed the versioned-dag2 branch 5 times, most recently from ad4f57c to dd272b0 Compare October 16, 2024 12:27
@ephraimbuddy ephraimbuddy added the legacy api Whether legacy API changes should be allowed in PR label Oct 16, 2024
@ephraimbuddy ephraimbuddy force-pushed the versioned-dag2 branch 3 times, most recently from eb13cdd to 1b81f2f Compare October 16, 2024 13:27
@ephraimbuddy ephraimbuddy marked this pull request as ready for review October 16, 2024 13:27
@@ -2463,7 +2471,7 @@ def create_dagrun(
conf: dict | None = None,
run_type: DagRunType | None = None,
session: Session = NEW_SESSION,
dag_hash: str | None = None,
dag_version_id: int | None = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure that we want to have (only) an integer value for the DAG version? Who woul dbe responsible to increment?

I am thinking about a backup/restore, how can we be sure durch such cases the IDs will not overwrite each other?

Besides the question about int: If it is an integer then it is not really an ID, we should rather call it dag_version if it represents a numeric value (I think)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dag_version_id is a foreignkey from dag_version table to dagrun and auto increments based on how sqlalchemy works. However, it will eventually be refactored to be a UUID.

Copy link
Contributor Author

@ephraimbuddy ephraimbuddy Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking about a backup/restore, how can we be sure durch such cases the IDs will not overwrite each other?

Since it's a foreign key, I don't see how it will overwrite. The above code is just for creating the dag run. Maybe we should change it to the dag_version object itself and not the ID?

sa.Column("dag_code_id", sa.Integer(), nullable=True),
sa.Column("serialized_dag_id", sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(("dag_id",), ["dag.dag_id"], name=op.f("dag_version_dag_id_fkey")),
sa.PrimaryKeyConstraint("id", name=op.f("dag_version_pkey")),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the ID the only primary key w/o considering the dag_id contained in primary key? So the ID is a global ID irrespective of the DAG linked?
Which code part/logic is responsible to auto-increment this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think to second question I found the answer... which relates to the other (follow-up) comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the ID the only primary key w/o considering the dag_id contained in primary key? So the ID is a global ID irrespective of the DAG linked?

Yes. I don't see any issues with that, and it's more straightforward. Do you have a case where using only the ID would be an issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly ... but in cases if (for whatever reason) data from DBs need to be merged INT values easily can generate clashes. So maybe as in the other comments... I'd feel a bit more comfortable with a UUID... but might be a matter of taste.

version_number = 1

if existing_dag_version:
version_number = existing_dag_version.version_number + 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code smells to be non-thread safe. If multiple instances run concurrently it could generate a clash in app logic for ID generation. If this should represent an auto-increment... this should be rather pushed-down to database engine (e.g. identity column)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will look into this

@ephraimbuddy ephraimbuddy force-pushed the versioned-dag2 branch 2 times, most recently from 21569f4 to 55e1257 Compare October 18, 2024 00:09
This commit introduces versioning for DAGs

Changes:
- Introduced DagVersion model to handle versioning of DAGs.
- Added version_name field to DAG for use in tracking the dagversion by users
- Added support for version retrieval in the get_dag_source API endpoint
- Modified DAG execution logic to reference dag_version_id instead of the
dag_hash to ensure DAG runs are linked to specific versions.

Fix tests

revert RESTAPI changes
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:API Airflow's REST/HTTP API area:CLI area:db-migrations PRs with DB migration area:dev-tools area:Scheduler including HA (high availability) scheduler area:serialization area:UI Related to UI/UX. For Frontend Developers. area:webserver Webserver related Issues kind:documentation legacy api Whether legacy API changes should be allowed in PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Calculate and track DAG version
3 participants