Skip to content

Commit

Permalink
Add regression tests and fixes for issue redis#1128
Browse files Browse the repository at this point in the history
  • Loading branch information
kristjanvalur committed Dec 15, 2022
1 parent 74c251a commit 7231d15
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 4 deletions.
13 changes: 11 additions & 2 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,17 @@ async def _send_command_parse_response(self, conn, command_name, *args, **option
"""
Send a command and parse the response
"""
await conn.send_command(*args)
return await self.parse_response(conn, command_name, **options)
try:
await conn.send_command(*args)
return await self.parse_response(conn, command_name, **options)
except BaseException:
# An exception while reading or writing leaves the connection in
# an unknown state. There may be un-written or un-read data
# so we cannot re-use it for a subsequent command/response transaction.
# This may be a TimeoutError or any other error not handled by the
# Connection object itself.
await conn.disconnect(nowait=True)
raise

async def _disconnect_raise(self, conn: Connection, error: Exception):
"""
Expand Down
14 changes: 12 additions & 2 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1231,8 +1231,18 @@ def _send_command_parse_response(self, conn, command_name, *args, **options):
"""
Send a command and parse the response
"""
conn.send_command(*args)
return self.parse_response(conn, command_name, **options)
try:
conn.send_command(*args)
return self.parse_response(conn, command_name, **options)
except BaseException:
# An exception while reading or writing leaves the connection in
# an unknown state. There may be un-written or un-read data
# so we cannot re-use it for a subsequent command/response transaction.
# This can be any error not handled by the Connection itself, such
# as BaseExceptions may have been used to interrupt IO, when using
# gevent.
conn.disconnect()
raise

def _disconnect_raise(self, conn, error):
"""
Expand Down
32 changes: 32 additions & 0 deletions tests/test_asyncio/test_commands.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""
Tests async overrides of commands from their mixins
"""
import asyncio
import binascii
import datetime
import re
from string import ascii_letters
import async_timeout

import pytest
import pytest_asyncio
Expand Down Expand Up @@ -2999,6 +3001,36 @@ async def test_module_list(self, r: redis.Redis):
for x in await r.module_list():
assert isinstance(x, dict)

@pytest.mark.onlynoncluster
async def test_interrupted_command(self, r: redis.Redis):
"""
Regression test for issue #1128: An Un-handled BaseException
will leave the socket with un-read response to a previous
command.
"""
ready = asyncio.Event()

async def helper():
with pytest.raises(asyncio.CancelledError):
# blocking pop
ready.set()
await r.brpop(["nonexist"])
# if all is well, we can continue. The following should not hang.
await r.set("status", "down")
return "done"

task = asyncio.create_task(helper())
await ready.wait()
await asyncio.sleep(0.01)
# the task is now sleeping, lets send it an exception
task.cancel()
try:
async with async_timeout.timeout(0.1):
assert await task == "done"
except asyncio.TimeoutError:
task.cancel()
await task


@pytest.mark.onlynoncluster
class TestBinarySave:
Expand Down
32 changes: 32 additions & 0 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import binascii
import datetime
import re
import threading
import time
from string import ascii_letters
from asyncio import CancelledError
from unittest import mock
from unittest.mock import patch
import socket

import pytest

Expand Down Expand Up @@ -4719,6 +4723,34 @@ def test_psync(self, r):
res = r2.psync(r2.client_id(), 1)
assert b"FULLRESYNC" in res

@pytest.mark.onlynoncluster
def test_interrupted_command(self, r: redis.Redis):
"""
Regression test for issue #1128: An Un-handled BaseException
will leave the socket with un-read response to a previous
command.
"""
print("start")

def helper():
try:
# blocking pop
with patch.object(
socket.socket, "recv_into", side_effect=CancelledError
) as mock_recv:
r.brpop(["nonexist"])
except CancelledError:
print("canc")
pass # we got some BaseException.
# if all is well, we can continue.
r.set("status", "down") # should not hang
return "done"

thread = threading.Thread(target=helper)
thread.start()
thread.join(0.1)
assert not thread.is_alive()


@pytest.mark.onlynoncluster
class TestBinarySave:
Expand Down

0 comments on commit 7231d15

Please sign in to comment.