import enum
from types import TracebackType
from typing import AsyncIterator, Tuple, Type
from .._types import URL, Headers, T
class NewConnectionRequired(Exception):
pass
class ConnectionState(enum.IntEnum):
"""
PENDING READY
| | ^
v V |
ACTIVE |
| | |
| V |
V IDLE-+
FULL |
| |
V V
CLOSED
"""
PENDING = 0 # Connection not yet acquired.
READY = 1 # Re-acquired from pool, about to send a request.
ACTIVE = 2 # Active requests.
FULL = 3 # Active requests, no more stream IDs available.
IDLE = 4 # No active requests.
CLOSED = 5 # Connection closed.
[docs]class AsyncByteStream:
"""
The base interface for request and response bodies.
Concrete implementations should subclass this class, and implement
the :meth:`__aiter__` method, and optionally the :meth:`aclose` method.
"""
[docs] async def __aiter__(self) -> AsyncIterator[bytes]:
"""
Yield bytes representing the request or response body.
"""
yield b"" # pragma: nocover
[docs] async def aclose(self) -> None:
"""
Must be called by the client to indicate that the stream has been closed.
"""
pass # pragma: nocover
[docs]class AsyncHTTPTransport:
"""
The base interface for sending HTTP requests.
Concrete implementations should subclass this class, and implement
the :meth:`arequest` method, and optionally the :meth:`aclose` method.
"""
[docs] async def arequest(
self,
method: bytes,
url: URL,
headers: Headers = None,
stream: AsyncByteStream = None,
ext: dict = None,
) -> Tuple[int, Headers, AsyncByteStream, dict]:
"""
The interface for sending a single HTTP request, and returning a response.
Parameters
----------
method:
The HTTP method, such as ``b'GET'``.
url:
The URL as a 4-tuple of (scheme, host, port, path).
headers:
Any HTTP headers to send with the request.
stream:
The body of the HTTP request.
ext:
A dictionary of optional extensions.
Returns
-------
status_code:
The HTTP status code, such as ``200``.
headers:
Any HTTP headers included on the response.
stream:
The body of the HTTP response.
ext:
A dictionary of optional extensions.
"""
raise NotImplementedError() # pragma: nocover
[docs] async def aclose(self) -> None:
"""
Close the implementation, which should close any outstanding response streams,
and any keep alive connections.
"""
async def __aenter__(self: T) -> T:
return self
async def __aexit__(
self,
exc_type: Type[BaseException] = None,
exc_value: BaseException = None,
traceback: TracebackType = None,
) -> None:
await self.aclose()