From aaf8c2a175ac891cdb373ef7155c26a4a972f39b Mon Sep 17 00:00:00 2001 From: Yoshihito Aso Date: Fri, 29 May 2026 08:38:58 +0900 Subject: [PATCH 1/2] Cache resolved web3 provider endpoints --- app/utils/web3_utils.py | 275 +++++++++++++------- tests/app/utils/web3_provider_cache_test.py | 169 ++++++++++++ 2 files changed, 356 insertions(+), 88 deletions(-) create mode 100644 tests/app/utils/web3_provider_cache_test.py diff --git a/app/utils/web3_utils.py b/app/utils/web3_utils.py index 2863a08f5..2b315ccca 100644 --- a/app/utils/web3_utils.py +++ b/app/utils/web3_utils.py @@ -20,6 +20,7 @@ import asyncio import threading import time +from dataclasses import dataclass from json.decoder import JSONDecodeError from typing import Any, cast from weakref import WeakKeyDictionary @@ -53,6 +54,12 @@ AsyncWeb3CacheMap = dict[AsyncWeb3TimeoutKey, AsyncWeb3[Any]] +@dataclass +class ResolvedEndpointCache: + endpoint_uri: URI + expires_at: float + + class Web3Wrapper: DEFAULT_TIMEOUT = 5 @@ -203,55 +210,101 @@ class FailOverHTTPProvider(HTTPProvider): def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) self.endpoint_uri: URI | None = None + self._resolved_endpoint_cache: ResolvedEndpointCache | None = None + self._resolved_endpoint_lock = threading.Lock() - def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse: + @staticmethod + def _get_cache_ttl() -> float: + return max(float(config.WEB3_REQUEST_WAIT_TIME), 1.0) + + def _get_cached_endpoint_uri(self) -> URI | None: + with self._resolved_endpoint_lock: + cache = self._resolved_endpoint_cache + if cache is None: + return None + if cache.expires_at <= time.monotonic(): + self._resolved_endpoint_cache = None + return None + return cache.endpoint_uri + + def _set_cached_endpoint_uri(self, endpoint_uri: URI) -> None: + with self._resolved_endpoint_lock: + self._resolved_endpoint_cache = ResolvedEndpointCache( + endpoint_uri=endpoint_uri, + expires_at=time.monotonic() + self._get_cache_ttl(), + ) + + def _clear_cached_endpoint_uri(self) -> None: + with self._resolved_endpoint_lock: + self._resolved_endpoint_cache = None + + def _resolve_endpoint_uri(self) -> URI | None: db_session = Session(autocommit=False, autoflush=True, bind=engine) try: - if FailOverHTTPProvider.fail_over_mode is True: - # If never running the block monitoring processor, - # use default(primary) node. - if db_session.scalars(select(Node).limit(1)).first() is None: - self.endpoint_uri = URI(config.WEB3_HTTP_PROVIDER) - return super().make_request(method, params) - else: - counter = 0 - while counter <= config.WEB3_REQUEST_RETRY_COUNT: - # Switch alive node - _node = db_session.scalars( - select(Node) - .where(Node.is_synced == True) - .order_by(Node.priority) - .order_by(Node.id) - .limit(1) - ).first() - if _node is None: - counter += 1 - if counter <= config.WEB3_REQUEST_RETRY_COUNT: - time.sleep(config.WEB3_REQUEST_WAIT_TIME) - continue - raise ServiceUnavailable("Block synchronization is down") - assert _node.endpoint_uri is not None - self.endpoint_uri = URI(_node.endpoint_uri) - try: - return super().make_request(method, params) - except ConnectionError, JSONDecodeError, HTTPError: - # NOTE: - # JSONDecodeError will be raised if a request is sent - # while Quorum is terminating. - LOG.notice( - f"Retry web3 request due to connection fail: method={method}, params={params}" - ) - counter += 1 - if counter <= config.WEB3_REQUEST_RETRY_COUNT: - time.sleep(config.WEB3_REQUEST_WAIT_TIME) - continue - raise ServiceUnavailable("Block synchronization is down") - else: # Use default provider - self.endpoint_uri = URI(config.WEB3_HTTP_PROVIDER) - return super().make_request(method, params) + if db_session.scalars(select(Node).limit(1)).first() is None: + endpoint_uri = URI(config.WEB3_HTTP_PROVIDER) + self._set_cached_endpoint_uri(endpoint_uri) + return endpoint_uri + + _node = db_session.scalars( + select(Node) + .where(Node.is_synced == True) + .order_by(Node.priority) + .order_by(Node.id) + .limit(1) + ).first() + if _node is None: + return None + assert _node.endpoint_uri is not None + endpoint_uri = URI(_node.endpoint_uri) + self._set_cached_endpoint_uri(endpoint_uri) + return endpoint_uri finally: db_session.close() + def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse: + if FailOverHTTPProvider.fail_over_mode is True: + cached_endpoint_uri = self._get_cached_endpoint_uri() + if cached_endpoint_uri is not None: + self.endpoint_uri = cached_endpoint_uri + try: + return super().make_request(method, params) + except ConnectionError, JSONDecodeError, HTTPError: + self._clear_cached_endpoint_uri() + LOG.notice( + f"Retry web3 request due to connection fail: method={method}, params={params}" + ) + + counter = 0 + while counter <= config.WEB3_REQUEST_RETRY_COUNT: + endpoint_uri = self._resolve_endpoint_uri() + if endpoint_uri is None: + counter += 1 + if counter <= config.WEB3_REQUEST_RETRY_COUNT: + time.sleep(config.WEB3_REQUEST_WAIT_TIME) + continue + raise ServiceUnavailable("Block synchronization is down") + + self.endpoint_uri = endpoint_uri + try: + return super().make_request(method, params) + except ConnectionError, JSONDecodeError, HTTPError: + # NOTE: + # JSONDecodeError will be raised if a request is sent + # while Quorum is terminating. + self._clear_cached_endpoint_uri() + LOG.notice( + f"Retry web3 request due to connection fail: method={method}, params={params}" + ) + counter += 1 + if counter <= config.WEB3_REQUEST_RETRY_COUNT: + time.sleep(config.WEB3_REQUEST_WAIT_TIME) + continue + raise ServiceUnavailable("Block synchronization is down") + + self.endpoint_uri = URI(config.WEB3_HTTP_PROVIDER) + return super().make_request(method, params) + @staticmethod def set_fail_over_mode(use_fail_over: bool): FailOverHTTPProvider.fail_over_mode = use_fail_over @@ -263,57 +316,103 @@ class AsyncFailOverHTTPProvider(AsyncHTTPProvider): def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) self.endpoint_uri: URI | None = None + self._resolved_endpoint_cache: ResolvedEndpointCache | None = None + self._resolved_endpoint_lock = asyncio.Lock() - async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse: + @staticmethod + def _get_cache_ttl() -> float: + return max(float(config.WEB3_REQUEST_WAIT_TIME), 1.0) + + async def _get_cached_endpoint_uri(self) -> URI | None: + async with self._resolved_endpoint_lock: + cache = self._resolved_endpoint_cache + if cache is None: + return None + if cache.expires_at <= time.monotonic(): + self._resolved_endpoint_cache = None + return None + return cache.endpoint_uri + + async def _set_cached_endpoint_uri(self, endpoint_uri: URI) -> None: + async with self._resolved_endpoint_lock: + self._resolved_endpoint_cache = ResolvedEndpointCache( + endpoint_uri=endpoint_uri, + expires_at=time.monotonic() + self._get_cache_ttl(), + ) + + async def _clear_cached_endpoint_uri(self) -> None: + async with self._resolved_endpoint_lock: + self._resolved_endpoint_cache = None + + async def _resolve_endpoint_uri(self) -> URI | None: db_session = AsyncSession(autocommit=False, autoflush=True, bind=async_engine) try: - if AsyncFailOverHTTPProvider.fail_over_mode is True: - # If never running the block monitoring processor, - # use default(primary) node. - if (await db_session.scalars(select(Node).limit(1))).first() is None: - self.endpoint_uri = URI(config.WEB3_HTTP_PROVIDER) - return await super().make_request(method, params) - else: - counter = 0 - while counter <= config.WEB3_REQUEST_RETRY_COUNT: - # Switch alive node - _node = ( - await db_session.scalars( - select(Node) - .where(Node.is_synced == True) - .order_by(Node.priority) - .order_by(Node.id) - .limit(1) - ) - ).first() - if _node is None: - counter += 1 - if counter <= config.WEB3_REQUEST_RETRY_COUNT: - await asyncio.sleep(config.WEB3_REQUEST_WAIT_TIME) - continue - raise ServiceUnavailable("Block synchronization is down") - assert _node.endpoint_uri is not None - self.endpoint_uri = URI(_node.endpoint_uri) - try: - return await super().make_request(method, params) - except ClientError, JSONDecodeError: - # NOTE: - # JSONDecodeError will be raised if a request is sent - # while Quorum is terminating. - LOG.notice( - f"Retry web3 request due to connection fail: method={method}, params={params}" - ) - counter += 1 - if counter <= config.WEB3_REQUEST_RETRY_COUNT: - await asyncio.sleep(config.WEB3_REQUEST_WAIT_TIME) - continue - raise ServiceUnavailable("Block synchronization is down") - else: # Use default provider - self.endpoint_uri = URI(config.WEB3_HTTP_PROVIDER) - return await super().make_request(method, params) + if (await db_session.scalars(select(Node).limit(1))).first() is None: + endpoint_uri = URI(config.WEB3_HTTP_PROVIDER) + await self._set_cached_endpoint_uri(endpoint_uri) + return endpoint_uri + + _node = ( + await db_session.scalars( + select(Node) + .where(Node.is_synced == True) + .order_by(Node.priority) + .order_by(Node.id) + .limit(1) + ) + ).first() + if _node is None: + return None + assert _node.endpoint_uri is not None + endpoint_uri = URI(_node.endpoint_uri) + await self._set_cached_endpoint_uri(endpoint_uri) + return endpoint_uri finally: await db_session.close() + async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse: + if AsyncFailOverHTTPProvider.fail_over_mode is True: + cached_endpoint_uri = await self._get_cached_endpoint_uri() + if cached_endpoint_uri is not None: + self.endpoint_uri = cached_endpoint_uri + try: + return await super().make_request(method, params) + except ClientError, JSONDecodeError: + await self._clear_cached_endpoint_uri() + LOG.notice( + f"Retry web3 request due to connection fail: method={method}, params={params}" + ) + + counter = 0 + while counter <= config.WEB3_REQUEST_RETRY_COUNT: + endpoint_uri = await self._resolve_endpoint_uri() + if endpoint_uri is None: + counter += 1 + if counter <= config.WEB3_REQUEST_RETRY_COUNT: + await asyncio.sleep(config.WEB3_REQUEST_WAIT_TIME) + continue + raise ServiceUnavailable("Block synchronization is down") + + self.endpoint_uri = endpoint_uri + try: + return await super().make_request(method, params) + except ClientError, JSONDecodeError: + # NOTE: + # JSONDecodeError will be raised if a request is sent + # while Quorum is terminating. + await self._clear_cached_endpoint_uri() + LOG.notice( + f"Retry web3 request due to connection fail: method={method}, params={params}" + ) + counter += 1 + if counter <= config.WEB3_REQUEST_RETRY_COUNT: + await asyncio.sleep(config.WEB3_REQUEST_WAIT_TIME) + continue + raise ServiceUnavailable("Block synchronization is down") + + self.endpoint_uri = URI(config.WEB3_HTTP_PROVIDER) + return await super().make_request(method, params) + @staticmethod def set_fail_over_mode(use_fail_over: bool): AsyncFailOverHTTPProvider.fail_over_mode = use_fail_over diff --git a/tests/app/utils/web3_provider_cache_test.py b/tests/app/utils/web3_provider_cache_test.py new file mode 100644 index 000000000..79e43cf29 --- /dev/null +++ b/tests/app/utils/web3_provider_cache_test.py @@ -0,0 +1,169 @@ +""" +Copyright BOOSTRY Co., Ltd. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +SPDX-License-Identifier: Apache-2.0 +""" + +from typing import cast + +import pytest +from aiohttp import ClientError +from eth_typing import URI +from requests.exceptions import ConnectionError +from web3 import AsyncHTTPProvider, HTTPProvider +from web3.types import RPCEndpoint, RPCResponse + +from app.utils import web3_utils + + +# Verify that the FailOverHTTPProvider reuses the cached endpoint URI for multiple requests +# when in fail-over mode and the cache is valid. +def test_failover_http_provider_reuses_cached_endpoint( + monkeypatch: pytest.MonkeyPatch, +): + provider = web3_utils.FailOverHTTPProvider("http://127.0.0.1:8545") + monkeypatch.setattr(web3_utils.FailOverHTTPProvider, "fail_over_mode", True) + monkeypatch.setattr(provider, "_get_cache_ttl", lambda: 60.0) + + resolve_calls: list[URI] = [] + + def fake_resolve() -> URI: + endpoint = URI("http://cached-node") + resolve_calls.append(endpoint) + provider._set_cached_endpoint_uri(endpoint) # type: ignore[attr-defined] + return endpoint + + def fake_make_request( + self: HTTPProvider, method: RPCEndpoint, params: object + ) -> RPCResponse: + return {"result": cast(str, provider.endpoint_uri)} + + monkeypatch.setattr(provider, "_resolve_endpoint_uri", fake_resolve) + monkeypatch.setattr(HTTPProvider, "make_request", fake_make_request) + + first = provider.make_request(RPCEndpoint("eth_chainId"), []) + second = provider.make_request(RPCEndpoint("eth_chainId"), []) + + assert first == {"result": "http://cached-node"} + assert second == {"result": "http://cached-node"} + assert len(resolve_calls) == 1 + + +# Verify that if the cached endpoint fails, +# the FailOverHTTPProvider will attempt to resolve a new endpoint and update the cache accordingly. +def test_failover_http_provider_invalidates_cache_on_request_error( + monkeypatch: pytest.MonkeyPatch, +): + provider = web3_utils.FailOverHTTPProvider("http://127.0.0.1:8545") + monkeypatch.setattr(web3_utils.FailOverHTTPProvider, "fail_over_mode", True) + monkeypatch.setattr(provider, "_get_cache_ttl", lambda: 60.0) + provider._set_cached_endpoint_uri(URI("http://stale-node")) # type: ignore[attr-defined] + + resolve_calls: list[URI] = [] + call_count = 0 + + def fake_resolve() -> URI: + endpoint = URI("http://fresh-node") + resolve_calls.append(endpoint) + provider._set_cached_endpoint_uri(endpoint) # type: ignore[attr-defined] + return endpoint + + def fake_make_request( + self: HTTPProvider, method: RPCEndpoint, params: object + ) -> RPCResponse: + nonlocal call_count + call_count += 1 + if call_count == 1: + raise ConnectionError("connection failed") + return {"result": cast(str, provider.endpoint_uri)} + + monkeypatch.setattr(provider, "_resolve_endpoint_uri", fake_resolve) + monkeypatch.setattr(HTTPProvider, "make_request", fake_make_request) + + result = provider.make_request(RPCEndpoint("eth_chainId"), []) + + assert result == {"result": "http://fresh-node"} + assert len(resolve_calls) == 1 + + +# Verify that the AsyncFailOverHTTPProvider reuses the cached endpoint URI for multiple requests +# when in fail-over mode and the cache is valid. +@pytest.mark.asyncio +async def test_async_failover_http_provider_reuses_cached_endpoint( + monkeypatch: pytest.MonkeyPatch, +): + provider = web3_utils.AsyncFailOverHTTPProvider("http://127.0.0.1:8545") + monkeypatch.setattr(web3_utils.AsyncFailOverHTTPProvider, "fail_over_mode", True) + monkeypatch.setattr(provider, "_get_cache_ttl", lambda: 60.0) + + resolve_calls: list[URI] = [] + + async def fake_resolve() -> URI: + endpoint = URI("http://cached-node") + resolve_calls.append(endpoint) + await provider._set_cached_endpoint_uri(endpoint) # type: ignore[attr-defined] + return endpoint + + async def fake_make_request( + self: AsyncHTTPProvider, method: RPCEndpoint, params: object + ) -> RPCResponse: + return {"result": cast(str, provider.endpoint_uri)} + + monkeypatch.setattr(provider, "_resolve_endpoint_uri", fake_resolve) + monkeypatch.setattr(AsyncHTTPProvider, "make_request", fake_make_request) + + first = await provider.make_request(RPCEndpoint("eth_chainId"), []) + second = await provider.make_request(RPCEndpoint("eth_chainId"), []) + + assert first == {"result": "http://cached-node"} + assert second == {"result": "http://cached-node"} + assert len(resolve_calls) == 1 + + +# Verify that if the cached endpoint fails, +# the AsyncFailOverHTTPProvider will attempt to resolve a new endpoint and update the cache accordingly. +@pytest.mark.asyncio +async def test_async_failover_http_provider_invalidates_cache_on_request_error( + monkeypatch: pytest.MonkeyPatch, +): + provider = web3_utils.AsyncFailOverHTTPProvider("http://127.0.0.1:8545") + monkeypatch.setattr(web3_utils.AsyncFailOverHTTPProvider, "fail_over_mode", True) + monkeypatch.setattr(provider, "_get_cache_ttl", lambda: 60.0) + await provider._set_cached_endpoint_uri(URI("http://stale-node")) # type: ignore[attr-defined] + + resolve_calls: list[URI] = [] + call_count = 0 + + async def fake_resolve() -> URI: + endpoint = URI("http://fresh-node") + resolve_calls.append(endpoint) + await provider._set_cached_endpoint_uri(endpoint) # type: ignore[attr-defined] + return endpoint + + async def fake_make_request( + self: AsyncHTTPProvider, method: RPCEndpoint, params: object + ) -> RPCResponse: + nonlocal call_count + call_count += 1 + if call_count == 1: + raise ClientError("connection failed") + return {"result": cast(str, provider.endpoint_uri)} + + monkeypatch.setattr(provider, "_resolve_endpoint_uri", fake_resolve) + monkeypatch.setattr(AsyncHTTPProvider, "make_request", fake_make_request) + + result = await provider.make_request(RPCEndpoint("eth_chainId"), []) + + assert result == {"result": "http://fresh-node"} + assert len(resolve_calls) == 1 From b48132f54e6f85fe66a56157fbc3dc5309d4739e Mon Sep 17 00:00:00 2001 From: Yoshihito Aso Date: Fri, 29 May 2026 11:21:48 +0900 Subject: [PATCH 2/2] Preserve HTTP keep-alive sessions for web3 --- app/utils/web3_utils.py | 107 +++++++++++++++++++- tests/app/utils/web3_provider_cache_test.py | 18 +++- 2 files changed, 123 insertions(+), 2 deletions(-) diff --git a/app/utils/web3_utils.py b/app/utils/web3_utils.py index 2b315ccca..447ce4a0c 100644 --- a/app/utils/web3_utils.py +++ b/app/utils/web3_utils.py @@ -25,13 +25,17 @@ from typing import Any, cast from weakref import WeakKeyDictionary -from aiohttp import ClientError, ClientTimeout +from aiohttp import ClientError, ClientSession, ClientTimeout, TCPConnector from eth_typing import URI from requests.exceptions import ConnectionError, HTTPError from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Session from web3 import AsyncHTTPProvider, AsyncWeb3, HTTPProvider, Web3 +from web3._utils.async_caching import async_lock +from web3._utils.caching.caching_utils import generate_cache_key +from web3._utils.http import DEFAULT_HTTP_TIMEOUT +from web3._utils.http_session_manager import HTTPSessionManager from web3.eth import AsyncEth from web3.geth import AsyncGeth from web3.middleware import ExtraDataToPOAMiddleware @@ -60,6 +64,106 @@ class ResolvedEndpointCache: expires_at: float +class KeepAliveHTTPSessionManager(HTTPSessionManager): + @staticmethod + def _create_async_session() -> ClientSession: + return ClientSession( + raise_for_status=True, + connector=TCPConnector(), + ) + + async def async_cache_and_return_session( + self, + endpoint_uri: URI, + session: ClientSession | None = None, + request_timeout: ClientTimeout | None = None, + ) -> ClientSession: + # Preserve async HTTP keep-alive for RPC-heavy workloads. + cache_key = self._async_session_cache_key(endpoint_uri) + + evicted_items = None + cached_session: ClientSession | None = None + async with async_lock(self.session_pool, self._lock): + if cache_key not in self.session_cache: + if session is None: + session = self._create_async_session() + + cached_session, evicted_items = self.session_cache.cache( + cache_key, session + ) + self.logger.debug( + "Async session cached: %s, %s", endpoint_uri, cached_session + ) + + else: + cached_session = cast( + ClientSession, + self.session_cache.get_cache_entry(cache_key), + ) + session_is_closed = cached_session.closed + session_loop = getattr(cached_session, "_loop", None) + session_loop_is_closed = bool( + session_loop is not None and session_loop.is_closed() + ) + + warning = ( + "Async session was closed" + if session_is_closed + else ( + "Loop was closed for async session" + if session_loop_is_closed + else None + ) + ) + if warning: + self.logger.debug( + "%s: %s, %s. Creating and caching a new async session for uri.", + warning, + endpoint_uri, + cached_session, + ) + + self.session_cache.pop(cache_key) + if not session_is_closed: + await cached_session.close() + self.logger.debug( + "Async session closed and evicted from cache: %s", + cached_session, + ) + + replacement_session = self._create_async_session() + cached_session, evicted_items = self.session_cache.cache( + cache_key, replacement_session + ) + self.logger.debug( + "Async session cached: %s, %s", endpoint_uri, cached_session + ) + + if evicted_items is not None: + evicted_sessions = list(evicted_items.values()) + for evicted_session in evicted_sessions: + self.logger.debug( + "Async session cache full. Session evicted from cache: %s", + evicted_session, + ) + timeout_total = DEFAULT_HTTP_TIMEOUT + if request_timeout is not None and request_timeout.total is not None: + timeout_total = request_timeout.total + asyncio.create_task( + self._async_close_evicted_sessions( + timeout_total + 0.1, + evicted_sessions, + ) + ) + + assert cached_session is not None + return cached_session + + @staticmethod + def _async_session_cache_key(endpoint_uri: URI) -> str: + return generate_cache_key(f"{id(asyncio.get_event_loop())}:{endpoint_uri}") + + class Web3Wrapper: DEFAULT_TIMEOUT = 5 @@ -315,6 +419,7 @@ class AsyncFailOverHTTPProvider(AsyncHTTPProvider): def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) + self._request_session_manager = KeepAliveHTTPSessionManager() self.endpoint_uri: URI | None = None self._resolved_endpoint_cache: ResolvedEndpointCache | None = None self._resolved_endpoint_lock = asyncio.Lock() diff --git a/tests/app/utils/web3_provider_cache_test.py b/tests/app/utils/web3_provider_cache_test.py index 79e43cf29..45d4ce220 100644 --- a/tests/app/utils/web3_provider_cache_test.py +++ b/tests/app/utils/web3_provider_cache_test.py @@ -18,7 +18,7 @@ from typing import cast import pytest -from aiohttp import ClientError +from aiohttp import ClientError, ClientTimeout from eth_typing import URI from requests.exceptions import ConnectionError from web3 import AsyncHTTPProvider, HTTPProvider @@ -167,3 +167,19 @@ async def fake_make_request( assert result == {"result": "http://fresh-node"} assert len(resolve_calls) == 1 + + +# Verify that the AsyncFailOverHTTPProvider keeps HTTP connections alive when using the cached endpoint. +@pytest.mark.asyncio +async def test_async_failover_http_provider_keeps_http_connections_alive(): + provider = web3_utils.AsyncFailOverHTTPProvider("http://127.0.0.1:8545") + + session = await provider._request_session_manager.async_cache_and_return_session( # type: ignore[attr-defined] + cast(URI, provider.endpoint_uri), + request_timeout=ClientTimeout(total=5), + ) + try: + assert session.connector is not None + assert session.connector.force_close is False + finally: + await session.close()