Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dag re-parsing request endpoint #39138

Merged
merged 44 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
5e1b315
Add dag re-parsing request endpoint
utkarsharma2 Apr 19, 2024
c7aede7
Fix migration files
utkarsharma2 Apr 22, 2024
d1b2e9e
Fix test_file_paths_in_queue_sorted_by_priority test
utkarsharma2 Apr 22, 2024
7a0ce40
Fix database cleanup issue
utkarsharma2 Apr 23, 2024
c0a7be8
Change http method to post
utkarsharma2 Apr 23, 2024
24e9f0f
Fix static checks
utkarsharma2 Apr 23, 2024
90703da
Remove created object from response
utkarsharma2 Apr 23, 2024
7ffedc1
Merge branch 'main' into DagReparsing
utkarsharma2 Apr 23, 2024
49771cf
Fix testcases
utkarsharma2 Apr 24, 2024
120672f
Merge branch 'main' into DagReparsing
utkarsharma2 Apr 24, 2024
8a4b8e3
Fix testcases
utkarsharma2 Apr 24, 2024
043d139
Merge branch 'main' into DagReparsing
utkarsharma2 Apr 24, 2024
f0840da
Change http response status for duplicate request
utkarsharma2 Apr 25, 2024
f927201
Fix tests
utkarsharma2 Apr 25, 2024
5a372e7
Changed the airflow version in migration file
utkarsharma2 Apr 25, 2024
e93c8f2
Merge branch 'main' into DagReparsing
utkarsharma2 Apr 25, 2024
0f128cc
Update airflow/api_connexion/openapi/v1.yaml
utkarsharma2 May 3, 2024
15a38ad
Update airflow/api_connexion/openapi/v1.yaml
utkarsharma2 May 3, 2024
f450105
Update airflow/models/dagbag.py
utkarsharma2 May 3, 2024
801b4cd
Update airflow/dag_processing/manager.py
utkarsharma2 May 3, 2024
aa09cd5
Better exception handling
utkarsharma2 May 3, 2024
f7b5807
Addressed PR comments
utkarsharma2 May 3, 2024
1279ff5
Merge branch 'main' into DagReparsing
utkarsharma2 May 3, 2024
24eeb97
Merge branch 'main' into DagReparsing
utkarsharma2 May 3, 2024
7fa0282
Fix tests
utkarsharma2 May 3, 2024
689c616
Update airflow/api_connexion/openapi/v1.yaml
utkarsharma2 May 3, 2024
0e8f5d5
Update airflow/api_connexion/endpoints/dag_parsing.py
utkarsharma2 May 3, 2024
0241212
Update airflow/api_connexion/openapi/v1.yaml
utkarsharma2 May 3, 2024
e080295
Update airflow/models/dagbag.py
utkarsharma2 May 3, 2024
f024421
Apply the suggestion from PR comments
utkarsharma2 May 7, 2024
5177618
Merge branch 'main' into DagReparsing
utkarsharma2 May 7, 2024
a670b09
Merge branch 'main' into DagReparsing
utkarsharma2 May 8, 2024
e0d7770
Remove __eq__ magic method
utkarsharma2 May 8, 2024
9bd67fb
Remove get_requests() method
utkarsharma2 May 8, 2024
14ca397
Optimize database request
uranusjr May 10, 2024
7b89cfc
Update airflow/dag_processing/manager.py
utkarsharma2 May 10, 2024
1ab6a81
Fix test
utkarsharma2 May 10, 2024
5e4fb51
Change return status code
utkarsharma2 May 10, 2024
6240d87
Fix test
utkarsharma2 May 10, 2024
015e8d3
Merge branch 'main' into DagReparsing
utkarsharma2 May 10, 2024
5105e22
Merge branch 'main' into DagReparsing
utkarsharma2 May 10, 2024
0754769
Merge branch 'main' into DagReparsing
utkarsharma2 May 14, 2024
f00c263
Fix static checks
utkarsharma2 May 10, 2024
da7ffe4
Merge branch 'main' into DagReparsing
utkarsharma2 May 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions airflow/api_connexion/endpoints/dag_parsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from http import HTTPStatus
from typing import TYPE_CHECKING, Sequence

from flask import Response, current_app
from itsdangerous import BadSignature, URLSafeSerializer
from sqlalchemy import exc

from airflow.api_connexion import security
from airflow.api_connexion.exceptions import NotFound, PermissionDenied
from airflow.auth.managers.models.resource_details import DagDetails
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagPriorityParsingRequest
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.www.extensions.init_auth_manager import get_auth_manager

if TYPE_CHECKING:
from sqlalchemy.orm import Session

from airflow.auth.managers.models.batch_apis import IsAuthorizedDagRequest


@security.requires_access_dag("PUT")
@provide_session
def reparse_dag_file(*, file_token: str, session: Session = NEW_SESSION) -> Response:
"""Request re-parsing a DAG file."""
secret_key = current_app.config["SECRET_KEY"]
auth_s = URLSafeSerializer(secret_key)
try:
path = auth_s.loads(file_token)
dag_ids = session.query(DagModel.dag_id).filter(DagModel.fileloc == path).all()
if len(dag_ids) == 0:
raise FileNotFoundError
except (BadSignature, FileNotFoundError):
raise NotFound("File not found")

requests: Sequence[IsAuthorizedDagRequest] = [
{
"method": "PUT",
"details": DagDetails(id=dag_id[0]),
}
for dag_id in dag_ids
]
# Check if user has read access to all the DAGs defined in the file
if not get_auth_manager().batch_is_authorized_dag(requests):
raise PermissionDenied()

parsing_request = DagPriorityParsingRequest(fileloc=path)
session.add(parsing_request)
try:
session.commit()
except exc.IntegrityError:
session.rollback()
return Response("Duplicate request", HTTPStatus.CONFLICT)
utkarsharma2 marked this conversation as resolved.
Show resolved Hide resolved

return Response(status=HTTPStatus.CREATED)
24 changes: 24 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,29 @@ paths:
"404":
$ref: "#/components/responses/NotFound"

/parseDagFile/{file_token}:
parameters:
- $ref: "#/components/parameters/FileToken"

put:
summary: Request re-parsing of a DAG file
description: >
Request re-parsing of existing DAG files using a file token.
x-openapi-router-controller: airflow.api_connexion.endpoints.dag_parsing
operationId: reparse_dag_file
tags: [ DAG ]
responses:
"201":
description: Success.
"401":
$ref: "#/components/responses/Unauthenticated"
"403":
$ref: "#/components/responses/PermissionDenied"
"404":
$ref: "#/components/responses/NotFound"
"409":
$ref: "#/components/responses/AlreadyExists"

/datasets/queuedEvent/{uri}:
parameters:
- $ref: "#/components/parameters/DatasetURI"
Expand Down Expand Up @@ -3159,6 +3182,7 @@ components:
*New in version 2.5.0*
nullable: true


UpdateDagRunState:
type: object
description: |
Expand Down
20 changes: 20 additions & 0 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from airflow.configuration import conf
from airflow.dag_processing.processor import DagFileProcessorProcess
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagPriorityParsingRequest
from airflow.models.dagwarning import DagWarning
from airflow.models.db_callback_request import DbCallbackRequest
from airflow.models.errors import ParseImportError
Expand Down Expand Up @@ -616,6 +617,7 @@ def _run_parsing_loop(self):
elif refreshed_dag_dir:
self.add_new_file_path_to_queue()

self._refresh_requested_filelocs()
self.start_new_processes()

# Update number of loop iteration.
Expand Down Expand Up @@ -728,6 +730,24 @@ def _add_callback_to_queue(self, request: CallbackRequest):
self._add_paths_to_queue([request.full_filepath], True)
Stats.incr("dag_processing.other_callback_count")

@provide_session
def _refresh_requested_filelocs(self, session=NEW_SESSION) -> None:
"""Refresh filepaths from dag dir as requested by users via APIs."""
# Get values from DB table
requests = session.scalars(select(DagPriorityParsingRequest)).all()
utkarsharma2 marked this conversation as resolved.
Show resolved Hide resolved
for request in requests:
# Check if fileloc is in valid file paths. Parsing any
# filepaths can be a security issue.
utkarsharma2 marked this conversation as resolved.
Show resolved Hide resolved
if request.fileloc in self._file_paths:
# Try removing the fileloc if already present
try:
self._file_path_queue.remove(request.fileloc)
except ValueError:
pass
# enqueue fileloc to the start of the queue.
self._file_path_queue.appendleft(request.fileloc)
session.delete(request)

def _refresh_dag_dir(self) -> bool:
"""Refresh file paths from dag dir if we haven't done it for too long."""
now = timezone.utcnow()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Added DagPriorityParsingRequest table.

Revision ID: c4602ba06b4b
Revises: 88344c1d9134
Create Date: 2024-04-17 17:12:05.473889

"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "c4602ba06b4b"
down_revision = "677fdbb7fc54"
branch_labels = None
depends_on = None
airflow_version = "2.10.0"


def upgrade():
"""Apply Added DagPriorityParsingRequest table."""
op.create_table(
"dag_priority_parsing_request",
sa.Column("id", sa.String(length=32), nullable=False),
sa.Column("fileloc", sa.String(length=2000), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("dag_priority_parsing_request_pkey")),
)


def downgrade():
"""Unapply Added DagPriorityParsingRequest table."""
op.drop_table("dag_priority_parsing_request")
35 changes: 35 additions & 0 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations

import hashlib
import importlib
import importlib.machinery
import importlib.util
Expand All @@ -30,6 +31,10 @@
from pathlib import Path
from typing import TYPE_CHECKING, NamedTuple

from sqlalchemy import (
Column,
String,
)
from sqlalchemy.exc import OperationalError
from tabulate import tabulate

Expand All @@ -43,6 +48,7 @@
AirflowDagDuplicatedIdException,
RemovedInAirflow3Warning,
)
from airflow.models.base import Base
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.dag_cycle_tester import check_cycle
Expand Down Expand Up @@ -727,3 +733,32 @@ def _sync_perm_for_dag(cls, dag: DAG, session: Session = NEW_SESSION):

security_manager = ApplessAirflowSecurityManager(session=session)
security_manager.sync_perm_for_dag(root_dag_id, dag.access_control)


def generate_md5_hash(context):
fileloc = context.get_current_parameters()["fileloc"]
return hashlib.md5(fileloc.encode()).hexdigest()


class DagPriorityParsingRequest(Base):
"""Model to store the dag parsing requests that will be prioritized when parsing files."""

__tablename__ = "dag_priority_parsing_request"

# Adding a unique constraint to fileloc results in the creation of an index and we have a limitation
# on the size of the string we can use in the index for MySQL DB. We also have to keep the fileloc
# size consistent with other tables. This is a workaround to enforce the unique constraint.
id = Column(String(32), primary_key=True, default=generate_md5_hash, onupdate=generate_md5_hash)

# The location of the file containing the DAG object
# Note: Do not depend on fileloc pointing to a file; in the case of a
# packaged DAG, it will point to the subpath of the DAG within the
# associated zip.
fileloc = Column(String(2000), nullable=False)
utkarsharma2 marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, fileloc: str) -> None:
super().__init__()
self.fileloc = fileloc

def __repr__(self) -> str:
return f"<DagPriorityParsingRequest: fileloc={self.fileloc}>"
2 changes: 1 addition & 1 deletion airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
"2.8.1": "88344c1d9134",
"2.9.0": "1949afb29106",
"2.9.2": "686269002441",
"2.10.0": "677fdbb7fc54",
"2.10.0": "c4602ba06b4b",
}


Expand Down
38 changes: 38 additions & 0 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,20 @@ export interface paths {
};
};
};
"/parseDagFile/{file_token}": {
/** Request re-parsing of existing DAG files using a file token. */
put: operations["reparse_dag_file"];
parameters: {
path: {
/**
* The key containing the encrypted path to the file. Encryption and decryption take place only on
* the server. This prevents the client from reading an non-DAG file. This also ensures API
* extensibility, because the format of encrypted data may change.
*/
file_token: components["parameters"]["FileToken"];
};
};
};
"/datasets/queuedEvent/{uri}": {
/**
* Get queued Dataset events for a Dataset
Expand Down Expand Up @@ -3438,6 +3452,27 @@ export interface operations {
404: components["responses"]["NotFound"];
};
};
/** Request re-parsing of existing DAG files using a file token. */
reparse_dag_file: {
parameters: {
path: {
/**
* The key containing the encrypted path to the file. Encryption and decryption take place only on
* the server. This prevents the client from reading an non-DAG file. This also ensures API
* extensibility, because the format of encrypted data may change.
*/
file_token: components["parameters"]["FileToken"];
};
};
responses: {
/** Success. */
201: unknown;
401: components["responses"]["Unauthenticated"];
403: components["responses"]["PermissionDenied"];
404: components["responses"]["NotFound"];
409: components["responses"]["AlreadyExists"];
};
};
/**
* Get queued Dataset events for a Dataset
*
Expand Down Expand Up @@ -5388,6 +5423,9 @@ export type DeleteDagDatasetQueuedEventsVariables = CamelCasedPropertiesDeep<
operations["delete_dag_dataset_queued_events"]["parameters"]["path"] &
operations["delete_dag_dataset_queued_events"]["parameters"]["query"]
>;
export type ReparseDagFileVariables = CamelCasedPropertiesDeep<
operations["reparse_dag_file"]["parameters"]["path"]
>;
export type GetDatasetQueuedEventsVariables = CamelCasedPropertiesDeep<
operations["get_dataset_queued_events"]["parameters"]["path"] &
operations["get_dataset_queued_events"]["parameters"]["query"]
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ec3c33d964d9afc51f0fbbee9bb2ef8c8dc496675a6ef96891a30d0403bb6823
e28117d14c6646a5ff9ce134070b69ea6d7efd310b969df90cb21698585a56ee
Loading
Loading