diff --git a/kubernetes_asyncio/watch/watch.py b/kubernetes_asyncio/watch/watch.py index a8195a8ab..7c30aa483 100644 --- a/kubernetes_asyncio/watch/watch.py +++ b/kubernetes_asyncio/watch/watch.py @@ -36,13 +36,6 @@ def _find_return_type(func): return "" -async def iter_resp_lines(resp): - line = await resp.content.readline() - if isinstance(line, bytes): - line = line.decode('utf8') - return line - - class Stream(object): def __init__(self, func, *args, **kwargs): @@ -102,15 +95,26 @@ async def __anext__(self): return await self.next() async def next(self): + # Set the response object to the user supplied function (eg + # `list_namespaced_pods`) if this is the first iteration. if self.resp is None: self.resp = await self.func() + # Abort at the current iteration if the user has called `stop` on this + # stream instance. if self._stop: raise StopAsyncIteration - ret = await iter_resp_lines(self.resp) - ret = self.unmarshal_event(ret, self.return_type) - return ret + # Fetch the next K8s response. + line = await self.resp.content.readline() + line = line.decode('utf8') + + # Stop the iterator if K8s sends an empty response. This happens when + # eg the supplied timeout has expired. + if line == '': + raise StopAsyncIteration + + return self.unmarshal_event(line, self.return_type) def stream(self, func, *args, **kwargs): """Watch an API resource and stream the result back via a generator. diff --git a/kubernetes_asyncio/watch/watch_test.py b/kubernetes_asyncio/watch/watch_test.py index daa15c36f..c9fc4cd5d 100644 --- a/kubernetes_asyncio/watch/watch_test.py +++ b/kubernetes_asyncio/watch/watch_test.py @@ -14,6 +14,7 @@ from asynctest import CoroutineMock, Mock, TestCase, patch +import json import kubernetes_asyncio from kubernetes_asyncio.watch import Watch @@ -23,30 +24,60 @@ class WatchTest(TestCase): async def test_watch_with_decode(self): fake_resp = CoroutineMock() fake_resp.content.readline = CoroutineMock() - fake_resp.content.readline.side_effect = [ - '{"type": "ADDED", "object": {"metadata": {"name": "test1"},"spec": {}, "status": {}}}', - '{"type": "ADDED", "object": {"metadata": {"name": "test2"},"spec": {}, "status": {}}}', - '{"type": "ADDED", "object": {"metadata": {"name": "test3"},"spec": {}, "status": {}}}', - 'should_not_happened'] + side_effects = [ + {"type": "ADDED", "object": {"metadata": {"name": "test0"}, "spec": {}, "status": {}}}, + {"type": "ADDED", "object": {"metadata": {"name": "test1"}, "spec": {}, "status": {}}}, + {"type": "ADDED", "object": {"metadata": {"name": "test2"}, "spec": {}, "status": {}}}, + ] + side_effects = [json.dumps(_).encode('utf8') for _ in side_effects] + side_effects = side_effects + [AssertionError('Should not have been called')] + fake_resp.content.readline.side_effect = side_effects fake_api = Mock() fake_api.get_namespaces = CoroutineMock(return_value=fake_resp) fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' watch = kubernetes_asyncio.watch.Watch() - count = 1 - async for e in watch.stream(fake_api.get_namespaces): + count = 0 + async for e in watch.stream(fake_api.get_namespaces, resource_version='123'): self.assertEqual("ADDED", e['type']) # make sure decoder worked and we got a model with the right name self.assertEqual("test%d" % count, e['object'].metadata.name) + + # Stop the watch. This must not return the next event which would + # be an AssertionError exception. count += 1 - # make sure we can stop the watch and the last event with won't be - # returned - if count == 4: + if count == len(side_effects) - 1: watch.stop() fake_api.get_namespaces.assert_called_once_with( _preload_content=False, watch=True) + async def test_watch_k8s_empty_response(self): + """Stop the iterator when the response is empty. + + This typically happens when the user supplied timeout expires. + + """ + # Mock the readline return value to first return a valid response + # followed by an empty response. + fake_resp = CoroutineMock() + fake_resp.content.readline = CoroutineMock() + side_effects = [ + {"type": "ADDED", "object": {"metadata": {"name": "test0"}, "spec": {}, "status": {}}}, + {"type": "ADDED", "object": {"metadata": {"name": "test1"}, "spec": {}, "status": {}}}, + ] + side_effects = [json.dumps(_).encode('utf8') for _ in side_effects] + fake_resp.content.readline.side_effect = side_effects + [b''] + + # Fake the K8s resource object to watch. + fake_api = Mock() + fake_api.get_namespaces = CoroutineMock(return_value=fake_resp) + fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' + + # Iteration must cease after all valid responses were received. + watch = kubernetes_asyncio.watch.Watch() + responses = [_ async for _ in watch.stream(fake_api.get_namespaces)] + assert len(responses) == len(side_effects) def test_unmarshal_with_float_object(self): w = Watch()