Source code for httpcore._bytestreams

from typing import AsyncIterator, Callable, Iterator

from ._async.base import AsyncByteStream
from ._sync.base import SyncByteStream


[docs]class PlainByteStream(AsyncByteStream, SyncByteStream): """ A concrete implementation for either sync or async byte streams. Example:: stream = httpcore.PlainByteStream(b"123") Parameters ---------- content: A plain byte string used as the content of the stream. """ def __init__(self, content: bytes) -> None: self._content = content def __iter__(self) -> Iterator[bytes]: yield self._content async def __aiter__(self) -> AsyncIterator[bytes]: yield self._content
[docs]class IteratorByteStream(SyncByteStream): """ A concrete implementation for sync byte streams. Example:: def generate_content(): yield b"Hello, world!" ... stream = httpcore.IteratorByteStream(generate_content()) Parameters ---------- iterator: A sync byte iterator, used as the content of the stream. close_func: An optional function called when closing the stream. """ def __init__(self, iterator: Iterator[bytes], close_func: Callable = None) -> None: self._iterator = iterator self._close_func = close_func def __iter__(self) -> Iterator[bytes]: for chunk in self._iterator: yield chunk def close(self) -> None: if self._close_func is not None: self._close_func()
[docs]class AsyncIteratorByteStream(AsyncByteStream): """ A concrete implementation for async byte streams. Example:: async def generate_content(): yield b"Hello, world!" ... stream = httpcore.AsyncIteratorByteStream(generate_content()) Parameters ---------- aiterator: An async byte iterator, used as the content of the stream. aclose_func: An optional async function called when closing the stream. """ def __init__( self, aiterator: AsyncIterator[bytes], aclose_func: Callable = None ) -> None: self._aiterator = aiterator self._aclose_func = aclose_func async def __aiter__(self) -> AsyncIterator[bytes]: async for chunk in self._aiterator: yield chunk async def aclose(self) -> None: if self._aclose_func is not None: await self._aclose_func()