Skip to content

fix: Address some issues in the split pdf logic #165

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions _test_unstructured_client/unit/test_split_pdf_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,12 @@ def test_unit_prepare_request_headers():
def test_unit_create_response():
"""Test create response method properly overrides body elements and Content-Length header."""
test_elements = [{"key": "value"}, {"key_2": "value"}]
test_response = requests.Response()
test_response.status_code = 200
test_response._content = b'[{"key_2": "value"}]'
test_response.headers = requests.structures.CaseInsensitiveDict(
{
"Content-Type": "application/json",
"Content-Length": len(test_response._content),
}
)

expected_status_code = 200
expected_content = b'[{"key": "value"}, {"key_2": "value"}]'
expected_content_length = "38"

response = request_utils.create_response(test_response, test_elements)
response = request_utils.create_response(test_elements)

assert response.status_code, expected_status_code
assert response._content, expected_content
Expand Down
6 changes: 5 additions & 1 deletion src/unstructured_client/_hooks/custom/logger_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ def after_success(
) -> Union[requests.Response, Exception]:
self.retries_counter.pop(hook_ctx.operation_id, None)
# NOTE: In case of split page partition this means - at least one of the splits was partitioned successfully
logger.info("Successfully partitioned the document.")
# Note(austin) - pdf splitting returns a mock request
# so we always reach the AfterSuccessHook
# This doesn't mean the splits succeeded
# Need to revisit our logging strategy
# logger.info("Successfully partitioned the document.")
return response

def after_error(
Expand Down
118 changes: 106 additions & 12 deletions src/unstructured_client/_hooks/custom/request_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import io
import json
import logging
import random
import time
from typing import Optional, Tuple, Any

import httpx
Expand Down Expand Up @@ -76,24 +78,85 @@ def create_request(
)


async def retry_with_backoff_async(
request_func,
page_number,
initial_interval,
max_interval,
exponent,
max_elapsed_time,
):
"""
A copy of the autogenerated backoff code adapted for asyncio
Call func()
"""
start = round(time.time() * 1000)
retries = 0

retry_status_codes = [502, 503, 504]

while True:
try:
response = await request_func()

if response.status_code not in retry_status_codes:
return response

logger.error("Request (page %d) failed with status code %d. Waiting to retry.", page_number, response.status_code)

# Is it time to get out of the loop?
now = round(time.time() * 1000)
if now - start > max_elapsed_time:
return response
except Exception as e:
logger.error("Request (page %d) failed (%s). Waiting to retry.", page_number, repr(e))

# Is it time to get out of the loop?
now = round(time.time() * 1000)
if now - start > max_elapsed_time:
raise

# Otherwise go back to sleep
sleep = (initial_interval / 1000) * exponent**retries + random.uniform(0, 1)
sleep = min(sleep, max_interval / 1000)
await asyncio.sleep(sleep)
retries += 1


async def call_api_async(
client: httpx.AsyncClient,
page: Tuple[io.BytesIO, int],
original_request: requests.Request,
form_data: FormData,
filename: str,
limiter: asyncio.Semaphore,
) -> tuple[int, dict]:
) -> httpx.Response:
"""
Issue a httpx POST using a copy of the original requests.Request
Wrap the call in a retry loop. These values are copied from the API spec,
and will not be auto updated. Long term solution is to reuse SDK logic.
We'll need the hook context to have access to the rest of the SDK.
"""
page_content, page_number = page
body = create_request_body(form_data, page_content, filename, page_number)
new_request = create_httpx_request(original_request, body)

one_second = 1000
one_minute = 1000 * 60
retry_values = {
"initial_interval": one_second * 3,
"max_interval": one_minute * 12,
"max_elapsed_time": one_minute * 30,
"exponent": 1.88,
}

async def do_request():
return await client.send(new_request)

async with limiter:
try:
response = await client.send(new_request)
return response.status_code, response.json()
except Exception:
logger.error("Failed to send request for page %d", page_number)
return 500, {}
response = await retry_with_backoff_async(do_request, page_number=page_number, **retry_values)

return response


def call_api(
Expand Down Expand Up @@ -157,9 +220,13 @@ def prepare_request_payload(form_data: FormData) -> FormData:
return payload


def create_response(response: requests.Response, elements: list) -> requests.Response:
def create_failure_response(response: requests.Response) -> requests.Response:
"""
Creates a modified response object with updated content.
Convert the status code on the given response to a 500
This is because the split logic catches and retries 502, 503, etc
If a failure is passed back to the SDK, we shouldn't trigger
another layer of retries, we just want to print the error. 500 is
non retryable up above.

Args:
response: The original response object.
Expand All @@ -170,11 +237,38 @@ def create_response(response: requests.Response, elements: list) -> requests.Res
The modified response object with updated content.
"""
response_copy = copy.deepcopy(response)

response_copy.status_code = 500

# Some server errors return a lower case content-type
# The SDK error parsing expects Content-Type
if content_type := response_copy.headers.get("content-type"):
response_copy.headers["Content-Type"] = content_type

return response_copy


def create_response(elements: list) -> requests.Response:
"""
Creates a requests.Response object with the list of elements.

Args:
response: The original response object.
elements: The list of elements to be serialized and added to
the response.

Returns:
The modified response object with updated content.
"""
response = requests.Response()

content = json.dumps(elements).encode()
content_length = str(len(content))
response_copy.headers.update({"Content-Length": content_length})
setattr(response_copy, "_content", content)
return response_copy

response.headers.update({"Content-Length": content_length, "Content-Type": "application/json"})
response.status_code = 200
setattr(response, "_content", content)
return response


def log_after_split_response(status_code: int, split_number: int):
Expand Down
Loading