Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions s3proxy/handlers/objects/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import hashlib
import xml.etree.ElementTree as ET
from datetime import UTC, datetime
from urllib.parse import quote

import structlog
from botocore.exceptions import ClientError
Expand All @@ -22,12 +23,18 @@
load_multipart_metadata,
save_multipart_metadata,
)
from ...state.metadata import _internal_meta_key
from ...utils import format_http_date, format_iso8601
from ...xml_utils import find_element, find_elements
from ..base import BaseHandler

logger: BoundLogger = structlog.get_logger(__name__)

# S3 CopyObject is a single server-side operation capped at 5 GiB; larger
# ciphertext objects fall back to the decrypt/re-encrypt streaming path.
# ponytail: 5 GiB ceiling; upgrade path is server-side UploadPartCopy per range.
MAX_SERVER_SIDE_COPY_BYTES = 5 * 1024**3


class MiscObjectMixin(BaseHandler):
async def handle_head_object(self, request: Request, creds: S3Credentials) -> Response:
Expand Down Expand Up @@ -198,6 +205,37 @@ async def handle_copy_object(self, request: Request, creds: S3Credentials) -> Re
request,
)

# Encrypted source. A same-credential plain COPY needs no re-encrypt:
# the ciphertext is self-describing and key-independent (GCM AAD is
# None, the DEK is random and stored in metadata / the sidecar,
# nonces are embedded), so a native server-side CopyObject yields a
# byte-identical, decryptable destination and skips the
# download+re-encrypt+re-upload amplification (Scylla Manager dedup).
# A cross-credential copy must re-key under the copier's KEK, and
# REPLACE / oversized objects also fall back to the re-encrypt path.
if src_multipart_meta:
src_kid = src_multipart_meta.kid
else:
src_kid = head_resp.get("Metadata", {}).get(self.settings.kidtag_name, "")
same_credential = bool(src_kid) and src_kid == client.credentials.access_key
ciphertext_size = head_resp.get("ContentLength", 0) or 0
if (
metadata_directive == "COPY"
and same_credential
and ciphertext_size <= MAX_SERVER_SIDE_COPY_BYTES
):
return await self._copy_passthrough_encrypted(
client,
bucket,
key,
content_type,
copy_source,
src_bucket,
src_key,
head_resp,
src_multipart_meta,
)

# Encrypted - need to decrypt and re-encrypt
return await self._copy_encrypted(
client,
Expand Down Expand Up @@ -262,6 +300,69 @@ async def _copy_passthrough(
media_type="application/xml",
)

async def _copy_passthrough_encrypted(
self,
client: S3Client,
bucket: str,
key: str,
content_type: str | None,
copy_source: str,
src_bucket: str,
src_key: str,
head_resp: dict,
src_multipart_meta,
) -> Response:
"""Server-side copy of an *encrypted* object, moving no bulk bytes.

The ciphertext, wrapped DEK and kid are self-describing and not bound to
the object key, so a native CopyObject with MetadataDirective=COPY yields
a byte-identical, decryptable destination. This collapses Scylla Manager's
dedup copies from a full download+re-encrypt+re-upload into a
metadata-only op and keeps them off the in-flight memory governor.
"""
logger.info(
"COPY_PASSTHROUGH_ENCRYPTED",
src_bucket=src_bucket,
src_key=src_key,
dest_bucket=bucket,
dest_key=key,
is_multipart=bool(src_multipart_meta),
)

resp = await client.copy_object(
bucket,
key,
copy_source,
metadata_directive="COPY",
content_type=content_type,
)

# Multipart objects keep their part/frame map in a separate sidecar
# object; the destination needs its own copy or the read path can't
# reconstruct (and decrypt) it.
if src_multipart_meta:
await client.copy_object(
bucket,
_internal_meta_key(key),
f"{src_bucket}/{quote(_internal_meta_key(src_key), safe='/')}",
metadata_directive="COPY",
)

# Encrypted objects report the plaintext md5 (client-etag), not the
# ciphertext ETag, to match GET/HEAD and the re-encrypt path.
src_metadata = head_resp.get("Metadata", {})
result = resp.get("CopyObjectResult", {})
etag = src_metadata.get("client-etag") or str(result.get("ETag", "")).strip('"')
last_modified = result.get("LastModified")
if hasattr(last_modified, "isoformat"):
last_modified = last_modified.isoformat().replace("+00:00", "Z")
else:
last_modified = format_iso8601(datetime.now(UTC))
return Response(
content=xml_responses.copy_object_result(etag, last_modified),
media_type="application/xml",
)

async def _copy_encrypted(
self,
client: S3Client,
Expand Down
150 changes: 150 additions & 0 deletions tests/integration/test_copy_passthrough.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
"""Server-side passthrough for encrypted COPY (Scylla Manager dedup path).

A plain COPY (MetadataDirective=COPY) of an encrypted object must be a native
server-side CopyObject, not a download + re-encrypt + re-upload. These tests
drive the real handler against the in-memory MockS3Client and assert:

- the source data is never downloaded and the destination is never re-uploaded
(i.e. no bulk-byte amplification), yet
- the destination round-trips back to the original plaintext, for both single
and multipart objects (multipart also copies the frame-map sidecar), and
- REPLACE still takes the decrypt/re-encrypt path.
"""

from unittest.mock import AsyncMock, MagicMock

import pytest

from s3proxy.handlers import S3ProxyHandler
from s3proxy.state import MultipartStateManager
from s3proxy.state.metadata import _internal_meta_key

BUCKET = "backups"


def _handler(settings, mock_s3, credentials):
handler = S3ProxyHandler(settings, settings.credentials_store, MultipartStateManager())
mock_s3.credentials = credentials
handler._client = MagicMock(return_value=mock_s3)
return handler


def _put_request(path, body):
req = MagicMock()
req.url.path = path
req.headers = {"content-length": str(len(body)), "content-type": "application/octet-stream"}
req.body = AsyncMock(return_value=body)
return req


def _stream_put_request(path, body, chunk=64 * 1024):
"""Force the streaming/multipart PUT path via UNSIGNED-PAYLOAD."""
req = MagicMock()
req.url.path = path
req.headers = {
"content-type": "application/octet-stream",
"x-amz-content-sha256": "UNSIGNED-PAYLOAD",
}

async def _stream():
for i in range(0, len(body), chunk):
yield body[i : i + chunk]

req.stream = _stream
return req


def _copy_request(dest_path, copy_source, directive="COPY"):
req = MagicMock()
req.url.path = dest_path
req.headers = {"x-amz-copy-source": copy_source, "x-amz-metadata-directive": directive}
return req


def _get_request(path):
req = MagicMock()
req.url.path = path
req.headers = {}
return req


async def _read(response) -> bytes:
if hasattr(response, "body_iterator"):
return b"".join([c async for c in response.body_iterator])
return response.body


def _keys_touched(history, op):
return [c[1].get("key") for c in history if c[0] == op]


@pytest.mark.asyncio
async def test_single_object_copy_is_server_side_passthrough(settings, mock_s3, credentials):
handler = _handler(settings, mock_s3, credentials)
await mock_s3.create_bucket(BUCKET)

body = b"scylla sstable component bytes" * 100
await handler.handle_put_object(_put_request(f"/{BUCKET}/sst/data.db", body), credentials)

mark = len(mock_s3.call_history)
await handler.handle_copy_object(
_copy_request(f"/{BUCKET}/sst/data.db.snap", f"/{BUCKET}/sst/data.db"), credentials
)
during = mock_s3.call_history[mark:]

# Native copy happened; no download of source data, no re-upload of dest.
assert "sst/data.db.snap" in _keys_touched(during, "copy_object")
assert "sst/data.db" not in _keys_touched(during, "get_object")
assert "sst/data.db.snap" not in _keys_touched(during, "put_object")

resp = await handler.handle_get_object(_get_request(f"/{BUCKET}/sst/data.db.snap"), credentials)
assert await _read(resp) == body


@pytest.mark.asyncio
async def test_multipart_object_copy_copies_sidecar_and_roundtrips(settings, mock_s3, credentials):
handler = _handler(settings, mock_s3, credentials)
await mock_s3.create_bucket(BUCKET)

body = b"m" * (256 * 1024) # streamed as multiple parts -> real sidecar
await handler.handle_put_object(_stream_put_request(f"/{BUCKET}/sst/big.db", body), credentials)
# sanity: a multipart sidecar exists for the source
assert mock_s3._key(BUCKET, _internal_meta_key("sst/big.db")) in mock_s3.objects

mark = len(mock_s3.call_history)
await handler.handle_copy_object(
_copy_request(f"/{BUCKET}/sst/big.db.snap", f"/{BUCKET}/sst/big.db"), credentials
)
during = mock_s3.call_history[mark:]

copied = _keys_touched(during, "copy_object")
assert "sst/big.db.snap" in copied # assembled ciphertext, server-side
assert _internal_meta_key("sst/big.db.snap") in copied # sidecar, server-side
assert "sst/big.db" not in _keys_touched(during, "get_object") # no bulk download
assert "sst/big.db.snap" not in _keys_touched(during, "put_object") # no re-upload

resp = await handler.handle_get_object(_get_request(f"/{BUCKET}/sst/big.db.snap"), credentials)
assert await _read(resp) == body


@pytest.mark.asyncio
async def test_replace_directive_still_reencrypts(settings, mock_s3, credentials):
handler = _handler(settings, mock_s3, credentials)
await mock_s3.create_bucket(BUCKET)

body = b"needs a real re-encrypt because metadata changes"
await handler.handle_put_object(_put_request(f"/{BUCKET}/a.db", body), credentials)

mark = len(mock_s3.call_history)
await handler.handle_copy_object(
_copy_request(f"/{BUCKET}/b.db", f"/{BUCKET}/a.db", directive="REPLACE"), credentials
)
during = mock_s3.call_history[mark:]

# REPLACE must NOT use server-side copy; it re-reads and re-writes.
assert "b.db" not in _keys_touched(during, "copy_object")
assert "a.db" in _keys_touched(during, "get_object")
assert "b.db" in _keys_touched(during, "put_object")

resp = await handler.handle_get_object(_get_request(f"/{BUCKET}/b.db"), credentials)
assert await _read(resp) == body