Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions task-sdk/src/airflow/sdk/execution_time/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,28 @@ def handle_requests(self, log: FilteringBoundLogger) -> Generator[None, _Request
),
request_id=request.id,
)
except Exception as e:
# Generic exception handling so a transient network error (httpx.ConnectError /
# httpx.TimeoutException) or any other exception
# doesn't crash this generator and crash the IPC communication between supervisor and task.
log.exception(
"Unhandled exception while handling task request",
request_id=request.id,
exc_info=e,
)
Comment thread
jason810496 marked this conversation as resolved.
with suppress(Exception):
self.send_msg(
msg=None,
error=ErrorResponse(
error=ErrorType.API_SERVER_ERROR,
detail={
"status_code": None,
"message": str(e),
"exception_type": type(e).__name__,
},
),
request_id=request.id,
)
finally:
if token is not None:
otel_context.detach(token)
Expand Down
40 changes: 40 additions & 0 deletions task-sdk/tests/task_sdk/execution_time/test_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3023,6 +3023,46 @@ def test_handle_requests_api_server_error(self, watched_subprocess, mocker):
"detail": error.response.json(),
}

def test_handle_requests_network_exception_does_not_crash_loop(self, watched_subprocess, mocker):
"""A transient network error must not crash the IPC generator.

Without the catch-all in handle_requests, an httpx.ConnectError would
propagate, the generator would terminate, the task subprocess would
get EOFError on every subsequent send, and the worker would be stuck.
Verify that the error is reported back to the task as an
API_SERVER_ERROR ErrorResponse and that the loop stays alive for the
next request.
"""
watched_subprocess, read_socket = watched_subprocess

# First request raises a network exception, second succeeds.
first_call = httpx.ConnectError("connection refused")
watched_subprocess.client.task_instances.succeed = mocker.Mock(side_effect=[first_call, None])

generator = watched_subprocess.handle_requests(log=mocker.Mock())
next(generator)

# First request — should produce an ErrorResponse, not crash the generator.
msg1 = SucceedTask(end_date=timezone.parse("2024-10-31T12:00:00Z"))
req1 = _RequestFrame(id=randint(1, 2**32 - 1), body=msg1.model_dump())
generator.send(req1)

read_socket.settimeout(0.5)
frame_len = int.from_bytes(read_socket.recv(4), "big")
bytes_ = read_socket.recv(frame_len)
frame = msgspec.msgpack.Decoder(_ResponseFrame).decode(bytes_)

assert frame.id == req1.id
assert frame.error is not None
assert frame.error["error"] == "API_SERVER_ERROR"
assert frame.error["detail"]["exception_type"] == "ConnectError"

# Second request — generator must still be alive and process it normally.
msg2 = SucceedTask(end_date=timezone.parse("2024-10-31T12:01:00Z"))
req2 = _RequestFrame(id=randint(1, 2**32 - 1), body=msg2.model_dump())
# Should not raise StopIteration (which would mean the loop crashed).
generator.send(req2)


class TestSetSupervisorComms:
class DummyComms:
Expand Down
Loading