Skip to content

chore: add error handle for pdf extract and pdf chunk #1490

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 75 additions & 62 deletions bigframes/blob/_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,81 +398,94 @@ def image_normalize_to_bytes_func(

# Extracts all text from a PDF url
def pdf_extract_func(src_obj_ref_rt: str) -> str:
import io
import json
try:
import io
import json

from pypdf import PdfReader # type: ignore
import requests
from requests import adapters
from pypdf import PdfReader # type: ignore
import requests
from requests import adapters

session = requests.Session()
session.mount("https://", adapters.HTTPAdapter(max_retries=3))
session = requests.Session()
session.mount("https://", adapters.HTTPAdapter(max_retries=3))

src_obj_ref_rt_json = json.loads(src_obj_ref_rt)
src_url = src_obj_ref_rt_json["access_urls"]["read_url"]
src_obj_ref_rt_json = json.loads(src_obj_ref_rt)
src_url = src_obj_ref_rt_json["access_urls"]["read_url"]

response = session.get(src_url, timeout=30, stream=True)
response.raise_for_status()
pdf_bytes = response.content
response = session.get(src_url, timeout=30, stream=True)
response.raise_for_status()
pdf_bytes = response.content

pdf_file = io.BytesIO(pdf_bytes)
reader = PdfReader(pdf_file, strict=False)
pdf_file = io.BytesIO(pdf_bytes)
reader = PdfReader(pdf_file, strict=False)

all_text = ""
for page in reader.pages:
page_extract_text = page.extract_text()
if page_extract_text:
all_text += page_extract_text
return all_text
all_text = ""
for page in reader.pages:
page_extract_text = page.extract_text()
if page_extract_text:
all_text += page_extract_text

result_dict = {"status": "", "content": all_text}

pdf_extract_def = FunctionDef(pdf_extract_func, ["pypdf", "requests", "pypdf[crypto]"])
except Exception as e:
result_dict = {"status": str(e), "content": ""}

result_json = json.dumps(result_dict)
return result_json

# Extracts text from a PDF url and chunks it simultaneously
def pdf_chunk_func(src_obj_ref_rt: str, chunk_size: int, overlap_size: int) -> str:
import io
import json

from pypdf import PdfReader # type: ignore
import requests
from requests import adapters

session = requests.Session()
session.mount("https://", adapters.HTTPAdapter(max_retries=3))
pdf_extract_def = FunctionDef(pdf_extract_func, ["pypdf", "requests", "pypdf[crypto]"])

src_obj_ref_rt_json = json.loads(src_obj_ref_rt)
src_url = src_obj_ref_rt_json["access_urls"]["read_url"]

response = session.get(src_url, timeout=30, stream=True)
response.raise_for_status()
pdf_bytes = response.content

pdf_file = io.BytesIO(pdf_bytes)
reader = PdfReader(pdf_file, strict=False)

# extract and chunk text simultaneously
all_text_chunks = []
curr_chunk = ""
for page in reader.pages:
page_text = page.extract_text()
if page_text:
curr_chunk += page_text
# split the accumulated text into chunks of a specific size with overlaop
# this loop implements a sliding window approach to create chunks
while len(curr_chunk) >= chunk_size:
split_idx = curr_chunk.rfind(" ", 0, chunk_size)
if split_idx == -1:
split_idx = chunk_size
actual_chunk = curr_chunk[:split_idx]
all_text_chunks.append(actual_chunk)
overlap = curr_chunk[split_idx + 1 : split_idx + 1 + overlap_size]
curr_chunk = overlap + curr_chunk[split_idx + 1 + overlap_size :]
if curr_chunk:
all_text_chunks.append(curr_chunk)

all_text_json_string = json.dumps(all_text_chunks)
return all_text_json_string
# Extracts text from a PDF url and chunks it simultaneously
def pdf_chunk_func(src_obj_ref_rt: str, chunk_size: int, overlap_size: int) -> str:
try:
import io
import json

from pypdf import PdfReader # type: ignore
import requests
from requests import adapters

session = requests.Session()
session.mount("https://", adapters.HTTPAdapter(max_retries=3))

src_obj_ref_rt_json = json.loads(src_obj_ref_rt)
src_url = src_obj_ref_rt_json["access_urls"]["read_url"]

response = session.get(src_url, timeout=30, stream=True)
response.raise_for_status()
pdf_bytes = response.content

pdf_file = io.BytesIO(pdf_bytes)
reader = PdfReader(pdf_file, strict=False)
# extract and chunk text simultaneously
all_text_chunks = []
curr_chunk = ""
for page in reader.pages:
page_text = page.extract_text()
if page_text:
curr_chunk += page_text
# split the accumulated text into chunks of a specific size with overlaop
# this loop implements a sliding window approach to create chunks
while len(curr_chunk) >= chunk_size:
split_idx = curr_chunk.rfind(" ", 0, chunk_size)
if split_idx == -1:
split_idx = chunk_size
actual_chunk = curr_chunk[:split_idx]
all_text_chunks.append(actual_chunk)
overlap = curr_chunk[split_idx + 1 : split_idx + 1 + overlap_size]
curr_chunk = overlap + curr_chunk[split_idx + 1 + overlap_size :]
if curr_chunk:
all_text_chunks.append(curr_chunk)

result_dict = {"status": "", "content": all_text_chunks}

except Exception as e:
result_dict = {"status": str(e), "content": []}

result_json = json.dumps(result_dict)
return result_json


pdf_chunk_def = FunctionDef(pdf_chunk_func, ["pypdf", "requests", "pypdf[crypto]"])
49 changes: 39 additions & 10 deletions bigframes/operations/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ def pdf_extract(
max_batching_rows: int = 1,
container_cpu: Union[float, int] = 2,
container_memory: str = "1Gi",
verbose: bool = False,
) -> bigframes.series.Series:
"""Extracts text from PDF URLs and saves the text as string.

Expand All @@ -630,12 +631,20 @@ def pdf_extract(
send to cloud run to execute the function.
container_cpu (int or float, default 2): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers.
container_memory (str, default "1Gi"): container memory size. String of the format <number><unit>. Possible values are from 512Mi to 32Gi.
verbose (bool, default "False"): controls the verbosity of the output.
When set to True, both error messages and the extracted content
are displayed. Conversely, when set to False, only the extracted
content is presented, suppressing error messages.

Returns:
bigframes.series.Series: conatins all text from a pdf file
bigframes.series.Series: str or struct[str, str],
depend on the "verbose" parameter.
Contains the extracted text from the PDF file.
Includes error messages if verbosity is enabled.
"""

import bigframes.bigquery as bbq
import bigframes.blob._functions as blob_func
import bigframes.pandas as bpd

connection = self._resolve_connection(connection)

Expand All @@ -649,11 +658,19 @@ def pdf_extract(
).udf()

src_rt = self._get_runtime_json_str(mode="R")

res = src_rt.apply(pdf_extract_udf)

self._add_to_cleanup_set(pdf_extract_udf)
content_series = res._apply_unary_op(ops.JSONValue(json_path="$.content"))

return res
self._add_to_cleanup_set(pdf_extract_udf)
if verbose:
status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status"))
res_df = bpd.DataFrame({"status": status_series, "content": content_series})
struct_series = bbq.struct(res_df)
return struct_series
else:
return content_series

def pdf_chunk(
self,
Expand All @@ -664,6 +681,7 @@ def pdf_chunk(
max_batching_rows: int = 1,
container_cpu: Union[float, int] = 2,
container_memory: str = "1Gi",
verbose: bool = False,
) -> bigframes.series.Series:
"""Extracts and chunks text from PDF URLs and saves the text as
arrays of strings.
Expand All @@ -684,14 +702,21 @@ def pdf_chunk(
send to cloud run to execute the function.
container_cpu (int or float, default 2): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers.
container_memory (str, default "1Gi"): container memory size. String of the format <number><unit>. Possible values are from 512Mi to 32Gi.
verbose (bool, default "False"): controls the verbosity of the output.
When set to True, both error messages and the extracted content
are displayed. Conversely, when set to False, only the extracted
content is presented, suppressing error messages.

Returns:
bigframe.series.Series: Series of array[str], where each string is a
chunk of text extracted from PDF.
bigframe.series.Series: array[str] or struct[str, array[str]],
depend on the "verbose" parameter.
where each string is a chunk of text extracted from PDF.
Includes error messages if verbosity is enabled.
"""

import bigframes.bigquery as bbq
import bigframes.blob._functions as blob_func
import bigframes.pandas as bpd

connection = self._resolve_connection(connection)

Expand All @@ -718,8 +743,12 @@ def pdf_chunk(

res = self._df_apply_udf(df, pdf_chunk_udf)

res_array = bbq.json_extract_string_array(res)

content_series = bbq.json_extract_string_array(res, "$.content")
self._add_to_cleanup_set(pdf_chunk_udf)

return res_array
if verbose:
status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status"))
res_df = bpd.DataFrame({"status": status_series, "content": content_series})
struct_series = bbq.struct(res_df)
return struct_series
else:
return content_series
14 changes: 14 additions & 0 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1480,3 +1480,17 @@ def reset_default_session_and_location():
yield
bpd.close_session()
bpd.options.bigquery.location = None


@pytest.fixture(scope="session")
def pdf_gcs_path() -> str:
return "gs://bigframes_blob_test/pdfs/*"


@pytest.fixture(scope="session")
def pdf_mm_df(
pdf_gcs_path, session: bigframes.Session, bq_connection: str
) -> bpd.DataFrame:
bigframes.options.experiments.blob = True

return session.from_glob_path(pdf_gcs_path, name="pdf", connection=bq_connection)
104 changes: 104 additions & 0 deletions tests/system/large/blob/test_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,107 @@ def test_blob_image_normalize_to_bq(images_mm_df: bpd.DataFrame, bq_connection:
assert isinstance(actual, bpd.Series)
assert len(actual) == 2
assert actual.dtype == dtypes.BYTES_DTYPE


@pytest.mark.parametrize(
"verbose, expected",
[
(
True,
pd.Series(
[
{"status": "File has not been decrypted", "content": ""},
{
"status": "",
"content": "Sample PDF This is a testing file. Some dummy messages are used for testing purposes. ",
},
]
),
),
(
False,
pd.Series(
[
"",
"Sample PDF This is a testing file. Some dummy messages are used for testing purposes. ",
],
name="pdf",
),
),
],
)
def test_blob_pdf_extract(
pdf_mm_df: bpd.DataFrame,
verbose: bool,
bq_connection: str,
expected: pd.Series,
):
bigframes.options.experiments.blob = True

actual = (
pdf_mm_df["pdf"]
.blob.pdf_extract(connection=bq_connection, verbose=verbose)
.explode()
.to_pandas()
)

pd.testing.assert_series_equal(
actual,
expected,
check_dtype=False,
check_index=False,
)


@pytest.mark.parametrize(
"verbose, expected",
[
(
True,
pd.Series(
[
{"status": "File has not been decrypted", "content": []},
{
"status": "",
"content": [
"Sample PDF This is a testing file. Some ",
"dummy messages are used for testing ",
"purposes. ",
],
},
]
),
),
(
False,
pd.Series(
[
pd.NA,
"Sample PDF This is a testing file. Some ",
"dummy messages are used for testing ",
"purposes. ",
],
),
),
],
)
def test_blob_pdf_chunk(
pdf_mm_df: bpd.DataFrame, verbose: bool, bq_connection: str, expected: pd.Series
):
bigframes.options.experiments.blob = True

actual = (
pdf_mm_df["pdf"]
.blob.pdf_chunk(
connection=bq_connection, chunk_size=50, overlap_size=10, verbose=verbose
)
.explode()
.to_pandas()
)

pd.testing.assert_series_equal(
actual,
expected,
check_dtype=False,
check_index=False,
)