Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions providers/src/airflow/providers/edge/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
Changelog
---------

0.8.1pre0
.........

Misc
~~~~

* ``Migrate worker log calls to FastAPI.``

0.8.0pre0
.........

Expand Down
2 changes: 1 addition & 1 deletion providers/src/airflow/providers/edge/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

__all__ = ["__version__"]

__version__ = "0.8.0pre0"
__version__ = "0.8.1pre0"

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.10.0"
Expand Down
31 changes: 27 additions & 4 deletions providers/src/airflow/providers/edge/cli/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.edge.worker_api.auth import jwt_signer
from airflow.providers.edge.worker_api.datamodels import WorkerStateBody
from airflow.providers.edge.worker_api.datamodels import PushLogsBody, WorkerStateBody

if TYPE_CHECKING:
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.edge.models.edge_worker import EdgeWorkerState

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -64,7 +65,7 @@ def _is_retryable_exception(exception: BaseException) -> bool:
retry=tenacity.retry_if_exception(_is_retryable_exception),
before_sleep=tenacity.before_log(logger, logging.WARNING),
)
def _make_generic_request(method: str, rest_path: str, data: str) -> Any:
def _make_generic_request(method: str, rest_path: str, data: str | None = None) -> Any:
signer = jwt_signer()
api_url = conf.get("edge", "api_url")
path = urlparse(api_url).path.replace("/rpcapi", "")
Expand Down Expand Up @@ -104,11 +105,33 @@ def worker_set_state(
hostname: str, state: EdgeWorkerState, jobs_active: int, queues: list[str] | None, sysinfo: dict
) -> list[str] | None:
"""Register worker with the Edge API."""
result = _make_generic_request(
return _make_generic_request(
"PATCH",
f"worker/{quote(hostname)}",
WorkerStateBody(state=state, jobs_active=jobs_active, queues=queues, sysinfo=sysinfo).model_dump_json(
exclude_unset=True
),
)
return result


def logs_logfile_path(task: TaskInstanceKey) -> Path:
"""Elaborate the path and filename to expect from task execution."""
result = _make_generic_request(
"GET",
f"logs/logfile_path/{task.dag_id}/{task.task_id}/{task.run_id}/{task.try_number}/{task.map_index}",
)
base_log_folder = conf.get("logging", "base_log_folder", fallback="NOT AVAILABLE")
return Path(base_log_folder, result)


def logs_push(
task: TaskInstanceKey,
log_chunk_time: datetime,
log_chunk_data: str,
) -> None:
"""Push an incremental log chunk from Edge Worker to central site."""
_make_generic_request(
"POST",
f"logs/push/{task.dag_id}/{task.task_id}/{task.run_id}/{task.try_number}/{task.map_index}",
PushLogsBody(log_chunk_time=log_chunk_time, log_chunk_data=log_chunk_data).model_dump_json(),
)
16 changes: 10 additions & 6 deletions providers/src/airflow/providers/edge/cli/edge_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.edge import __version__ as edge_provider_version
from airflow.providers.edge.cli.api_client import worker_register, worker_set_state
from airflow.providers.edge.cli.api_client import (
logs_logfile_path,
logs_push,
worker_register,
worker_set_state,
)
from airflow.providers.edge.models.edge_job import EdgeJob
from airflow.providers.edge.models.edge_logs import EdgeLogs
from airflow.providers.edge.models.edge_worker import EdgeWorkerState, EdgeWorkerVersionException
from airflow.utils import cli as cli_utils
from airflow.utils import cli as cli_utils, timezone
from airflow.utils.platform import IS_WINDOWS
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.state import TaskInstanceState
Expand Down Expand Up @@ -246,7 +250,7 @@ def fetch_job(self) -> bool:
env["AIRFLOW__CORE__INTERNAL_API_URL"] = conf.get("edge", "api_url")
env["_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK"] = "1"
process = Popen(edge_job.command, close_fds=True, env=env, start_new_session=True)
logfile = EdgeLogs.logfile_path(edge_job.key)
logfile = logs_logfile_path(edge_job.key)
self.jobs.append(_Job(edge_job, process, logfile, 0))
EdgeJob.set_state(edge_job.key, TaskInstanceState.RUNNING)
return True
Expand Down Expand Up @@ -285,9 +289,9 @@ def check_running_jobs(self) -> None:
if not chunk_data:
break

EdgeLogs.push_logs(
logs_push(
task=job.edge_job.key,
log_chunk_time=datetime.now(),
log_chunk_time=timezone.utcnow(),
log_chunk_data=chunk_data,
)

Expand Down
2 changes: 1 addition & 1 deletion providers/src/airflow/providers/edge/models/edge_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def __init__(


class EdgeLogs(BaseModel, LoggingMixin):
"""Accessor for Edge Worker instances as logical model."""
"""Deprecated Internal API for Edge Worker instances as logical model."""

dag_id: str
task_id: str
Expand Down
197 changes: 197 additions & 0 deletions providers/src/airflow/providers/edge/openapi/edge_worker_api_v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,186 @@ paths:
summary: Register
tags:
- Worker
/logs/logfile_path/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}:
get:
description: Elaborate the path and filename to expect from task execution.
x-openapi-router-controller: airflow.providers.edge.worker_api.routes._v2_routes
operationId: logfile_path_v2
parameters:
- description: Identifier of the DAG to which the task belongs.
in: path
name: dag_id
required: true
schema:
description: Identifier of the DAG to which the task belongs.
title: Dag ID
type: string
- description: Task name in the DAG.
in: path
name: task_id
required: true
schema:
description: Task name in the DAG.
title: Task ID
type: string
- description: Run ID of the DAG execution.
in: path
name: run_id
required: true
schema:
description: Run ID of the DAG execution.
title: Run ID
type: string
- description: The number of attempt to execute this task.
in: path
name: try_number
required: true
schema:
description: The number of attempt to execute this task.
title: Try Number
type: integer
- description: For dynamically mapped tasks the mapping number, -1 if the task
is not mapped.
in: path
name: map_index
required: true
schema:
description: For dynamically mapped tasks the mapping number, -1 if the
task is not mapped.
title: Map Index
type: string # This should be integer, but Connexion/Flask do not support negative integers in path parameters
- description: JWT Authorization Token
in: header
name: authorization
required: true
schema:
description: JWT Authorization Token
title: Authorization
type: string
responses:
'200':
content:
application/json:
schema:
title: Response Logfile Path
type: string
description: Successful Response
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'422':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
description: Validation Error
summary: Logfile Path
tags:
- Logs
/logs/push/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}:
post:
description: Push an incremental log chunk from Edge Worker to central site.
x-openapi-router-controller: airflow.providers.edge.worker_api.routes._v2_routes
operationId: push_logs_v2
parameters:
- description: Identifier of the DAG to which the task belongs.
in: path
name: dag_id
required: true
schema:
description: Identifier of the DAG to which the task belongs.
title: Dag ID
type: string
- description: Task name in the DAG.
in: path
name: task_id
required: true
schema:
description: Task name in the DAG.
title: Task ID
type: string
- description: Run ID of the DAG execution.
in: path
name: run_id
required: true
schema:
description: Run ID of the DAG execution.
title: Run ID
type: string
- description: The number of attempt to execute this task.
in: path
name: try_number
required: true
schema:
description: The number of attempt to execute this task.
title: Try Number
type: integer
- description: For dynamically mapped tasks the mapping number, -1 if the task
is not mapped.
in: path
name: map_index
required: true
schema:
description: For dynamically mapped tasks the mapping number, -1 if the
task is not mapped.
title: Map Index
type: string # This should be integer, but Connexion/Flask do not support negative integers in path parameters
- description: JWT Authorization Token
in: header
name: authorization
required: true
schema:
description: JWT Authorization Token
title: Authorization
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/PushLogsBody'
description: The worker remote has no access to log sink and with this
can send log chunks to the central site.
title: Log data chunks
required: true
responses:
'200':
content:
application/json:
schema:
title: Response Push Logs
type: object
nullable: true
description: Successful Response
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'422':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
description: Validation Error
summary: Push Logs
tags:
- Logs
/rpcapi:
post:
deprecated: false
Expand Down Expand Up @@ -284,6 +464,23 @@ components:
title: Sysinfo
type: object
title: WorkerStateBody
PushLogsBody:
description: Incremental new log content from worker.
properties:
log_chunk_data:
description: Log chunk data as incremental log text.
title: Log Chunk Data
type: string
log_chunk_time:
description: Time of the log chunk at point of sending.
format: date-time
title: Log Chunk Time
type: string
required:
- log_chunk_time
- log_chunk_data
title: PushLogsBody
type: object
HTTPExceptionResponse:
description: HTTPException Model used for error response.
properties:
Expand Down
2 changes: 1 addition & 1 deletion providers/src/airflow/providers/edge/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ source-date-epoch: 1729683247

# note that those versions are maintained by release manager - do not update them manually
versions:
- 0.8.0pre0
- 0.8.1pre0

dependencies:
- apache-airflow>=2.10.0
Expand Down
2 changes: 2 additions & 0 deletions providers/src/airflow/providers/edge/worker_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from fastapi import FastAPI

from airflow.providers.edge.worker_api.routes.health import health_router
from airflow.providers.edge.worker_api.routes.logs import logs_router
from airflow.providers.edge.worker_api.routes.worker import worker_router


Expand All @@ -35,5 +36,6 @@ def create_edge_worker_api_app() -> FastAPI:
)

edge_worker_api_app.include_router(health_router)
edge_worker_api_app.include_router(logs_router)
edge_worker_api_app.include_router(worker_router)
return edge_worker_api_app
Loading