Skip to content

Commit 6cc299b

Browse files
committed
Migrate Edge calls for Worker to FastAPI 3 - Jobs route
1 parent a4e286d commit 6cc299b

File tree

11 files changed

+515
-33
lines changed

11 files changed

+515
-33
lines changed

providers/src/airflow/providers/edge/CHANGELOG.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@
2727
Changelog
2828
---------
2929

30+
0.8.2pre0
31+
.........
32+
33+
Misc
34+
~~~~
35+
36+
* ``Migrate worker job calls to FastAPI.``
37+
3038
0.8.1pre0
3139
.........
3240

providers/src/airflow/providers/edge/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
__all__ = ["__version__"]
3131

32-
__version__ = "0.8.1pre0"
32+
__version__ = "0.8.2pre0"
3333

3434
if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
3535
"2.10.0"

providers/src/airflow/providers/edge/cli/api_client.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,13 @@
3232
from airflow.configuration import conf
3333
from airflow.exceptions import AirflowException
3434
from airflow.providers.edge.worker_api.auth import jwt_signer
35-
from airflow.providers.edge.worker_api.datamodels import PushLogsBody, WorkerStateBody
35+
from airflow.providers.edge.worker_api.datamodels import (
36+
EdgeJobFetched,
37+
PushLogsBody,
38+
WorkerQueuesBody,
39+
WorkerStateBody,
40+
)
41+
from airflow.utils.state import TaskInstanceState # noqa: TC001
3642

3743
if TYPE_CHECKING:
3844
from airflow.models.taskinstancekey import TaskInstanceKey
@@ -120,6 +126,26 @@ def worker_set_state(
120126
)
121127

122128

129+
def jobs_fetch(hostname: str, queues: list[str] | None, free_concurrency: int) -> EdgeJobFetched | None:
130+
"""Fetch a job to execute on the edge worker."""
131+
result = _make_generic_request(
132+
"GET",
133+
f"jobs/fetch/{quote(hostname)}",
134+
WorkerQueuesBody(queues=queues, free_concurrency=free_concurrency).model_dump_json(),
135+
)
136+
if result:
137+
return EdgeJobFetched(**result)
138+
return None
139+
140+
141+
def jobs_set_state(key: TaskInstanceKey, state: TaskInstanceState) -> None:
142+
"""Set the state of a job."""
143+
_make_generic_request(
144+
"PATCH",
145+
f"jobs/state/{key.dag_id}/{key.task_id}/{key.run_id}/{key.try_number}/{key.map_index}/{state}",
146+
)
147+
148+
123149
def logs_logfile_path(task: TaskInstanceKey) -> Path:
124150
"""Elaborate the path and filename to expect from task execution."""
125151
result = _make_generic_request(

providers/src/airflow/providers/edge/cli/edge_command.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from pathlib import Path
2626
from subprocess import Popen
2727
from time import sleep
28+
from typing import TYPE_CHECKING
2829

2930
import psutil
3031
from lockfile.pidlockfile import read_pid_from_pidfile, remove_existing_pidfile, write_pid_to_pidfile
@@ -35,18 +36,22 @@
3536
from airflow.exceptions import AirflowException
3637
from airflow.providers.edge import __version__ as edge_provider_version
3738
from airflow.providers.edge.cli.api_client import (
39+
jobs_fetch,
40+
jobs_set_state,
3841
logs_logfile_path,
3942
logs_push,
4043
worker_register,
4144
worker_set_state,
4245
)
43-
from airflow.providers.edge.models.edge_job import EdgeJob
4446
from airflow.providers.edge.models.edge_worker import EdgeWorkerState, EdgeWorkerVersionException
4547
from airflow.utils import cli as cli_utils, timezone
4648
from airflow.utils.platform import IS_WINDOWS
4749
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
4850
from airflow.utils.state import TaskInstanceState
4951

52+
if TYPE_CHECKING:
53+
from airflow.providers.edge.worker_api.datamodels import EdgeJobFetched
54+
5055
logger = logging.getLogger(__name__)
5156
EDGE_WORKER_PROCESS_NAME = "edge-worker"
5257
EDGE_WORKER_HEADER = "\n".join(
@@ -100,7 +105,7 @@ def _write_pid_to_pidfile(pid_file_path: str):
100105
class _Job:
101106
"""Holds all information for a task/job to be executed as bundle."""
102107

103-
edge_job: EdgeJob
108+
edge_job: EdgeJobFetched
104109
process: Popen
105110
logfile: Path
106111
logsize: int
@@ -199,9 +204,7 @@ def loop(self):
199204
def fetch_job(self) -> bool:
200205
"""Fetch and start a new job from central site."""
201206
logger.debug("Attempting to fetch a new job...")
202-
edge_job = EdgeJob.reserve_task(
203-
worker_name=self.hostname, free_concurrency=self.free_concurrency, queues=self.queues
204-
)
207+
edge_job = jobs_fetch(self.hostname, self.queues, self.free_concurrency)
205208
if edge_job:
206209
logger.info("Received job: %s", edge_job)
207210
env = os.environ.copy()
@@ -211,7 +214,7 @@ def fetch_job(self) -> bool:
211214
process = Popen(edge_job.command, close_fds=True, env=env, start_new_session=True)
212215
logfile = logs_logfile_path(edge_job.key)
213216
self.jobs.append(_Job(edge_job, process, logfile, 0))
214-
EdgeJob.set_state(edge_job.key, TaskInstanceState.RUNNING)
217+
jobs_set_state(edge_job.key, TaskInstanceState.RUNNING)
215218
return True
216219

217220
logger.info("No new job to process%s", f", {len(self.jobs)} still running" if self.jobs else "")
@@ -227,10 +230,10 @@ def check_running_jobs(self) -> None:
227230
self.jobs.remove(job)
228231
if job.process.returncode == 0:
229232
logger.info("Job completed: %s", job.edge_job)
230-
EdgeJob.set_state(job.edge_job.key, TaskInstanceState.SUCCESS)
233+
jobs_set_state(job.edge_job.key, TaskInstanceState.SUCCESS)
231234
else:
232235
logger.error("Job failed: %s", job.edge_job)
233-
EdgeJob.set_state(job.edge_job.key, TaskInstanceState.FAILED)
236+
jobs_set_state(job.edge_job.key, TaskInstanceState.FAILED)
234237
else:
235238
used_concurrency += job.edge_job.concurrency_slots
236239

providers/src/airflow/providers/edge/openapi/edge_worker_api_v1.yaml

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,161 @@ paths:
178178
summary: Register
179179
tags:
180180
- Worker
181+
/jobs/fetch/{worker_name}:
182+
get:
183+
description: Fetch a job to execute on the edge worker.
184+
x-openapi-router-controller: airflow.providers.edge.worker_api.routes._v2_routes
185+
operationId: job_fetch_v2
186+
parameters:
187+
- in: path
188+
name: worker_name
189+
required: true
190+
schema:
191+
title: Worker Name
192+
type: string
193+
- description: JWT Authorization Token
194+
in: header
195+
name: authorization
196+
required: true
197+
schema:
198+
description: JWT Authorization Token
199+
title: Authorization
200+
type: string
201+
requestBody:
202+
content:
203+
application/json:
204+
schema:
205+
$ref: '#/components/schemas/WorkerQueuesBody'
206+
description: The worker remote has no access to log sink and with this
207+
can send log chunks to the central site.
208+
title: Log data chunks
209+
required: true
210+
responses:
211+
'200':
212+
content:
213+
application/json:
214+
schema:
215+
anyOf:
216+
- $ref: '#/components/schemas/EdgeJobFetched'
217+
- type: object
218+
nullable: true
219+
title: Response Fetch
220+
description: Successful Response
221+
'400':
222+
content:
223+
application/json:
224+
schema:
225+
$ref: '#/components/schemas/HTTPExceptionResponse'
226+
description: Bad Request
227+
'403':
228+
content:
229+
application/json:
230+
schema:
231+
$ref: '#/components/schemas/HTTPExceptionResponse'
232+
description: Forbidden
233+
'422':
234+
content:
235+
application/json:
236+
schema:
237+
$ref: '#/components/schemas/HTTPValidationError'
238+
description: Validation Error
239+
summary: Fetch
240+
tags:
241+
- Jobs
242+
/jobs/state/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}/{state}:
243+
patch:
244+
description: Update the state of a job running on the edge worker.
245+
x-openapi-router-controller: airflow.providers.edge.worker_api.routes._v2_routes
246+
operationId: job_state_v2
247+
parameters:
248+
- description: Identifier of the DAG to which the task belongs.
249+
in: path
250+
name: dag_id
251+
required: true
252+
schema:
253+
description: Identifier of the DAG to which the task belongs.
254+
title: Dag ID
255+
type: string
256+
- description: Task name in the DAG.
257+
in: path
258+
name: task_id
259+
required: true
260+
schema:
261+
description: Task name in the DAG.
262+
title: Task ID
263+
type: string
264+
- description: Run ID of the DAG execution.
265+
in: path
266+
name: run_id
267+
required: true
268+
schema:
269+
description: Run ID of the DAG execution.
270+
title: Run ID
271+
type: string
272+
- description: The number of attempt to execute this task.
273+
in: path
274+
name: try_number
275+
required: true
276+
schema:
277+
description: The number of attempt to execute this task.
278+
title: Try Number
279+
type: integer
280+
- description: For dynamically mapped tasks the mapping number, -1 if the task
281+
is not mapped.
282+
in: path
283+
name: map_index
284+
required: true
285+
schema:
286+
description: For dynamically mapped tasks the mapping number, -1 if the
287+
task is not mapped.
288+
title: Map Index
289+
type: string # This should be integer, but Connexion/Flask do not support negative integers in path parameters
290+
- description: State of the assigned task under execution.
291+
in: path
292+
name: state
293+
required: true
294+
schema:
295+
$ref: '#/components/schemas/TaskInstanceState'
296+
description: State of the assigned task under execution.
297+
title: Task State
298+
- description: JWT Authorization Token
299+
in: header
300+
name: authorization
301+
required: true
302+
schema:
303+
description: JWT Authorization Token
304+
title: Authorization
305+
type: string
306+
responses:
307+
'200':
308+
content:
309+
application/json:
310+
schema:
311+
title: Response State
312+
type: object
313+
nullable: true
314+
description: Successful Response
315+
'400':
316+
content:
317+
application/json:
318+
schema:
319+
$ref: '#/components/schemas/HTTPExceptionResponse'
320+
description: Bad Request
321+
'403':
322+
content:
323+
application/json:
324+
schema:
325+
$ref: '#/components/schemas/HTTPExceptionResponse'
326+
description: Forbidden
327+
'422':
328+
content:
329+
application/json:
330+
schema:
331+
$ref: '#/components/schemas/HTTPValidationError'
332+
description: Validation Error
333+
summary: State
334+
tags:
335+
- Jobs
181336
/logs/logfile_path/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}:
182337
get:
183338
description: Elaborate the path and filename to expect from task execution.
@@ -464,6 +619,91 @@ components:
464619
title: Sysinfo
465620
type: object
466621
title: WorkerStateBody
622+
WorkerQueuesBody:
623+
description: Queues that a worker supports to run jobs on.
624+
properties:
625+
queues:
626+
anyOf:
627+
- items:
628+
type: string
629+
type: array
630+
- type: object
631+
nullable: true
632+
description: List of queues the worker is pulling jobs from. If not provided,
633+
worker pulls from all queues.
634+
title: Queues
635+
free_concurrency:
636+
description: Number of free slots for running tasks.
637+
title: Free Concurrency
638+
type: integer
639+
required:
640+
- queues
641+
- free_concurrency
642+
title: WorkerQueuesBody
643+
type: object
644+
EdgeJobFetched:
645+
description: Job that is to be executed on the edge worker.
646+
properties:
647+
command:
648+
description: Command line to use to execute the job.
649+
items:
650+
type: string
651+
title: Command
652+
type: array
653+
concurrency_slots:
654+
description: Number of slots to use for the task.
655+
title: Concurrency Slots
656+
type: integer
657+
dag_id:
658+
description: Identifier of the DAG to which the task belongs.
659+
title: Dag ID
660+
type: string
661+
map_index:
662+
description: For dynamically mapped tasks the mapping number, -1 if the
663+
task is not mapped.
664+
title: Map Index
665+
type: integer
666+
run_id:
667+
description: Run ID of the DAG execution.
668+
title: Run ID
669+
type: string
670+
task_id:
671+
description: Task name in the DAG.
672+
title: Task ID
673+
type: string
674+
try_number:
675+
description: The number of attempt to execute this task.
676+
title: Try Number
677+
type: integer
678+
required:
679+
- dag_id
680+
- task_id
681+
- run_id
682+
- map_index
683+
- try_number
684+
- command
685+
title: EdgeJobFetched
686+
type: object
687+
TaskInstanceState:
688+
description: 'All possible states that a Task Instance can be in.
689+
690+
691+
Note that None is also allowed, so always use this in a type hint with Optional.'
692+
enum:
693+
- removed
694+
- scheduled
695+
- queued
696+
- running
697+
- success
698+
- restarting
699+
- failed
700+
- up_for_retry
701+
- up_for_reschedule
702+
- upstream_failed
703+
- skipped
704+
- deferred
705+
title: TaskInstanceState
706+
type: string
467707
PushLogsBody:
468708
description: Incremental new log content from worker.
469709
properties:

0 commit comments

Comments
 (0)