Skip to content
This repository has been archived by the owner on Mar 13, 2022. It is now read-only.

Commit

Permalink
Fixes kubernetes-client/python issue 1047 "ResponseNotChunked from wa…
Browse files Browse the repository at this point in the history
…tch"

In recent versions of K8S (>1.16?), when a `Watch.stream()` call uses a
resource_version which is too old the resulting 410 error is wrapped in JSON
and returned in a non-chunked 200 response. Using `resp.stream()` instead of
`resp.read_chunked()` automatically handles the response being either chunked or
non-chunked.
  • Loading branch information
dhague committed Apr 8, 2021
1 parent fb425a3 commit 9039966
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 20 deletions.
2 changes: 1 addition & 1 deletion watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def _find_return_type(func):

def iter_resp_lines(resp):
prev = ""
for seg in resp.read_chunked(decode_content=False):
for seg in resp.stream(amt=None, decode_content=False):
if isinstance(seg, bytes):
seg = seg.decode('utf8')
seg = prev + seg
Expand Down
44 changes: 25 additions & 19 deletions watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_watch_with_decode(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
return_value=[
'{"type": "ADDED", "object": {"metadata": {"name": "test1",'
'"resourceVersion": "1"}, "spec": {}, "status": {}}}\n',
Expand Down Expand Up @@ -63,15 +63,16 @@ def test_watch_with_decode(self):

fake_api.get_namespaces.assert_called_once_with(
_preload_content=False, watch=True)
fake_resp.read_chunked.assert_called_once_with(decode_content=False)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

def test_watch_for_follow(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
return_value=[
'log_line_1\n',
'log_line_2\n'])
Expand All @@ -92,7 +93,8 @@ def test_watch_for_follow(self):

fake_api.read_namespaced_pod_log.assert_called_once_with(
_preload_content=False, follow=True)
fake_resp.read_chunked.assert_called_once_with(decode_content=False)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

Expand All @@ -112,6 +114,7 @@ def test_watch_resource_version_set(self):
'{"type": "ADDED", "object": {"metadata": {"name": "test3",'
'"resourceVersion": "3"}, "spec": {}, "status": {}}}\n'
]

# return nothing on the first call and values on the second
# this emulates a watch from a rv that returns nothing in the first k8s
# watch reset and values later
Expand All @@ -123,7 +126,7 @@ def get_values(*args, **kwargs):
else:
return values

fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
side_effect=get_values)

fake_api = Mock()
Expand Down Expand Up @@ -170,7 +173,7 @@ def test_watch_stream_twice(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
return_value=['{"type": "ADDED", "object": 1}\n'] * 4)

fake_api = Mock()
Expand All @@ -186,8 +189,8 @@ def test_watch_stream_twice(self):
self.assertEqual(count, 3)
fake_api.get_namespaces.assert_called_once_with(
_preload_content=False, watch=True)
fake_resp.read_chunked.assert_called_once_with(
decode_content=False)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

Expand All @@ -197,7 +200,7 @@ def test_watch_stream_loop(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
return_value=['{"type": "ADDED", "object": 1}\n'])

fake_api = Mock()
Expand All @@ -219,7 +222,7 @@ def test_watch_stream_loop(self):

self.assertEqual(count, 2)
self.assertEqual(fake_api.get_namespaces.call_count, 2)
self.assertEqual(fake_resp.read_chunked.call_count, 2)
self.assertEqual(fake_resp.stream.call_count, 2)
self.assertEqual(fake_resp.close.call_count, 2)
self.assertEqual(fake_resp.release_conn.call_count, 2)

Expand Down Expand Up @@ -256,7 +259,7 @@ def test_watch_with_exception(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(side_effect=KeyError('expected'))
fake_resp.stream = Mock(side_effect=KeyError('expected'))

fake_api = Mock()
fake_api.get_thing = Mock(return_value=fake_resp)
Expand All @@ -271,15 +274,16 @@ def test_watch_with_exception(self):

fake_api.get_thing.assert_called_once_with(
_preload_content=False, watch=True)
fake_resp.read_chunked.assert_called_once_with(decode_content=False)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

def test_watch_with_error_event(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
return_value=[
'{"type": "ERROR", "object": {"code": 410, '
'"reason": "Gone", "message": "error message"}}\n'])
Expand All @@ -294,15 +298,16 @@ def test_watch_with_error_event(self):

fake_api.get_thing.assert_called_once_with(
_preload_content=False, watch=True)
fake_resp.read_chunked.assert_called_once_with(decode_content=False)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

def test_watch_retries_on_error_event(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
return_value=[
'{"type": "ERROR", "object": {"code": 410, '
'"reason": "Gone", "message": "error message"}}\n'])
Expand All @@ -320,16 +325,16 @@ def test_watch_retries_on_error_event(self):
# Two calls should be expected during a retry
fake_api.get_thing.assert_has_calls(
[call(resource_version=0, _preload_content=False, watch=True)] * 2)
fake_resp.read_chunked.assert_has_calls(
[call(decode_content=False)] * 2)
fake_resp.stream.assert_has_calls(
[call(amt=None, decode_content=False)] * 2)
assert fake_resp.close.call_count == 2
assert fake_resp.release_conn.call_count == 2

def test_watch_with_error_event_and_timeout_param(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
return_value=[
'{"type": "ERROR", "object": {"code": 410, '
'"reason": "Gone", "message": "error message"}}\n'])
Expand All @@ -346,7 +351,8 @@ def test_watch_with_error_event_and_timeout_param(self):

fake_api.get_thing.assert_called_once_with(
_preload_content=False, watch=True, timeout_seconds=10)
fake_resp.read_chunked.assert_called_once_with(decode_content=False)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

Expand Down

0 comments on commit 9039966

Please sign in to comment.