From d00cb53a3e21786cd47a67330a207b795297543f Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Tue, 12 May 2020 16:18:43 +0100 Subject: [PATCH 1/7] Use wait_closed with asyncio, with socket unwrapping workaround. --- httpcore/_backends/asyncio.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/httpcore/_backends/asyncio.py b/httpcore/_backends/asyncio.py index c97001a2..cff0e363 100644 --- a/httpcore/_backends/asyncio.py +++ b/httpcore/_backends/asyncio.py @@ -1,5 +1,5 @@ import asyncio -from ssl import SSLContext +from ssl import SSLContext, SSLWantReadError from typing import Optional from .._exceptions import ( @@ -158,6 +158,14 @@ async def aclose(self) -> None: async with self.write_lock: with map_exceptions({OSError: CloseError}): self.stream_writer.close() + try: + ssl_object = self.stream_writer.get_extra_info("ssl_object") + if ssl_object is not None: + ssl_object.unwrap() + except SSLWantReadError: + pass + else: + await self.stream_writer.wait_closed() def is_connection_dropped(self) -> bool: # Counter-intuitively, what we really want to know here is whether the socket is From 11f6d477b9d1482caacd050b53e671cde379c56b Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Tue, 12 May 2020 16:27:19 +0100 Subject: [PATCH 2/7] Fix for Python 3.6, and comments --- httpcore/_backends/asyncio.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/httpcore/_backends/asyncio.py b/httpcore/_backends/asyncio.py index cff0e363..27a8c3c9 100644 --- a/httpcore/_backends/asyncio.py +++ b/httpcore/_backends/asyncio.py @@ -158,6 +158,8 @@ async def aclose(self) -> None: async with self.write_lock: with map_exceptions({OSError: CloseError}): self.stream_writer.close() + # Unwrap the SSL socket, ignoring want-read errors. + # Refs https://bugs.python.org/issue39758 try: ssl_object = self.stream_writer.get_extra_info("ssl_object") if ssl_object is not None: @@ -165,7 +167,9 @@ async def aclose(self) -> None: except SSLWantReadError: pass else: - await self.stream_writer.wait_closed() + if hasattr(self.stream_writer, "wait_closed"): + # Python 3.7+ + await self.stream_writer.wait_closed() def is_connection_dropped(self) -> bool: # Counter-intuitively, what we really want to know here is whether the socket is From 60ec95b43298afbd349b0c0ffe280ca0e1e3943d Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Tue, 12 May 2020 16:30:00 +0100 Subject: [PATCH 3/7] Add type: ignore for Python 3.6 --- httpcore/_backends/asyncio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/httpcore/_backends/asyncio.py b/httpcore/_backends/asyncio.py index 27a8c3c9..9b770874 100644 --- a/httpcore/_backends/asyncio.py +++ b/httpcore/_backends/asyncio.py @@ -169,7 +169,7 @@ async def aclose(self) -> None: else: if hasattr(self.stream_writer, "wait_closed"): # Python 3.7+ - await self.stream_writer.wait_closed() + await self.stream_writer.wait_closed() # type: ignore def is_connection_dropped(self) -> bool: # Counter-intuitively, what we really want to know here is whether the socket is From 05cae9f02959d299d30643baa997508cffd2252a Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Wed, 13 May 2020 15:23:40 +0100 Subject: [PATCH 4/7] Honor MAX_CONCURRENT_STREAMS --- httpcore/_async/http2.py | 41 ++++++++++++++++++++++++++++------------ httpcore/_sync/http2.py | 41 ++++++++++++++++++++++++++++------------ 2 files changed, 58 insertions(+), 24 deletions(-) diff --git a/httpcore/_async/http2.py b/httpcore/_async/http2.py index 9b3ce2d8..d9f41c88 100644 --- a/httpcore/_async/http2.py +++ b/httpcore/_async/http2.py @@ -8,8 +8,8 @@ from h2.exceptions import NoAvailableStreamIDError from h2.settings import SettingCodes, Settings -from .._backends.auto import AsyncLock, AsyncSocketStream, AutoBackend -from .._exceptions import ProtocolError +from .._backends.auto import AsyncLock, AsyncSemaphore, AsyncSocketStream, AutoBackend +from .._exceptions import PoolTimeout, ProtocolError from .._types import URL, Headers, TimeoutDict from .._utils import get_logger from .base import ( @@ -67,6 +67,17 @@ def read_lock(self) -> AsyncLock: self._read_lock = self.backend.create_lock() return self._read_lock + @property + def max_streams_semaphore(self) -> AsyncSemaphore: + # We do this lazily, to make sure backend autodetection always + # runs within an async context. + if not hasattr(self, "_max_streams_semaphore"): + max_streams = self.h2_state.remote_settings.max_concurrent_streams + self._max_streams_semaphore = self.backend.create_semaphore( + max_streams, exc_class=PoolTimeout + ) + return self._max_streams_semaphore + async def start_tls(self, hostname: bytes, timeout: TimeoutDict = None) -> None: pass @@ -265,16 +276,21 @@ async def request( b"content-length" in seen_headers or b"transfer-encoding" in seen_headers ) - await self.send_headers(method, url, headers, has_body, timeout) - if has_body: - await self.send_body(stream, timeout) - - # Receive the response. - status_code, headers = await self.receive_response(timeout) - reason_phrase = get_reason_phrase(status_code) - stream = AsyncByteStream( - aiterator=self.body_iter(timeout), aclose_func=self._response_closed - ) + await self.connection.max_streams_semaphore.acquire() + try: + await self.send_headers(method, url, headers, has_body, timeout) + if has_body: + await self.send_body(stream, timeout) + + # Receive the response. + status_code, headers = await self.receive_response(timeout) + reason_phrase = get_reason_phrase(status_code) + stream = AsyncByteStream( + aiterator=self.body_iter(timeout), aclose_func=self._response_closed + ) + except: + self.connection.max_streams_semaphore.release() + raise return (b"HTTP/2", status_code, reason_phrase, headers, stream) @@ -346,4 +362,5 @@ async def body_iter(self, timeout: TimeoutDict) -> AsyncIterator[bytes]: break async def _response_closed(self) -> None: + self.connection.max_streams_semaphore.release() await self.connection.close_stream(self.stream_id) diff --git a/httpcore/_sync/http2.py b/httpcore/_sync/http2.py index 47039235..d63e7905 100644 --- a/httpcore/_sync/http2.py +++ b/httpcore/_sync/http2.py @@ -8,8 +8,8 @@ from h2.exceptions import NoAvailableStreamIDError from h2.settings import SettingCodes, Settings -from .._backends.auto import SyncLock, SyncSocketStream, SyncBackend -from .._exceptions import ProtocolError +from .._backends.auto import SyncLock, SyncSemaphore, SyncSocketStream, SyncBackend +from .._exceptions import PoolTimeout, ProtocolError from .._types import URL, Headers, TimeoutDict from .._utils import get_logger from .base import ( @@ -67,6 +67,17 @@ def read_lock(self) -> SyncLock: self._read_lock = self.backend.create_lock() return self._read_lock + @property + def max_streams_semaphore(self) -> SyncSemaphore: + # We do this lazily, to make sure backend autodetection always + # runs within an async context. + if not hasattr(self, "_max_streams_semaphore"): + max_streams = self.h2_state.remote_settings.max_concurrent_streams + self._max_streams_semaphore = self.backend.create_semaphore( + max_streams, exc_class=PoolTimeout + ) + return self._max_streams_semaphore + def start_tls(self, hostname: bytes, timeout: TimeoutDict = None) -> None: pass @@ -265,16 +276,21 @@ def request( b"content-length" in seen_headers or b"transfer-encoding" in seen_headers ) - self.send_headers(method, url, headers, has_body, timeout) - if has_body: - self.send_body(stream, timeout) - - # Receive the response. - status_code, headers = self.receive_response(timeout) - reason_phrase = get_reason_phrase(status_code) - stream = SyncByteStream( - iterator=self.body_iter(timeout), close_func=self._response_closed - ) + self.connection.max_streams_semaphore.acquire() + try: + self.send_headers(method, url, headers, has_body, timeout) + if has_body: + self.send_body(stream, timeout) + + # Receive the response. + status_code, headers = self.receive_response(timeout) + reason_phrase = get_reason_phrase(status_code) + stream = SyncByteStream( + iterator=self.body_iter(timeout), close_func=self._response_closed + ) + except: + self.connection.max_streams_semaphore.release() + raise return (b"HTTP/2", status_code, reason_phrase, headers, stream) @@ -346,4 +362,5 @@ def body_iter(self, timeout: TimeoutDict) -> Iterator[bytes]: break def _response_closed(self) -> None: + self.connection.max_streams_semaphore.release() self.connection.close_stream(self.stream_id) From 0d84a652aaf4f8e3d5c3375e6211df4dbebb4cb8 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Thu, 14 May 2020 15:00:47 +0100 Subject: [PATCH 5/7] Drop erronous commit --- httpcore/_backends/asyncio.py | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/httpcore/_backends/asyncio.py b/httpcore/_backends/asyncio.py index 9b770874..c97001a2 100644 --- a/httpcore/_backends/asyncio.py +++ b/httpcore/_backends/asyncio.py @@ -1,5 +1,5 @@ import asyncio -from ssl import SSLContext, SSLWantReadError +from ssl import SSLContext from typing import Optional from .._exceptions import ( @@ -158,18 +158,6 @@ async def aclose(self) -> None: async with self.write_lock: with map_exceptions({OSError: CloseError}): self.stream_writer.close() - # Unwrap the SSL socket, ignoring want-read errors. - # Refs https://bugs.python.org/issue39758 - try: - ssl_object = self.stream_writer.get_extra_info("ssl_object") - if ssl_object is not None: - ssl_object.unwrap() - except SSLWantReadError: - pass - else: - if hasattr(self.stream_writer, "wait_closed"): - # Python 3.7+ - await self.stream_writer.wait_closed() # type: ignore def is_connection_dropped(self) -> bool: # Counter-intuitively, what we really want to know here is whether the socket is From 0e2ccca73820b147ac801f4285275bdc3a5bdde9 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Thu, 14 May 2020 15:05:41 +0100 Subject: [PATCH 6/7] Don't release stream concurrency semaphore until *after* network closing the stream --- httpcore/_async/http2.py | 6 ++++-- httpcore/_sync/http2.py | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/httpcore/_async/http2.py b/httpcore/_async/http2.py index d9f41c88..ffe6515a 100644 --- a/httpcore/_async/http2.py +++ b/httpcore/_async/http2.py @@ -362,5 +362,7 @@ async def body_iter(self, timeout: TimeoutDict) -> AsyncIterator[bytes]: break async def _response_closed(self) -> None: - self.connection.max_streams_semaphore.release() - await self.connection.close_stream(self.stream_id) + try: + await self.connection.close_stream(self.stream_id) + finally: + self.connection.max_streams_semaphore.release() diff --git a/httpcore/_sync/http2.py b/httpcore/_sync/http2.py index d63e7905..88bcf6f8 100644 --- a/httpcore/_sync/http2.py +++ b/httpcore/_sync/http2.py @@ -362,5 +362,7 @@ def body_iter(self, timeout: TimeoutDict) -> Iterator[bytes]: break def _response_closed(self) -> None: - self.connection.max_streams_semaphore.release() - self.connection.close_stream(self.stream_id) + try: + self.connection.close_stream(self.stream_id) + finally: + self.connection.max_streams_semaphore.release() From 2c085741911156ffeb90ec8f0861cf515d3f48da Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Thu, 14 May 2020 15:19:25 +0100 Subject: [PATCH 7/7] Don't use bare except --- httpcore/_async/http2.py | 2 +- httpcore/_sync/http2.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/httpcore/_async/http2.py b/httpcore/_async/http2.py index ffe6515a..c1b0a5ce 100644 --- a/httpcore/_async/http2.py +++ b/httpcore/_async/http2.py @@ -288,7 +288,7 @@ async def request( stream = AsyncByteStream( aiterator=self.body_iter(timeout), aclose_func=self._response_closed ) - except: + except Exception: self.connection.max_streams_semaphore.release() raise diff --git a/httpcore/_sync/http2.py b/httpcore/_sync/http2.py index 88bcf6f8..35213600 100644 --- a/httpcore/_sync/http2.py +++ b/httpcore/_sync/http2.py @@ -288,7 +288,7 @@ def request( stream = SyncByteStream( iterator=self.body_iter(timeout), close_func=self._response_closed ) - except: + except Exception: self.connection.max_streams_semaphore.release() raise