-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
Other Airflow 2 version (please specify below)
What happened
Apache Airflow version: 2.6.1
When multiple runs for a dag executing simultaneously, K8 executor fails with the following MySQL exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1407, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1534, in _execute_task_with_callbacks
RenderedTaskInstanceFields.write(rtif)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 75, in wrapper
with create_session() as session:
File "/usr/local/lib/python3.10/contextlib.py", line 142, in exit
next(self.gen)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 37, in create_session
session.commit()
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1454, in commit
self._transaction.commit(_to_root=self.future)
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 832, in commit
self._prepare_impl()
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 811, in _prepare_impl
self.session.flush()
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
self.flush(objects)
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3588, in flush
with util.safe_reraise():
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in exit
compat.raise(
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise
raise exception
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
flush_context.execute()
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
rec.execute(self)
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
util.preloaded.orm_persistence.save_obj(
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj
_emit_update_statements(
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1001, in _emit_update_statements
c = connection._execute_20(
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
return connection._execute_clauseelement(
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
ret = self._execute_context(
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1948, in _execute_context
self.handle_dbapi_exception(
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2129, in handle_dbapi_exception
util.raise(
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise
raise exception
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1905, in _execute_context
self.dialect.do_execute(
File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
File "/home/airflow/.local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 206, in execute
res = self._query(query)
File "/home/airflow/.local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 319, in _query
db.query(q)
File "/home/airflow/.local/lib/python3.10/site-packages/MySQLdb/connections.py", line 254, in query
_mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
SQL: UPDATE rendered_task_instance_fields SET k8s_pod_yaml=%s WHERE rendered_task_instance_fields.dag_id = %s AND rendered_task_instance_fields.task_id = %s AND rendered_task_instance_fields.run_id = %s AND rendered_task_instance_fields.map_index = %s
What you think should happen instead
K8 executor should complete processing successfully without error
How to reproduce
Trigger multiple runs of the same dag simultaneously so that tasks under the dag get executed around the same time
Operating System
Airflow docker image tag 2.6.1-python3.10
Versions of Apache Airflow Providers
No response
Deployment
Other 3rd-party Helm chart
Deployment details
User community airflow-helm chart
https://github.com/airflow-helm
Anything else
This occurs consistently. If multiple runs for the dag are executed with a delay of few minutes, K8 executor completes successfully
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct