Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions langfuse/_task_manager/media_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,36 @@ def _process_upload_media_job(
headers["x-ms-blob-type"] = "BlockBlob"
headers["x-amz-checksum-sha256"] = data["content_sha256_hash"]

def _upload_with_status_check() -> httpx.Response:
response = self._httpx_client.put(
upload_url,
headers=headers,
content=data["content_bytes"],
)
response.raise_for_status()

return response

upload_start_time = time.time()
upload_response = self._request_with_backoff(
self._httpx_client.put,
upload_url,
headers=headers,
content=data["content_bytes"],
)

try:
upload_response = self._request_with_backoff(_upload_with_status_check)
except httpx.HTTPStatusError as e:
upload_time_ms = int((time.time() - upload_start_time) * 1000)
failed_response = e.response

if failed_response is not None:
self._request_with_backoff(
self._api_client.media.patch,
media_id=data["media_id"],
uploaded_at=_get_timestamp(),
upload_http_status=failed_response.status_code,
upload_http_error=failed_response.text,
upload_time_ms=upload_time_ms,
)

raise

upload_time_ms = int((time.time() - upload_start_time) * 1000)

self._request_with_backoff(
Expand Down
80 changes: 80 additions & 0 deletions tests/test_media_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from queue import Queue
from types import SimpleNamespace
from unittest.mock import Mock

import httpx
import pytest

from langfuse._task_manager.media_manager import MediaManager


def _upload_response(status_code: int, text: str = "") -> httpx.Response:
request = httpx.Request("PUT", "https://example.com/upload")
return httpx.Response(status_code=status_code, request=request, text=text)


def _upload_job() -> dict:
return {
"media_id": "media-id",
"content_bytes": b"payload",
"content_type": "image/jpeg",
"content_length": 7,
"content_sha256_hash": "sha256hash",
"trace_id": "trace-id",
"observation_id": None,
"field": "input",
}


def test_media_upload_retries_on_retryable_http_status():
media_api = Mock()
media_api.get_upload_url.return_value = SimpleNamespace(
upload_url="https://example.com/upload",
media_id="media-id",
)
media_api.patch.return_value = None

httpx_client = Mock()
httpx_client.put.side_effect = [
_upload_response(503, "temporary failure"),
_upload_response(200, "ok"),
]

manager = MediaManager(
api_client=SimpleNamespace(media=media_api),
httpx_client=httpx_client,
media_upload_queue=Queue(),
max_retries=3,
)

manager._process_upload_media_job(data=_upload_job())

assert httpx_client.put.call_count == 2
media_api.patch.assert_called_once()
assert media_api.patch.call_args.kwargs["upload_http_status"] == 200


def test_media_upload_gives_up_on_non_retryable_http_status():
media_api = Mock()
media_api.get_upload_url.return_value = SimpleNamespace(
upload_url="https://example.com/upload",
media_id="media-id",
)
media_api.patch.return_value = None

httpx_client = Mock()
httpx_client.put.return_value = _upload_response(403, "forbidden")

manager = MediaManager(
api_client=SimpleNamespace(media=media_api),
httpx_client=httpx_client,
media_upload_queue=Queue(),
max_retries=3,
)

with pytest.raises(httpx.HTTPStatusError):
manager._process_upload_media_job(data=_upload_job())

assert httpx_client.put.call_count == 1
media_api.patch.assert_called_once()
assert media_api.patch.call_args.kwargs["upload_http_status"] == 403
Loading