Skip to content

Conversation

@tanvn
Copy link
Contributor

@tanvn tanvn commented May 4, 2024

Closes: #39088

As described in #39088 (comment),
when an OperationalError happens (in my case, it was MySQLdb.OperationalError),
the method executor.try_adopt_task_instances(tis_to_adopt_or_reset) is retried,
and in the first attempt, all running pods have been adopted successfully, so adopt_launched_task will not be called even once which leads to a situation that tis_to_flush_by_key in the second attempt contains TIs that are already adopted.
The TIs then are flushed unnecessarily, and could be annoying as some heavy tasks get terminated and retried.

Notes

Please note that this PR only fixes the issue happening with kubernetes executor.
(It might happen with other executors as well but I am only using kubernetes executor at the present and not familiar with the others)


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues labels May 4, 2024
@tanvn tanvn changed the title Avoid resetting adopted task instances when retrying Avoid resetting adopted task instances when retrying for kubernetes executor May 4, 2024
@tanvn
Copy link
Contributor Author

tanvn commented May 4, 2024

Tested on my environment, confirmed that in the second attempt, the task instances whose pods have been adopted are not flushed anymore

[2024-05-04T13:03:12.681+0000] {scheduler_job_runner.py:935} INFO - Starting the scheduler loop
[2024-05-04T13:03:12.681+0000] {scheduler_job_runner.py:1612} INFO - Adopting or resetting orphaned tasks for active dag runs
[2024-05-04T13:03:12.682+0000] {scheduler_job_runner.py:1617} INFO - Running SchedulerJob.adopt_or_reset_orphaned_tasks with retries. Try 1 of 3
....
[2024-05-04T13:03:18.691+0000] {before_sleep.py:65} INFO - Retrying <unknown> in 0.31128888434353585 seconds as it raised OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: UPDATE task_instance SET queued_by_job_id=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s]
[parameters: ((10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689778), 'test__orphaned_test_dag', 'select_0', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689792), 'test__orphaned_test_dag', 'select_1', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689797), 'test__orphaned_test_dag', 'select_10', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689801), 'test__orphaned_test_dag', 'select_11', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689805), 'test__orphaned_test_dag', 'select_12', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689808), 'test__orphaned_test_dag', 'select_21', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689812), 'test__orphaned_test_dag', 'select_22', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689815), 'test__orphaned_test_dag', 'select_23', 'scheduled__2024-05-04T10:30:00+00:00', -1)  ... displaying 10 of 32 total bound parameter sets ...  (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689894), 'test__orphaned_test_dag', 'select_52', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689897), 'test__orphaned_test_dag', 'select_53', 'scheduled__2024-05-04T10:30:00+00:00', -1))]
(Background on this error at: https://sqlalche.me/e/14/e3q8).
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
    self.dialect.do_executemany(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 183, in do_executemany
    rowcount = cursor.executemany(statement, parameters)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in executemany
    self.rowcount = sum(self.execute(query, arg) for arg in args)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in <genexpr>
    self.rowcount = sum(self.execute(query, arg) for arg in args)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 179, in execute
    res = self._query(mogrified_query)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 330, in _query
    db.query(q)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/connections.py", line 261, in query
    _mysql.connection.query(self, query)
MySQLdb.OperationalError: (2013, 'Lost connection to MySQL server during query')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/airflow_src/airflow/jobs/scheduler_job_runner.py", line 1684, in adopt_or_reset_orphaned_tasks
    session.flush()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
    with util.safe_reraise():
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
    util.preloaded.orm_persistence.save_obj(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj
    _emit_update_statements(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1001, in _emit_update_statements
    c = connection._execute_20(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
    self.dialect.do_executemany(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 183, in do_executemany
    rowcount = cursor.executemany(statement, parameters)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in executemany
    self.rowcount = sum(self.execute(query, arg) for arg in args)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in <genexpr>
    self.rowcount = sum(self.execute(query, arg) for arg in args)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 179, in execute
    res = self._query(mogrified_query)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 330, in _query
    db.query(q)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/connections.py", line 261, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: UPDATE task_instance SET queued_by_job_id=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s]
[parameters: ((10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689778), 'test__orphaned_test_dag', 'select_0', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689792), 'test__orphaned_test_dag', 'select_1', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689797), 'test__orphaned_test_dag', 'select_10', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689801), 'test__orphaned_test_dag', 'select_11', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689805), 'test__orphaned_test_dag', 'select_12', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689808), 'test__orphaned_test_dag', 'select_21', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689812), 'test__orphaned_test_dag', 'select_22', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689815), 'test__orphaned_test_dag', 'select_23', 'scheduled__2024-05-04T10:30:00+00:00', -1)  ... displaying 10 of 32 total bound parameter sets ...  (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689894), 'test__orphaned_test_dag', 'select_52', 'scheduled__2024-05-04T10:30:00+00:00', -1), (10463, datetime.datetime(2024, 5, 4, 13, 3, 18, 689897), 'test__orphaned_test_dag', 'select_53', 'scheduled__2024-05-04T10:30:00+00:00', -1))]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
[2024-05-04T13:03:19.007+0000] {scheduler_job_runner.py:1617} INFO - Running SchedulerJob.adopt_or_reset_orphaned_tasks with retries. Try 2 of 3
...

[2024-05-04T13:03:19.397+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_0', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-05-04T13:03:19.397+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_1', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-05-04T13:03:19.397+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_10', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-05-04T13:03:19.397+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_11', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-05-04T13:03:19.398+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_12', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-05-04T13:03:19.398+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_21', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-05-04T13:03:19.398+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_22', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-05-04T13:03:19.398+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_23', run_id='scheduled__2024-05-04T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
...

@tanvn
Copy link
Contributor Author

tanvn commented May 5, 2024

@jedcunningham @hussein-awala
Please take a look at your convenience.

@tanvn
Copy link
Contributor Author

tanvn commented May 8, 2024

@jedcunningham @hussein-awala @potiuk @eladkal
Hi, we are running a service based on Airflow and there are thousands of tasks running hourly.
Many of them are heavy tasks and it is very annoying that every time our service is re-deployed, many of them get terminated and reset.
(The details are described in #39088)
Could you please take a look at this and give your opinions? I would be more than happy to provide further information if needed 🙇

@tanvn
Copy link
Contributor Author

tanvn commented May 21, 2024

@jedcunningham @hussein-awala ping

@eladkal eladkal force-pushed the avoid-resetting-adopted-tis-when-retrying branch from dc5da55 to 862d30e Compare June 5, 2024 06:05
Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM but I would prefer someone else to take a look on this

Copy link
Contributor

@romsharon98 romsharon98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good

Copy link
Member

@Lee-W Lee-W left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible for us to add a test to it?

@tanvn
Copy link
Contributor Author

tanvn commented Jun 5, 2024

Thank you all for your reviews!
I have updated and tested manually again (on source code of version 2.8.4 with some debug logs)
Confirmed that it is working as expected

[2024-06-05T11:57:46.698+0000] {before_sleep.py:65} INFO - Retrying <unknown> in 0.10147814315789988 seconds as it raised OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: UPDATE task_instance SET queued_by_job_id=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s]
[parameters: ((12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696354), 'test__orphaned_test_dag', 'select_70', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696370), 'test__orphaned_test_dag', 'select_71', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696376), 'test__orphaned_test_dag', 'select_72', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696379), 'test__orphaned_test_dag', 'select_73', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696383), 'test__orphaned_test_dag', 'select_74', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696386), 'test__orphaned_test_dag', 'select_75', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696390), 'test__orphaned_test_dag', 'select_76', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696393), 'test__orphaned_test_dag', 'select_77', 'scheduled__2024-06-05T10:30:00+00:00', -1)  ... displaying 10 of 32 total bound parameter sets ...  (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696473), 'test__orphaned_test_dag', 'select_98', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696477), 'test__orphaned_test_dag', 'select_99', 'scheduled__2024-06-05T10:30:00+00:00', -1))]
(Background on this error at: https://sqlalche.me/e/14/e3q8).
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
    self.dialect.do_executemany(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 183, in do_executemany
    rowcount = cursor.executemany(statement, parameters)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in executemany
    self.rowcount = sum(self.execute(query, arg) for arg in args)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in <genexpr>
    self.rowcount = sum(self.execute(query, arg) for arg in args)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 179, in execute
    res = self._query(mogrified_query)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 330, in _query
    db.query(q)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/connections.py", line 261, in query
    _mysql.connection.query(self, query)
MySQLdb.OperationalError: (2013, 'Lost connection to MySQL server during query')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/airflow_src/airflow/jobs/scheduler_job_runner.py", line 1684, in adopt_or_reset_orphaned_tasks
    session.flush()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
    with util.safe_reraise():
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
    util.preloaded.orm_persistence.save_obj(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj
    _emit_update_statements(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1001, in _emit_update_statements
    c = connection._execute_20(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context
    self.dialect.do_executemany(
  File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 183, in do_executemany
    rowcount = cursor.executemany(statement, parameters)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in executemany
    self.rowcount = sum(self.execute(query, arg) for arg in args)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in <genexpr>
    self.rowcount = sum(self.execute(query, arg) for arg in args)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 179, in execute
    res = self._query(mogrified_query)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 330, in _query
    db.query(q)
  File "/usr/local/lib/python3.10/site-packages/MySQLdb/connections.py", line 261, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: UPDATE task_instance SET queued_by_job_id=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s]
[parameters: ((12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696354), 'test__orphaned_test_dag', 'select_70', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696370), 'test__orphaned_test_dag', 'select_71', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696376), 'test__orphaned_test_dag', 'select_72', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696379), 'test__orphaned_test_dag', 'select_73', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696383), 'test__orphaned_test_dag', 'select_74', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696386), 'test__orphaned_test_dag', 'select_75', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696390), 'test__orphaned_test_dag', 'select_76', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696393), 'test__orphaned_test_dag', 'select_77', 'scheduled__2024-06-05T10:30:00+00:00', -1)  ... displaying 10 of 32 total bound parameter sets ...  (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696473), 'test__orphaned_test_dag', 'select_98', 'scheduled__2024-06-05T10:30:00+00:00', -1), (12681, datetime.datetime(2024, 6, 5, 11, 57, 46, 696477), 'test__orphaned_test_dag', 'select_99', 'scheduled__2024-06-05T10:30:00+00:00', -1))]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
[2024-06-05T11:57:46.805+0000] {scheduler_job_runner.py:1617} INFO - Running SchedulerJob.adopt_or_reset_orphaned_tasks with retries. Try 2 of 3
[2024-06-05T11:57:46.805+0000] {scheduler_job_runner.py:1622} INFO - Calling SchedulerJob.adopt_or_reset_orphaned_tasks method, with attempt <tenacity.AttemptManager object at 0x7f6b86100af0>
[2024-06-05T11:57:46.918+0000] {kubernetes_executor.py:554} INFO - tis_to_flush: []

....

[2024-06-05T11:57:47.207+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_70', run_id='scheduled__2024-06-05T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-06-05T11:57:47.208+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_71', run_id='scheduled__2024-06-05T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-06-05T11:57:47.208+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_72', run_id='scheduled__2024-06-05T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-06-05T11:57:47.208+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_73', run_id='scheduled__2024-06-05T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
[2024-06-05T11:57:47.208+0000] {kubernetes_executor.py:588} INFO - TaskInstanceKey(dag_id='test__orphaned_test_dag', task_id='select_74', run_id='scheduled__2024-06-05T10:30:00+00:00', try_number=1, map_index=-1) is already adopted, no need to flush.
...

@eladkal
Copy link
Contributor

eladkal commented Jun 5, 2024

@tanvn is it possible to cover this fix with unit tests to avoid regression?

@tanvn
Copy link
Contributor Author

tanvn commented Jun 6, 2024

is it possible to cover this fix with unit tests to avoid regression?

@eladkal Let me take a look!

@tanvn
Copy link
Contributor Author

tanvn commented Jun 7, 2024

@eladkal @Lee-W
Unit test added! PTAL.

@eladkal eladkal merged commit 8daa53e into apache:main Jun 7, 2024
@tanvn tanvn deleted the avoid-resetting-adopted-tis-when-retrying branch June 7, 2024 08:16
romsharon98 pushed a commit to romsharon98/airflow that referenced this pull request Jul 26, 2024
…xecutor (apache#39406)

* Avoid resetting adopted task instances when retrying

* Stop using f-string when logging

* Address comment

* Remove return type of generator func

* Add unit test

* Add comment and fix linter error
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Many running task instances are cleared by the new scheduler when an old scheduler is terminated and its health check server is periodically requested

6 participants