Skip to content

fix: Address some issues in the split pdf logic #170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Sep 10, 2024
16 changes: 13 additions & 3 deletions _test_unstructured_client/integration/test_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,19 +291,27 @@ async def test_split_pdf_requests_do_retry(monkeypatch):
"""
Test that when we split a pdf, the split requests will honor retryable errors.
"""
mock_endpoint_called = False
number_of_split_502s = 2
number_of_last_page_502s = 2

async def mock_send(_, request):
async def mock_send(_, request: httpx.Request):
"""
Return a predefined number of 502s for requests with certain starting_page_number values.

This is because N-1 splits are sent off in the hook logic. These need explicit retry handling.
The final split is returned to the SDK and gets the built in retry code.
This is to make sure specific portions of the doc are retried properly.

We want to make sure both code paths are retried.
"""
# Assert that the SDK issues our no-op request
# returned by the BeforeRequestHook
nonlocal mock_endpoint_called
if request.url.host == "no-op":
mock_endpoint_called = True
return Response(200, request=request)

request_body = request.read()

decoded_body = MultipartDecoder(request_body, request.headers.get("Content-Type"))
form_data = form_utils.parse_form_data(decoded_body)

Expand Down Expand Up @@ -360,4 +368,6 @@ async def mock_send(_, request):

assert number_of_split_502s == 0
assert number_of_last_page_502s == 0
assert mock_endpoint_called

assert res.status_code == 200
23 changes: 23 additions & 0 deletions _test_unstructured_client/integration/test_integration_freemium.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,29 @@ def test_partition_handling_server_error(error, split_pdf, monkeypatch, doc_path
response = client.general.partition(request=req)


@pytest.mark.asyncio
async def test_partition_async_returns_elements(client, doc_path):
filename = "layout-parser-paper.pdf"
with open(doc_path / filename, "rb") as f:
files = shared.Files(
content=f.read(),
file_name=filename,
)

req = operations.PartitionRequest(
partition_parameters=shared.PartitionParameters(
files=files,
strategy="fast",
languages=["eng"],
split_pdf_page=True,
)
)

response = await client.general.partition_async(request=req)
assert response.status_code == 200
assert len(response.elements)


def test_uvloop_partitions_without_errors(client, doc_path):
async def call_api():
filename = "layout-parser-paper-fast.pdf"
Expand Down
24 changes: 2 additions & 22 deletions _test_unstructured_client/unit/test_split_pdf_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from collections import Counter
from typing import Coroutine

import httpx
import pytest
import requests
from requests_toolbelt import MultipartDecoder, MultipartEncoder
Expand All @@ -27,18 +28,6 @@
from unstructured_client.models import shared


def test_unit_sdk_init():
"""Test sdk init method properly sets the client."""
hook = SplitPdfHook()
# This is a fake URL, test doesn't make an API call
test_url = "http://localhost:5000"
test_client = requests.Session()

hook.sdk_init(test_url, test_client)

assert hook.client == test_client


def test_unit_clear_operation():
"""Test clear operation method properly clears request/response data."""
hook = SplitPdfHook()
Expand Down Expand Up @@ -105,21 +94,12 @@ def test_unit_prepare_request_headers():
def test_unit_create_response():
"""Test create response method properly overrides body elements and Content-Length header."""
test_elements = [{"key": "value"}, {"key_2": "value"}]
test_response = requests.Response()
test_response.status_code = 200
test_response._content = b'[{"key_2": "value"}]'
test_response.headers = requests.structures.CaseInsensitiveDict(
{
"Content-Type": "application/json",
"Content-Length": len(test_response._content),
}
)

expected_status_code = 200
expected_content = b'[{"key": "value"}, {"key_2": "value"}]'
expected_content_length = "38"

response = request_utils.create_response(test_response, test_elements)
response = request_utils.create_response(test_elements)

assert response.status_code, expected_status_code
assert response._content, expected_content
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from urllib.parse import ParseResult, urlparse, urlunparse

from unstructured_client._hooks.types import SDKInitHook
from unstructured_client.httpclient import HttpClient
from unstructured_client.httpclient import HttpClient, AsyncHttpClient


class CleanServerUrlSDKInitHook(SDKInitHook):
Expand All @@ -25,9 +25,9 @@ def clean_server_url(self, base_url) -> str:
return urlunparse(parsed_url._replace(path=""))

def sdk_init(
self, base_url: str, client: HttpClient
) -> Tuple[str, HttpClient]:
self, base_url: str, client: HttpClient, async_client: AsyncHttpClient
) -> Tuple[str, HttpClient, AsyncHttpClient]:
"""Concrete implementation for SDKInitHook."""
cleaned_url = self.clean_server_url(base_url)

return cleaned_url, client
return cleaned_url, client, async_client
15 changes: 9 additions & 6 deletions src/unstructured_client/_hooks/custom/logger_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
SDKInitHook,
AfterSuccessHook,
)
from unstructured_client.httpclient import HttpClient
from unstructured_client.httpclient import HttpClient, AsyncHttpClient
from collections import defaultdict

logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME)
Expand Down Expand Up @@ -46,17 +46,20 @@ def log_retries(self, response: Optional[httpx.Response], error: Optional[Excep


def sdk_init(
self, base_url: str, client: HttpClient
) -> Tuple[str, HttpClient]:
self, base_url: str, client: HttpClient, async_client: AsyncHttpClient
) -> Tuple[str, HttpClient, AsyncHttpClient]:
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
return base_url, client
return base_url, client, async_client

def after_success(
self, hook_ctx: AfterSuccessContext, response: httpx.Response
) -> Union[httpx.Response, Exception]:
self.retries_counter.pop(hook_ctx.operation_id, None)
# NOTE: In case of split page partition this means - at least one of the splits was partitioned successfully
logger.info("Successfully partitioned the document.")
# Note(austin) - pdf splitting returns a mock request
# so we always reach the AfterSuccessHook
# This doesn't mean the splits succeeded
# Need to revisit our logging strategy
# logger.info("Successfully partitioned the document.")
return response

def after_error(
Expand Down
56 changes: 18 additions & 38 deletions src/unstructured_client/_hooks/custom/request_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,16 @@ async def call_api_async(
headers={**original_headers, "Content-Type": body.content_type},
)

async with limiter:
response = await send_request_async_with_retries(client, new_request)
return response
one_second = 1000
one_minute = 1000 * 60


async def send_request_async_with_retries(client: httpx.AsyncClient, request: httpx.Request):
# Hardcode the retry config until we can
# properly reuse the SDK logic
# (Values are in ms)
retry_config = utils.RetryConfig(
"backoff",
utils.BackoffStrategy(
initial_interval=3000, # 3 seconds
max_interval=1000 * 60 * 12, # 12 minutes
exponent=1.88,
max_elapsed_time=1000 * 60 * 30 # 30 minutes
initial_interval = one_second * 3,
max_interval = one_minute * 12,
max_elapsed_time = one_minute * 30,
exponent = 1.88,
),
retry_connection_errors=True
)
Expand All @@ -96,14 +90,14 @@ async def send_request_async_with_retries(client: httpx.AsyncClient, request: ht
]

async def do_request():
return await client.send(request)
return await client.send(new_request)

response = await utils.retry_async(
do_request,
utils.Retries(retry_config, retryable_codes)
)

return response
async with limiter:
response = await utils.retry_async(
do_request,
utils.Retries(retry_config, retryable_codes)
)
return response


def prepare_request_headers(
Expand Down Expand Up @@ -145,34 +139,20 @@ def prepare_request_payload(form_data: FormData) -> FormData:
return payload


def create_response(response: httpx.Response, elements: list) -> httpx.Response:
def create_response(elements: list) -> httpx.Response:
"""
Creates a modified response object with updated content.

Args:
response: The original response object.
elements: The list of elements to be serialized and added to
the response.

Returns:
The modified response object with updated content.
"""
response_copy = copy.deepcopy(response)
response = httpx.Response(status_code=200, headers={"Content-Type": "application/json"})
content = json.dumps(elements).encode()
content_length = str(len(content))
response_copy.headers.update({"Content-Length": content_length})
setattr(response_copy, "_content", content)
return response_copy


def log_after_split_response(status_code: int, split_number: int):
if status_code == 200:
logger.info(
"Successfully partitioned set #%d, elements added to the final result.",
split_number,
)
else:
logger.warning(
"Failed to partition set #%d, its elements will be omitted in the final result.",
split_number,
)
response.headers.update({"Content-Length": content_length})
setattr(response, "_content", content)
return response
Loading