Skip to content

Commit

Permalink
An empty K8s response now terminates the iterator.
Browse files Browse the repository at this point in the history
  • Loading branch information
olitheolix committed Jun 20, 2018
1 parent fb2f64e commit 61852ea
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 20 deletions.
24 changes: 14 additions & 10 deletions kubernetes_asyncio/watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,6 @@ def _find_return_type(func):
return ""


async def iter_resp_lines(resp):
line = await resp.content.readline()
if isinstance(line, bytes):
line = line.decode('utf8')
return line


class Stream(object):

def __init__(self, func, *args, **kwargs):
Expand Down Expand Up @@ -102,15 +95,26 @@ async def __anext__(self):
return await self.next()

async def next(self):
# Set the response object to the user supplied function (eg
# `list_namespaced_pods`) if this is the first iteration.
if self.resp is None:
self.resp = await self.func()

# Abort at the current iteration if the user has called `stop` on this
# stream instance.
if self._stop:
raise StopAsyncIteration

ret = await iter_resp_lines(self.resp)
ret = self.unmarshal_event(ret, self.return_type)
return ret
# Fetch the next K8s response.
line = await self.resp.content.readline()
line = line.decode('utf8')

# Stop the iterator if K8s sends an empty response. This happens when
# eg the supplied timeout has expired.
if line == '':
raise StopAsyncIteration

return self.unmarshal_event(line, self.return_type)

def stream(self, func, *args, **kwargs):
"""Watch an API resource and stream the result back via a generator.
Expand Down
51 changes: 41 additions & 10 deletions kubernetes_asyncio/watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from asynctest import CoroutineMock, Mock, TestCase, patch

import json
import kubernetes_asyncio
from kubernetes_asyncio.watch import Watch

Expand All @@ -23,30 +24,60 @@ class WatchTest(TestCase):
async def test_watch_with_decode(self):
fake_resp = CoroutineMock()
fake_resp.content.readline = CoroutineMock()
fake_resp.content.readline.side_effect = [
'{"type": "ADDED", "object": {"metadata": {"name": "test1"},"spec": {}, "status": {}}}',
'{"type": "ADDED", "object": {"metadata": {"name": "test2"},"spec": {}, "status": {}}}',
'{"type": "ADDED", "object": {"metadata": {"name": "test3"},"spec": {}, "status": {}}}',
'should_not_happened']
side_effects = [
{"type": "ADDED", "object": {"metadata": {"name": "test0"}, "spec": {}, "status": {}}},
{"type": "ADDED", "object": {"metadata": {"name": "test1"}, "spec": {}, "status": {}}},
{"type": "ADDED", "object": {"metadata": {"name": "test2"}, "spec": {}, "status": {}}},
]
side_effects = [json.dumps(_).encode('utf8') for _ in side_effects]
side_effects = side_effects + [AssertionError('Should not have been called')]
fake_resp.content.readline.side_effect = side_effects

fake_api = Mock()
fake_api.get_namespaces = CoroutineMock(return_value=fake_resp)
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'

watch = kubernetes_asyncio.watch.Watch()
count = 1
async for e in watch.stream(fake_api.get_namespaces):
count = 0
async for e in watch.stream(fake_api.get_namespaces, resource_version='123'):
self.assertEqual("ADDED", e['type'])
# make sure decoder worked and we got a model with the right name
self.assertEqual("test%d" % count, e['object'].metadata.name)

# Stop the watch. This must not return the next event which would
# be an AssertionError exception.
count += 1
# make sure we can stop the watch and the last event with won't be
# returned
if count == 4:
if count == len(side_effects) - 1:
watch.stop()

fake_api.get_namespaces.assert_called_once_with(
_preload_content=False, watch=True)
async def test_watch_k8s_empty_response(self):
"""Stop the iterator when the response is empty.
This typically happens when the user supplied timeout expires.
"""
# Mock the readline return value to first return a valid response
# followed by an empty response.
fake_resp = CoroutineMock()
fake_resp.content.readline = CoroutineMock()
side_effects = [
{"type": "ADDED", "object": {"metadata": {"name": "test0"}, "spec": {}, "status": {}}},
{"type": "ADDED", "object": {"metadata": {"name": "test1"}, "spec": {}, "status": {}}},
]
side_effects = [json.dumps(_).encode('utf8') for _ in side_effects]
fake_resp.content.readline.side_effect = side_effects + [b'']

# Fake the K8s resource object to watch.
fake_api = Mock()
fake_api.get_namespaces = CoroutineMock(return_value=fake_resp)
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'

# Iteration must cease after all valid responses were received.
watch = kubernetes_asyncio.watch.Watch()
responses = [_ async for _ in watch.stream(fake_api.get_namespaces)]
assert len(responses) == len(side_effects)

def test_unmarshal_with_float_object(self):
w = Watch()
Expand Down

0 comments on commit 61852ea

Please sign in to comment.