Skip to content

Commit

Permalink
Add dag re-parsing request endpoint (#39138)
Browse files Browse the repository at this point in the history
* Add dag re-parsing request endpoint

* Fix migration files

* Fix test_file_paths_in_queue_sorted_by_priority test

* Fix database cleanup issue

* Change http method to post

* Fix static checks

* Remove created object from response

* Fix testcases

* Fix testcases

* Change http response status for duplicate request

* Fix tests

* Changed the airflow version in migration file

* Update airflow/api_connexion/openapi/v1.yaml

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>

* Update airflow/api_connexion/openapi/v1.yaml

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>

* Update airflow/models/dagbag.py

Co-authored-by: Wei Lee <weilee.rx@gmail.com>

* Update airflow/dag_processing/manager.py

Co-authored-by: Wei Lee <weilee.rx@gmail.com>

* Better exception handling

* Addressed PR comments

* Fix tests

* Update airflow/api_connexion/openapi/v1.yaml

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>

* Update airflow/api_connexion/endpoints/dag_parsing.py

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>

* Update airflow/api_connexion/openapi/v1.yaml

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>

* Update airflow/models/dagbag.py

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>

* Apply the suggestion from PR comments

* Remove __eq__ magic method

* Remove get_requests() method

* Optimize database request

* Update airflow/dag_processing/manager.py

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>

* Fix test

* Change return status code

* Fix test

* Fix static checks

---------

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
Co-authored-by: Wei Lee <weilee.rx@gmail.com>
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
  • Loading branch information
4 people authored May 14, 2024
1 parent af3e2cf commit 6bcec90
Show file tree
Hide file tree
Showing 14 changed files with 1,685 additions and 1,266 deletions.
69 changes: 69 additions & 0 deletions airflow/api_connexion/endpoints/dag_parsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from http import HTTPStatus
from typing import TYPE_CHECKING, Sequence

from flask import Response, current_app
from itsdangerous import BadSignature, URLSafeSerializer
from sqlalchemy import exc, select

from airflow.api_connexion import security
from airflow.api_connexion.exceptions import NotFound, PermissionDenied
from airflow.auth.managers.models.resource_details import DagDetails
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagPriorityParsingRequest
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.www.extensions.init_auth_manager import get_auth_manager

if TYPE_CHECKING:
from sqlalchemy.orm import Session

from airflow.auth.managers.models.batch_apis import IsAuthorizedDagRequest


@security.requires_access_dag("PUT")
@provide_session
def reparse_dag_file(*, file_token: str, session: Session = NEW_SESSION) -> Response:
"""Request re-parsing a DAG file."""
secret_key = current_app.config["SECRET_KEY"]
auth_s = URLSafeSerializer(secret_key)
try:
path = auth_s.loads(file_token)
except BadSignature:
raise NotFound("File not found")

requests: Sequence[IsAuthorizedDagRequest] = [
{"method": "PUT", "details": DagDetails(id=dag_id)}
for dag_id in session.scalars(select(DagModel.dag_id).where(DagModel.fileloc == path))
]
if not requests:
raise NotFound("File not found")

# Check if user has read access to all the DAGs defined in the file
if not get_auth_manager().batch_is_authorized_dag(requests):
raise PermissionDenied()

parsing_request = DagPriorityParsingRequest(fileloc=path)
session.add(parsing_request)
try:
session.commit()
except exc.IntegrityError:
session.rollback()
return Response("Duplicate request", HTTPStatus.CREATED)
return Response(status=HTTPStatus.CREATED)
22 changes: 22 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,27 @@ paths:
"404":
$ref: "#/components/responses/NotFound"

/parseDagFile/{file_token}:
parameters:
- $ref: "#/components/parameters/FileToken"

put:
summary: Request re-parsing of a DAG file
description: >
Request re-parsing of existing DAG files using a file token.
x-openapi-router-controller: airflow.api_connexion.endpoints.dag_parsing
operationId: reparse_dag_file
tags: [ DAG ]
responses:
"201":
description: Success.
"401":
$ref: "#/components/responses/Unauthenticated"
"403":
$ref: "#/components/responses/PermissionDenied"
"404":
$ref: "#/components/responses/NotFound"

/datasets/queuedEvent/{uri}:
parameters:
- $ref: "#/components/parameters/DatasetURI"
Expand Down Expand Up @@ -3159,6 +3180,7 @@ components:
*New in version 2.5.0*
nullable: true


UpdateDagRunState:
type: object
description: |
Expand Down
20 changes: 20 additions & 0 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from airflow.configuration import conf
from airflow.dag_processing.processor import DagFileProcessorProcess
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagPriorityParsingRequest
from airflow.models.dagwarning import DagWarning
from airflow.models.db_callback_request import DbCallbackRequest
from airflow.models.errors import ParseImportError
Expand Down Expand Up @@ -616,6 +617,7 @@ def _run_parsing_loop(self):
elif refreshed_dag_dir:
self.add_new_file_path_to_queue()

self._refresh_requested_filelocs()
self.start_new_processes()

# Update number of loop iteration.
Expand Down Expand Up @@ -728,6 +730,24 @@ def _add_callback_to_queue(self, request: CallbackRequest):
self._add_paths_to_queue([request.full_filepath], True)
Stats.incr("dag_processing.other_callback_count")

@provide_session
def _refresh_requested_filelocs(self, session=NEW_SESSION) -> None:
"""Refresh filepaths from dag dir as requested by users via APIs."""
# Get values from DB table
requests = session.scalars(select(DagPriorityParsingRequest))
for request in requests:
# Check if fileloc is in valid file paths. Parsing any
# filepaths can be a security issue.
if request.fileloc in self._file_paths:
# Try removing the fileloc if already present
try:
self._file_path_queue.remove(request.fileloc)
except ValueError:
pass
# enqueue fileloc to the start of the queue.
self._file_path_queue.appendleft(request.fileloc)
session.delete(request)

def _refresh_dag_dir(self) -> bool:
"""Refresh file paths from dag dir if we haven't done it for too long."""
now = timezone.utcnow()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Added DagPriorityParsingRequest table.
Revision ID: c4602ba06b4b
Revises: 677fdbb7fc54
Create Date: 2024-04-17 17:12:05.473889
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "c4602ba06b4b"
down_revision = "677fdbb7fc54"
branch_labels = None
depends_on = None
airflow_version = "2.10.0"


def upgrade():
"""Apply Added DagPriorityParsingRequest table."""
op.create_table(
"dag_priority_parsing_request",
sa.Column("id", sa.String(length=32), nullable=False),
sa.Column("fileloc", sa.String(length=2000), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("dag_priority_parsing_request_pkey")),
)


def downgrade():
"""Unapply Added DagPriorityParsingRequest table."""
op.drop_table("dag_priority_parsing_request")
35 changes: 35 additions & 0 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations

import hashlib
import importlib
import importlib.machinery
import importlib.util
Expand All @@ -30,6 +31,10 @@
from pathlib import Path
from typing import TYPE_CHECKING, NamedTuple

from sqlalchemy import (
Column,
String,
)
from sqlalchemy.exc import OperationalError
from tabulate import tabulate

Expand All @@ -43,6 +48,7 @@
AirflowDagDuplicatedIdException,
RemovedInAirflow3Warning,
)
from airflow.models.base import Base
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.dag_cycle_tester import check_cycle
Expand Down Expand Up @@ -727,3 +733,32 @@ def _sync_perm_for_dag(cls, dag: DAG, session: Session = NEW_SESSION):

security_manager = ApplessAirflowSecurityManager(session=session)
security_manager.sync_perm_for_dag(root_dag_id, dag.access_control)


def generate_md5_hash(context):
fileloc = context.get_current_parameters()["fileloc"]
return hashlib.md5(fileloc.encode()).hexdigest()


class DagPriorityParsingRequest(Base):
"""Model to store the dag parsing requests that will be prioritized when parsing files."""

__tablename__ = "dag_priority_parsing_request"

# Adding a unique constraint to fileloc results in the creation of an index and we have a limitation
# on the size of the string we can use in the index for MySQL DB. We also have to keep the fileloc
# size consistent with other tables. This is a workaround to enforce the unique constraint.
id = Column(String(32), primary_key=True, default=generate_md5_hash, onupdate=generate_md5_hash)

# The location of the file containing the DAG object
# Note: Do not depend on fileloc pointing to a file; in the case of a
# packaged DAG, it will point to the subpath of the DAG within the
# associated zip.
fileloc = Column(String(2000), nullable=False)

def __init__(self, fileloc: str) -> None:
super().__init__()
self.fileloc = fileloc

def __repr__(self) -> str:
return f"<DagPriorityParsingRequest: fileloc={self.fileloc}>"
2 changes: 1 addition & 1 deletion airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class MappedClassProtocol(Protocol):
"2.8.1": "88344c1d9134",
"2.9.0": "1949afb29106",
"2.9.2": "686269002441",
"2.10.0": "677fdbb7fc54",
"2.10.0": "c4602ba06b4b",
}


Expand Down
37 changes: 37 additions & 0 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,20 @@ export interface paths {
};
};
};
"/parseDagFile/{file_token}": {
/** Request re-parsing of existing DAG files using a file token. */
put: operations["reparse_dag_file"];
parameters: {
path: {
/**
* The key containing the encrypted path to the file. Encryption and decryption take place only on
* the server. This prevents the client from reading an non-DAG file. This also ensures API
* extensibility, because the format of encrypted data may change.
*/
file_token: components["parameters"]["FileToken"];
};
};
};
"/datasets/queuedEvent/{uri}": {
/**
* Get queued Dataset events for a Dataset
Expand Down Expand Up @@ -3438,6 +3452,26 @@ export interface operations {
404: components["responses"]["NotFound"];
};
};
/** Request re-parsing of existing DAG files using a file token. */
reparse_dag_file: {
parameters: {
path: {
/**
* The key containing the encrypted path to the file. Encryption and decryption take place only on
* the server. This prevents the client from reading an non-DAG file. This also ensures API
* extensibility, because the format of encrypted data may change.
*/
file_token: components["parameters"]["FileToken"];
};
};
responses: {
/** Success. */
201: unknown;
401: components["responses"]["Unauthenticated"];
403: components["responses"]["PermissionDenied"];
404: components["responses"]["NotFound"];
};
};
/**
* Get queued Dataset events for a Dataset
*
Expand Down Expand Up @@ -5388,6 +5422,9 @@ export type DeleteDagDatasetQueuedEventsVariables = CamelCasedPropertiesDeep<
operations["delete_dag_dataset_queued_events"]["parameters"]["path"] &
operations["delete_dag_dataset_queued_events"]["parameters"]["query"]
>;
export type ReparseDagFileVariables = CamelCasedPropertiesDeep<
operations["reparse_dag_file"]["parameters"]["path"]
>;
export type GetDatasetQueuedEventsVariables = CamelCasedPropertiesDeep<
operations["get_dataset_queued_events"]["parameters"]["path"] &
operations["get_dataset_queued_events"]["parameters"]["query"]
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
468c1db106e059c4a97f07b9f8be7edfa487099113e4611c74f61f17c0ea0d82
6ae5e112d66c30d36fbc27a608355ffd66853e34d7538223f69a71e2eba54b59
Loading

0 comments on commit 6bcec90

Please sign in to comment.