Skip to content

Commit 4389855

Browse files
authored
Merge pull request #32 from DataFog/synchronous_processing
Add Synchronous processing
2 parents a7c6151 + 0b3b084 commit 4389855

File tree

6 files changed

+63
-48
lines changed

6 files changed

+63
-48
lines changed

datafog/__about__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "3.2.2"
1+
__version__ = "3.3.0"

datafog/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from .__about__ import __version__
22
from .config import OperationType
3-
from .main import DataFog, OCRPIIAnnotator, TextPIIAnnotator
3+
from .main import DataFog, TextPIIAnnotator
44
from .processing.image_processing.donut_processor import DonutProcessor
55
from .processing.image_processing.image_downloader import ImageDownloader
66
from .processing.image_processing.pytesseract_processor import PytesseractProcessor
@@ -13,7 +13,6 @@
1313
"DonutProcessor",
1414
"DataFog",
1515
"ImageService",
16-
"OCRPIIAnnotator",
1716
"OperationType",
1817
"SparkService",
1918
"TextPIIAnnotator",

datafog/main.py

Lines changed: 27 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import json
22
import logging
3-
from logging import INFO
43
from typing import List
54

65
from .config import OperationType
@@ -10,7 +9,7 @@
109
from .services.text_service import TextService
1110

1211
logger = logging.getLogger("datafog_logger")
13-
logger.setLevel(INFO)
12+
logger.setLevel(logging.INFO)
1413

1514

1615
class DataFog:
@@ -37,7 +36,7 @@ def __init__(
3736
self.logger.info(f"Operations: {operations}")
3837

3938
async def run_ocr_pipeline(self, image_urls: List[str]):
40-
"""Run the OCR pipeline asynchronously."""
39+
"""Run the OCR pipeline asynchronously on a list of images provided via url."""
4140
try:
4241
extracted_text = await self.image_service.ocr_extract(image_urls)
4342
self.logger.info(f"OCR extraction completed for {len(image_urls)} images.")
@@ -46,7 +45,7 @@ async def run_ocr_pipeline(self, image_urls: List[str]):
4645
)
4746

4847
if OperationType.ANNOTATE_PII in self.operations:
49-
annotated_text = await self.text_service.batch_annotate_texts(
48+
annotated_text = await self.text_service.batch_annotate_text_async(
5049
extracted_text
5150
)
5251
self.logger.info(
@@ -59,55 +58,45 @@ async def run_ocr_pipeline(self, image_urls: List[str]):
5958
self.logger.error(f"Error in run_ocr_pipeline: {str(e)}")
6059
raise
6160

62-
async def run_text_pipeline(self, texts: List[str]):
63-
"""Run the text pipeline asynchronously."""
61+
async def run_text_pipeline(self, str_list: List[str]):
62+
"""Run the text pipeline asynchronously on a list of input text."""
6463
try:
65-
self.logger.info(f"Starting text pipeline with {len(texts)} texts.")
64+
self.logger.info(f"Starting text pipeline with {len(str_list)} texts.")
6665
if OperationType.ANNOTATE_PII in self.operations:
67-
annotated_text = await self.text_service.batch_annotate_texts(texts)
66+
annotated_text = await self.text_service.batch_annotate_text_async(
67+
str_list
68+
)
6869
self.logger.info(
6970
f"Text annotation completed with {len(annotated_text)} annotations."
7071
)
7172
return annotated_text
7273

7374
self.logger.info("No annotation operation found; returning original texts.")
74-
return texts
75+
return str_list
7576
except Exception as e:
7677
self.logger.error(f"Error in run_text_pipeline: {str(e)}")
7778
raise
7879

79-
def _add_attributes(self, attributes: dict):
80-
"""Add multiple attributes."""
81-
for key, value in attributes.items():
82-
pass
83-
84-
85-
class OCRPIIAnnotator:
86-
def __init__(self):
87-
self.image_service = ImageService(use_donut=True, use_tesseract=False)
88-
self.text_annotator = SpacyPIIAnnotator.create()
89-
self.spark_service: SparkService = None
90-
91-
async def run(self, image_urls: List[str], output_path=None):
80+
def run_text_pipeline_sync(self, str_list: List[str]):
81+
"""Run the text pipeline synchronously on a list of input text."""
9282
try:
93-
# Download and process the image to extract text
94-
# downloaded_images = await self.image_service.download_images(image_urls)
95-
# extracted_texts = await self.image_service.ocr_extract(downloaded_images)
96-
97-
# # Annotate the extracted text for PII
98-
# annotated_texts = [self.text_annotator.annotate(text) for text in extracted_texts]
99-
100-
# # Optionally, output the results to a JSON file
101-
# if output_path:
102-
# with open(output_path, "w") as f:
103-
# json.dump(annotated_texts, f)
83+
self.logger.info(f"Starting text pipeline with {len(str_list)} texts.")
84+
if OperationType.ANNOTATE_PII in self.operations:
85+
annotated_text = self.text_service.batch_annotate_text_sync(str_list)
86+
self.logger.info(
87+
f"Text annotation completed with {len(annotated_text)} annotations."
88+
)
89+
return annotated_text
10490

105-
# return annotated_texts
106-
pass
91+
self.logger.info("No annotation operation found; returning original texts.")
92+
return str_list
93+
except Exception as e:
94+
self.logger.error(f"Error in run_text_pipeline: {str(e)}")
95+
raise
10796

108-
finally:
109-
# Ensure Spark resources are released
110-
# self.spark_processor.spark.stop()
97+
def _add_attributes(self, attributes: dict):
98+
"""Add multiple attributes."""
99+
for key, value in attributes.items():
111100
pass
112101

113102

datafog/services/text_service.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,24 @@ class TextService:
77
def __init__(self):
88
self.annotator = SpacyPIIAnnotator.create()
99

10-
async def annotate_text(self, text):
11-
"""Asynchronously annotate a single piece of text."""
10+
def annotate_text_sync(self, text):
11+
"""Synchronously Annotate a text string."""
12+
print(f"Starting on {text.split()[0]}")
13+
res = self.annotator.annotate(text)
14+
print(f"Done processing {text.split()[0]}")
15+
return res
16+
17+
def batch_annotate_text_sync(self, texts: list):
18+
"""Synchronously annotate a list of text input."""
19+
results = [self.annotate_text_sync(text) for text in texts]
20+
return dict(zip(texts, results, strict=True))
21+
22+
async def annotate_text_async(self, text):
23+
"""Asynchronously annotate a text string."""
1224
return await asyncio.to_thread(self.annotator.annotate, text)
1325

14-
async def batch_annotate_texts(self, texts: list):
15-
"""Asynchronously annotate a batch of texts."""
16-
tasks = [self.annotate_text(text) for text in texts]
26+
async def batch_annotate_text_async(self, text: list):
27+
"""Asynchronously annotate a list of text input."""
28+
tasks = [self.annotate_text_async(txt) for txt in text]
1729
results = await asyncio.gather(*tasks)
18-
return dict(zip(texts, results, strict=True))
30+
return dict(zip(text, results, strict=True))

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
long_description = f.read()
66

77
# Use a single source of truth for the version
8-
__version__ = "3.2.2"
8+
__version__ = "3.3.0"
99

1010
project_urls = {
1111
"Homepage": "https://datafog.ai",

tests/test_main.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,21 @@ def test_textpii_annotator():
3636
# assert "Satya Nadella" in annotated_text[0].get("PER", []), "PII not annotated correctly."
3737

3838

39+
def test_datafog_text_annotation_sync():
40+
"""Test DataFog class for synchronous text annotation."""
41+
text = ["Joe Biden is the President of the United States."]
42+
datafog = DataFog()
43+
annotated_text = datafog.run_text_pipeline_sync(text)
44+
45+
assert annotated_text # Ensure that some results are returned.
46+
assert search_nested_dict(
47+
annotated_text, "Joe Biden"
48+
), "Joe Biden not found in annotated results."
49+
assert search_nested_dict(
50+
annotated_text, "the United States"
51+
), "United States not found in annotated results."
52+
53+
3954
@pytest.mark.asyncio
4055
async def test_datafog_text_annotation():
4156
"""Test DataFog class for text annotation."""

0 commit comments

Comments
 (0)