Skip to content

Conversation

@ashb
Copy link
Member

@ashb ashb commented Jan 29, 2025

Closes #45426

Some points of note about this PR:

  • Logging is changed in Celery, but only for Airflow 3

    Celery does it's own "capture stdout" logging, which conflicts with the ones
    we do in the TaskSDK, so we disable that; but to not change anything for
    Airflow 3.

  • Simplify task SDK logging redirection

    As part of this discovery that Celery captures stdout/stderr itself (and
    before disabling that) I discovered a simpler way to re-open the
    stdin/out/err so that the implementation needs fewer/no special casing.

  • Make JSON task logs more readable by giving them a consistent/useful order

    We re-order (by re-creating) the event_dict so that timestamp, level, and
    then even are always the first items in the dict

  • Makes the CeleryExecutor understand the concept of "workloads" instead a
    command tuple.

    This change isn't done in the best way, but until Kube executor is swapped
    over (and possibly the other in-tree executors, such as ECS) we need to
    support both styles concurrently.

    The change should be done in such a way that the provider still works with
    Airflow v2, if it's running on that version.

  • Upgrade Celery

    This turned out to not be 100% necessary but it does fix some deprecation
    warnings when running on Python 3.12

  • Ensure that the forked process in TaskSDK never ever exits

    Again, this isn't possible usually, but since the setup step of _fork_main
    died, it didn't call os._exit(), and was caught further up, which meant
    the process stayed alive as it never closed the sockets properly. We put and
    extra safety try/except block in place to catch that

I have not yet included a newsfragment for changing the executor interface as
the old style is currently still supported.

Testing: I ran airflow scheduler, airflow fastapi-api, and airflow celery worker and triggered some dags, viewed logs etc.

And since a picture makes all PRs better

Screenshot 2025-01-29 at 22 49 49

Celery window showing debug logs from the supervisor itself (task output goes to file)

Screenshot 2025-01-29 at 22 50 56

@boring-cyborg boring-cyborg bot added area:Executors-core LocalExecutor & SequentialExecutor area:Scheduler including HA (high availability) scheduler area:task-sdk provider:celery labels Jan 29, 2025
@ashb ashb force-pushed the swap-celery-exec-to-tasksdk branch from 315d736 to 8263c94 Compare January 29, 2025 22:59
Copy link
Contributor

@amoghrajesh amoghrajesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this @ashb!
Have a few comments, nothing serious, general comments / nits, I am OK with the PR mostly

@amoghrajesh

This comment was marked as resolved.

@ashb ashb force-pushed the swap-celery-exec-to-tasksdk branch 3 times, most recently from 2038dc1 to 8e2d7cc Compare January 30, 2025 13:36
@ashb ashb requested a review from eladkal as a code owner January 30, 2025 13:36
@ashb ashb force-pushed the swap-celery-exec-to-tasksdk branch 2 times, most recently from ef0a8b9 to fd30378 Compare January 30, 2025 15:29
@ashb ashb force-pushed the swap-celery-exec-to-tasksdk branch from fd30378 to a7635c9 Compare January 30, 2025 16:34
@ashb
Copy link
Member Author

ashb commented Jan 30, 2025

RIght I think this is good, the failing tests are fixed in main by Jarek's hard work.

One failure I wasn't happy with, so I've rebased.

@ashb ashb force-pushed the swap-celery-exec-to-tasksdk branch from a7635c9 to 15aec32 Compare January 30, 2025 17:33
Copy link
Contributor

@o-nikolas o-nikolas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes to base look good to me to support both for now. I think it's called out in a comment but we should migrate all in-tree executors (K8s, ECS, Batch, Edge, etc) and add a new min Airflow version for those packages.

@ashb
Copy link
Member Author

ashb commented Jan 30, 2025

Changes to base look good to me to support both for now. I think it's called out in a comment but we should migrate all in-tree executors (K8s, ECS, Batch, Edge, etc) and add a new min Airflow version for those packages.

👍 Yup, or we can make them work with 2+3 similar to how I've done for celery. The ecs etc changes right now are very minimal in this or

Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow I would have expected a larger PR from the challenges you faced. Looks pretty solid.

Did a regression with EdgeExecutor and also this is still working. Just one detail - when I executed my "Integration Test" DAG from the Edge Examples, the BranchOperator in EdgeWorker is failing, which seems to be working in Celery. So might be I need a bit of ENV fixing. Exception is:
{"timestamp":"2025-01-30T22:01:35.193579","level":"error","event":"Task failed with exception","logger":"task","error_detail":[{"exc_type":"OperationalError","exc_value":"(sqlite3.OperationalError) no such table: dag_run\n[SQL: SELECT dag_run.state AS dag_run_state, dag_run.id AS dag_run_id, dag_run.dag_id AS dag_run_dag_id, dag_run.queued_at AS dag_run_queued_at, dag_run.logical_date AS dag_run_logical_date, dag_run.start_date AS dag_run_start_date, dag_run.end_date AS dag_run_end_date, dag_run.run_id AS dag_run_run_id, dag_run.creating_job_id AS dag_run_creating_job_id, dag_run.external_trigger AS dag_run_external_trigger, dag_run.run_type AS dag_run_run_type, dag_run.triggered_by AS dag_run_triggered_by, dag_run.conf AS dag_run_conf, dag_run.data_interval_start AS dag_run_data_interval_start, dag_run.data_interval_end AS dag_run_data_interval_end, dag_run.last_scheduling_decision AS dag_run_last_scheduling_decision, dag_run.log_template_id AS dag_run_log_template_id, dag_run.updated_at AS dag_run_updated_at, dag_run.clear_number AS dag_run_clear_number, dag_run.backfill_id AS dag_run_backfill_id, dag_run.dag_version_id AS dag_run_dag_version_id, dag_run.bundle_version AS dag_run_bundle_version \nFROM dag_run \nWHERE dag_run.dag_id = ? AND dag_run.run_id = ?]\n[parameters: ('integration_test', 'manual__2025-01-30T23:01:24+01:00')]\n(Background on this error at: [https://sqlalche.me/e/14/e3q8)","syntax_error":null,"is_cause":false,"frames":%22,%22syntax_error%22:null,%22is_cause%22:false,%22frames%22:[){"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":527,"name":"run"},{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":626,"name":"_execute_task"},{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":173,"name":"wrapper"},{"filename":"/opt/airflow/airflow/decorators/base.py","lineno":252,"name":"execute"},{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":173,"name":"wrapper"},{"filename":"/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py","lineno":240,"name":"execute"},{"filename":"/opt/airflow/airflow/operators/branch.py","lineno":41,"name":"do_branch"},{"filename":"/opt/airflow/airflow/models/skipmixin.py","lineno":212,"name":"skip_all_except"},{"filename":"/opt/airflow/airflow/utils/session.py","lineno":98,"name":"wrapper"},{"filename":"/opt/airflow/airflow/models/taskinstance.py","lineno":2480,"name":"get_dagrun"},{"filename":"/opt/airflow/airflow/models/taskinstance.py","lineno":2461,"name":"_get_dagrun"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/query.py","lineno":2870,"name":"one"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/query.py","lineno":2916,"name":"_iter"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py","lineno":1717,"name":"execute"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py","lineno":1710,"name":"_execute_20"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py","lineno":334,"name":"_execute_on_connection"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py","lineno":1577,"name":"_execute_clauseelement"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py","lineno":1953,"name":"_execute_context"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py","lineno":2134,"name":"handle_dbapi_exception"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py","lineno":211,"name":"raise"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py","lineno":1910,"name":"_execute_context"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/default.py","lineno":736,"name":"do_execute"}]},{"exc_type":"OperationalError","exc_value":"no such table: dag_run","syntax_error":null,"is_cause":true,"frames":[{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py","lineno":1910,"name":"_execute_context"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/default.py","lineno":736,"name":"do_execute"}]}]}

..but anyway, LGTM!

Copy link
Contributor

@amoghrajesh amoghrajesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing the comments, LGTM +1

@ashb
Copy link
Member Author

ashb commented Jan 31, 2025

no such table: dag_run

That seems to be the issue @jens, likely this is the task in still trying to use DB as not everything has been ported over yet?

@ashb ashb force-pushed the swap-celery-exec-to-tasksdk branch 2 times, most recently from 5eb3b8a to 03b610d Compare January 31, 2025 13:06
@ashb ashb requested a review from potiuk as a code owner January 31, 2025 13:06
Some points of note about this change:

- Logging is changed in Celery, but only for Airflow 3

  Celery does it's own "capture stdout" logging, which conflicts with the ones
  we do in the TaskSDK, so we disable that; but to not change anything for
  Airflow 3.

- Simplify task SDK logging redirection

  As part of this discovery that Celery captures stdout/stderr itself (and
  before disabling that) I discovered a simpler way to re-open the
  stdin/out/err so that the implementation needs fewer/no special casing.

- Make JSON task logs more readable by giving them a consistent/useful order

  We re-order (by re-creating) the event_dict so that timestamp, level, and
  then even are always the first items in the dict

- Makes the CeleryExecutor understand the concept of "workloads" instead a
  command tuple.

  This change isn't done in the best way, but until Kube executor is swapped
  over (and possibly the other in-tree executors, such as ECS) we need to
  support both styles concurrently.

  The change should be done in such a way that the provider still works with
  Airflow v2, if it's running on that version.

- Upgrade Celery

  This turned out to not be 100% necessary but it does fix some deprecation
  warnings when running on Python 3.12

- Ensure that the forked process in TaskSDK _never ever_ exits

  Again, this isn't possible usually, but since the setup step of `_fork_main`
  died, it didn't call `os._exit()`, and was caught further up, which meant
  the process stayed alive as it never closed the sockets properly. We put and
  extra safety try/except block in place to catch that

I have not yet included a newsfragment for changing the executor interface as
the old style is _currently_ still supported.
@ashb ashb force-pushed the swap-celery-exec-to-tasksdk branch from 03b610d to 2b87ab7 Compare January 31, 2025 14:15
@ashb
Copy link
Member Author

ashb commented Jan 31, 2025

K, finally. One (🤞🏻) unrelated Beam downgrade test failing, but that's happening on other PRs too

@ashb ashb merged commit 984c61d into apache:main Jan 31, 2025
155 of 156 checks passed
@ashb ashb deleted the swap-celery-exec-to-tasksdk branch January 31, 2025 15:24
Prab-27 pushed a commit to Prab-27/airflow that referenced this pull request Jan 31, 2025
Some points of note about this change:

- Logging is changed in Celery, but only for Airflow 3

  Celery does it's own "capture stdout" logging, which conflicts with the ones
  we do in the TaskSDK, so we disable that; but to not change anything for
  Airflow 3.

- Simplify task SDK logging redirection

  As part of this discovery that Celery captures stdout/stderr itself (and
  before disabling that) I discovered a simpler way to re-open the
  stdin/out/err so that the implementation needs fewer/no special casing.

- Make JSON task logs more readable by giving them a consistent/useful order

  We re-order (by re-creating) the event_dict so that timestamp, level, and
  then even are always the first items in the dict

- Makes the CeleryExecutor understand the concept of "workloads" instead a
  command tuple.

  This change isn't done in the best way, but until Kube executor is swapped
  over (and possibly the other in-tree executors, such as ECS) we need to
  support both styles concurrently.

  The change should be done in such a way that the provider still works with
  Airflow v2, if it's running on that version.

- Upgrade Celery

  This turned out to not be 100% necessary but it does fix some deprecation
  warnings when running on Python 3.12

- Ensure that the forked process in TaskSDK _never ever_ exits

  Again, this isn't possible usually, but since the setup step of `_fork_main`
  died, it didn't call `os._exit()`, and was caught further up, which meant
  the process stayed alive as it never closed the sockets properly. We put and
  extra safety try/except block in place to catch that

I have not yet included a newsfragment for changing the executor interface as
the old style is _currently_ still supported.
@jscheffl
Copy link
Contributor

jscheffl commented Jan 31, 2025

no such table: dag_run

That seems to be the issue @jens, likely this is the task in still trying to use DB as not everything has been ported over yet?

Yeah, am not sure. In Edge the DB Connection gets dropped, maybe this is the side effect if still defined in ENV for celery. In 2.10 all was clean, maybe some refactoring re-introduced this or the cleanup of AIP-44 leftovers. So no blame to this PR :-D just noticed... LGTM! ...ups was already :-D

amoghrajesh pushed a commit to astronomer/airflow that referenced this pull request Feb 3, 2025
Some points of note about this change:

- Logging is changed in Celery, but only for Airflow 3

  Celery does it's own "capture stdout" logging, which conflicts with the ones
  we do in the TaskSDK, so we disable that; but to not change anything for
  Airflow 3.

- Simplify task SDK logging redirection

  As part of this discovery that Celery captures stdout/stderr itself (and
  before disabling that) I discovered a simpler way to re-open the
  stdin/out/err so that the implementation needs fewer/no special casing.

- Make JSON task logs more readable by giving them a consistent/useful order

  We re-order (by re-creating) the event_dict so that timestamp, level, and
  then even are always the first items in the dict

- Makes the CeleryExecutor understand the concept of "workloads" instead a
  command tuple.

  This change isn't done in the best way, but until Kube executor is swapped
  over (and possibly the other in-tree executors, such as ECS) we need to
  support both styles concurrently.

  The change should be done in such a way that the provider still works with
  Airflow v2, if it's running on that version.

- Upgrade Celery

  This turned out to not be 100% necessary but it does fix some deprecation
  warnings when running on Python 3.12

- Ensure that the forked process in TaskSDK _never ever_ exits

  Again, this isn't possible usually, but since the setup step of `_fork_main`
  died, it didn't call `os._exit()`, and was caught further up, which meant
  the process stayed alive as it never closed the sockets properly. We put and
  extra safety try/except block in place to catch that

I have not yet included a newsfragment for changing the executor interface as
the old style is _currently_ still supported.
dabla pushed a commit to dabla/airflow that referenced this pull request Feb 3, 2025
Some points of note about this change:

- Logging is changed in Celery, but only for Airflow 3

  Celery does it's own "capture stdout" logging, which conflicts with the ones
  we do in the TaskSDK, so we disable that; but to not change anything for
  Airflow 3.

- Simplify task SDK logging redirection

  As part of this discovery that Celery captures stdout/stderr itself (and
  before disabling that) I discovered a simpler way to re-open the
  stdin/out/err so that the implementation needs fewer/no special casing.

- Make JSON task logs more readable by giving them a consistent/useful order

  We re-order (by re-creating) the event_dict so that timestamp, level, and
  then even are always the first items in the dict

- Makes the CeleryExecutor understand the concept of "workloads" instead a
  command tuple.

  This change isn't done in the best way, but until Kube executor is swapped
  over (and possibly the other in-tree executors, such as ECS) we need to
  support both styles concurrently.

  The change should be done in such a way that the provider still works with
  Airflow v2, if it's running on that version.

- Upgrade Celery

  This turned out to not be 100% necessary but it does fix some deprecation
  warnings when running on Python 3.12

- Ensure that the forked process in TaskSDK _never ever_ exits

  Again, this isn't possible usually, but since the setup step of `_fork_main`
  died, it didn't call `os._exit()`, and was caught further up, which meant
  the process stayed alive as it never closed the sockets properly. We put and
  extra safety try/except block in place to catch that

I have not yet included a newsfragment for changing the executor interface as
the old style is _currently_ still supported.
niklasr22 pushed a commit to niklasr22/airflow that referenced this pull request Feb 8, 2025
Some points of note about this change:

- Logging is changed in Celery, but only for Airflow 3

  Celery does it's own "capture stdout" logging, which conflicts with the ones
  we do in the TaskSDK, so we disable that; but to not change anything for
  Airflow 3.

- Simplify task SDK logging redirection

  As part of this discovery that Celery captures stdout/stderr itself (and
  before disabling that) I discovered a simpler way to re-open the
  stdin/out/err so that the implementation needs fewer/no special casing.

- Make JSON task logs more readable by giving them a consistent/useful order

  We re-order (by re-creating) the event_dict so that timestamp, level, and
  then even are always the first items in the dict

- Makes the CeleryExecutor understand the concept of "workloads" instead a
  command tuple.

  This change isn't done in the best way, but until Kube executor is swapped
  over (and possibly the other in-tree executors, such as ECS) we need to
  support both styles concurrently.

  The change should be done in such a way that the provider still works with
  Airflow v2, if it's running on that version.

- Upgrade Celery

  This turned out to not be 100% necessary but it does fix some deprecation
  warnings when running on Python 3.12

- Ensure that the forked process in TaskSDK _never ever_ exits

  Again, this isn't possible usually, but since the setup step of `_fork_main`
  died, it didn't call `os._exit()`, and was caught further up, which meant
  the process stayed alive as it never closed the sockets properly. We put and
  extra safety try/except block in place to catch that

I have not yet included a newsfragment for changing the executor interface as
the old style is _currently_ still supported.
ambika-garg pushed a commit to ambika-garg/airflow that referenced this pull request Feb 17, 2025
Some points of note about this change:

- Logging is changed in Celery, but only for Airflow 3

  Celery does it's own "capture stdout" logging, which conflicts with the ones
  we do in the TaskSDK, so we disable that; but to not change anything for
  Airflow 3.

- Simplify task SDK logging redirection

  As part of this discovery that Celery captures stdout/stderr itself (and
  before disabling that) I discovered a simpler way to re-open the
  stdin/out/err so that the implementation needs fewer/no special casing.

- Make JSON task logs more readable by giving them a consistent/useful order

  We re-order (by re-creating) the event_dict so that timestamp, level, and
  then even are always the first items in the dict

- Makes the CeleryExecutor understand the concept of "workloads" instead a
  command tuple.

  This change isn't done in the best way, but until Kube executor is swapped
  over (and possibly the other in-tree executors, such as ECS) we need to
  support both styles concurrently.

  The change should be done in such a way that the provider still works with
  Airflow v2, if it's running on that version.

- Upgrade Celery

  This turned out to not be 100% necessary but it does fix some deprecation
  warnings when running on Python 3.12

- Ensure that the forked process in TaskSDK _never ever_ exits

  Again, this isn't possible usually, but since the setup step of `_fork_main`
  died, it didn't call `os._exit()`, and was caught further up, which meant
  the process stayed alive as it never closed the sockets properly. We put and
  extra safety try/except block in place to catch that

I have not yet included a newsfragment for changing the executor interface as
the old style is _currently_ still supported.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Executors-core LocalExecutor & SequentialExecutor area:Scheduler including HA (high availability) scheduler area:task-sdk provider:celery

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Change Celery executor to accept new "Activity" payload format and update it to run tasks via task SDK supervisor instead of LocalTaskJob

5 participants