Skip to content

Commit

Permalink
Add dedicated ConcurrencyError exception.
Browse files Browse the repository at this point in the history
Previously, a generic RuntimeError was used.

Fix #1499.
  • Loading branch information
aaugustin committed Sep 9, 2024
1 parent f9cea9c commit 070ff1a
Show file tree
Hide file tree
Showing 13 changed files with 140 additions and 87 deletions.
4 changes: 4 additions & 0 deletions docs/project/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ Improvements

* Improved reporting of errors during the opening handshake.

* Raised :exc:`~exceptions.ConcurrencyError` on unsupported concurrent calls.
Previously, :exc:`RuntimeError` was raised. For backwards compatibility,
:exc:`~exceptions.ConcurrencyError` is a subclass of :exc:`RuntimeError`.

13.0.1
------

Expand Down
5 changes: 5 additions & 0 deletions docs/reference/exceptions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ translated to :exc:`ConnectionClosedError` in the other implementations.

.. autoexception:: InvalidState

Miscellaneous exceptions
------------------------

.. autoexception:: ConcurrencyError

Legacy exceptions
-----------------

Expand Down
3 changes: 3 additions & 0 deletions src/websockets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"HeadersLike",
"MultipleValuesError",
# .exceptions
"ConcurrencyError",
"ConnectionClosed",
"ConnectionClosedError",
"ConnectionClosedOK",
Expand Down Expand Up @@ -72,6 +73,7 @@
from .client import ClientProtocol
from .datastructures import Headers, HeadersLike, MultipleValuesError
from .exceptions import (
ConcurrencyError,
ConnectionClosed,
ConnectionClosedError,
ConnectionClosedOK,
Expand Down Expand Up @@ -134,6 +136,7 @@
"HeadersLike": ".datastructures",
"MultipleValuesError": ".datastructures",
# .exceptions
"ConcurrencyError": ".exceptions",
"ConnectionClosed": ".exceptions",
"ConnectionClosedError": ".exceptions",
"ConnectionClosedOK": ".exceptions",
Expand Down
36 changes: 22 additions & 14 deletions src/websockets/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
cast,
)

from ..exceptions import ConnectionClosed, ConnectionClosedOK, ProtocolError
from ..exceptions import (
ConcurrencyError,
ConnectionClosed,
ConnectionClosedOK,
ProtocolError,
)
from ..frames import DATA_OPCODES, BytesLike, CloseCode, Frame, Opcode
from ..http11 import Request, Response
from ..protocol import CLOSED, OPEN, Event, Protocol, State
Expand Down Expand Up @@ -262,16 +267,16 @@ async def recv(self, decode: bool | None = None) -> Data:
Raises:
ConnectionClosed: When the connection is closed.
RuntimeError: If two coroutines call :meth:`recv` or
ConcurrencyError: If two coroutines call :meth:`recv` or
:meth:`recv_streaming` concurrently.
"""
try:
return await self.recv_messages.get(decode)
except EOFError:
raise self.protocol.close_exc from self.recv_exc
except RuntimeError:
raise RuntimeError(
except ConcurrencyError:
raise ConcurrencyError(
"cannot call recv while another coroutine "
"is already running recv or recv_streaming"
) from None
Expand All @@ -283,8 +288,9 @@ async def recv_streaming(self, decode: bool | None = None) -> AsyncIterator[Data
This method is designed for receiving fragmented messages. It returns an
asynchronous iterator that yields each fragment as it is received. This
iterator must be fully consumed. Else, future calls to :meth:`recv` or
:meth:`recv_streaming` will raise :exc:`RuntimeError`, making the
connection unusable.
:meth:`recv_streaming` will raise
:exc:`~websockets.exceptions.ConcurrencyError`, making the connection
unusable.
:meth:`recv_streaming` raises the same exceptions as :meth:`recv`.
Expand Down Expand Up @@ -315,7 +321,7 @@ async def recv_streaming(self, decode: bool | None = None) -> AsyncIterator[Data
Raises:
ConnectionClosed: When the connection is closed.
RuntimeError: If two coroutines call :meth:`recv` or
ConcurrencyError: If two coroutines call :meth:`recv` or
:meth:`recv_streaming` concurrently.
"""
Expand All @@ -324,8 +330,8 @@ async def recv_streaming(self, decode: bool | None = None) -> AsyncIterator[Data
yield frame
except EOFError:
raise self.protocol.close_exc from self.recv_exc
except RuntimeError:
raise RuntimeError(
except ConcurrencyError:
raise ConcurrencyError(
"cannot call recv_streaming while another coroutine "
"is already running recv or recv_streaming"
) from None
Expand Down Expand Up @@ -593,7 +599,7 @@ async def ping(self, data: Data | None = None) -> Awaitable[float]:
Raises:
ConnectionClosed: When the connection is closed.
RuntimeError: If another ping was sent with the same data and
ConcurrencyError: If another ping was sent with the same data and
the corresponding pong wasn't received yet.
"""
Expand All @@ -607,7 +613,7 @@ async def ping(self, data: Data | None = None) -> Awaitable[float]:
async with self.send_context():
# Protect against duplicates if a payload is explicitly set.
if data in self.pong_waiters:
raise RuntimeError("already waiting for a pong with the same data")
raise ConcurrencyError("already waiting for a pong with the same data")

# Generate a unique random payload otherwise.
while data is None or data in self.pong_waiters:
Expand Down Expand Up @@ -793,7 +799,7 @@ async def send_context(
# Let the caller interact with the protocol.
try:
yield
except (ProtocolError, RuntimeError):
except (ProtocolError, ConcurrencyError):
# The protocol state wasn't changed. Exit immediately.
raise
except Exception as exc:
Expand Down Expand Up @@ -1092,15 +1098,17 @@ def broadcast(
if raise_exceptions:
if sys.version_info[:2] < (3, 11): # pragma: no cover
raise ValueError("raise_exceptions requires at least Python 3.11")
exceptions = []
exceptions: list[Exception] = []

for connection in connections:
exception: Exception

if connection.protocol.state is not OPEN:
continue

if connection.fragmented_send_waiter is not None:
if raise_exceptions:
exception = RuntimeError("sending a fragmented message")
exception = ConcurrencyError("sending a fragmented message")
exceptions.append(exception)
else:
connection.logger.warning(
Expand Down
19 changes: 10 additions & 9 deletions src/websockets/asyncio/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
TypeVar,
)

from ..exceptions import ConcurrencyError
from ..frames import OP_BINARY, OP_CONT, OP_TEXT, Frame
from ..typing import Data

Expand Down Expand Up @@ -49,7 +50,7 @@ async def get(self) -> T:
"""Remove and return an item from the queue, waiting if necessary."""
if not self.queue:
if self.get_waiter is not None:
raise RuntimeError("get is already running")
raise ConcurrencyError("get is already running")
self.get_waiter = self.loop.create_future()
try:
await self.get_waiter
Expand Down Expand Up @@ -135,15 +136,15 @@ async def get(self, decode: bool | None = None) -> Data:
Raises:
EOFError: If the stream of frames has ended.
RuntimeError: If two coroutines run :meth:`get` or :meth:`get_iter`
concurrently.
ConcurrencyError: If two coroutines run :meth:`get` or
:meth:`get_iter` concurrently.
"""
if self.closed:
raise EOFError("stream of frames ended")

if self.get_in_progress:
raise RuntimeError("get() or get_iter() is already running")
raise ConcurrencyError("get() or get_iter() is already running")

# Locking with get_in_progress ensures only one coroutine can get here.
self.get_in_progress = True
Expand Down Expand Up @@ -190,7 +191,7 @@ async def get_iter(self, decode: bool | None = None) -> AsyncIterator[Data]:
:class:`str` or :class:`bytes` for each frame in the message.
The iterator must be fully consumed before calling :meth:`get_iter` or
:meth:`get` again. Else, :exc:`RuntimeError` is raised.
:meth:`get` again. Else, :exc:`ConcurrencyError` is raised.
This method only makes sense for fragmented messages. If messages aren't
fragmented, use :meth:`get` instead.
Expand All @@ -202,15 +203,15 @@ async def get_iter(self, decode: bool | None = None) -> AsyncIterator[Data]:
Raises:
EOFError: If the stream of frames has ended.
RuntimeError: If two coroutines run :meth:`get` or :meth:`get_iter`
concurrently.
ConcurrencyError: If two coroutines run :meth:`get` or
:meth:`get_iter` concurrently.
"""
if self.closed:
raise EOFError("stream of frames ended")

if self.get_in_progress:
raise RuntimeError("get() or get_iter() is already running")
raise ConcurrencyError("get() or get_iter() is already running")

# Locking with get_in_progress ensures only one coroutine can get here.
self.get_in_progress = True
Expand All @@ -236,7 +237,7 @@ async def get_iter(self, decode: bool | None = None) -> AsyncIterator[Data]:
# We cannot handle asyncio.CancelledError because we don't buffer
# previous fragments — we're streaming them. Canceling get_iter()
# here will leave the assembler in a stuck state. Future calls to
# get() or get_iter() will raise RuntimeError.
# get() or get_iter() will raise ConcurrencyError.
frame = await self.frames.get()
self.maybe_resume()
assert frame.opcode is OP_CONT
Expand Down
12 changes: 12 additions & 0 deletions src/websockets/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* :exc:`ProtocolError` (Sans-I/O)
* :exc:`PayloadTooBig` (Sans-I/O)
* :exc:`InvalidState` (Sans-I/O)
* :exc:`ConcurrencyError`
"""

Expand Down Expand Up @@ -62,6 +63,7 @@
"WebSocketProtocolError",
"PayloadTooBig",
"InvalidState",
"ConcurrencyError",
]


Expand Down Expand Up @@ -354,6 +356,16 @@ class InvalidState(WebSocketException, AssertionError):
"""


class ConcurrencyError(WebSocketException, RuntimeError):
"""
Raised when receiving or sending messages concurrently.
WebSocket is a connection-oriented protocol. Reads must be serialized; so
must be writes. However, reading and writing concurrently is possible.
"""


# When type checking, import non-deprecated aliases eagerly. Else, import on demand.
if typing.TYPE_CHECKING:
from .legacy.exceptions import (
Expand Down
Loading

0 comments on commit 070ff1a

Please sign in to comment.