Skip to content

Commit 0d0da61

Browse files
perf: reuse one aiobotocore client per credential set
Creating a client per request built a fresh aiohttp connector + SSLContext that loads the whole CA store each time -- memray showed ~9.5M allocations in _create_connector and repeated load_default_certs. Clients are pool-safe and meant to be long-lived, so cache one per (endpoint, key, region) and close them on shutdown. Big CPU/allocation win; modest (~5MB) memory.
1 parent 9cea3ad commit 0d0da61

4 files changed

Lines changed: 157 additions & 21 deletions

File tree

s3proxy/app.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from structlog.stdlib import BoundLogger
2323

2424
from . import concurrency
25-
from .client import SigV4Verifier
25+
from .client import SigV4Verifier, close_cached_clients
2626
from .config import Settings
2727
from .errors import S3Error, get_s3_error_code
2828
from .handlers import S3ProxyHandler
@@ -200,6 +200,7 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
200200
await stats_store.aclose() # flush buffered samples before Redis closes
201201
await close_redis()
202202
await close_http_client()
203+
await close_cached_clients()
203204
logger.info("Shutting down")
204205

205206
return lifespan

s3proxy/client/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""S3 client layer - credentials, verification, and API wrapper."""
22

3-
from .s3 import S3Client, get_shared_session
3+
from .s3 import S3Client, close_cached_clients, get_shared_session
44
from .types import ParsedRequest, S3Credentials
55
from .verifier import CLOCK_SKEW_TOLERANCE, SigV4Verifier, _derive_signing_key
66

@@ -11,5 +11,6 @@
1111
"S3Credentials",
1212
"SigV4Verifier",
1313
"_derive_signing_key",
14+
"close_cached_clients",
1415
"get_shared_session",
1516
]

s3proxy/client/s3.py

Lines changed: 61 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
from __future__ import annotations
44

5+
import asyncio
6+
import contextlib
57
import time
68
from typing import TYPE_CHECKING, Any
79

@@ -29,6 +31,33 @@ def get_shared_session() -> aioboto3.Session:
2931
return _shared_session
3032

3133

34+
# Long-lived aiobotocore clients, keyed by (endpoint, access_key, region). Creating
35+
# a client per request builds a fresh aiohttp connector + SSLContext that loads the
36+
# whole CA store each time -- large native (OpenSSL) allocations invisible to
37+
# tracemalloc, which pile up under concurrency and dominate RSS (memray: millions
38+
# of allocations in _create_connector / load_default_certs). aiobotocore clients
39+
# are pool-safe and meant to be reused, so we cache one per credential set and
40+
# keep it open for the app's lifetime (closed via close_cached_clients on shutdown).
41+
_client_cache: dict[tuple, tuple[Any, Any]] = {}
42+
_client_cache_lock: asyncio.Lock | None = None
43+
44+
45+
def _cache_lock() -> asyncio.Lock:
46+
global _client_cache_lock
47+
if _client_cache_lock is None:
48+
_client_cache_lock = asyncio.Lock()
49+
return _client_cache_lock
50+
51+
52+
async def close_cached_clients() -> None:
53+
"""Close all cached aiobotocore clients (call on app shutdown)."""
54+
async with _cache_lock():
55+
for ctx, _client in _client_cache.values():
56+
with contextlib.suppress(Exception):
57+
await ctx.__aexit__(None, None, None)
58+
_client_cache.clear()
59+
60+
3261
def _add_optional_kwargs(kwargs: dict[str, Any], **optional: Any) -> None:
3362
"""Add non-None optional kwargs to the dict."""
3463
for key, value in optional.items():
@@ -63,28 +92,41 @@ def __init__(self, settings: Settings, credentials: S3Credentials):
6392
self._client_context = None
6493

6594
async def __aenter__(self):
66-
"""Enter async context - create client from shared session."""
67-
# Use shared session to avoid loading JSON service models repeatedly
68-
# Each new session costs ~30-150MB for botocore service definitions
69-
session = get_shared_session()
70-
self._client_context = session.client(
71-
"s3",
72-
endpoint_url=self.settings.s3_endpoint,
73-
config=self._config,
74-
aws_access_key_id=self.credentials.access_key,
75-
aws_secret_access_key=self.credentials.secret_key,
76-
region_name=self.credentials.region,
77-
)
78-
self._cached_client = await self._client_context.__aenter__()
95+
"""Enter async context - reuse a long-lived client for these credentials."""
96+
self._cached_client = await self._get_or_create_client()
7997
return self
8098

99+
async def _get_or_create_client(self):
100+
key = (
101+
self.settings.s3_endpoint,
102+
self.credentials.access_key,
103+
self.credentials.secret_key,
104+
self.credentials.region,
105+
)
106+
cached = _client_cache.get(key)
107+
if cached is not None:
108+
return cached[1]
109+
async with _cache_lock():
110+
cached = _client_cache.get(key) # double-checked under lock
111+
if cached is not None:
112+
return cached[1]
113+
# Shared session avoids reloading botocore JSON models per client.
114+
ctx = get_shared_session().client(
115+
"s3",
116+
endpoint_url=self.settings.s3_endpoint,
117+
config=self._config,
118+
aws_access_key_id=self.credentials.access_key,
119+
aws_secret_access_key=self.credentials.secret_key,
120+
region_name=self.credentials.region,
121+
)
122+
client = await ctx.__aenter__()
123+
_client_cache[key] = (ctx, client)
124+
return client
125+
81126
async def __aexit__(self, exc_type, exc_val, exc_tb):
82-
"""Exit async context - clean up client."""
83-
if self._client_context is not None:
84-
await self._client_context.__aexit__(exc_type, exc_val, exc_tb)
85-
self._cached_client = None
86-
self._client_context = None
87-
logger.debug("Cleaned up S3 client context")
127+
"""Exit - the client is cached and shared, so it stays open (closed on
128+
app shutdown via close_cached_clients). Nothing to tear down per request."""
129+
self._cached_client = None
88130

89131
async def get_object(
90132
self,

tests/unit/test_client_reuse.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
"""S3Client must reuse one aiobotocore client per credential set.
2+
3+
Creating a client per request builds a fresh aiohttp connector + SSLContext that
4+
loads the whole CA store each time -- large native allocations (invisible to
5+
tracemalloc) that pile up under concurrency and drive RSS. memray showed millions
6+
of allocations in _create_connector / load_default_certs. Clients are pool-safe
7+
and meant to be long-lived, so the wrapper caches one per (endpoint, key, region)
8+
and reuses it. These tests pin that: same creds => one underlying client; distinct
9+
creds => distinct clients; and they're closed on shutdown.
10+
"""
11+
12+
import pytest
13+
14+
from s3proxy.client import s3
15+
from s3proxy.client.types import S3Credentials
16+
17+
18+
class _FakeCtx:
19+
def __init__(self, client):
20+
self._client = client
21+
22+
async def __aenter__(self):
23+
return self._client
24+
25+
async def __aexit__(self, *a):
26+
self._client.closed = True
27+
return False
28+
29+
30+
class _FakeClient:
31+
def __init__(self, n):
32+
self.n = n
33+
self.closed = False
34+
35+
36+
class _FakeSession:
37+
def __init__(self):
38+
self.calls = 0
39+
40+
def client(self, *a, **k):
41+
self.calls += 1
42+
return _FakeCtx(_FakeClient(self.calls))
43+
44+
45+
class _Settings:
46+
s3_endpoint = "http://minio:9000"
47+
48+
49+
def _creds(key="AKIA1"):
50+
return S3Credentials(access_key=key, secret_key="s", region="us-east-1")
51+
52+
53+
@pytest.fixture(autouse=True)
54+
async def _clean(monkeypatch):
55+
fake = _FakeSession()
56+
monkeypatch.setattr(s3, "get_shared_session", lambda: fake)
57+
await s3.close_cached_clients()
58+
yield fake
59+
await s3.close_cached_clients()
60+
61+
62+
@pytest.mark.asyncio
63+
async def test_same_credentials_reuse_one_client(_clean):
64+
settings = _Settings()
65+
async with s3.S3Client(settings, _creds()) as c1:
66+
first = c1._cached_client
67+
async with s3.S3Client(settings, _creds()) as c2:
68+
second = c2._cached_client
69+
assert first is second # reused
70+
assert _clean.calls == 1 # session.client() called once, not per request
71+
assert first.closed is False # not torn down between requests
72+
73+
74+
@pytest.mark.asyncio
75+
async def test_distinct_credentials_distinct_clients(_clean):
76+
settings = _Settings()
77+
async with s3.S3Client(settings, _creds("AKIA1")) as c1:
78+
a = c1._cached_client
79+
async with s3.S3Client(settings, _creds("AKIA2")) as c2:
80+
b = c2._cached_client
81+
assert a is not b
82+
assert _clean.calls == 2
83+
84+
85+
@pytest.mark.asyncio
86+
async def test_close_cached_clients_tears_down(_clean):
87+
settings = _Settings()
88+
async with s3.S3Client(settings, _creds()) as c:
89+
client = c._cached_client
90+
await s3.close_cached_clients()
91+
assert client.closed is True
92+
assert len(s3._client_cache) == 0

0 commit comments

Comments
 (0)