Skip to content

Added integration test markers: #73

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ repos:
rev: v4.0.0-alpha.8
hooks:
- id: prettier
files: '.*\.(md|markdown|json|yaml|yml|js|jsx|css|html)$'
exclude: .venv
84 changes: 66 additions & 18 deletions datafog/services/spark_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import importlib
import json
import os
import subprocess
import sys
from typing import Any, List
Expand All @@ -20,25 +21,63 @@
data reading and package installation.
"""

def __init__(self):
# First import necessary modules
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
def __init__(self, master=None):
self.master = master

# Assign fields
self.SparkSession = SparkSession
self.DataFrame = DataFrame
self.udf = udf
self.ArrayType = ArrayType
self.StringType = StringType

# Now create spark session and ensure pyspark is installed
# Ensure pyspark is installed first
self.ensure_installed("pyspark")
self.spark = self.create_spark_session()

# Now import necessary modules after ensuring pyspark is installed
try:
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

# Assign fields
self.SparkSession = SparkSession
self.DataFrame = DataFrame
self.udf = udf
self.ArrayType = ArrayType
self.StringType = StringType

# Create the spark session
self.spark = self.create_spark_session()
except ImportError as e:
raise ImportError(

Check warning on line 46 in datafog/services/spark_service.py

View check run for this annotation

Codecov / codecov/patch

datafog/services/spark_service.py#L45-L46

Added lines #L45 - L46 were not covered by tests
f"Failed to import PySpark modules: {e}. "
f"Make sure PySpark is installed correctly."
)

def create_spark_session(self):
return self.SparkSession.builder.appName("datafog").getOrCreate()
# Check if we're running in a test environment
in_test_env = (
"PYTEST_CURRENT_TEST" in os.environ or "TOX_ENV_NAME" in os.environ
)

# Set Java system properties to handle security manager issues
# This is needed for newer Java versions
os.environ["PYSPARK_SUBMIT_ARGS"] = (
"--conf spark.driver.allowMultipleContexts=true pyspark-shell"
)

# Create a builder with the app name
builder = self.SparkSession.builder.appName("datafog")

# Add configuration to work around security manager issues
builder = builder.config("spark.driver.allowMultipleContexts", "true")
builder = builder.config(
"spark.driver.extraJavaOptions", "-Djava.security.manager=allow"
)

# If master is specified, use it
if self.master:
builder = builder.master(self.master)
# Otherwise, if we're in a test environment, use local mode
elif in_test_env:
builder = builder.master("local[1]")

Check warning on line 77 in datafog/services/spark_service.py

View check run for this annotation

Codecov / codecov/patch

datafog/services/spark_service.py#L76-L77

Added lines #L76 - L77 were not covered by tests

# Create and return the session
return builder.getOrCreate()

def read_json(self, path: str) -> List[dict]:
return self.spark.read.json(path).collect()
Expand All @@ -47,6 +86,15 @@
try:
importlib.import_module(package_name)
except ImportError:
subprocess.check_call(
[sys.executable, "-m", "pip", "install", package_name]
)
print(f"Installing {package_name}...")
try:
subprocess.check_call(
[sys.executable, "-m", "pip", "install", package_name]
)
print(f"{package_name} installed successfully.")
except subprocess.CalledProcessError as e:
print(f"Failed to install {package_name}: {e}")
raise ImportError(

Check warning on line 97 in datafog/services/spark_service.py

View check run for this annotation

Codecov / codecov/patch

datafog/services/spark_service.py#L95-L97

Added lines #L95 - L97 were not covered by tests
f"Could not install {package_name}. "
f"Please install it manually with 'pip install {package_name}'."
)
27 changes: 27 additions & 0 deletions notes/story-1.7-tkt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
**Story 1.7: Integration tests (no mocks)**

- [x] Run pytest with `-m "integration"` to run Spark in local mode.
- [ ] Smoke test the CLI with a tmp file.
- [ ] OCR path behind `PYTEST_DONUT=yes` flag.

## Implementation Notes

### Spark Integration Tests

1. Added integration marker to pytest configuration in tox.ini
2. Created test_spark_integration.py with tests for SparkService in local mode
3. Updated SparkService to support local mode for integration testing
4. Added integration markers to existing text_service_integration.py tests
5. Added a dedicated tox environment for running integration tests

To run the integration tests:

```bash
tox -e integration
```

Or directly with pytest:

```bash
pytest -m "integration"
```
80 changes: 80 additions & 0 deletions tests/test_spark_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Integration tests for SparkService in local mode."""

import json
import os
import tempfile

import pytest

from datafog.services.spark_service import SparkService


@pytest.fixture(scope="module")
def spark_service():
"""Create a shared SparkService instance for all tests."""
# Initialize SparkService with explicit local mode
service = SparkService(master="local[1]")

yield service

# Clean up after all tests
if hasattr(service, "spark") and service.spark is not None:
service.spark.stop()


@pytest.fixture
def sample_json_data():
"""Create a temporary JSON file with sample data for testing."""
data = [
{"name": "John Doe", "email": "john.doe@example.com", "age": 30},
{"name": "Jane Smith", "email": "jane.smith@example.com", "age": 25},
{"name": "Bob Johnson", "email": "bob.johnson@example.com", "age": 40},
]

# Create a temporary file
with tempfile.NamedTemporaryFile(suffix=".json", delete=False, mode="w") as f:
for item in data:
f.write(json.dumps(item) + "\n")
temp_file = f.name

yield temp_file

# Clean up the temporary file after the test
if os.path.exists(temp_file):
os.remove(temp_file)


@pytest.mark.integration
def test_spark_service_initialization(spark_service):
"""Test that SparkService can be initialized in local mode."""
# Verify that the Spark session was created successfully
assert spark_service.spark is not None
assert spark_service.spark.sparkContext.appName == "datafog"
assert spark_service.spark.sparkContext.master.startswith("local")

# Verify that the necessary Spark classes are available
assert spark_service.DataFrame is not None
assert spark_service.SparkSession is not None
assert spark_service.udf is not None


@pytest.mark.integration
def test_spark_read_json(spark_service, sample_json_data):
"""Test that SparkService can read JSON data in local mode."""
# Read the JSON data
result = spark_service.read_json(sample_json_data)

# Verify the result
assert len(result) == 3, f"Expected 3 rows, got {len(result)}"

# PySpark Row objects have a __contains__ method and can be accessed like dictionaries
# but they're not actually dictionaries
assert all(hasattr(item, "name") for item in result), "Missing 'name' field"
assert all(hasattr(item, "email") for item in result), "Missing 'email' field"
assert all(hasattr(item, "age") for item in result), "Missing 'age' field"

# Verify specific values
names = [item.name for item in result]
assert "John Doe" in names, f"Expected 'John Doe' in {names}"
assert "Jane Smith" in names, f"Expected 'Jane Smith' in {names}"
assert "Bob Johnson" in names, f"Expected 'Bob Johnson' in {names}"
5 changes: 5 additions & 0 deletions tests/test_text_service_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def real_text_service():
return TextService(text_chunk_length=1000) # Larger chunk to avoid multiple calls


@pytest.mark.integration
def test_engine_regex_detects_simple_entities():
"""Test that regex engine correctly detects simple entities like emails and phones."""
# Sample text with patterns that regex should easily detect
Expand All @@ -36,6 +37,7 @@ def test_engine_regex_detects_simple_entities():
assert "123-45-6789" in result.get("SSN", [])


@pytest.mark.integration
def test_engine_auto_fallbacks_to_spacy():
"""Test that auto mode works correctly with entity detection."""
# We need to test the auto mode in a more controlled way
Expand Down Expand Up @@ -64,6 +66,7 @@ def test_engine_auto_fallbacks_to_spacy():
assert any("john.smith@example.com" in email for email in auto_result["EMAIL"])


@pytest.mark.integration
def test_engine_spacy_only():
"""Test that spaCy engine is always used regardless of regex potential hits."""
# Sample text with both regex-detectable and spaCy-detectable entities
Expand All @@ -89,6 +92,7 @@ def test_engine_spacy_only():
assert "EMAIL" not in spacy_result or not spacy_result["EMAIL"]


@pytest.mark.integration
def test_structured_annotation_output():
"""Test that structured=True returns list of Span objects."""
text = "John Smith's email is john.smith@example.com"
Expand Down Expand Up @@ -131,6 +135,7 @@ def test_structured_annotation_output():
# which we've already verified above


@pytest.mark.integration
def test_debug_entity_types():
"""Debug test to print the actual entity types returned by spaCy."""
# Sample text with named entities
Expand Down
21 changes: 20 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,23 @@
envlist = py310,py311,py312
isolated_build = True

[testenv:integration]
deps =
pytest==7.4.0
pytest-asyncio==0.21.0
pytest-cov
pyspark>=3.0.0
-r requirements-dev.txt
extras = all
allowlist_externals =
tesseract
pip
python
commands =
pip install --no-cache-dir -r requirements-dev.txt
pip install --no-cache-dir pyspark>=3.0.0
python run_tests.py -m integration

[testenv]
deps =
pytest==7.4.0
Expand Down Expand Up @@ -36,4 +53,6 @@ commands =
mypy datafog tests

[pytest]
asyncio_mode = auto
asyncio_mode = auto
markers =
integration: marks tests as integration tests that may require external dependencies