Skip to content

Commit a0761ac

Browse files
kylebarrond-v-b
andauthored
fix: Special-case suffix requests in obstore backend to support Azure (#2994)
* Special-case suffix requests in obstore backend Co-authored-by: Davis Bennett <davis.v.bennett@gmail.com>
1 parent bb55f0c commit a0761ac

File tree

1 file changed

+85
-7
lines changed

1 file changed

+85
-7
lines changed

src/zarr/storage/_obstore.py

Lines changed: 85 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,25 @@ async def get(
106106
)
107107
return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type]
108108
elif isinstance(byte_range, SuffixByteRequest):
109-
resp = await obs.get_async(
110-
self.store, key, options={"range": {"suffix": byte_range.suffix}}
111-
)
112-
return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type]
109+
# some object stores (Azure) don't support suffix requests. In this
110+
# case, our workaround is to first get the length of the object and then
111+
# manually request the byte range at the end.
112+
try:
113+
resp = await obs.get_async(
114+
self.store, key, options={"range": {"suffix": byte_range.suffix}}
115+
)
116+
return prototype.buffer.from_bytes(await resp.bytes_async()) # type: ignore[arg-type]
117+
except obs.exceptions.NotSupportedError:
118+
head_resp = await obs.head_async(self.store, key)
119+
file_size = head_resp["size"]
120+
suffix_len = byte_range.suffix
121+
buffer = await obs.get_range_async(
122+
self.store,
123+
key,
124+
start=file_size - suffix_len,
125+
length=suffix_len,
126+
)
127+
return prototype.buffer.from_bytes(buffer) # type: ignore[arg-type]
113128
else:
114129
raise ValueError(f"Unexpected byte_range, got {byte_range}")
115130
except _ALLOWED_EXCEPTIONS:
@@ -265,10 +280,29 @@ class _OtherRequest(TypedDict):
265280
path: str
266281
"""The path to request from."""
267282

268-
range: OffsetRange | SuffixRange | None
283+
range: OffsetRange | None
284+
# Note: suffix requests are handled separately because some object stores (Azure)
285+
# don't support them
269286
"""The range request type."""
270287

271288

289+
class _SuffixRequest(TypedDict):
290+
"""Offset or suffix range requests.
291+
292+
These requests cannot be concurrent on the Rust side, and each need their own call
293+
to `obstore.get_async`, passing in the `range` parameter.
294+
"""
295+
296+
original_request_index: int
297+
"""The positional index in the original key_ranges input"""
298+
299+
path: str
300+
"""The path to request from."""
301+
302+
range: SuffixRange
303+
"""The suffix range."""
304+
305+
272306
class _Response(TypedDict):
273307
"""A response buffer associated with the original index that it should be restored to."""
274308

@@ -317,7 +351,7 @@ async def _make_other_request(
317351
prototype: BufferPrototype,
318352
semaphore: asyncio.Semaphore,
319353
) -> list[_Response]:
320-
"""Make suffix or offset requests.
354+
"""Make offset or full-file requests.
321355
322356
We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all
323357
futures can be gathered together.
@@ -339,6 +373,46 @@ async def _make_other_request(
339373
]
340374

341375

376+
async def _make_suffix_request(
377+
store: _UpstreamObjectStore,
378+
request: _SuffixRequest,
379+
prototype: BufferPrototype,
380+
semaphore: asyncio.Semaphore,
381+
) -> list[_Response]:
382+
"""Make suffix requests.
383+
384+
This is separated out from `_make_other_request` because some object stores (Azure)
385+
don't support suffix requests. In this case, our workaround is to first get the
386+
length of the object and then manually request the byte range at the end.
387+
388+
We return a `list[_Response]` for symmetry with `_make_bounded_requests` so that all
389+
futures can be gathered together.
390+
"""
391+
import obstore as obs
392+
393+
async with semaphore:
394+
try:
395+
resp = await obs.get_async(store, request["path"], options={"range": request["range"]})
396+
buffer = await resp.bytes_async()
397+
except obs.exceptions.NotSupportedError:
398+
head_resp = await obs.head_async(store, request["path"])
399+
file_size = head_resp["size"]
400+
suffix_len = request["range"]["suffix"]
401+
buffer = await obs.get_range_async(
402+
store,
403+
request["path"],
404+
start=file_size - suffix_len,
405+
length=suffix_len,
406+
)
407+
408+
return [
409+
{
410+
"original_request_index": request["original_request_index"],
411+
"buffer": prototype.buffer.from_bytes(buffer), # type: ignore[arg-type]
412+
}
413+
]
414+
415+
342416
async def _get_partial_values(
343417
store: _UpstreamObjectStore,
344418
prototype: BufferPrototype,
@@ -358,6 +432,7 @@ async def _get_partial_values(
358432
key_ranges = list(key_ranges)
359433
per_file_bounded_requests: dict[str, list[_BoundedRequest]] = defaultdict(list)
360434
other_requests: list[_OtherRequest] = []
435+
suffix_requests: list[_SuffixRequest] = []
361436

362437
for idx, (path, byte_range) in enumerate(key_ranges):
363438
if byte_range is None:
@@ -381,7 +456,7 @@ async def _get_partial_values(
381456
}
382457
)
383458
elif isinstance(byte_range, SuffixByteRequest):
384-
other_requests.append(
459+
suffix_requests.append(
385460
{
386461
"original_request_index": idx,
387462
"path": path,
@@ -402,6 +477,9 @@ async def _get_partial_values(
402477
for request in other_requests:
403478
futs.append(_make_other_request(store, request, prototype, semaphore=semaphore)) # noqa: PERF401
404479

480+
for suffix_request in suffix_requests:
481+
futs.append(_make_suffix_request(store, suffix_request, prototype, semaphore=semaphore)) # noqa: PERF401
482+
405483
buffers: list[Buffer | None] = [None] * len(key_ranges)
406484

407485
for responses in await asyncio.gather(*futs):

0 commit comments

Comments
 (0)