diff --git a/awscrt/__init__.py b/awscrt/__init__.py index b1a575120..ab3ee339f 100644 --- a/awscrt/__init__.py +++ b/awscrt/__init__.py @@ -7,6 +7,7 @@ 'auth', 'crypto', 'http', + 'aio.http', 'io', 'mqtt', 'mqtt5', diff --git a/awscrt/aio/http.py b/awscrt/aio/http.py new file mode 100644 index 000000000..d37bec401 --- /dev/null +++ b/awscrt/aio/http.py @@ -0,0 +1,423 @@ +""" +HTTP AsyncIO support + +This module provides asyncio wrappers around the awscrt.http module. +All network operations in `awscrt.aio.http` are asynchronous and use Python's asyncio framework. +""" + +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0. + +import _awscrt +import awscrt.exceptions +from awscrt.http import ( + HttpClientConnectionBase, HttpRequest, HttpClientStreamBase, HttpProxyOptions, + Http2Setting, HttpVersion +) +from awscrt.io import ( + ClientBootstrap, SocketOptions, TlsConnectionOptions, InputStream +) +import asyncio +from collections import deque +from io import BytesIO +from concurrent.futures import Future +from typing import List, Tuple, Optional, Callable, AsyncIterator + + +class AIOHttpClientConnectionUnified(HttpClientConnectionBase): + """ + An async unified HTTP client connection for either a HTTP/1 or HTTP/2 connection. + + Use `AIOHttpClientConnectionUnified.new()` to establish a new connection. + """ + + @classmethod + async def new(cls, + host_name: str, + port: int, + bootstrap: Optional[ClientBootstrap] = None, + socket_options: Optional[SocketOptions] = None, + tls_connection_options: Optional[TlsConnectionOptions] = None, + proxy_options: Optional[HttpProxyOptions] = None) -> "AIOHttpClientConnectionUnified": + """ + Asynchronously establish a new AIOHttpClientConnectionUnified. + + Args: + host_name (str): Connect to host. + + port (int): Connect to port. + + bootstrap (Optional [ClientBootstrap]): Client bootstrap to use when initiating socket connection. + If None is provided, the default singleton is used. + + socket_options (Optional[SocketOptions]): Optional socket options. + If None is provided, then default options are used. + + tls_connection_options (Optional[TlsConnectionOptions]): Optional TLS + connection options. If None is provided, then the connection will + be attempted over plain-text. + + proxy_options (Optional[HttpProxyOptions]): Optional proxy options. + If None is provided then a proxy is not used. + + Returns: + AIOHttpClientConnectionUnified: A new unified HTTP client connection. + """ + future = cls._generic_new( + host_name, + port, + bootstrap, + socket_options, + tls_connection_options, + proxy_options, + asyncio_connection=True) + return await asyncio.wrap_future(future) + + async def close(self) -> None: + """Close the connection asynchronously. + + Shutdown is asynchronous. This call has no effect if the connection is already + closing. + + Returns: + None: When shutdown is complete. + """ + _awscrt.http_connection_close(self._binding) + await asyncio.wrap_future(self.shutdown_future) + + def request(self, + request: 'HttpRequest', + request_body_generator: AsyncIterator[bytes] = None, + loop: Optional[asyncio.AbstractEventLoop] = None) -> 'AIOHttpClientStreamUnified': + """Create `AIOHttpClientStreamUnified` to carry out the request/response exchange. + + Args: + request (HttpRequest): Definition for outgoing request. + request_body_generator (AsyncIterator[bytes], optional): Async iterator providing chunks of the request body. + If provided, the body will be sent incrementally as chunks become available. + loop (Optional[asyncio.AbstractEventLoop]): Event loop to use for async operations. + If None, the current event loop is used. + + Returns: + AIOHttpClientStreamUnified: Stream for the HTTP request/response exchange. + """ + return AIOHttpClientStreamUnified(self, request, request_body_generator, loop) + + +class AIOHttpClientConnection(AIOHttpClientConnectionUnified): + """ + An async HTTP/1.1 only client connection. + + Use `AIOHttpClientConnection.new()` to establish a new connection. + """ + + @classmethod + async def new(cls, + host_name: str, + port: int, + bootstrap: Optional[ClientBootstrap] = None, + socket_options: Optional[SocketOptions] = None, + tls_connection_options: Optional[TlsConnectionOptions] = None, + proxy_options: Optional[HttpProxyOptions] = None) -> "AIOHttpClientConnection": + """ + Asynchronously establish a new AIOHttpClientConnection. + + Args: + host_name (str): Connect to host. + + port (int): Connect to port. + + bootstrap (Optional [ClientBootstrap]): Client bootstrap to use when initiating socket connection. + If None is provided, the default singleton is used. + + socket_options (Optional[SocketOptions]): Optional socket options. + If None is provided, then default options are used. + + tls_connection_options (Optional[TlsConnectionOptions]): Optional TLS + connection options. If None is provided, then the connection will + be attempted over plain-text. + + proxy_options (Optional[HttpProxyOptions]): Optional proxy options. + If None is provided then a proxy is not used. + + Returns: + AIOHttpClientConnection: A new HTTP client connection. + """ + future = cls._generic_new( + host_name, + port, + bootstrap, + socket_options, + tls_connection_options, + proxy_options, + expected_version=HttpVersion.Http1_1, + asyncio_connection=True) + return await asyncio.wrap_future(future) + + def request(self, + request: 'HttpRequest', + request_body_generator: AsyncIterator[bytes] = None, + loop: Optional[asyncio.AbstractEventLoop] = None) -> 'AIOHttpClientStream': + """Create `AIOHttpClientStream` to carry out the request/response exchange. + + Args: + request (HttpRequest): Definition for outgoing request. + request_body_generator (AsyncIterator[bytes], optional): Async iterator providing chunks of the request body. + Not supported for HTTP/1.1 connections yet, use the request's body_stream instead. + loop (Optional[asyncio.AbstractEventLoop]): Event loop to use for async operations. + If None, the current event loop is used. + + Returns: + AIOHttpClientStream: Stream for the HTTP request/response exchange. + """ + return AIOHttpClientStream(self, request, loop) + + +class AIOHttp2ClientConnection(AIOHttpClientConnectionUnified): + """ + An async HTTP/2 only client connection. + + Use `AIOHttp2ClientConnection.new()` to establish a new connection. + """ + + @classmethod + async def new(cls, + host_name: str, + port: int, + bootstrap: Optional[ClientBootstrap] = None, + socket_options: Optional[SocketOptions] = None, + tls_connection_options: Optional[TlsConnectionOptions] = None, + proxy_options: Optional[HttpProxyOptions] = None, + initial_settings: Optional[List[Http2Setting]] = None, + on_remote_settings_changed: Optional[Callable[[List[Http2Setting]], + None]] = None) -> "AIOHttp2ClientConnection": + """ + Asynchronously establish an HTTP/2 client connection. + Notes: to set up the connection, the server must support HTTP/2 and TlsConnectionOptions + + This class extends AIOHttpClientConnection with HTTP/2 specific functionality. + + HTTP/2 specific args: + initial_settings (List[Http2Setting]): The initial settings to change for the connection. + + on_remote_settings_changed: Optional callback invoked once the remote peer changes its settings. + And the settings are acknowledged by the local connection. + The function should take the following arguments and return nothing: + + * `settings` (List[Http2Setting]): List of settings that were changed. + """ + future = cls._generic_new( + host_name, + port, + bootstrap, + socket_options, + tls_connection_options, + proxy_options, + expected_version=HttpVersion.Http2, + initial_settings=initial_settings, + on_remote_settings_changed=on_remote_settings_changed, + asyncio_connection=True) + return await asyncio.wrap_future(future) + + def request(self, + request: 'HttpRequest', + request_body_generator: AsyncIterator[bytes] = None, + loop: Optional[asyncio.AbstractEventLoop] = None) -> 'AIOHttp2ClientStream': + """Create `AIOHttp2ClientStream` to carry out the request/response exchange. + + Args: + request (HttpRequest): Definition for outgoing request. + request_body_generator (AsyncIterator[bytes], optional): Async iterator providing chunks of the request body. + If provided, the body will be sent incrementally as chunks become available from the iterator. + loop (Optional[asyncio.AbstractEventLoop]): Event loop to use for async operations. + If None, the current event loop is used. + + Returns: + AIOHttp2ClientStream: Stream for the HTTP/2 request/response exchange. + """ + return AIOHttp2ClientStream(self, request, request_body_generator, loop) + + +class AIOHttpClientStreamUnified(HttpClientStreamBase): + __slots__ = ( + '_response_status_future', + '_response_headers_future', + '_chunk_futures', + '_received_chunks', + '_completion_future', + '_stream_completed', + '_status_code', + '_loop') + + def __init__(self, + connection: AIOHttpClientConnection, + request: HttpRequest, + request_body_generator: AsyncIterator[bytes] = None, + loop: Optional[asyncio.AbstractEventLoop] = None) -> None: + # Initialize the parent class + http2_manual_write = request_body_generator is not None and connection.version is HttpVersion.Http2 + super()._init_common(connection, request, http2_manual_write=http2_manual_write) + + # Attach the event loop for async operations + if loop is None: + # Use the current event loop if none is provided + loop = asyncio.get_event_loop() + elif not isinstance(loop, asyncio.AbstractEventLoop): + raise TypeError("loop must be an instance of asyncio.AbstractEventLoop") + self._loop = loop + + # deque is thread-safe for appending and popping, so that we don't need + # locks to handle the callbacks from the C thread + self._chunk_futures = deque() + self._received_chunks = deque() + self._stream_completed = False + + # Create futures for async operations + self._completion_future = Future() + self._response_status_future = Future() + self._response_headers_future = Future() + self._status_code = None + + self._request_body_generator = request_body_generator + if self._request_body_generator is not None: + self._writer = self._loop.create_task(self._set_request_body_generator(self._request_body_generator)) + + # Activate the stream immediately + _awscrt.http_client_stream_activate(self) + + def _on_response(self, status_code: int, name_value_pairs: List[Tuple[str, str]]) -> None: + self._status_code = status_code + # invoked from the C thread, so we need to schedule the result setting on the event loop + self._response_status_future.set_result(status_code) + self._response_headers_future.set_result(name_value_pairs) + + def _on_body(self, chunk: bytes) -> None: + """Process body chunk on the correct event loop thread.""" + if self._chunk_futures: + future = self._chunk_futures.popleft() + future.set_result(chunk) + else: + self._received_chunks.append(chunk) + + def _on_complete(self, error_code: int) -> None: + """Set the completion status of the stream.""" + if error_code == 0: + self._completion_future.set_result(self._status_code) + else: + self._completion_future.set_exception(awscrt.exceptions.from_code(error_code)) + + # Resolve all pending chunk futures with an empty string to indicate end of stream + while self._chunk_futures: + future = self._chunk_futures.popleft() + future.set_result("") + + async def _set_request_body_generator(self, body_iterator: AsyncIterator[bytes]): + ... + + async def get_response_status_code(self) -> int: + """Get the response status code asynchronously. + + Returns: + int: The response status code. + """ + return await asyncio.wrap_future(self._response_status_future, loop=self._loop) + + async def get_response_headers(self) -> List[Tuple[str, str]]: + """Get the response headers asynchronously. + + Returns: + List[Tuple[str, str]]: The response headers as a list of (name, value) tuples. + """ + return await asyncio.wrap_future(self._response_headers_future, loop=self._loop) + + async def get_next_response_chunk(self) -> bytes: + """Get the next chunk from the response body. + + Returns: + bytes: The next chunk of data from the response body. + Returns empty bytes when the stream is completed and no more chunks are left. + """ + if self._received_chunks: + return self._received_chunks.popleft() + elif self._completion_future.done(): + return b"" + else: + future = Future() + self._chunk_futures.append(future) + return await asyncio.wrap_future(future, loop=self._loop) + + async def wait_for_completion(self) -> int: + """Wait asynchronously for the stream to complete. + + Returns: + int: The response status code. + """ + return await asyncio.wrap_future(self._completion_future, loop=self._loop) + + +class AIOHttpClientStream(AIOHttpClientStreamUnified): + """Async HTTP stream that sends a request and receives a response. + + Create an AIOHttpClientStream with `AIOHttpClientConnection.request()`. + + Attributes: + connection (AIOHttpClientConnection): This stream's connection. + + completion_future (asyncio.Future): Future that will contain + the response status code (int) when the request/response exchange + completes. If the exchange fails to complete, the Future will + contain an exception indicating why it failed. + + Notes: + All async method on a stream (await stream.next(), etc.) must be performed in the + thread that owns the event loop used to create the stream + """ + + def __init__(self, connection: AIOHttpClientConnection, request: HttpRequest, + loop: Optional[asyncio.AbstractEventLoop] = None) -> None: + """Initialize an HTTP client stream. + + Args: + connection (AIOHttpClientConnection): The connection to send the request on. + request (HttpRequest): The HTTP request to send. + loop (Optional[asyncio.AbstractEventLoop]): Event loop to use for async operations. + If None, the current event loop is used. + """ + super().__init__(connection, request, loop=loop) + + +class AIOHttp2ClientStream(AIOHttpClientStreamUnified): + """HTTP/2 stream that sends a request and receives a response. + + Create an AIOHttp2ClientStream with `AIOHttp2ClientConnection.request()`. + """ + + def __init__(self, + connection: AIOHttpClientConnection, + request: HttpRequest, + request_body_generator: AsyncIterator[bytes] = None, + loop: Optional[asyncio.AbstractEventLoop] = None) -> None: + super().__init__(connection, request, request_body_generator=request_body_generator, loop=loop) + + async def _write_data(self, body, end_stream): + future = Future() + body_stream = InputStream.wrap(body, allow_none=True) + + def on_write_complete(error_code: int) -> None: + if future.cancelled(): + # the future was cancelled, so we don't need to set the result or exception + return + if error_code: + future.set_exception(awscrt.exceptions.from_code(error_code)) + else: + future.set_result(None) + + _awscrt.http2_client_stream_write_data(self, body_stream, end_stream, on_write_complete) + await asyncio.wrap_future(future, loop=self._loop) + + async def _set_request_body_generator(self, body_iterator: AsyncIterator[bytes]): + try: + async for chunk in body_iterator: + await self._write_data(BytesIO(chunk), False) + finally: + await self._write_data(None, True) diff --git a/awscrt/http.py b/awscrt/http.py index 85244b5a2..e8a9c2a73 100644 --- a/awscrt/http.py +++ b/awscrt/http.py @@ -107,19 +107,6 @@ def version(self) -> HttpVersion: """HttpVersion: Protocol used by this connection""" return self._version - def close(self) -> "concurrent.futures.Future": - """Close the connection. - - Shutdown is asynchronous. This call has no effect if the connection is already - closing. - - Returns: - concurrent.futures.Future: This connection's :attr:`shutdown_future`, - which completes when shutdown has finished. - """ - _awscrt.http_connection_close(self._binding) - return self.shutdown_future - def is_open(self) -> bool: """ Returns: @@ -130,56 +117,9 @@ def is_open(self) -> bool: return _awscrt.http_connection_is_open(self._binding) -class HttpClientConnection(HttpConnectionBase): - """ - An HTTP client connection. - - Use :meth:`HttpClientConnection.new()` to establish a new connection. - """ +class HttpClientConnectionBase(HttpConnectionBase): __slots__ = ('_host_name', '_port') - @classmethod - def new(cls, - host_name: str, - port: int, - bootstrap: Optional[ClientBootstrap] = None, - socket_options: Optional[SocketOptions] = None, - tls_connection_options: Optional[TlsConnectionOptions] = None, - proxy_options: Optional['HttpProxyOptions'] = None) -> "concurrent.futures.Future": - """ - Asynchronously establish a new HttpClientConnection. - - Args: - host_name (str): Connect to host. - - port (int): Connect to port. - - bootstrap (Optional [ClientBootstrap]): Client bootstrap to use when initiating socket connection. - If None is provided, the default singleton is used. - - socket_options (Optional[SocketOptions]): Optional socket options. - If None is provided, then default options are used. - - tls_connection_options (Optional[TlsConnectionOptions]): Optional TLS - connection options. If None is provided, then the connection will - be attempted over plain-text. - - proxy_options (Optional[HttpProxyOptions]): Optional proxy options. - If None is provided then a proxy is not used. - - Returns: - concurrent.futures.Future: A Future which completes when connection succeeds or fails. - If successful, the Future will contain a new :class:`HttpClientConnection`. - Otherwise, it will contain an exception. - """ - return HttpClientConnection._generic_new( - host_name, - port, - bootstrap, - socket_options, - tls_connection_options, - proxy_options) - @staticmethod def _generic_new( host_name: str, @@ -190,7 +130,8 @@ def _generic_new( proxy_options: Optional['HttpProxyOptions'] = None, expected_version: Optional[HttpVersion] = None, initial_settings: Optional[List[Http2Setting]] = None, - on_remote_settings_changed: Optional[Callable[[List[Http2Setting]], None]] = None) -> "concurrent.futures.Future": + on_remote_settings_changed: Optional[Callable[[List[Http2Setting]], None]] = None, + asyncio_connection=False) -> "concurrent.futures.Future": """ Initialize the generic part of the HttpClientConnection class. """ @@ -217,7 +158,8 @@ def _generic_new( tls_connection_options=tls_connection_options, connect_future=future, expected_version=expected_version, - on_remote_settings_changed=on_remote_settings_changed) + on_remote_settings_changed=on_remote_settings_changed, + asyncio_connection=asyncio_connection) _awscrt.http_client_connection_new( bootstrap, @@ -245,6 +187,57 @@ def port(self) -> int: """Remote port""" return self._port + +class HttpClientConnection(HttpClientConnectionBase): + """ + An HTTP client connection. + + Use :meth:`HttpClientConnection.new()` to establish a new connection. + """ + __slots__ = ('_host_name', '_port') + + @classmethod + def new(cls, + host_name: str, + port: int, + bootstrap: Optional[ClientBootstrap] = None, + socket_options: Optional[SocketOptions] = None, + tls_connection_options: Optional[TlsConnectionOptions] = None, + proxy_options: Optional['HttpProxyOptions'] = None) -> "concurrent.futures.Future": + """ + Asynchronously establish a new HttpClientConnection. + + Args: + host_name (str): Connect to host. + + port (int): Connect to port. + + bootstrap (Optional [ClientBootstrap]): Client bootstrap to use when initiating socket connection. + If None is provided, the default singleton is used. + + socket_options (Optional[SocketOptions]): Optional socket options. + If None is provided, then default options are used. + + tls_connection_options (Optional[TlsConnectionOptions]): Optional TLS + connection options. If None is provided, then the connection will + be attempted over plain-text. + + proxy_options (Optional[HttpProxyOptions]): Optional proxy options. + If None is provided then a proxy is not used. + + Returns: + concurrent.futures.Future: A Future which completes when connection succeeds or fails. + If successful, the Future will contain a new :class:`HttpClientConnection`. + Otherwise, it will contain an exception. + """ + return cls._generic_new( + host_name, + port, + bootstrap, + socket_options, + tls_connection_options, + proxy_options) + def request(self, request: 'HttpRequest', on_response: Optional[Callable[..., None]] = None, @@ -292,8 +285,21 @@ def request(self, """ return HttpClientStream(self, request, on_response, on_body) + def close(self) -> "concurrent.futures.Future": + """Close the connection. -class Http2ClientConnection(HttpClientConnection): + Shutdown is asynchronous. This call has no effect if the connection is already + closing. + + Returns: + concurrent.futures.Future: This connection's :attr:`shutdown_future`, + which completes when shutdown has finished. + """ + _awscrt.http_connection_close(self._binding) + return self.shutdown_future + + +class Http2ClientConnection(HttpClientConnectionBase): @classmethod def new(cls, @@ -321,7 +327,7 @@ def new(cls, * `settings` (List[Http2Setting]): List of settings that were changed. """ - return HttpClientConnection._generic_new( + return cls._generic_new( host_name, port, bootstrap, @@ -337,17 +343,74 @@ def request(self, on_response: Optional[Callable[..., None]] = None, on_body: Optional[Callable[..., None]] = None, manual_write: bool = False) -> 'Http2ClientStream': + """Create `Http2ClientStream` to carry out the request/response exchange. + + NOTE: The HTTP stream sends no data until `Http2ClientStream.activate()` + is called. Call activate() when you're ready for callbacks and events to fire. + + Args: + request (HttpRequest): Definition for outgoing request. + + on_response: Optional callback invoked once main response headers are received. + The function should take the following arguments and return nothing: + + * `http_stream` (`Http2ClientStream`): HTTP/2 stream carrying + out this request/response exchange. + + * `status_code` (int): Response status code. + + * `headers` (List[Tuple[str, str]]): Response headers as a + list of (name,value) pairs. + + * `**kwargs` (dict): Forward compatibility kwargs. + + on_body: Optional callback invoked 0+ times as response body data is received. + The function should take the following arguments and return nothing: + + * `http_stream` (`Http2ClientStream`): HTTP/2 stream carrying + out this request/response exchange. + + * `chunk` (buffer): Response body data (not necessarily + a whole "chunk" of chunked encoding). + + * `**kwargs` (dict): Forward-compatibility kwargs. + + manual_write (bool): If True, enables manual data writing on the stream. + This allows calling `write_data()` to stream the request body in chunks. + Note: In the asyncio version, this is replaced by the async_body parameter. + + Returns: + Http2ClientStream: Stream for the HTTP/2 request/response exchange. + """ return Http2ClientStream(self, request, on_response, on_body, manual_write) + def close(self) -> "concurrent.futures.Future": + """Close the connection. + + Shutdown is asynchronous. This call has no effect if the connection is already + closing. + + Returns: + concurrent.futures.Future: This connection's :attr:`shutdown_future`, + which completes when shutdown has finished. + """ + _awscrt.http_connection_close(self._binding) + return self.shutdown_future + class HttpStreamBase(NativeResource): - """Base for HTTP stream classes""" + """Base for HTTP stream classes. + + Attributes: + connection: The HTTP connection this stream belongs to. + completion_future: Future that completes when the operation finishes. + """ __slots__ = ('_connection', '_completion_future', '_on_body_cb') - def __init__(self, connection: HttpConnectionBase, on_body: Optional[Callable[..., None]] = None) -> None: + def __init__(self, connection, on_body: Optional[Callable[..., None]] = None) -> None: super().__init__() - self._connection: HttpConnectionBase = connection - self._completion_future: Future = Future() + self._connection = connection + self._completion_future = Future() self._on_body_cb: Optional[Callable[..., None]] = on_body @property @@ -363,38 +426,25 @@ def _on_body(self, chunk: bytes) -> None: self._on_body_cb(http_stream=self, chunk=chunk) -class HttpClientStream(HttpStreamBase): - """HTTP stream that sends a request and receives a response. - - Create an HttpClientStream with :meth:`HttpClientConnection.request()`. - - NOTE: The HTTP stream sends no data until :meth:`HttpClientStream.activate()` - is called. Call activate() when you're ready for callbacks and events to fire. +class HttpClientStreamBase(HttpStreamBase): + """Base for HTTP client stream classes. Attributes: - connection (HttpClientConnection): This stream's connection. + connection: This stream's connection. - completion_future (concurrent.futures.Future): Future that will contain - the response status code (int) when the request/response exchange - completes. If the exchange fails to complete, the Future will - contain an exception indicating why it failed. + completion_future: Future that completes when + the request/response exchange is finished. """ - __slots__ = ('_response_status_code', '_on_response_cb', '_on_body_cb', '_request', '_version') - def __init__(self, - connection: HttpClientConnection, - request: 'HttpRequest', - on_response: Optional[Callable[..., None]] = None, - on_body: Optional[Callable[..., None]] = None) -> None: - self._init_common(connection, request, on_response, on_body) + __slots__ = ('_response_status_code', '_on_response_cb', '_on_body_cb', '_request', '_version') def _init_common(self, - connection: HttpClientConnection, + connection: HttpClientConnectionBase, request: 'HttpRequest', on_response: Optional[Callable[..., None]] = None, on_body: Optional[Callable[..., None]] = None, http2_manual_write: bool = False) -> None: - assert isinstance(connection, HttpClientConnection) + assert isinstance(connection, HttpClientConnectionBase) assert isinstance(request, HttpRequest) assert callable(on_response) or on_response is None assert callable(on_body) or on_body is None @@ -405,9 +455,8 @@ def _init_common(self, self._response_status_code: Optional[int] = None # keep HttpRequest alive until stream completes - self._request: 'HttpRequest' = request - self._version: HttpVersion = connection.version - + self._request = request + self._version = connection.version self._binding = _awscrt.http_client_stream_new(self, connection, request, http2_manual_write) @property @@ -422,14 +471,6 @@ def response_status_code(self) -> Optional[int]: This is None until a response arrives.""" return self._response_status_code - def activate(self) -> None: - """Begin sending the request. - - The HTTP stream does nothing until this is called. Call activate() when you - are ready for its callbacks and events to fire. - """ - _awscrt.http_client_stream_activate(self) - def _on_response(self, status_code: int, name_value_pairs: List[Tuple[str, str]]) -> None: self._response_status_code = status_code @@ -446,22 +487,85 @@ def _on_complete(self, error_code: int) -> None: self._completion_future.set_exception(awscrt.exceptions.from_code(error_code)) -class Http2ClientStream(HttpClientStream): +class HttpClientStream(HttpClientStreamBase): + """HTTP stream that sends a request and receives a response. + + Create an HttpClientStream with :meth:`HttpClientConnection.request()`. + + NOTE: The HTTP stream sends no data until :meth:`HttpClientStream.activate()` + is called. Call activate() when you're ready for callbacks and events to fire. + + Attributes: + connection (HttpClientConnection): This stream's connection. + + completion_future (concurrent.futures.Future): Future that will contain + the response status code (int) when the request/response exchange + completes. If the exchange fails to complete, the Future will + contain an exception indicating why it failed. + """ + + def __init__(self, + connection: HttpClientConnection, + request: 'HttpRequest', + on_response: Optional[Callable[..., None]] = None, + on_body: Optional[Callable[..., None]] = None) -> None: + self._init_common(connection, request, on_response, on_body) + + def activate(self) -> None: + """Begin sending the request. + + The HTTP stream does nothing until this is called. Call activate() when you + are ready for its callbacks and events to fire. + """ + _awscrt.http_client_stream_activate(self) + + +class Http2ClientStream(HttpClientStreamBase): def __init__(self, connection: HttpClientConnection, request: 'HttpRequest', on_response: Optional[Callable[..., None]] = None, on_body: Optional[Callable[..., None]] = None, manual_write: bool = False) -> None: - super()._init_common(connection, request, on_response, on_body, manual_write) + self._init_common(connection, request, on_response, on_body, manual_write) + + def activate(self) -> None: + """Begin sending the request. + + The HTTP stream does nothing until this is called. Call activate() when you + are ready for its callbacks and events to fire. + """ + _awscrt.http_client_stream_activate(self) def write_data(self, data_stream: Union[InputStream, Any], end_stream: bool = False) -> "concurrent.futures.Future": - future: Future = Future() - body_stream: InputStream = InputStream.wrap(data_stream, allow_none=True) + """Write a chunk of data to the request body stream. + + This method is only available when the stream was created with + manual_write=True. This allows incremental writing of request data. + + Note: In the asyncio version, this is replaced by the request_body_generator parameter + which accepts an async generator. + + Args: + data_stream (Union[InputStream, Any]): Data to write. If not an InputStream, + it will be wrapped in one. Can be None to send an empty chunk. + + end_stream (bool): True to indicate this is the last chunk and no more data + will be sent. False if more chunks will follow. + + Returns: + concurrent.futures.Future: Future that completes when the write operation + is done. The future will contain None on success, or an exception on failure. + """ + future = Future() + body_stream = InputStream.wrap(data_stream, allow_none=True) def on_write_complete(error_code: int) -> None: + if future.cancelled(): + # the future was cancelled, so we don't need to set the result or exception + return if error_code: future.set_exception(awscrt.exceptions.from_code(error_code)) else: @@ -483,7 +587,7 @@ def __init__(self, binding: Any, headers: 'HttpHeaders', super().__init__() self._binding = binding - self._headers: HttpHeaders = headers + self._headers = headers self._body_stream: Optional[InputStream] = None if body_stream: @@ -787,13 +891,13 @@ def __init__(self, auth_username: Optional[str] = None, auth_password: Optional[str] = None, connection_type: HttpProxyConnectionType = HttpProxyConnectionType.Legacy) -> None: - self.host_name: str = host_name - self.port: int = port - self.tls_connection_options: Optional[TlsConnectionOptions] = tls_connection_options - self.auth_type: HttpProxyAuthenticationType = auth_type - self.auth_username: Optional[str] = auth_username - self.auth_password: Optional[str] = auth_password - self.connection_type: HttpProxyConnectionType = connection_type + self.host_name = host_name + self.port = port + self.tls_connection_options = tls_connection_options + self.auth_type = auth_type + self.auth_username = auth_username + self.auth_password = auth_password + self.connection_type = connection_type class _HttpClientConnectionCore: @@ -809,7 +913,8 @@ def __init__( tls_connection_options: Optional[TlsConnectionOptions] = None, connect_future: Optional[Future] = None, expected_version: Optional[HttpVersion] = None, - on_remote_settings_changed: Optional[Callable[[List[Http2Setting]], None]] = None) -> None: + on_remote_settings_changed: Optional[Callable[[List[Http2Setting]], None]] = None, + asyncio_connection=False) -> None: self._shutdown_future = None self._host_name = host_name self._port = port @@ -818,23 +923,31 @@ def __init__( self._connect_future = connect_future self._expected_version = expected_version self._on_remote_settings_changed_from_user = on_remote_settings_changed + self._asyncio_connection = asyncio_connection def _on_connection_setup(self, binding: Any, error_code: int, http_version: HttpVersion) -> None: if self._connect_future is None: return - + if error_code != 0: + self._connect_future.set_exception(awscrt.exceptions.from_code(error_code)) + return if self._expected_version and self._expected_version != http_version: # unexpected protocol version # AWS_ERROR_HTTP_UNSUPPORTED_PROTOCOL self._connect_future.set_exception(awscrt.exceptions.from_code(2060)) return - if error_code != 0: - self._connect_future.set_exception(awscrt.exceptions.from_code(error_code)) - return - if http_version == HttpVersion.Http2: - connection = Http2ClientConnection() + if self._asyncio_connection: + # Import is done here to avoid circular import issues + from awscrt.aio.http import AIOHttpClientConnection, AIOHttp2ClientConnection + if http_version == HttpVersion.Http2: + connection = AIOHttp2ClientConnection() + else: + connection = AIOHttpClientConnection() else: - connection = HttpClientConnection() + if http_version == HttpVersion.Http2: + connection = Http2ClientConnection() + else: + connection = HttpClientConnection() connection._host_name = self._host_name connection._port = self._port diff --git a/test/test_aiohttp_client.py b/test/test_aiohttp_client.py new file mode 100644 index 000000000..f15365d6b --- /dev/null +++ b/test/test_aiohttp_client.py @@ -0,0 +1,652 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0. + +import time +import socket +import sys +import asyncio +import unittest +import threading +import subprocess +import concurrent.futures +from urllib.parse import urlparse +from test import NativeResourceTest +import ssl +import os +from io import BytesIO +from http.server import HTTPServer, SimpleHTTPRequestHandler +from awscrt import io +from awscrt.io import ClientBootstrap, ClientTlsContext, DefaultHostResolver, EventLoopGroup, TlsContextOptions, TlsCipherPref +from awscrt.http import HttpHeaders, HttpRequest, HttpVersion, Http2Setting, Http2SettingID +from awscrt.aio.http import AIOHttpClientConnection, AIOHttp2ClientConnection +import threading + + +class Response: + """Holds contents of incoming response""" + + def __init__(self): + self.status_code = None + self.headers = None + self.body = bytearray() + + async def collect_response(self, stream): + """Collects complete response from a stream""" + # Get status code and headers + self.status_code = await stream.get_response_status_code() + headers_list = await stream.get_response_headers() + self.headers = HttpHeaders(headers_list) + # Collect body chunks + while True: + chunk = await stream.get_next_response_chunk() + if not chunk: + break + self.body.extend(chunk) + + # Return status code for convenience + return self.status_code + + +class TestRequestHandler(SimpleHTTPRequestHandler): + """Request handler for test server""" + + def do_PUT(self): + content_length = int(self.headers['Content-Length']) + # store put request on the server object + incoming_body_bytes = self.rfile.read(content_length) + self.server.put_requests[self.path] = incoming_body_bytes + self.send_response(200, 'OK') + self.end_headers() + + +class TestAsyncClient(NativeResourceTest): + hostname = 'localhost' + timeout = 5 # seconds + + def _start_server(self, secure, http_1_0=False): + # HTTP/1.0 closes the connection at the end of each request + # HTTP/1.1 will keep the connection alive + if http_1_0: + TestRequestHandler.protocol_version = "HTTP/1.0" + else: + TestRequestHandler.protocol_version = "HTTP/1.1" + + self.server = HTTPServer((self.hostname, 0), TestRequestHandler) + if secure: + context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + context.minimum_version = ssl.TLSVersion.TLSv1_2 + context.load_cert_chain(certfile='test/resources/unittest.crt', keyfile="test/resources/unittest.key") + self.server.socket = context.wrap_socket(self.server.socket, server_side=True) + self.port = self.server.server_address[1] + + # put requests are stored in this dict + self.server.put_requests = {} + + self.server_thread = threading.Thread(target=self.server.serve_forever, name='test_server') + self.server_thread.start() + + def _stop_server(self): + self.server.shutdown() + self.server.server_close() + self.server_thread.join() + + async def _new_client_connection(self, secure, proxy_options=None): + if secure: + tls_ctx_opt = TlsContextOptions() + tls_ctx_opt.verify_peer = False + tls_ctx = ClientTlsContext(tls_ctx_opt) + tls_conn_opt = tls_ctx.new_connection_options() + tls_conn_opt.set_server_name(self.hostname) + else: + tls_conn_opt = None + + event_loop_group = EventLoopGroup() + host_resolver = DefaultHostResolver(event_loop_group) + bootstrap = ClientBootstrap(event_loop_group, host_resolver) + return await AIOHttpClientConnection.new( + host_name=self.hostname, + port=self.port, + bootstrap=bootstrap, + tls_connection_options=tls_conn_opt, + proxy_options=proxy_options) + + async def _test_connect(self, secure): + self._start_server(secure) + try: + connection = await self._new_client_connection(secure) + + # close connection + await connection.close() + self.assertFalse(connection.is_open()) + + finally: + self._stop_server() + + async def _test_get(self, secure): + # GET request receives this very file from the server + self._start_server(secure) + try: + connection = await self._new_client_connection(secure) + self.assertTrue(connection.is_open()) + + test_asset_path = 'test/test_aiohttp_client.py' + + # Create request and get stream - stream is already activated + request = HttpRequest('GET', '/' + test_asset_path) + stream = connection.request(request) + + # Collect and process response + response = Response() + status_code = await response.collect_response(stream) + + # Verify results + self.assertEqual(200, status_code) + self.assertEqual(200, response.status_code) + + with open(test_asset_path, 'rb') as test_asset: + test_asset_bytes = test_asset.read() + self.assertEqual(test_asset_bytes, response.body) + + await connection.close() + + finally: + self._stop_server() + + async def _test_put(self, secure): + # PUT request sends this very file to the server + self._start_server(secure) + try: + connection = await self._new_client_connection(secure) + test_asset_path = 'test/test_aiohttp_client.py' + with open(test_asset_path, 'rb') as outgoing_body_stream: + outgoing_body_bytes = outgoing_body_stream.read() + headers = HttpHeaders([ + ('Content-Length', str(len(outgoing_body_bytes))), + ]) + + # seek back to start of stream before trying to send it + outgoing_body_stream.seek(0) + + # Create request and get stream - stream is already activated + request = HttpRequest('PUT', '/' + test_asset_path, headers, outgoing_body_stream) + stream = connection.request(request) + + # Collect and process response + response = Response() + status_code = await response.collect_response(stream) + + # Verify results + self.assertEqual(200, status_code) + self.assertEqual(200, response.status_code) + + # compare what we sent against what the server received + server_received = self.server.put_requests.get('/' + test_asset_path) + self.assertIsNotNone(server_received) + self.assertEqual(server_received, outgoing_body_bytes) + + await connection.close() + + finally: + self._stop_server() + + async def _test_shutdown_error(self, secure): + # Use HTTP/1.0 connection to force a connection close after request completes + self._start_server(secure, http_1_0=True) + try: + connection = await self._new_client_connection(secure) + + # Send request + request = HttpRequest('GET', '/') + stream = connection.request(request) + + # Collect response + response = Response() + await response.collect_response(stream) + + # With HTTP/1.0, the server should close the connection + # We'll wait a bit and verify the connection is closed + await asyncio.sleep(0.5) # Give time for the server to close connection + self.assertFalse(connection.is_open()) + + finally: + self._stop_server() + + async def _test_stream_lives_until_complete(self, secure): + # Ensure that stream and connection classes stay alive until work is complete + self._start_server(secure) + try: + connection = await self._new_client_connection(secure) + + request = HttpRequest('GET', '/test/test_aiohttp_client.py') + stream = connection.request(request) + + # Store stream but delete all local references + response = Response() + + # Schedule task to collect response but don't await it yet + collect_task = asyncio.create_task(response.collect_response(stream)) + + # Delete references to stream and connection + del stream + del connection + + # Now await the collection task - stream should still complete successfully + status_code = await collect_task + self.assertEqual(200, status_code) + + finally: + self._stop_server() + + async def _test_request_lives_until_stream_complete(self, secure): + # Ensure HttpRequest and body InputStream stay alive until HttpClientStream completes + self._start_server(secure) + try: + connection = await self._new_client_connection(secure) + + request = HttpRequest( + method='PUT', + path='/test/test_request_refcounts.txt', + headers=HttpHeaders([('Host', self.hostname), ('Content-Length', '5')]), + body_stream=BytesIO(b'hello')) + + # Create stream but delete the request + stream = connection.request(request) + del request + + # Now collect the response - should still work since the stream keeps the request alive + response = Response() + status_code = await response.collect_response(stream) + self.assertEqual(200, status_code) + + await connection.close() + + finally: + self._stop_server() + + async def _new_h2_client_connection(self, url): + event_loop_group = EventLoopGroup() + host_resolver = DefaultHostResolver(event_loop_group) + bootstrap = ClientBootstrap(event_loop_group, host_resolver) + + port = url.port + if port is None: + port = 443 + + tls_ctx_options = TlsContextOptions() + tls_ctx_options.verify_peer = False # Allow localhost + tls_ctx = ClientTlsContext(tls_ctx_options) + tls_conn_opt = tls_ctx.new_connection_options() + tls_conn_opt.set_server_name(url.hostname) + tls_conn_opt.set_alpn_list(["h2"]) + + connection = await AIOHttp2ClientConnection.new( + host_name=url.hostname, + port=port, + bootstrap=bootstrap, + tls_connection_options=tls_conn_opt) + + return connection + + async def _test_h2_client(self): + url = urlparse("https://d1cz66xoahf9cl.cloudfront.net/http_test_doc.txt") + connection = await self._new_h2_client_connection(url) + + # Check we set an h2 connection + self.assertEqual(connection.version, HttpVersion.Http2) + + request = HttpRequest('GET', url.path) + request.headers.add('host', url.hostname) + stream = connection.request(request) + + response = Response() + status_code = await response.collect_response(stream) + + # Check result + self.assertEqual(200, status_code) + self.assertEqual(200, response.status_code) + self.assertEqual(14428801, len(response.body)) + + await connection.close() + + async def _test_h2_manual_write_exception(self): + url = urlparse("https://d1cz66xoahf9cl.cloudfront.net/http_test_doc.txt") + connection = await self._new_h2_client_connection(url) + + # Check we set an h2 connection + self.assertEqual(connection.version, HttpVersion.Http2) + + request = HttpRequest('GET', url.path) + request.headers.add('host', url.hostname) + + # Create stream without using request_body_generator parameter + # (which would be needed to properly configure it for writing) + stream = connection.request(request) + + # The stream should have write_data attribute but using it should raise an exception + # since the stream isn't properly configured for manual writing + exception = None + try: + # Attempt to access internal write_data method which should raise an exception + # since the stream wasn't created with request_body_generator + await stream._write_data(BytesIO(b'hello'), False) + except (RuntimeError, AttributeError) as e: + exception = e + + self.assertIsNotNone(exception) + await connection.close() + + def test_connect_http(self): + asyncio.run(self._test_connect(secure=False)) + + def test_connect_https(self): + asyncio.run(self._test_connect(secure=True)) + + def test_get_http(self): + asyncio.run(self._test_get(secure=False)) + + def test_get_https(self): + asyncio.run(self._test_get(secure=True)) + + def test_put_http(self): + asyncio.run(self._test_put(secure=False)) + + def test_put_https(self): + asyncio.run(self._test_put(secure=True)) + + def test_shutdown_error_http(self): + asyncio.run(self._test_shutdown_error(secure=False)) + + def test_shutdown_error_https(self): + asyncio.run(self._test_shutdown_error(secure=True)) + + def test_stream_lives_until_complete_http(self): + asyncio.run(self._test_stream_lives_until_complete(secure=False)) + + def test_stream_lives_until_complete_https(self): + asyncio.run(self._test_stream_lives_until_complete(secure=True)) + + def test_request_lives_until_stream_complete_http(self): + asyncio.run(self._test_request_lives_until_stream_complete(secure=False)) + + def test_request_lives_until_stream_complete_https(self): + asyncio.run(self._test_request_lives_until_stream_complete(secure=True)) + + def test_h2_client(self): + asyncio.run(self._test_h2_client()) + + def test_h2_manual_write_exception(self): + asyncio.run(self._test_h2_manual_write_exception()) + + @unittest.skipIf(not TlsCipherPref.PQ_DEFAULT.is_supported(), "Cipher pref not supported") + def test_connect_pq_default(self): + async def _test(): + await self._test_connect(secure=True) + asyncio.run(_test()) + + async def _test_cross_thread_http_client(self, secure): + """Test using an HTTP client from a different thread/event loop.""" + self._start_server(secure) + try: + # Create connection in the main thread + connection = await self._new_client_connection(secure) + self.assertTrue(connection.is_open()) + + # Function to run in a different thread with a different event loop + async def thread_func(conn): + # Create new event loop for this thread + test_asset_path = 'test/test_aiohttp_client.py' + request = HttpRequest('GET', '/' + test_asset_path) + + # Use the connection but with the current thread's event loop + thread_loop = asyncio.get_event_loop() + stream = conn.request(request, loop=thread_loop) + + # Collect and process response + response = Response() + status_code = await response.collect_response(stream) + + # Verify results + assert status_code == 200 + + with open(test_asset_path, 'rb') as test_asset: + test_asset_bytes = test_asset.read() + assert test_asset_bytes == response.body + + return True + + # Run in executor to get a different thread + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit( + lambda: asyncio.run(thread_func(connection)) + ) + result = future.result() + self.assertTrue(result) + + await connection.close() + + finally: + self._stop_server() + + async def _test_cross_thread_http2_client(self): + """Test using an HTTP/2 client from a different thread/event loop.""" + url = urlparse("https://d1cz66xoahf9cl.cloudfront.net/http_test_doc.txt") + connection = await self._new_h2_client_connection(url) + + # Check we set an h2 connection + self.assertEqual(connection.version, HttpVersion.Http2) + + # Function to run in a different thread with a different event loop + async def thread_func(conn): + request = HttpRequest('GET', url.path) + request.headers.add('host', url.hostname) + + # Use the connection but with the current thread's event loop + thread_loop = asyncio.get_event_loop() + stream = conn.request(request, loop=thread_loop) + + response = Response() + status_code = await response.collect_response(stream) + # Check result + assert status_code == 200 + return len(response.body) + + # Run in executor to get a different thread + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit( + lambda: asyncio.run(thread_func(connection)) + ) + body_length = future.result() + self.assertEqual(14428801, body_length) + + await connection.close() + + def test_cross_thread_http_client(self): + asyncio.run(self._test_cross_thread_http_client(secure=False)) + + def test_cross_thread_https_client(self): + asyncio.run(self._test_cross_thread_http_client(secure=True)) + + def test_cross_thread_http2_client(self): + asyncio.run(self._test_cross_thread_http2_client()) + + +@unittest.skipUnless(os.environ.get('AWS_TEST_LOCALHOST'), 'set env var to run test: AWS_TEST_LOCALHOST') +class TestAsyncClientMockServer(NativeResourceTest): + timeout = 5 # seconds + p_server = None + mock_server_url = None + + def setUp(self): + super().setUp() + # Start the mock server from the aws-c-http + server_path = os.path.join( + os.path.dirname(__file__), + '..', + 'crt', + 'aws-c-http', + 'tests', + 'py_localhost', + 'server.py') + python_path = sys.executable + self.mock_server_url = urlparse("https://localhost:3443/upload_test") + self.p_server = subprocess.Popen([python_path, server_path]) + # Wait for server to be ready + self._wait_for_server_ready() + + def _wait_for_server_ready(self): + """Wait until server is accepting connections.""" + max_attempts = 20 + + for attempt in range(max_attempts): + try: + with socket.create_connection(("127.0.0.1", self.mock_server_url.port), timeout=1): + return # Server is ready + except (ConnectionRefusedError, socket.timeout): + time.sleep(0.5) + + # If we get here, server failed to start + stdout, stderr = self.p_server.communicate(timeout=0.5) + raise RuntimeError(f"Server failed to start after {max_attempts} attempts.\n" + f"STDOUT: {stdout.decode()}\nSTDERR: {stderr.decode()}") + + def tearDown(self): + self.p_server.terminate() + try: + self.p_server.wait(timeout=5) + except subprocess.TimeoutExpired: + self.p_server.kill() + super().tearDown() + + def _on_remote_settings_changed(self, settings): + # The mock server has the default settings with + # ENABLE_PUSH = 0 + # MAX_CONCURRENT_STREAMS = 100 + # MAX_HEADER_LIST_SIZE = 2**16 + # Check the settings here + self.assertEqual(len(settings), 3) + for i in settings: + if i.id == Http2SettingID.ENABLE_PUSH: + self.assertEqual(i.value, 0) + if i.id == Http2SettingID.MAX_CONCURRENT_STREAMS: + self.assertEqual(i.value, 100) + if i.id == Http2SettingID.MAX_HEADER_LIST_SIZE: + self.assertEqual(i.value, 2**16) + + async def _new_mock_connection(self, initial_settings=None): + event_loop_group = EventLoopGroup() + host_resolver = DefaultHostResolver(event_loop_group) + bootstrap = ClientBootstrap(event_loop_group, host_resolver) + + port = self.mock_server_url.port + # only test https + if port is None: + port = 443 + tls_ctx_options = TlsContextOptions() + tls_ctx_options.verify_peer = False # allow localhost + tls_ctx = ClientTlsContext(tls_ctx_options) + tls_conn_opt = tls_ctx.new_connection_options() + tls_conn_opt.set_server_name(self.mock_server_url.hostname) + tls_conn_opt.set_alpn_list(["h2"]) + + if initial_settings is None: + initial_settings = [Http2Setting(Http2SettingID.ENABLE_PUSH, 0)] + + connection = await AIOHttp2ClientConnection.new( + host_name=self.mock_server_url.hostname, + port=port, + bootstrap=bootstrap, + tls_connection_options=tls_conn_opt, + initial_settings=initial_settings, + on_remote_settings_changed=self._on_remote_settings_changed) + + return connection + + async def _test_h2_mock_server_manual_write(self): + connection = await self._new_mock_connection() + # check we set an h2 connection + self.assertEqual(connection.version, HttpVersion.Http2) + + request = HttpRequest('POST', self.mock_server_url.path) + request.headers.add('host', self.mock_server_url.hostname) + + # Create an async generator for the request body + body_chunks = [b'hello', b'he123123', b'', b'hello'] + total_length = 0 + for i in body_chunks: + total_length = total_length + len(i) + + async def body_generator(): + for i in body_chunks: + yield i + + stream = connection.request(request, request_body_generator=body_generator()) + + # Collect response + response = Response() + status_code = await response.collect_response(stream) + + # Check result + self.assertEqual(200, status_code) + self.assertEqual(200, response.status_code) + # mock server response the total length received, check if it matches what we sent + self.assertEqual(total_length, int(response.body.decode())) + await connection.close() + + class DelayStream: + def __init__(self, bad_read=False): + self._read = False + self.bad_read = bad_read + + def read(self, _len): + if self.bad_read: + # simulate a bad read that raises an exception + # this will cause the stream to fail + raise RuntimeError("bad read exception") + if self._read: + # return empty as EOS + return b'' + else: + self._read = True + return b'hello' + + async def _test_h2_mock_server_settings(self): + # Test with invalid settings - should throw an exception + exception = None + try: + # Invalid settings type + initial_settings = [100] + await self._new_mock_connection(initial_settings) + except Exception as e: + exception = e + self.assertIsNotNone(exception) + + # Test with valid settings + connection = await self._new_mock_connection() + self.assertEqual(connection.version, HttpVersion.Http2) + + request = HttpRequest('POST', self.mock_server_url.path) + request.headers.add('host', self.mock_server_url.hostname) + + # Create an async generator for the request body + async def body_generator(): + yield b'hello' + + stream = connection.request(request, request_body_generator=body_generator()) + + response = Response() + status_code = await response.collect_response(stream) + + self.assertEqual(200, status_code) + self.assertEqual(200, response.status_code) + + await connection.close() + + def test_h2_mock_server_manual_write(self): + asyncio.run(self._test_h2_mock_server_manual_write()) + + def test_h2_mock_server_settings(self): + asyncio.run(self._test_h2_mock_server_settings()) + + +if __name__ == '__main__': + unittest.main()