Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions s3proxy/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,22 @@ async def try_acquire_memory(bytes_needed: int) -> int:
return await _default.try_acquire(bytes_needed)


@contextlib.asynccontextmanager
async def reserve_memory(bytes_needed: int):
"""Reserve memory for the duration of a block, releasing on exit.

For operations whose real peak isn't reflected by the request body size
(e.g. server-side copies, which decrypt+re-encrypt the source), so they get
gated by the limiter like uploads instead of running unbounded.
"""
reserved = await _default.try_acquire(bytes_needed)
try:
yield
finally:
if reserved > 0:
await _default.release(reserved)


async def release_memory(bytes_reserved: int) -> None:
await _default.release(bytes_reserved)

Expand Down
16 changes: 16 additions & 0 deletions s3proxy/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,22 @@ def streaming_upload_peak(content_length: int) -> int:
return 2 * part + 2 * frame


def copy_pipeline_peak(plaintext_size: int) -> int:
"""Peak memory a server-side copy holds while decrypting + re-encrypting.

A copy request has no body, so the request-level memory limiter reserves
~nothing for it -- yet the copy reads the source, decrypts it and re-encrypts
it. Large copies stream in MAX_BUFFER_SIZE chunks, so their peak is the
pipeline (source chunk + plaintext buffer + dest ciphertext), independent of
object size; small copies buffer the whole object (~3x). Reserving this lets
the limiter BOUND concurrent copies instead of admitting an unbounded dedup
flood that OOMs the pod.
"""
if plaintext_size > STREAMING_THRESHOLD:
return 4 * MAX_BUFFER_SIZE
return max(MAX_BUFFER_SIZE, 3 * plaintext_size)


@dataclass(slots=True)
class EncryptedData:
"""Container for encrypted data and metadata."""
Expand Down
46 changes: 44 additions & 2 deletions s3proxy/handlers/multipart/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from fastapi import Request, Response
from structlog.stdlib import BoundLogger

from ... import crypto, xml_responses
from ... import concurrency, crypto, xml_responses
from ...client import S3Client, S3Credentials
from ...errors import S3Error
from ...state import (
Expand Down Expand Up @@ -213,6 +213,45 @@ async def _streaming_copy_part(
head_resp: dict,
src_metadata: dict,
plaintext_size: int,
) -> Response:
# UploadPartCopy carries no request body, so the request-level limiter
# reserved ~nothing -- but this streams the source through decrypt +
# re-encrypt. Reserve the pipeline peak so concurrent copies are bounded
# (a dedup flood otherwise runs unbounded and OOMs the pod).
async with concurrency.reserve_memory(crypto.copy_pipeline_peak(plaintext_size)):
return await self._streaming_copy_part_inner(
client,
bucket,
key,
upload_id,
part_num,
state,
src_bucket,
src_key,
copy_source_range,
src_wrapped_dek,
src_multipart_meta,
head_resp,
src_metadata,
plaintext_size,
)

async def _streaming_copy_part_inner(
self,
client: S3Client,
bucket: str,
key: str,
upload_id: str,
part_num: int,
state: MultipartUploadState,
src_bucket: str,
src_key: str,
copy_source_range: str | None,
src_wrapped_dek: str | None,
src_multipart_meta,
head_resp: dict,
src_metadata: dict,
plaintext_size: int,
) -> Response:
"""Stream-decrypt the source and encrypt+upload each chunk as an internal S3 part."""
chunk_size = crypto.calculate_optimal_part_size(plaintext_size)
Expand Down Expand Up @@ -321,8 +360,11 @@ async def _iter_copy_source(
else:
resp = await client.get_object(src_bucket, src_key, range_header=copy_source_range)
async with resp["Body"] as body:
# resp["Body"] enters as an aiohttp ClientResponse, whose read()
# takes no size arg; stream via its StreamReader in bounded chunks
# (body.read(n) raised TypeError and 500'd every passthrough copy).
while True:
chunk = await body.read(crypto.MAX_BUFFER_SIZE)
chunk = await body.content.read(crypto.MAX_BUFFER_SIZE)
if not chunk:
break
yield chunk
Expand Down
40 changes: 39 additions & 1 deletion s3proxy/handlers/objects/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from fastapi import Request, Response
from structlog.stdlib import BoundLogger

from ... import crypto, xml_responses
from ... import concurrency, crypto, xml_responses
from ...client import S3Client, S3Credentials
from ...errors import S3Error
from ...state import (
Expand Down Expand Up @@ -275,6 +275,44 @@ async def _copy_encrypted(
src_multipart_meta,
metadata_directive: str,
new_metadata: dict[str, str] | None,
) -> Response:
# Copies decrypt+re-encrypt the source in memory but carry no request
# body, so the request-level limiter reserved ~nothing. Reserve the copy
# pipeline peak here so concurrent copies are bounded (a dedup flood would
# otherwise run unbounded and OOM the pod).
if src_multipart_meta:
pt_size = src_multipart_meta.total_plaintext_size
else:
_s = head_resp.get("Metadata", {}).get("plaintext-size")
pt_size = int(_s) if _s else crypto.plaintext_size(head_resp.get("ContentLength", 0))
async with concurrency.reserve_memory(crypto.copy_pipeline_peak(pt_size)):
return await self._copy_encrypted_inner(
client,
bucket,
key,
content_type,
src_bucket,
src_key,
head_resp,
src_wrapped_dek,
src_multipart_meta,
metadata_directive,
new_metadata,
)

async def _copy_encrypted_inner(
self,
client: S3Client,
bucket: str,
key: str,
content_type: str | None,
src_bucket: str,
src_key: str,
head_resp: dict,
src_wrapped_dek: str | None,
src_multipart_meta,
metadata_directive: str,
new_metadata: dict[str, str] | None,
) -> Response:
logger.info(
"COPY_ENCRYPTED",
Expand Down
7 changes: 7 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ class MockS3Response:
def __init__(self, data: bytes):
self._buf = bytearray(data)

@property
def content(self):
# Real aiohttp exposes the StreamReader as resp.content; the streaming
# copy path reads it in bounded chunks via content.read(n). The buffer
# is shared, so chunked reads consume the same bytes as read().
return self

async def read(self, n: int = -1) -> bytes:
"""Read response body, optionally limited to n bytes."""
if n == -1:
Expand Down
75 changes: 75 additions & 0 deletions tests/unit/test_copy_memory_governing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""Server-side copies must be gated by the memory limiter.

A CopyObject / UploadPartCopy carries no request body, so the request-level
limiter reserved ~nothing for it -- yet each copy decrypts the source and
re-encrypts it in RAM. Under a dedup flood, dozens ran concurrently with nothing
throttling them and the pod was OOMKilled (reproduced locally: a 64-concurrent
copy flood at a 256MiB cap killed the pod, exit 137, 0/64 copies succeeded;
with the reservation it peaks ~195MiB and completes 64/64).

These tests pin the two halves of the fix:
1. copy_pipeline_peak() reports the real peak so the limiter reserves enough.
2. reserve_memory() actually bounds concurrent copies to the budget.
"""

import asyncio

import pytest

from s3proxy import concurrency, crypto

MB = 1024 * 1024


def test_copy_pipeline_peak_streamed_is_bounded():
# Large copies stream in MAX_BUFFER_SIZE chunks, so their peak is the
# pipeline (source chunk + plaintext + dest ciphertext), independent of size.
for size in (64 * MB, 512 * MB, 5 * 1024 * MB):
peak = crypto.copy_pipeline_peak(size)
assert peak == 4 * crypto.MAX_BUFFER_SIZE
assert peak < size # never scales with object size


def test_copy_pipeline_peak_small_is_three_x():
# Small copies buffer the whole object and re-encrypt it (~3x), floored at
# one buffer so a tiny copy still reserves something meaningful.
assert crypto.copy_pipeline_peak(0) == crypto.MAX_BUFFER_SIZE
assert crypto.copy_pipeline_peak(1 * MB) == crypto.MAX_BUFFER_SIZE # floor
assert crypto.copy_pipeline_peak(20 * MB) == 60 * MB # 3x


@pytest.mark.asyncio
async def test_reserve_memory_bounds_concurrent_copies():
# With a budget that fits ~2 copy pipelines, a flood of concurrent copies
# must never let active reservations exceed the budget -- that bound is what
# stops the OOM. Without the reservation, all of them would run at once.
# The unit conftest forces immediate rejection (timeout 0); this test needs
# the wait-then-succeed path, so restore a real timeout for its duration.
original_timeout = concurrency.BACKPRESSURE_TIMEOUT
concurrency.BACKPRESSURE_TIMEOUT = 30
limiter = concurrency._default
limiter.set_memory_limit(64) # 64MB budget
limiter.active_bytes = 0
per_copy = crypto.copy_pipeline_peak(64 * MB) # 32MB -> budget fits 2

peak_active = 0
inside = 0
max_inside = 0

async def one_copy():
nonlocal peak_active, inside, max_inside
async with concurrency.reserve_memory(per_copy):
inside += 1
max_inside = max(max_inside, inside)
peak_active = max(peak_active, limiter.active_bytes)
await asyncio.sleep(0.02) # hold the reservation
inside -= 1

try:
await asyncio.gather(*[one_copy() for _ in range(16)])
finally:
concurrency.BACKPRESSURE_TIMEOUT = original_timeout

assert peak_active <= limiter.limit_bytes # never overran the budget
assert max_inside <= 2 # 64MB / 32MB == at most 2 copies at once
assert limiter.active_bytes == 0 # everything released