Skip to content

Task fails to execute in EdgeExecutor if it is deferred and re-scheduled #53610

@jscheffl

Description

@jscheffl

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

During tests of PR #53035 it came to light that deferred tasks (which in case of testing HITL is) have problems in EdgeExecutor in case they are re-scheduled.

In the tested case the task was initially scheduled on EdgeExecutor and then was deferred to triggerer. But the task in HITL in the test was configured to a timout. To process the timeout the task was scheduled a second time which caused a unique constraint error on edge_jobs table in the DB with the following error:

root@f2921a52480b:/opt/airflow# airflow scheduler
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
/opt/airflow/providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py:44 DeprecationWarning: The `airflow.utils.task_group.TaskGroup` attribute is deprecated. Please use `'airflow.sdk.definitions.taskgroup.TaskGroup'`.
[2025-07-21T19:30:51.013+0000] {scheduler_job_runner.py:978} INFO - Starting the scheduler
[2025-07-21T19:30:51.014+0000] {executor_loader.py:269} INFO - Loaded executor: ::airflow.providers.edge3.executors.edge_executor.EdgeExecutor
[2025-07-21T19:30:51.046+0000] {scheduler_job_runner.py:2137} INFO - Adopting or resetting orphaned tasks for active dag runs
[2025-07-21T19:30:51.139+0000] {scheduler_job_runner.py:454} INFO - 3 tasks up for execution:
        <TaskInstance: example_hitl_operator.wait_for_input manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>
        <TaskInstance: example_hitl_operator.wait_for_option manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>
        <TaskInstance: example_hitl_operator.valid_input_and_options manual__2025-07-21T18:55:40.109637+00:00 [scheduled]>
[2025-07-21T19:30:51.139+0000] {scheduler_job_runner.py:526} INFO - DAG example_hitl_operator has 0/16 running and queued tasks
[2025-07-21T19:30:51.140+0000] {scheduler_job_runner.py:526} INFO - DAG example_hitl_operator has 1/16 running and queued tasks
[2025-07-21T19:30:51.140+0000] {scheduler_job_runner.py:526} INFO - DAG example_hitl_operator has 0/16 running and queued tasks
[2025-07-21T19:30:51.140+0000] {scheduler_job_runner.py:665} INFO - Setting the following tasks to queued state:
        <TaskInstance: example_hitl_operator.wait_for_input manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>
        <TaskInstance: example_hitl_operator.wait_for_option manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>
        <TaskInstance: example_hitl_operator.valid_input_and_options manual__2025-07-21T18:55:40.109637+00:00 [scheduled]>
[2025-07-21T19:30:51.142+0000] {scheduler_job_runner.py:750} INFO - Trying to enqueue tasks: [<TaskInstance: example_hitl_operator.wait_for_input manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>, <TaskInstance: example_hitl_operator.wait_for_option manual__2025-07-21T19:28:32.128452+00:00 [scheduled]>, <TaskInstance: example_hitl_operator.valid_input_and_options manual__2025-07-21T18:55:40.109637+00:00 [scheduled]>] for executor: EdgeExecutor(parallelism=32)
[2025-07-21T19:30:51.149+0000] {scheduler_job_runner.py:1006} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
    self.dialect.do_executemany(
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 982, in do_executemany
    context._psycopg2_fetched_rows = xtras.execute_values(
                                     ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 1299, in execute_values
    cur.execute(b''.join(parts))
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "edge_job_pkey"
DETAIL:  Key (dag_id, task_id, run_id, map_index, try_number)=(example_hitl_operator, valid_input_and_options, manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists.


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1002, in _execute
    self._run_scheduler_loop()
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1283, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1430, in _do_scheduling
    guard.commit()
  File "/opt/airflow/airflow-core/src/airflow/utils/sqlalchemy.py", line 401, in commit
    self.session.commit()
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 1454, in commit
    self._transaction.commit(_to_root=self.future)
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 832, in commit
    self._prepare_impl()
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 811, in _prepare_impl
    self.session.flush()
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
    with util.safe_reraise():
         ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
    util.preloaded.orm_persistence.save_obj(
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
    _emit_insert_statements(
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 1097, in _emit_insert_statements
    c = connection._execute_20(
        ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    return connection._execute_clauseelement(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
    ret = self._execute_context(
          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
    self.dialect.do_executemany(
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 982, in do_executemany
    context._psycopg2_fetched_rows = xtras.execute_values(
                                     ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 1299, in execute_values
    cur.execute(b''.join(parts))
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "edge_job_pkey"
DETAIL:  Key (dag_id, task_id, run_id, map_index, try_number)=(example_hitl_operator, valid_input_and_options, manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists.

[SQL: INSERT INTO edge_job (dag_id, task_id, run_id, map_index, try_number, state, queue, concurrency_slots, command, queued_dttm, edge_worker, last_update) VALUES (%(dag_id)s, %(task_id)s, %(run_id)s, %(map_index)s, %(try_number)s, %(state)s, %(queue)s, %(concurrency_slots)s, %(command)s, %(queued_dttm)s, %(edge_worker)s, %(last_update)s)]
[parameters: ({'dag_id': 'example_hitl_operator', 'task_id': 'wait_for_input', 'run_id': 'manual__2025-07-21T19:28:32.128452+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhlLTdmZWYtYWYzNC0zNzM1NTIyMmFlMWYiLCJqdGkiOiIxNzFiNGU3ZmI3ZmY0MjQyOTFmZWFhM2 ... (675 characters truncated) ... ,"log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_input/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145801, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 'example_hitl_operator', 'task_id': 'wait_for_option', 'run_id': 'manual__2025-07-21T19:28:32.128452+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhmLTc3NTItOThhOC1jOWE2MTdmYjhiZDEiLCJqdGkiOiI4ZTIxMmQzZTU2NWM0MzRlYTE2Mzc2ND ... (677 characters truncated) ... "log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_option/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145998, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 'example_hitl_operator', 'task_id': 'valid_input_and_options', 'run_id': 'manual__2025-07-21T18:55:40.109637+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU1Ny03NDY0LTc4OTgtOGRlNC1mNjRkNzliMWQzYzAiLCJqdGkiOiI4YTAxZmRmYzJiYzI0MWUxOGJjMTA3Nj ... (691 characters truncated) ... h":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T18-55-40.109637+00-00/task_id=valid_input_and_options/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 146113, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None})]
(Background on this error at: https://sqlalche.me/e/14/gkpj)
[2025-07-21T19:30:51.155+0000] {edge_executor.py:357} INFO - Shutting down EdgeExecutor
[2025-07-21T19:30:51.156+0000] {scheduler_job_runner.py:1018} INFO - Exited execute loop
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
    self.dialect.do_executemany(
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 982, in do_executemany
    context._psycopg2_fetched_rows = xtras.execute_values(
                                     ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 1299, in execute_values
    cur.execute(b''.join(parts))
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "edge_job_pkey"
DETAIL:  Key (dag_id, task_id, run_id, map_index, try_number)=(example_hitl_operator, valid_input_and_options, manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists.


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 10, in <module>
    sys.exit(main())
             ^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/__main__.py", line 55, in main
    args.func(args)
  File "/opt/airflow/airflow-core/src/airflow/cli/cli_config.py", line 48, in command
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/utils/cli.py", line 113, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 52, in scheduler
    run_command_with_daemon_option(
  File "/opt/airflow/airflow-core/src/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
    callback()
  File "/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 55, in <lambda>
    callback=lambda: _run_scheduler_job(args),
                     ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/cli/commands/scheduler_command.py", line 43, in _run_scheduler_job
    run_job(job=job_runner.job, execute_callable=job_runner._execute)
  File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 100, in wrapper
    return func(*args, session=session, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 355, in run_job
    return execute_job(job, execute_callable=execute_callable)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/jobs/job.py", line 384, in execute_job
    ret = execute_callable()
          ^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1002, in _execute
    self._run_scheduler_loop()
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1283, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py", line 1430, in _do_scheduling
    guard.commit()
  File "/opt/airflow/airflow-core/src/airflow/utils/sqlalchemy.py", line 401, in commit
    self.session.commit()
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 1454, in commit
    self._transaction.commit(_to_root=self.future)
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 832, in commit
    self._prepare_impl()
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 811, in _prepare_impl
    self.session.flush()
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
    with util.safe_reraise():
         ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
    util.preloaded.orm_persistence.save_obj(
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
    _emit_insert_statements(
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 1097, in _emit_insert_statements
    c = connection._execute_20(
        ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    return connection._execute_clauseelement(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
    ret = self._execute_context(
          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
    self.dialect.do_executemany(
  File "/usr/local/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 982, in do_executemany
    context._psycopg2_fetched_rows = xtras.execute_values(
                                     ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/psycopg2/extras.py", line 1299, in execute_values
    cur.execute(b''.join(parts))
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "edge_job_pkey"
DETAIL:  Key (dag_id, task_id, run_id, map_index, try_number)=(example_hitl_operator, valid_input_and_options, manual__2025-07-21T18:55:40.109637+00:00, -1, 1) already exists.

[SQL: INSERT INTO edge_job (dag_id, task_id, run_id, map_index, try_number, state, queue, concurrency_slots, command, queued_dttm, edge_worker, last_update) VALUES (%(dag_id)s, %(task_id)s, %(run_id)s, %(map_index)s, %(try_number)s, %(state)s, %(queue)s, %(concurrency_slots)s, %(command)s, %(queued_dttm)s, %(edge_worker)s, %(last_update)s)]
[parameters: ({'dag_id': 'example_hitl_operator', 'task_id': 'wait_for_input', 'run_id': 'manual__2025-07-21T19:28:32.128452+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhlLTdmZWYtYWYzNC0zNzM1NTIyMmFlMWYiLCJqdGkiOiIxNzFiNGU3ZmI3ZmY0MjQyOTFmZWFhM2 ... (675 characters truncated) ... ,"log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_input/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145801, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 'example_hitl_operator', 'task_id': 'wait_for_option', 'run_id': 'manual__2025-07-21T19:28:32.128452+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU3NS04YjhmLTc3NTItOThhOC1jOWE2MTdmYjhiZDEiLCJqdGkiOiI4ZTIxMmQzZTU2NWM0MzRlYTE2Mzc2ND ... (677 characters truncated) ... "log_path":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T19-28-32.128452+00-00/task_id=wait_for_option/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 145998, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None}, {'dag_id': 'example_hitl_operator', 'task_id': 'valid_input_and_options', 'run_id': 'manual__2025-07-21T18:55:40.109637+00:00', 'map_index': -1, 'try_number': 1, 'state': <TaskInstanceState.QUEUED: 'queued'>, 'queue': 'default', 'concurrency_slots': 1, 'command': '{"token":"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwMTk4MmU1Ny03NDY0LTc4OTgtOGRlNC1mNjRkNzliMWQzYzAiLCJqdGkiOiI4YTAxZmRmYzJiYzI0MWUxOGJjMTA3Nj ... (691 characters truncated) ... h":"dag_id=example_hitl_operator/run_id=manual__2025-07-21T18-55-40.109637+00-00/task_id=valid_input_and_options/attempt=1.log","type":"ExecuteTask"}', 'queued_dttm': datetime.datetime(2025, 7, 21, 19, 30, 51, 146113, tzinfo=Timezone('UTC')), 'edge_worker': None, 'last_update': None})]
(Background on this error at: https://sqlalche.me/e/14/gkpj)

Also restart of Scheduler did not fix the problem.

What you think should happen instead?

No failure. Or if Deferred tasks are in general not working on EdgeExecutor they should be refused to be acceped first-hand already. Better of course would be to handle the re-scheduling of the same tasks.

How to reproduce

Run HITL Dag (if no UI merged using PR #53035 to have a UI) and wait on timeout of task valid_input_and_options that has a timeout of 1 minute. Scheduler will crash

Operating System

Ununtu 24.04

Versions of Apache Airflow Providers

Edge3 from main but probably all versions of Edge Provider released will have this problem

Deployment

Other

Deployment details

Started from main using breeze. Used command line:
breeze start-airflow --python 3.12 --backend postgres --executor EdgeExecutor --load-example-dags

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

area:corekind:bugThis is a clearly a bugprovider:edgeEdge Executor / Worker (AIP-69) / edge3

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions