diff --git a/src/integrations/prefect-kubernetes/prefect_kubernetes/worker.py b/src/integrations/prefect-kubernetes/prefect_kubernetes/worker.py index ef7ceb8b631d..0fde9375992b 100644 --- a/src/integrations/prefect-kubernetes/prefect_kubernetes/worker.py +++ b/src/integrations/prefect-kubernetes/prefect_kubernetes/worker.py @@ -1048,6 +1048,20 @@ async def _watch_job( if not first_container_status: logger.error(f"Job {job_name!r}: No pods found for job.") return -1 + + # In some cases, the pod will still be running at this point. + # We can assume that the job is still running and return 0 to prevent marking the flow run as crashed + elif first_container_status.state and ( + first_container_status.state.running is not None + or first_container_status.state.waiting is not None + ): + logger.warning( + f"The worker's watch for job {job_name!r} has exited early. Check the logs for more information." + " The job is still running, but the worker will not wait for it to complete." + ) + # Return 0 to prevent marking the flow run as crashed + return 0 + # In some cases, such as spot instance evictions, the pod will be forcibly # terminated and not report a status correctly. elif ( diff --git a/src/integrations/prefect-kubernetes/pyproject.toml b/src/integrations/prefect-kubernetes/pyproject.toml index 3ba1acae82de..37245b5e0fec 100644 --- a/src/integrations/prefect-kubernetes/pyproject.toml +++ b/src/integrations/prefect-kubernetes/pyproject.toml @@ -47,6 +47,7 @@ dev = [ "pytest-asyncio", "pytest", "pytest-env", + "pytest-timeout", "pytest-xdist", ] @@ -80,3 +81,4 @@ show_missing = true [tool.pytest.ini_options] asyncio_mode = "auto" env = ["PREFECT_TEST_MODE=1"] +timeout = 30 diff --git a/src/integrations/prefect-kubernetes/tests/test_worker.py b/src/integrations/prefect-kubernetes/tests/test_worker.py index 37e661ed1523..0101264fa952 100644 --- a/src/integrations/prefect-kubernetes/tests/test_worker.py +++ b/src/integrations/prefect-kubernetes/tests/test_worker.py @@ -2430,6 +2430,17 @@ async def test_streaming_pod_logs_timeout_warns( mock_pod, mock_job, ): + job_pod = MagicMock(spec=kubernetes_asyncio.client.V1Pod) + mock_container_status = MagicMock( + spec=kubernetes_asyncio.client.V1ContainerStatus + ) + mock_container_status.state.running = None + mock_container_status.state.waiting = None + job_pod.status.container_statuses = [mock_container_status] + mock_core_client.return_value.list_namespaced_pod.return_value.items = [ + job_pod + ] + async def mock_stream(*args, **kwargs): mock_job.status.completion_time = pendulum.now("utc").timestamp() stream = [ @@ -2516,6 +2527,16 @@ async def mock_log_stream(*args, **kwargs): mock_core_client.return_value.read_namespaced_pod_log.return_value.stream = mock_log_stream mock_watch.return_value.stream = mock.Mock(side_effect=mock_stream) + job_pod = MagicMock(spec=kubernetes_asyncio.client.V1Pod) + mock_container_status = MagicMock( + spec=kubernetes_asyncio.client.V1ContainerStatus + ) + mock_container_status.state.running = None + mock_container_status.state.waiting = None + job_pod.status.container_statuses = [mock_container_status] + mock_core_client.return_value.list_namespaced_pod.return_value.items = [ + job_pod + ] default_configuration.job_watch_timeout_seconds = 100 async with KubernetesWorker(work_pool_name="test") as k8s_worker: @@ -2595,6 +2616,8 @@ async def test_watch_stops_after_backoff_limit_reached( spec=kubernetes_asyncio.client.V1ContainerStatus ) mock_container_status.state.terminated.exit_code = 137 + mock_container_status.state.running = None + mock_container_status.state.waiting = None job_pod.status.container_statuses = [mock_container_status] mock_core_client.return_value.list_namespaced_pod.return_value.items = [ job_pod @@ -2679,6 +2702,8 @@ async def test_watch_handles_pod_without_exit_code( # The container may exist but because it has been forcefully terminated # it will not have an exit code. mock_container_status.state.terminated = None + mock_container_status.state.running = None + mock_container_status.state.waiting = None job_pod.status.container_statuses = [mock_container_status] mock_core_client.return_value.list_namespaced_pod.return_value.items = [ job_pod @@ -2761,6 +2786,44 @@ async def mock_stream(*args, **kwargs): ] ) + async def test_watch_early_exit( + self, + default_configuration: KubernetesWorkerJobConfiguration, + flow_run, + mock_batch_client, + mock_core_client, + mock_watch, + mock_job, + mock_pod, + ): + mock_batch_client.return_value.read_namespaced_job.return_value.status.completion_time = None + job_pod = MagicMock(spec=kubernetes_asyncio.client.V1Pod) + job_pod.status.phase = "Running" + mock_container_status = MagicMock( + spec=kubernetes_asyncio.client.V1ContainerStatus + ) + mock_container_status.state.running = MagicMock( + start_time=pendulum.now("utc") + ) + job_pod.status.container_statuses = [mock_container_status] + mock_core_client.return_value.list_namespaced_pod.return_value.items = [ + job_pod + ] + + async def mock_stream(*args, **kwargs): + if kwargs["func"] == mock_core_client.return_value.list_namespaced_pod: + yield {"object": mock_pod, "type": "ADDED"} + + if kwargs["func"] == mock_batch_client.return_value.list_namespaced_job: + raise Exception("This is a test exception") + + mock_watch.return_value.stream = mock.Mock(side_effect=mock_stream) + + async with KubernetesWorker(work_pool_name="test") as k8s_worker: + result = await k8s_worker.run(flow_run, default_configuration) + + assert result.status_code == 0 + @pytest.fixture async def mock_events(self, mock_core_client): mock_core_client.return_value.list_namespaced_event.return_value = (