Skip to content

feat(text-service): Add engine selection and structured output #66

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ error_log.txt
docs/*
!docs/*.rst
!docs/conf.py
scratch.py
scratch.py
.coverage*
11 changes: 10 additions & 1 deletion CHANGELOG.MD
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
# ChangeLog

## [2025-05-02]

### `datafog-python` [4.1.0-dev]

- Added engine selection functionality to TextService class, allowing users to choose between 'regex', 'spacy', or 'auto' annotation engines (#XX)
- Enhanced TextService with intelligent fallback mechanism in 'auto' mode that tries regex first and falls back to spaCy if no entities are found
- Added comprehensive integration tests for the new engine selection feature
- Improved documentation for TextService class and its methods

## [2024-03-25]

### `datafog-python` [2.3.2]
### `datafog-python` [4.0.0]

- Added datafog-python/examples/uploading-file-types.ipynb to show JSON uploading example (#16)
- Added datafog-python/tests/regex_issue.py to show issue with regex recognizer creation
Expand Down
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,29 @@ client = DataFog(operations="scan")
ocr_client = DataFog(operations="extract")
```

## Engine Selection

DataFog now supports multiple annotation engines through the `TextService` class. You can choose between different engines for PII detection:

```python
from datafog.services.text_service import TextService

# Use regex engine only (fastest, pattern-based detection)
regex_service = TextService(engine="regex")

# Use spaCy engine only (more comprehensive NLP-based detection)
spacy_service = TextService(engine="spacy")

# Use auto mode (default) - tries regex first, falls back to spaCy if no entities found
auto_service = TextService() # engine="auto" is the default
```

Each engine has different strengths:

- **regex**: Fast pattern matching, good for structured data like emails, phone numbers, credit cards, etc.
- **spacy**: NLP-based entity recognition, better for detecting names, organizations, locations, etc.
- **auto**: Best of both worlds - uses regex for speed, falls back to spaCy for comprehensive detection

## Text PII Annotation

Here's an example of how to annotate PII in a text document:
Expand Down
278 changes: 248 additions & 30 deletions datafog/services/text_service.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,46 @@
"""
Text processing service for PII annotation.
"""Text processing service for PII annotation.

Provides synchronous and asynchronous methods for annotating text with personally identifiable information (PII) using SpaCy. Supports chunking long texts and batch processing.
Provides synchronous and asynchronous methods for annotating text with personally identifiable information (PII) using SpaCy or regex patterns. Supports chunking long texts and batch processing.
"""

import asyncio
from typing import Dict, List
from typing import Dict, List, Optional, Union

from datafog.processing.text_processing.regex_annotator.regex_annotator import (
AnnotationResult,
RegexAnnotator,
Span,
)
from datafog.processing.text_processing.spacy_pii_annotator import SpacyPIIAnnotator


class TextService:
"""
Manages text annotation operations.
Service for annotating text with PII entities.

Handles text chunking, PII annotation, and result combination for both single texts and batches. Offers both synchronous and asynchronous interfaces.
This service provides methods to detect and annotate personally identifiable information (PII)
in text using different annotation engines. It supports chunking long texts for efficient processing
and combining annotations from multiple chunks.
"""

def __init__(self, text_chunk_length: int = 1000):
self.annotator = SpacyPIIAnnotator.create()
def __init__(self, text_chunk_length: int = 1000, engine: str = "auto"):
"""
Initialize the TextService with specified chunk length and annotation engine.

Args:
text_chunk_length: Maximum length of text chunks for processing. Default is 1000 characters.
engine: The annotation engine to use. Options are:
- "regex": Use only the RegexAnnotator for pattern-based entity detection
- "spacy": Use only the SpacyPIIAnnotator for NLP-based entity detection
- "auto": (Default) Try RegexAnnotator first and fall back to SpacyPIIAnnotator if no entities are found

Raises:
AssertionError: If an invalid engine type is provided
"""
assert engine in {"regex", "spacy", "auto"}, "Invalid engine"
self.engine = engine
self.spacy_annotator = SpacyPIIAnnotator.create()
self.regex_annotator = RegexAnnotator()
self.text_chunk_length = text_chunk_length

def _chunk_text(self, text: str) -> List[str]:
Expand All @@ -38,36 +60,232 @@ def _combine_annotations(self, annotations: List[Dict]) -> Dict:
combined[key].extend(value)
return combined

def annotate_text_sync(self, text: str) -> Dict:
"""Synchronously annotate a text string."""
def _annotate_with_engine(
self, text: str, structured: bool = False
) -> Union[Dict[str, List[str]], List[Span]]:
"""
Annotate text using the selected engine based on the engine parameter.

This method implements the engine selection logic:
- For "regex" mode: Uses only the RegexAnnotator
- For "spacy" mode: Uses only the SpacyPIIAnnotator
- For "auto" mode: Tries RegexAnnotator first and falls back to SpacyPIIAnnotator if no entities are found

Args:
text: The text to annotate
structured: If True, return structured output (list of Span objects)

Returns:
If structured=False: Dictionary of annotations by entity type where keys are entity types (e.g., "EMAIL", "PERSON", "ORG")
and values are lists of detected entities of that type
If structured=True: List of Span objects with entity information
"""
if structured:
# Handle structured output mode
if self.engine == "regex":
_, annotation_result = self.regex_annotator.annotate_with_spans(text)
return annotation_result.spans
elif self.engine == "spacy":
# For spaCy, we need to convert the dictionary format to spans
spacy_dict = self.spacy_annotator.annotate(text)
spans = []
for label, entities in spacy_dict.items():
for entity in entities:
# Find the entity in the text to get its position
start = text.find(entity)
if start >= 0:
end = start + len(entity)
spans.append(
Span(label=label, start=start, end=end, text=entity)
)
return spans
else: # auto mode
# Try regex first
regex_dict, annotation_result = (
self.regex_annotator.annotate_with_spans(text)
)

# Check if any entities were found
has_entities = any(
len(entities) > 0 for entities in regex_dict.values()
)

# If regex found entities, return those results
if has_entities:
return annotation_result.spans

# Otherwise, fall back to spaCy and convert to spans
spacy_dict = self.spacy_annotator.annotate(text)
spans = []
for label, entities in spacy_dict.items():
for entity in entities:
# Find the entity in the text to get its position
start = text.find(entity)
if start >= 0:
end = start + len(entity)
spans.append(
Span(label=label, start=start, end=end, text=entity)
)
return spans
else:
# Handle legacy dictionary output mode
if self.engine == "regex":
return self.regex_annotator.annotate(text)
elif self.engine == "spacy":
return self.spacy_annotator.annotate(text)
else: # auto mode
# Try regex first
regex_results = self.regex_annotator.annotate(text)

# Check if any entities were found
has_entities = any(
len(entities) > 0 for entities in regex_results.values()
)

# If regex found entities, return those results
if has_entities:
return regex_results

# Otherwise, fall back to spaCy
return self.spacy_annotator.annotate(text)

def annotate_text_sync(
self, text: str, structured: bool = False
) -> Union[Dict[str, List[str]], List[Span]]:
"""
Synchronously annotate a text string.

Args:
text: The text to annotate
structured: If True, return structured output (list of Span objects)

Returns:
If structured=False: Dictionary mapping entity types to lists of strings
If structured=True: List of Span objects with entity information
"""
if not text:
return {}
return [] if structured else {}

print(f"Starting on {text.split()[0]}")
chunks = self._chunk_text(text)
annotations = []
for chunk in chunks:
res = self.annotator.annotate(chunk)
annotations.append(res)
combined = self._combine_annotations(annotations)
print(f"Done processing {text.split()[0]}")
return combined

def batch_annotate_text_sync(self, texts: List[str]) -> Dict[str, Dict]:
"""Synchronously annotate a list of text input."""
results = [self.annotate_text_sync(text) for text in texts]
if structured:
# Handle structured output mode
all_spans = []
chunk_offset = 0 # Track the offset for each chunk in the original text

for chunk in chunks:
# Get spans for this chunk
chunk_spans = self._annotate_with_engine(chunk, structured=True)

# Adjust span positions based on chunk offset in the original text
for span in chunk_spans:
span.start += chunk_offset
span.end += chunk_offset
# Verify the span text matches the text at the adjusted position
# This helps catch any positioning errors
if span.start < len(text) and span.end <= len(text):
span.text = text[span.start : span.end]
all_spans.append(span)

# Update offset for the next chunk
chunk_offset += len(chunk)

print(f"Done processing {text.split()[0]}")
return all_spans
else:
# Handle legacy dictionary output mode
annotations = []
for chunk in chunks:
res = self._annotate_with_engine(chunk)
annotations.append(res)
combined = self._combine_annotations(annotations)
print(f"Done processing {text.split()[0]}")
return combined

def batch_annotate_text_sync(
self, texts: List[str], structured: bool = False
) -> Dict[str, Union[Dict[str, List[str]], List[Span]]]:
"""
Synchronously annotate a list of text input.

Args:
texts: List of text strings to annotate
structured: If True, return structured output (list of Span objects) for each text

Returns:
Dictionary mapping each input text to its annotation result
"""
results = [
self.annotate_text_sync(text, structured=structured) for text in texts
]
return dict(zip(texts, results, strict=True))

async def annotate_text_async(self, text: str) -> Dict:
"""Asynchronously annotate a text string."""
async def annotate_text_async(
self, text: str, structured: bool = False
) -> Union[Dict[str, List[str]], List[Span]]:
"""
Asynchronously annotate a text string.

Args:
text: The text to annotate
structured: If True, return structured output (list of Span objects)

Returns:
If structured=False: Dictionary mapping entity types to lists of strings
If structured=True: List of Span objects with entity information
"""
if not text:
return {}
return [] if structured else {}

chunks = self._chunk_text(text)
tasks = [asyncio.to_thread(self.annotator.annotate, chunk) for chunk in chunks]
annotations = await asyncio.gather(*tasks)
return self._combine_annotations(annotations)

async def batch_annotate_text_async(self, texts: List[str]) -> Dict[str, Dict]:
"""Asynchronously annotate a list of text input."""
tasks = [self.annotate_text_async(txt) for txt in texts]
if structured:
# Handle structured output mode asynchronously
all_spans = []
chunk_offset = 0 # Track the offset for each chunk in the original text

for chunk in chunks:
# We can't easily parallelize this due to the need to track offsets sequentially
# In a production environment, you might want a more sophisticated approach
chunk_spans = self._annotate_with_engine(chunk, structured=True)

# Adjust span positions based on chunk offset in the original text
for span in chunk_spans:
span.start += chunk_offset
span.end += chunk_offset
# Verify the span text matches the text at the adjusted position
if span.start < len(text) and span.end <= len(text):
span.text = text[span.start : span.end]
all_spans.append(span)

# Update offset for the next chunk
chunk_offset += len(chunk)

return all_spans
else:
# Handle legacy dictionary output mode asynchronously
tasks = [
asyncio.to_thread(self._annotate_with_engine, chunk) for chunk in chunks
]
annotations = await asyncio.gather(*tasks)
return self._combine_annotations(annotations)

async def batch_annotate_text_async(
self, texts: List[str], structured: bool = False
) -> Dict[str, Union[Dict[str, List[str]], List[Span]]]:
"""
Asynchronously annotate a list of text input.

Args:
texts: List of text strings to annotate
structured: If True, return structured output (list of Span objects) for each text

Returns:
Dictionary mapping each input text to its annotation result
"""
tasks = [
self.annotate_text_async(text, structured=structured) for text in texts
]
results = await asyncio.gather(*tasks)
return dict(zip(texts, results, strict=True))
Loading