Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add semaphore to wait for streams to become available. #38

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 31 additions & 16 deletions httpcore/_async/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from h2.exceptions import NoAvailableStreamIDError
from h2.settings import SettingCodes, Settings

from .._backends.auto import AsyncLock, AsyncSocketStream, AutoBackend
from .._exceptions import ProtocolError
from .._backends.auto import AsyncLock, AsyncSemaphore, AsyncSocketStream, AutoBackend
from .._exceptions import PoolTimeout, ProtocolError
from .base import (
AsyncByteStream,
AsyncHTTPTransport,
Expand Down Expand Up @@ -72,6 +72,17 @@ def read_lock(self) -> AsyncLock:
self._read_lock = self.backend.create_lock()
return self._read_lock

@property
def streams_semaphore(self) -> AsyncSemaphore:
# We do this lazily, to make sure backend autodetection always
# runs within an async context.
if not hasattr(self, "_streams_semaphore"):
semaphore_count = self.h2_state.remote_settings.max_concurrent_streams
self._streams_semaphore = self.backend.create_semaphore(
semaphore_count, PoolTimeout
)
return self._streams_semaphore

async def start_tls(
self, hostname: bytes, timeout: Dict[str, Optional[float]] = None
):
Expand All @@ -91,20 +102,24 @@ async def request(
) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]], AsyncByteStream]:
timeout = {} if timeout is None else timeout

async with self.init_lock:
if not self.sent_connection_init:
# The very first stream is responsible for initiating the connection.
self.state = ConnectionState.ACTIVE
await self.send_connection_init(timeout)
self.sent_connection_init = True

try:
stream_id = self.h2_state.get_next_available_stream_id()
except NoAvailableStreamIDError:
self.state = ConnectionState.FULL
raise NewConnectionRequired()
else:
self.state = ConnectionState.ACTIVE
await self.streams_semaphore.acquire()
try:
async with self.init_lock:
if not self.sent_connection_init:
# The very first stream is responsible for initiating the connection.
self.state = ConnectionState.ACTIVE
await self.send_connection_init(timeout)
self.sent_connection_init = True

try:
stream_id = self.h2_state.get_next_available_stream_id()
except NoAvailableStreamIDError:
self.state = ConnectionState.FULL
raise NewConnectionRequired()
else:
self.state = ConnectionState.ACTIVE
finally:
self.streams_semaphore.release()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so I think the wrapping here isn't quite correct. It'd make sense to instead wrap up the portion of this method after the connection init, so...

# Wrap this up inside the stream semaphore
h2_stream = AsyncHTTP2Stream(stream_id=stream_id, connection=self)
self.streams[stream_id] = h2_stream
self.events[stream_id] = []
return await h2_stream.request(method, url, headers, stream, timeout)

Since that'd ensure that we hold the semaphore for the duration of a request/response.

In fact, it's actually a bit more awkward that tho. We don't want to release the semaphore until the client has closed the response we've returned. So really we want to release the semaphore either on an exception occuring within this block, or on the response being closed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, something like...

async def request(...):
    ...
    await self.streams_semaphore.acquire()
    try:
        h2_stream = AsyncHTTP2Stream(stream_id=stream_id, connection=self)
        self.streams[stream_id] = h2_stream
        self.events[stream_id] = []
        return await h2_stream.request(method, url, headers, stream, timeout)
    except:
        self.streams_semaphore.release()
        raise

...
async def close_stream(self, stream_id: int) -> None:
    self.streams_semaphore.release()
    ...

Alternately we could pass the semaphore to AsyncHTTP2Stream, and wrap up the behaviour there instead. (That approach might feel a bit more tightly constrained)


h2_stream = AsyncHTTP2Stream(stream_id=stream_id, connection=self)
self.streams[stream_id] = h2_stream
Expand Down
47 changes: 31 additions & 16 deletions httpcore/_sync/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from h2.exceptions import NoAvailableStreamIDError
from h2.settings import SettingCodes, Settings

from .._backends.auto import SyncLock, SyncSocketStream, SyncBackend
from .._exceptions import ProtocolError
from .._backends.auto import SyncLock, SyncSemaphore, SyncSocketStream, SyncBackend
from .._exceptions import PoolTimeout, ProtocolError
from .base import (
SyncByteStream,
SyncHTTPTransport,
Expand Down Expand Up @@ -72,6 +72,17 @@ def read_lock(self) -> SyncLock:
self._read_lock = self.backend.create_lock()
return self._read_lock

@property
def streams_semaphore(self) -> SyncSemaphore:
# We do this lazily, to make sure backend autodetection always
# runs within an async context.
if not hasattr(self, "_streams_semaphore"):
semaphore_count = self.h2_state.remote_settings.max_concurrent_streams
self._streams_semaphore = self.backend.create_semaphore(
semaphore_count, PoolTimeout
)
return self._streams_semaphore

def start_tls(
self, hostname: bytes, timeout: Dict[str, Optional[float]] = None
):
Expand All @@ -91,20 +102,24 @@ def request(
) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]], SyncByteStream]:
timeout = {} if timeout is None else timeout

with self.init_lock:
if not self.sent_connection_init:
# The very first stream is responsible for initiating the connection.
self.state = ConnectionState.ACTIVE
self.send_connection_init(timeout)
self.sent_connection_init = True

try:
stream_id = self.h2_state.get_next_available_stream_id()
except NoAvailableStreamIDError:
self.state = ConnectionState.FULL
raise NewConnectionRequired()
else:
self.state = ConnectionState.ACTIVE
self.streams_semaphore.acquire()
try:
with self.init_lock:
if not self.sent_connection_init:
# The very first stream is responsible for initiating the connection.
self.state = ConnectionState.ACTIVE
self.send_connection_init(timeout)
self.sent_connection_init = True

try:
stream_id = self.h2_state.get_next_available_stream_id()
except NoAvailableStreamIDError:
self.state = ConnectionState.FULL
raise NewConnectionRequired()
else:
self.state = ConnectionState.ACTIVE
finally:
self.streams_semaphore.release()

h2_stream = SyncHTTP2Stream(stream_id=stream_id, connection=self)
self.streams[stream_id] = h2_stream
Expand Down