Skip to content

Commit ceefc47

Browse files
fix: parallelize per-object HEAD on list-objects to stop recursive-list stalls
ListObjects resolved the SSE plaintext size/etag with one sequential head_object per key. A recursive (empty-delimiter) list of up to max-keys objects stacked into a multi-second stall that tripped client timeouts, hanging ClickHouse/Postgres remote backups at the S3 list step. Run the HEADs concurrently, bounded by LIST_HEAD_CONCURRENCY (50), preserving output order and the per-object fallback to the listed size/etag.
1 parent 25cb3dd commit ceefc47

2 files changed

Lines changed: 157 additions & 22 deletions

File tree

s3proxy/handlers/buckets.py

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020

2121
logger: BoundLogger = structlog.get_logger()
2222

23+
# Max concurrent HEAD requests when resolving plaintext sizes for a list page.
24+
LIST_HEAD_CONCURRENCY = 50
25+
2326

2427
def _strip_minio_cache_suffix(value: str | None) -> str | None:
2528
"""Strip MinIO cache metadata suffix from marker/token values.
@@ -170,28 +173,32 @@ def _is_internal_key(self, key: str) -> bool:
170173
)
171174

172175
async def _process_list_objects(self, client, bucket: str, contents: list[dict]) -> list[dict]:
173-
objects = []
174-
for obj in contents:
175-
if self._is_internal_key(obj["Key"]):
176-
continue
177-
try:
178-
head = await client.head_object(bucket, obj["Key"])
179-
meta = head.get("Metadata", {})
180-
size = self._get_plaintext_size(meta, obj.get("Size", 0))
181-
etag = self._get_effective_etag(meta, obj.get("ETag", ""))
182-
except Exception:
183-
size, etag = obj.get("Size", 0), obj.get("ETag", "").strip('"')
184-
185-
objects.append(
186-
{
187-
"key": obj["Key"],
188-
"last_modified": obj["LastModified"].isoformat(),
189-
"etag": etag,
190-
"size": size,
191-
"storage_class": obj.get("StorageClass", "STANDARD"),
192-
}
193-
)
194-
return objects
176+
# One HEAD per object is needed to recover the SSE plaintext size/etag.
177+
# Run them concurrently (bounded) — sequential HEADs on a recursive list
178+
# of up to max-keys objects stack into a multi-second stall that trips
179+
# client timeouts. ponytail: fixed fan-out cap; raise LIST_HEAD_CONCURRENCY
180+
# if backend HEAD latency dominates large pages.
181+
listable = [obj for obj in contents if not self._is_internal_key(obj["Key"])]
182+
sem = asyncio.Semaphore(LIST_HEAD_CONCURRENCY)
183+
184+
async def resolve(obj: dict) -> dict:
185+
async with sem:
186+
try:
187+
head = await client.head_object(bucket, obj["Key"])
188+
meta = head.get("Metadata", {})
189+
size = self._get_plaintext_size(meta, obj.get("Size", 0))
190+
etag = self._get_effective_etag(meta, obj.get("ETag", ""))
191+
except Exception:
192+
size, etag = obj.get("Size", 0), obj.get("ETag", "").strip('"')
193+
return {
194+
"key": obj["Key"],
195+
"last_modified": obj["LastModified"].isoformat(),
196+
"etag": etag,
197+
"size": size,
198+
"storage_class": obj.get("StorageClass", "STANDARD"),
199+
}
200+
201+
return await asyncio.gather(*[resolve(obj) for obj in listable])
195202

196203
async def handle_create_bucket(self, request: Request, creds: S3Credentials) -> Response:
197204
bucket = self._parse_bucket(request.url.path)
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
"""Self-check for the parallel HEAD fan-out in BucketHandlerMixin._process_list_objects.
2+
3+
Sequential HEADs on a recursive list stack into a client-timeout-tripping stall.
4+
This proves: HEADs run concurrently, output order matches input order, internal
5+
keys are skipped, and a failing HEAD falls back to the listed size/etag.
6+
7+
The repo's package __init__ chain currently pulls in modules with pre-existing
8+
Py2 `except A, B:` syntax (utils.py, dashboard/*) that won't import under Py3,
9+
so we load buckets.py directly with stubbed siblings to exercise the real code.
10+
"""
11+
12+
import asyncio
13+
import datetime as dt
14+
import importlib.util
15+
import sys
16+
import types
17+
from pathlib import Path
18+
19+
REPO = Path(__file__).resolve().parents[2]
20+
21+
22+
def _load_buckets():
23+
def stub(name, **attrs):
24+
m = types.ModuleType(name)
25+
for k, v in attrs.items():
26+
setattr(m, k, v)
27+
sys.modules[name] = m
28+
return m
29+
30+
stub("s3proxy")
31+
stub("s3proxy.handlers")
32+
stub("s3proxy.xml_responses")
33+
stub("s3proxy.client", S3Credentials=object)
34+
stub("s3proxy.errors", S3Error=type("S3Error", (Exception,), {}))
35+
stub(
36+
"s3proxy.state",
37+
INTERNAL_PREFIX="s3proxy-internal/",
38+
META_SUFFIX_LEGACY=".s3proxy-meta",
39+
delete_multipart_metadata=lambda *a, **k: None,
40+
)
41+
stub("s3proxy.xml_utils", find_element=lambda *a, **k: None, find_elements=lambda *a, **k: [])
42+
stub("s3proxy.handlers.base", BaseHandler=object)
43+
44+
spec = importlib.util.spec_from_file_location(
45+
"s3proxy.handlers.buckets", REPO / "s3proxy" / "handlers" / "buckets.py"
46+
)
47+
mod = importlib.util.module_from_spec(spec)
48+
sys.modules["s3proxy.handlers.buckets"] = mod
49+
spec.loader.exec_module(mod)
50+
return mod
51+
52+
53+
buckets = _load_buckets()
54+
INTERNAL_PREFIX = "s3proxy-internal/"
55+
56+
57+
class FakeHandler:
58+
_process_list_objects = buckets.BucketHandlerMixin._process_list_objects
59+
60+
def __init__(self):
61+
self.inflight = 0
62+
self.max_inflight = 0
63+
64+
def _is_internal_key(self, key):
65+
return key.startswith(INTERNAL_PREFIX)
66+
67+
def _get_plaintext_size(self, meta, fallback):
68+
return int(meta.get("plaintext-size", fallback))
69+
70+
def _get_effective_etag(self, meta, fallback):
71+
return meta.get("client-etag", fallback.strip('"'))
72+
73+
74+
class FakeClient:
75+
def __init__(self, handler, fail_key=None):
76+
self.handler = handler
77+
self.fail_key = fail_key
78+
79+
async def head_object(self, bucket, key):
80+
self.handler.inflight += 1
81+
self.handler.max_inflight = max(self.handler.max_inflight, self.handler.inflight)
82+
try:
83+
await asyncio.sleep(0.02) # simulate backend round-trip
84+
if key == self.fail_key:
85+
raise RuntimeError("backend HEAD failed")
86+
return {"Metadata": {"plaintext-size": "111", "client-etag": f"etag-{key}"}}
87+
finally:
88+
self.handler.inflight -= 1
89+
90+
91+
def _obj(key, size=999):
92+
return {
93+
"Key": key,
94+
"Size": size,
95+
"ETag": '"raw-etag"',
96+
"LastModified": dt.datetime(2026, 6, 24, 9, 0, 0),
97+
"StorageClass": "STANDARD",
98+
}
99+
100+
101+
def test_parallel_order_and_fallback():
102+
handler = FakeHandler()
103+
client = FakeClient(handler, fail_key="b")
104+
contents = [
105+
_obj("a"),
106+
_obj(f"{INTERNAL_PREFIX}skip-me"), # internal -> dropped
107+
_obj("b", size=42), # HEAD fails -> fallback to listed size/etag
108+
_obj("c"),
109+
]
110+
111+
result = asyncio.run(handler._process_list_objects(client, "bucket", contents))
112+
113+
# Internal key dropped, order preserved.
114+
assert [o["key"] for o in result] == ["a", "b", "c"]
115+
# Successful HEAD -> plaintext size + client-etag.
116+
assert result[0]["size"] == 111
117+
assert result[0]["etag"] == "etag-a"
118+
# Failed HEAD -> fallback to listed size + stripped raw etag.
119+
assert result[1]["size"] == 42
120+
assert result[1]["etag"] == "raw-etag"
121+
# HEADs actually ran concurrently (would be 1 if sequential), and stayed bounded.
122+
assert handler.max_inflight > 1
123+
assert handler.max_inflight <= buckets.LIST_HEAD_CONCURRENCY
124+
125+
126+
if __name__ == "__main__":
127+
test_parallel_order_and_fallback()
128+
print("ok")

0 commit comments

Comments
 (0)