Skip to content

Commit 54cc8a2

Browse files
amoghrajeshYour friendly bot
authored andcommitted
[v3-1-test] Use exc_info for task instance heartbeat failure exception logging (#57172)
closes: #57167 PR #52562 changed `_handle_heartbeat_failures()` to accept an exception parameter and log it as a structured field. Now due to this change, during the first failed heartbeat attempt, the _handle_heartbeat_failures function logs a message by calling log.warning(), which accepts an exception parameter that expects a string type object. However, in the source code, [an exception type object is passed](https://github.com/apache/airflow/blob/54bd5d8cd9f6f477cc83445737614dec81c4323c/task-sdk/src/airflow/sdk/execution_time/supervisor.py#L1126) instead of a string type object. This results in a TypeError (like below) which causes task supervision to fail. The error looked like this: ```python 2025-10-23T17:58:22.900129Z [error ] Task execute_workload[aac34f36-54e1-46e4-ba47-15dba8ba7149] raised unexpected: TypeError('can only concatenate str (not "ConnectError") to str') [celery.app.trace] loc=trace.py:267 Traceback (most recent call last): File "/usr/python/lib/python3.10/site-packages/httpx/_transports/default.py", line 101, in map_httpcore_exceptions yield File "/usr/python/lib/python3.10/site-packages/httpx/_transports/default.py", line 250, in handle_request resp = self._pool.handle_request(req) File "/usr/python/lib/python3.10/site-packages/httpcore/_sync/connection_pool.py", line 256, in handle_request raise exc from None File "/usr/python/lib/python3.10/site-packages/httpcore/_sync/connection_pool.py", line 236, in handle_request response = connection.handle_request( File "/usr/python/lib/python3.10/site-packages/httpcore/_sync/connection.py", line 101, in handle_request raise exc File "/usr/python/lib/python3.10/site-packages/httpcore/_sync/connection.py", line 78, in handle_request stream = self._connect(request) File "/usr/python/lib/python3.10/site-packages/httpcore/_sync/connection.py", line 124, in _connect stream = self._network_backend.connect_tcp(**kwargs) File "/usr/python/lib/python3.10/site-packages/httpcore/_backends/sync.py", line 207, in connect_tcp with map_exceptions(exc_map): File "/usr/python/lib/python3.10/contextlib.py", line 153, in __exit__ self.gen.throw(typ, value, traceback) File "/usr/python/lib/python3.10/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions raise to_exc(exc) from exc httpcore.ConnectError: [Errno 111] Connection refused ``` The change in #52562 was mainly made due to ruff upgrade reasons, so I am going back to using the standard Python logging pattern: pass exception to `exc_info` parameter. After changes, error looks like this: ```python 2025-10-23T18:24:39.303939Z [warning ] Starting call to 'airflow.sdk.api.client.Client.request', this is the 1st time calling it. [airflow.sdk.api.client] loc=before.py:42 2025-10-23T18:24:40.307404Z [warning ] Starting call to 'airflow.sdk.api.client.Client.request', this is the 2nd time calling it. [airflow.sdk.api.client] loc=before.py:42 2025-10-23T18:24:41.417523Z [warning ] Starting call to 'airflow.sdk.api.client.Client.request', this is the 3rd time calling it. [airflow.sdk.api.client] loc=before.py:42 2025-10-23T18:24:43.801145Z [warning ] Starting call to 'airflow.sdk.api.client.Client.request', this is the 4th time calling it. [airflow.sdk.api.client] loc=before.py:42 2025-10-23T18:24:46.690339Z [warning ] Failed to send heartbeat. Will be retried [supervisor] failed_heartbeats=3 loc=supervisor.py:1135 max_retries=3 ti_id=UUID('019a1250-48a0-756c-ab0b-9687289ef580') Traceback (most recent call last): File "/usr/python/lib/python3.10/site-packages/httpx/_transports/default.py", line 101, in map_httpcore_exceptions yield File "/usr/python/lib/python3.10/site-packages/httpx/_transports/default.py", line 250, in handle_request resp = self._pool.handle_request(req) File "/usr/python/lib/python3.10/site-packages/httpcore/_sync/connection_pool.py", line 256, in handle_request raise exc from None File "/usr/python/lib/python3.10/site-packages/httpcore/_sync/connection_pool.py", line 236, in handle_request response = connection.handle_request( File "/usr/python/lib/python3.10/site-packages/httpcore/_sync/connection.py", line 101, in handle_request raise exc File "/usr/python/lib/python3.10/site-packages/httpcore/_sync/connection.py", line 78, in handle_request stream = self._connect(request) File "/usr/python/lib/python3.10/site-packages/httpcore/_sync/connection.py", line 124, in _connect stream = self._network_backend.connect_tcp(**kwargs) File "/usr/python/lib/python3.10/site-packages/httpcore/_backends/sync.py", line 207, in connect_tcp with map_exceptions(exc_map): File "/usr/python/lib/python3.10/contextlib.py", line 153, in __exit__ self.gen.throw(typ, value, traceback) File "/usr/python/lib/python3.10/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions raise to_exc(exc) from exc httpcore.ConnectError: [Errno 111] Connection refused The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py", line 1105, in _send_heartbeat_if_needed self.client.task_instances.heartbeat(self.id, pid=self._process.pid) File "/opt/airflow/task-sdk/src/airflow/sdk/api/client.py", line 259, in heartbeat self.client.put(f"task-instances/{id}/heartbeat", content=body.model_dump_json()) File "/usr/python/lib/python3.10/site-packages/httpx/_client.py", line 1181, in put return self.request( File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 338, in wrapped_f return copy(f, *args, **kw) File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 477, in __call__ do = self.iter(retry_state=retry_state) File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 378, in iter result = action(retry_state) File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 420, in exc_check raise retry_exc.reraise() File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 187, in reraise raise self.last_attempt.result() File "/usr/python/lib/python3.10/concurrent/futures/_base.py", line 451, in result return self.__get_result() File "/usr/python/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result raise self._exception File "/usr/python/lib/python3.10/site-packages/tenacity/__init__.py", line 480, in __call__ result = fn(*args, **kwargs) File "/opt/airflow/task-sdk/src/airflow/sdk/api/client.py", line 894, in request return super().request(*args, **kwargs) File "/usr/python/lib/python3.10/site-packages/httpx/_client.py", line 825, in request return self.send(request, auth=auth, follow_redirects=follow_redirects) File "/usr/python/lib/python3.10/site-packages/httpx/_client.py", line 914, in send response = self._send_handling_auth( File "/usr/python/lib/python3.10/site-packages/httpx/_client.py", line 942, in _send_handling_auth response = self._send_handling_redirects( File "/usr/python/lib/python3.10/site-packages/httpx/_client.py", line 979, in _send_handling_redirects response = self._send_single_request(request) File "/usr/python/lib/python3.10/site-packages/httpx/_client.py", line 1014, in _send_single_request response = transport.handle_request(request) File "/usr/python/lib/python3.10/site-packages/httpx/_transports/default.py", line 249, in handle_request with map_httpcore_exceptions(): File "/usr/python/lib/python3.10/contextlib.py", line 153, in __exit__ self.gen.throw(typ, value, traceback) File "/usr/python/lib/python3.10/site-packages/httpx/_transports/default.py", line 118, in map_httpcore_exceptions raise mapped_exc(message) from exc httpx.ConnectError: [Errno 111] Connection refused ``` (cherry picked from commit 970d7da) Co-authored-by: Amogh Desai <amoghrajesh1999@gmail.com>
1 parent cff8386 commit 54cc8a2

File tree

2 files changed

+3
-3
lines changed

2 files changed

+3
-3
lines changed

task-sdk/src/airflow/sdk/execution_time/supervisor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1125,15 +1125,15 @@ def _send_heartbeat_if_needed(self):
11251125
except Exception as e:
11261126
self._handle_heartbeat_failures(e)
11271127

1128-
def _handle_heartbeat_failures(self, exc: Exception | None):
1128+
def _handle_heartbeat_failures(self, exc: Exception):
11291129
"""Increment the failed heartbeats counter and kill the process if too many failures."""
11301130
self.failed_heartbeats += 1
11311131
log.warning(
11321132
"Failed to send heartbeat. Will be retried",
11331133
failed_heartbeats=self.failed_heartbeats,
11341134
ti_id=self.id,
11351135
max_retries=MAX_FAILED_HEARTBEATS,
1136-
exception=exc,
1136+
exc_info=exc,
11371137
)
11381138
# If we've failed to heartbeat too many times, kill the process
11391139
if self.failed_heartbeats >= MAX_FAILED_HEARTBEATS:

task-sdk/tests/task_sdk/execution_time/test_supervisor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -857,7 +857,7 @@ def test_heartbeat_failures_handling(self, monkeypatch, mocker, captured_logs, t
857857
"level": "warning",
858858
"logger": "supervisor",
859859
"timestamp": mocker.ANY,
860-
"exception": mocker.ANY,
860+
"exc_info": mocker.ANY,
861861
"loc": mocker.ANY,
862862
}
863863

0 commit comments

Comments
 (0)