Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Fix k8s pod.execute randomly stuck indefinitely by logs consu… #23656

Merged
merged 1 commit into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 5 additions & 42 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
# specific language governing permissions and limitations
# under the License.
"""Launches PODs"""
import asyncio
import concurrent
import json
import math
import time
Expand Down Expand Up @@ -195,40 +193,6 @@ def follow_container_logs(self, pod: V1Pod, container_name: str) -> PodLoggingSt
)
return self.fetch_container_logs(pod=pod, container_name=container_name, follow=True)

def log_iterable(self, logs: Iterable[bytes]) -> Optional[DateTime]:
timestamp = None
for line in logs:
timestamp, message = self.parse_log_line(line.decode('utf-8', errors="backslashreplace"))
self.log.info(message)
return timestamp

def consume_container_logs_stream(
self, pod: V1Pod, container_name: str, stream: Iterable[bytes]
) -> Optional[DateTime]:
async def async_await_container_completion() -> None:
await asyncio.sleep(1)
while self.container_is_running(pod=pod, container_name=container_name):
await asyncio.sleep(1)

loop = asyncio.get_event_loop()
await_container_completion = loop.create_task(async_await_container_completion())
log_stream = asyncio.ensure_future(loop.run_in_executor(None, self.log_iterable, stream))
tasks: Iterable[asyncio.Task] = {await_container_completion, log_stream}
loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
if log_stream.done():
return log_stream.result()

log_stream.cancel()
try:
loop.run_until_complete(log_stream)
except concurrent.futures.CancelledError:
self.log.warning(
"Container %s log read was interrupted at some point caused by log rotation "
"see https://github.com/apache/airflow/issues/23497 for reference.",
container_name,
)
return None

def fetch_container_logs(
self, pod: V1Pod, container_name: str, *, follow=False, since_time: Optional[DateTime] = None
) -> PodLoggingStatus:
Expand Down Expand Up @@ -256,11 +220,10 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True)
),
follow=follow,
)
if follow:
timestamp = self.consume_container_logs_stream(pod, container_name, logs)
else:
timestamp = self.log_iterable(logs)

for raw_line in logs:
line = raw_line.decode('utf-8', errors="backslashreplace")
timestamp, message = self.parse_log_line(line)
self.log.info(message)
except BaseHTTPError as e:
self.log.warning(
"Reading of logs interrupted with error %r; will retry. "
Expand Down Expand Up @@ -293,7 +256,7 @@ def consume_logs(*, since_time: Optional[DateTime] = None, follow: bool = True)
time.sleep(1)

def await_container_completion(self, pod: V1Pod, container_name: str) -> None:
while self.container_is_running(pod=pod, container_name=container_name):
while not self.container_is_running(pod=pod, container_name=container_name):
time.sleep(1)

def await_pod_completion(self, pod: V1Pod) -> V1Pod:
Expand Down
28 changes: 2 additions & 26 deletions tests/providers/cncf/kubernetes/utils/test_pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
# specific language governing permissions and limitations
# under the License.
import logging
import time
from typing import Generator
from unittest import mock
from unittest.mock import MagicMock

Expand Down Expand Up @@ -314,7 +312,7 @@ def test_fetch_container_since_time(self, container_running, mock_now):
args, kwargs = self.mock_kube_client.read_namespaced_pod_log.call_args_list[0]
assert kwargs['since_seconds'] == 5

@pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 4, False), (False, 1, True)])
@pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 3, False), (False, 1, True)])
@mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running')
def test_fetch_container_running_follow(
self, container_running_mock, follow, is_running_calls, exp_running
Expand All @@ -324,35 +322,13 @@ def test_fetch_container_running_follow(
When called with follow=False, should return immediately even though still running.
"""
mock_pod = MagicMock()
container_running_mock.side_effect = [True, False, False, False] # called once when follow=False
container_running_mock.side_effect = [True, True, False] # only will be called once
self.mock_kube_client.read_namespaced_pod_log.return_value = [b'2021-01-01 hi']
ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name='base', follow=follow)
assert len(container_running_mock.call_args_list) == is_running_calls
assert ret.last_log_time == DateTime(2021, 1, 1, tzinfo=Timezone('UTC'))
assert ret.running is exp_running

@pytest.mark.parametrize('follow, is_running_calls, exp_running', [(True, 3, False)])
@mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running')
def test_fetch_container_running_follow_when_kube_api_hangs(
self, container_running_mock, follow, is_running_calls, exp_running
):
"""
When called with follow, should keep looping even after disconnections, if pod still running.
"""
mock_pod = MagicMock()
container_running_mock.side_effect = [True, False, False]

def stream_logs() -> Generator:
while True:
time.sleep(1) # this is intentional: urllib3.response.stream() is not async
yield b'2021-01-01 hi'

self.mock_kube_client.read_namespaced_pod_log.return_value = stream_logs()
ret = self.pod_manager.fetch_container_logs(pod=mock_pod, container_name='base', follow=follow)
assert len(container_running_mock.call_args_list) == is_running_calls
assert ret.running is exp_running
assert ret.last_log_time is None


def params_for_test_container_is_running():
"""The `container_is_running` method is designed to handle an assortment of bad objects
Expand Down