-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
main (development)
If "Other Airflow 2 version" selected, which one?
No response
What happened?
During tests of PR #53035 it came to light that deferred tasks (which in case of testing HITL is) have problems in EdgeExecutor in case they are re-scheduled.
In the tested case the task was initially scheduled on EdgeExecutor and then was deferred to triggerer. But the task in HITL in the test was configured to a timout. To process the timeout the task was scheduled a second time which caused a unique constraint error on edge_jobs table in the DB with the following error:
root@f2921a52480b:/opt/airflow# airflow scheduler
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
/opt/airflow/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py:44 DeprecationWarning: The `airflow.utils.task_group.TaskGroup` attribute is deprecated. Please use `'airflow.sdk.definitions.taskgroup.TaskGroup'`.
[2025-07-21T19:30:51.013+0000] {scheduler_job_runner.py:978} INFO - Starting the scheduler
[2025-07-21T19:30:51.014+0000] {executor_loader.py:269} INFO - Loaded executor: ::airflow.providers.edge3.executors.edge_executor.EdgeExecutor
[2025-07-21T19:30:51.046+0000] {scheduler_job_runner.py:2137} INFO - Adopting or resetting orphaned tasks for active dag runs
[2025-07-21T19:30:51.139+0000] {scheduler_job_runner.py:454} INFO - 3 tasks up for execution:
<TaskInstance: example_hitl_operator.wait_for_input manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>
<TaskInstance: example_hitl_operator.wait_for_option manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>
<TaskInstance: example_hitl_operator.valid_input_and_options manual__2025-07-21T18:55:40.109637+00:00 [scheduled]>
[2025-07-21T19:30:51.139+0000] {scheduler_job_runner.py:526} INFO - DAG example_hitl_operator has 0/16 running and queued tasks
[2025-07-21T19:30:51.140+0000] {scheduler_job_runner.py:526} INFO - DAG example_hitl_operator has 1/16 running and queued tasks
[2025-07-21T19:30:51.140+0000] {scheduler_job_runner.py:526} INFO - DAG example_hitl_operator has 0/16 running and queued tasks
[2025-07-21T19:30:51.140+0000] {scheduler_job_runner.py:665} INFO - Setting the following tasks to queued state:
<TaskInstance: example_hitl_operator.wait_for_input manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>
<TaskInstance: example_hitl_operator.wait_for_option manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>
<TaskInstance: example_hitl_operator.valid_input_and_options manual__2025-07-21T18:55:40.109637+00:00 [scheduled]>
[2025-07-21T19:30:51.142+0000] {scheduler_job_runner.py:750} INFO - Trying to enqueue tasks: [<TaskInstance: example_hitl_operator.wait_for_input manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>, <TaskInstance: example_hitl_operator.wait_for_option manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>, <TaskInstance: example_hitl_operator.valid_input_and_options manual__2025-07-21T18:55:40.109637+00:00 [scheduled]>] for executor: EdgeExecutor(parallelism=32)
[2025-07-21T19:30:51.149+0000] {scheduler_job_runner.py:1006} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
self.dialect.do_executemany(
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 982, in do_executemany
context._psycopg2_fetched_rows = xtras.execute_values(
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 1299, in execute_values
cur.execute(b''.join(parts))
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "edge_job_pkey"
DETAIL: Key (dag_id, task_id, run_id, map_index, try_number)=(example_hitl_operator, valid_input_and_options, manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1002, in _execute
self._run_scheduler_loop()
File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1283, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1430, in _do_scheduling
guard.commit()
File "/opt/airflow/airflow-core/src/airflow/utils/sqlalchemy.py", line 401, in commit
self.session.commit()
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 1454, in commit
self._transaction.commit(_to_root=self.future)
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 832, in commit
self._prepare_impl()
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 811, in _prepare_impl
self.session.flush()
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
self._flush(objects)
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
with util.safe_reraise():
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
flush_context.execute()
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
rec.execute(self)
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
util.preloaded.orm_persistence.save_obj(
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
_emit_insert_statements(
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 1097, in _emit_insert_statements
c = connection._execute_20(
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
return connection._execute_clauseelement(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
ret = self._execute_context(
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
self._handle_dbapi_exception(
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
util.raise_(
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
self.dialect.do_executemany(
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 982, in do_executemany
context._psycopg2_fetched_rows = xtras.execute_values(
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 1299, in execute_values
cur.execute(b''.join(parts))
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "edge_job_pkey"
DETAIL: Key (dag_id, task_id, run_id, map_index, try_number)=(example_hitl_operator, valid_input_and_options, manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists.
[SQL: INSERT INTO edge_job (dag_id, task_id, run_id, map_index, try_number, state, queue, concurrency_slots, command, queued_dttm, edge_worker, last_update) VALUES (%(dag_id)s, %(task_id)s, %(run_id)s, %(map_index)s, %(try_number)s, %(state)s, %(queue)s, %(concurrency_slots)s, %(command)s, %(queued_dttm)s, %(edge_worker)s, %(last_update)s)]
[parameters: ({'dag_id': 'example_hitl_operator', 'task_id': 'wait_for_input', 'run_id': 'manual__2025-07-21T19:28:32.128452+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhlLTdmZWYtYWYzNC0zNzM1NTIyMmFlMWYiLCJqdGkiOiIxNzFiNGU3ZmI3ZmY0MjQyOTFmZWFhM2 ... (675 characters truncated) ... ,"log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_input/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145801, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 'example_hitl_operator', 'task_id': 'wait_for_option', 'run_id': 'manual__2025-07-21T19:28:32.128452+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhmLTc3NTItOThhOC1jOWE2MTdmYjhiZDEiLCJqdGkiOiI4ZTIxMmQzZTU2NWM0MzRlYTE2Mzc2ND ... (677 characters truncated) ... "log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_option/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145998, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 'example_hitl_operator', 'task_id': 'valid_input_and_options', 'run_id': 'manual__2025-07-21T18:55:40.109637+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU1Ny03NDY0LTc4OTgtOGRlNC1mNjRkNzliMWQzYzAiLCJqdGkiOiI4YTAxZmRmYzJiYzI0MWUxOGJjMTA3Nj ... (691 characters truncated) ... h":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T18-55-40.109637+00-00/task_id=valid_input_and_options/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 146113, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None})]
(Background on this error at: https://sqlalche.me/e/14/gkpj)
[2025-07-21T19:30:51.155+0000] {edge_executor.py:357} INFO - Shutting down EdgeExecutor
[2025-07-21T19:30:51.156+0000] {scheduler_job_runner.py:1018} INFO - Exited execute loop
Traceback (most recent call last):
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
self.dialect.do_executemany(
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 982, in do_executemany
context._psycopg2_fetched_rows = xtras.execute_values(
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 1299, in execute_values
cur.execute(b''.join(parts))
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "edge_job_pkey"
DETAIL: Key (dag_id, task_id, run_id, map_index, try_number)=(example_hitl_operator, valid_input_and_options, manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 10, in <module>
sys.exit(main())
^^^^^^
File "/opt/airflow/airflow-core/src/airflow/__main__.py", line 55, in main
args.func(args)
File "/opt/airflow/airflow-core/src/airflow/cli/cli_config.py", line 48, in command
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow-core/src/airflow/utils/cli.py", line 113, in wrapper
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow-core/src/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 52, in scheduler
run_command_with_daemon_option(
File "/opt/airflow/airflow-core/src/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
callback()
File "/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 55, in <lambda>
callback=lambda: _run_scheduler_job(args),
^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 43, in _run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 100, in wrapper
return func(*args, session=session, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 355, in run_job
return execute_job(job, execute_callable=execute_callable)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 384, in execute_job
ret = execute_callable()
^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1002, in _execute
self._run_scheduler_loop()
File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1283, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1430, in _do_scheduling
guard.commit()
File "/opt/airflow/airflow-core/src/airflow/utils/sqlalchemy.py", line 401, in commit
self.session.commit()
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 1454, in commit
self._transaction.commit(_to_root=self.future)
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 832, in commit
self._prepare_impl()
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 811, in _prepare_impl
self.session.flush()
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
self._flush(objects)
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
with util.safe_reraise():
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
flush_context.execute()
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
rec.execute(self)
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
util.preloaded.orm_persistence.save_obj(
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
_emit_insert_statements(
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 1097, in _emit_insert_statements
c = connection._execute_20(
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
return connection._execute_clauseelement(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
ret = self._execute_context(
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
self._handle_dbapi_exception(
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
util.raise_(
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
self.dialect.do_executemany(
File "/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 982, in do_executemany
context._psycopg2_fetched_rows = xtras.execute_values(
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 1299, in execute_values
cur.execute(b''.join(parts))
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "edge_job_pkey"
DETAIL: Key (dag_id, task_id, run_id, map_index, try_number)=(example_hitl_operator, valid_input_and_options, manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists.
[SQL: INSERT INTO edge_job (dag_id, task_id, run_id, map_index, try_number, state, queue, concurrency_slots, command, queued_dttm, edge_worker, last_update) VALUES (%(dag_id)s, %(task_id)s, %(run_id)s, %(map_index)s, %(try_number)s, %(state)s, %(queue)s, %(concurrency_slots)s, %(command)s, %(queued_dttm)s, %(edge_worker)s, %(last_update)s)]
[parameters: ({'dag_id': 'example_hitl_operator', 'task_id': 'wait_for_input', 'run_id': 'manual__2025-07-21T19:28:32.128452+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhlLTdmZWYtYWYzNC0zNzM1NTIyMmFlMWYiLCJqdGkiOiIxNzFiNGU3ZmI3ZmY0MjQyOTFmZWFhM2 ... (675 characters truncated) ... ,"log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_input/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145801, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 'example_hitl_operator', 'task_id': 'wait_for_option', 'run_id': 'manual__2025-07-21T19:28:32.128452+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhmLTc3NTItOThhOC1jOWE2MTdmYjhiZDEiLCJqdGkiOiI4ZTIxMmQzZTU2NWM0MzRlYTE2Mzc2ND ... (677 characters truncated) ... "log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_option/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145998, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 'example_hitl_operator', 'task_id': 'valid_input_and_options', 'run_id': 'manual__2025-07-21T18:55:40.109637+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU1Ny03NDY0LTc4OTgtOGRlNC1mNjRkNzliMWQzYzAiLCJqdGkiOiI4YTAxZmRmYzJiYzI0MWUxOGJjMTA3Nj ... (691 characters truncated) ... h":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T18-55-40.109637+00-00/task_id=valid_input_and_options/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 146113, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None})]
(Background on this error at: https://sqlalche.me/e/14/gkpj)
Also restart of Scheduler did not fix the problem.
What you think should happen instead?
No failure. Or if Deferred tasks are in general not working on EdgeExecutor they should be refused to be acceped first-hand already. Better of course would be to handle the re-scheduling of the same tasks.
How to reproduce
Run HITL Dag (if no UI merged using PR #53035 to have a UI) and wait on timeout of task valid_input_and_options that has a timeout of 1 minute. Scheduler will crash
Operating System
Ununtu 24.04
Versions of Apache Airflow Providers
Edge3 from main but probably all versions of Edge Provider released will have this problem
Deployment
Other
Deployment details
Started from main using breeze. Used command line:
breeze start-airflow --python 3.12 --backend postgres --executor EdgeExecutor --load-example-dags
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct