Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-72 Add Task Scheduling Metadata to TaskInstance #45008

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

jscheffl
Copy link
Contributor

This is a sugar topping to PR #44982:
It adds support for scheduling relevant task instance fields "pool slots", "priority weight" and "queue".

Note: Only the last commit of this PR is relevant, the branch builds on top of #44982 - will rebase once the other PR is merged.

@jscheffl jscheffl added AIP-69 Edge Executor area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK provider:edge Edge Executor / Worker (AIP-69) labels Dec 17, 2024
@boring-cyborg boring-cyborg bot added area:Executors-core LocalExecutor & SequentialExecutor area:providers area:Scheduler including HA (high availability) scheduler kind:documentation labels Dec 17, 2024
@jscheffl jscheffl requested review from Copilot, ashb and kaxil and removed request for ashb, XD-DENG, pierrejeambrun, hussein-awala and o-nikolas December 17, 2024 20:41
@jscheffl jscheffl changed the title Feature/aip 72 add task scheduling metadata to taskinstance AIP-72 Add Task Scheduling Metadata to TaskInstance Dec 17, 2024
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 5 out of 17 changed files in this pull request and generated 4 comments.

Files not reviewed (12)
  • docs/apache-airflow-providers-edge/edge_executor.rst: Language not supported
  • providers/src/airflow/providers/edge/CHANGELOG.rst: Language not supported
  • airflow/jobs/scheduler_job_runner.py: Evaluated as low risk
  • providers/src/airflow/providers/edge/worker_api/routes/jobs.py: Evaluated as low risk
  • airflow/executors/base_executor.py: Evaluated as low risk
  • airflow/executors/local_executor.py: Evaluated as low risk
  • airflow/executors/workloads.py: Evaluated as low risk
  • providers/tests/edge/plugins/test_edge_executor_plugin.py: Evaluated as low risk
  • providers/src/airflow/providers/edge/worker_api/datamodels.py: Evaluated as low risk
  • providers/src/airflow/providers/edge/init.py: Evaluated as low risk
  • providers/src/airflow/providers/edge/provider.yaml: Evaluated as low risk
  • providers/src/airflow/providers/edge/worker_api/routes/_v2_compat.py: Evaluated as low risk
Comments suppressed due to low confidence (3)

providers/src/airflow/providers/edge/cli/edge_command.py:145

  • The method poll should be called on the Popen process to update the returncode before checking it. Without calling poll, the returncode may not be updated correctly.
return self.process.returncode is None

providers/tests/edge/cli/test_edge_command.py:222

  • The attribute 'generated_returncode' does not exist on the process object. This will cause an AttributeError during test execution.
job.process.generated_returncode = 0  # type: ignore[union-attr]

providers/src/airflow/providers/edge/executors/edge_executor.py:141

  • The type check for 'workloads.ExecuteTask' should be done using 'issubclass' instead of 'isinstance' to properly handle cases where 'workload' might be a subclass of 'ExecuteTask'.
if not isinstance(workload, workloads.ExecuteTask):

providers/src/airflow/providers/edge/cli/edge_command.py Outdated Show resolved Hide resolved
providers/tests/edge/cli/test_edge_command.py Outdated Show resolved Hide resolved
providers/tests/edge/cli/test_edge_command.py Outdated Show resolved Hide resolved
providers/src/airflow/providers/edge/worker_api/auth.py Outdated Show resolved Hide resolved
@jscheffl jscheffl marked this pull request as draft December 17, 2024 21:59
@jscheffl jscheffl force-pushed the feature/aip-72-add-task-scheduling-metadata-to-taskinstance branch from 33b8205 to 3754c26 Compare December 18, 2024 13:58
Comment on lines +53 to +55
pool_slots: int
queue: str
priority_weight: int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaxil Should this be here, or in the context returned from the /run endpoint?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least that are attributes on TaskInstance in the ORM model. Just adding these three lines filled it over. No objection moving it somewhere else but as here it is consistent with the backend.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be needed for the executor to route to correct pool before the task is marked as queued

task_instance = workload.ti
key = task_instance.key
session.add(
EdgeJobModel(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is referring to a TI it should also have the UUID column from task_instance.id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to switch the model - but rather in a later PR. As atm this is not critical and Airflow 2 does not have the UUID. But once we can drop Airflow 2 support that totally makes sense.

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I have only skimmed the surface of this PR)

@ashb
Copy link
Member

ashb commented Dec 18, 2024

Oh just notices this is a chained PR, I'll look at the base one instead.

@jscheffl jscheffl force-pushed the feature/aip-72-add-task-scheduling-metadata-to-taskinstance branch from 44def12 to a37cd63 Compare January 8, 2025 21:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AIP-69 Edge Executor area:Executors-core LocalExecutor & SequentialExecutor area:providers area:Scheduler including HA (high availability) scheduler area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK kind:documentation provider:edge Edge Executor / Worker (AIP-69)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants