Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 53 additions & 25 deletions netra/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
from opentelemetry.trace import SpanKind

from netra.config import Config
from netra.evaluation import Evaluation, EvaluationScore

# Instrumentor functions
from netra.evaluation import Evaluation
from netra.instrumentation import init_instrumentations
from netra.instrumentation.instruments import NetraInstruments
from netra.logging_utils import configure_package_logging
Expand All @@ -19,13 +17,19 @@
from netra.tracer import Tracer
from netra.usage import Usage

__all__ = [
"Netra",
"UsageModel",
"ActionModel",
]

logger = logging.getLogger(__name__)


class Netra:
"""
Main SDK class. Call SDK.init(...) at the start of your application
to configure OpenTelemetry and enable all built-in LLM + VectorDB instrumentations.
Main SDK class. Call Netra.init(...) at the start of your application
to configure OpenTelemetry and enable all instrumentations.
"""

_initialized = False
Expand All @@ -36,7 +40,8 @@ class Netra:

@classmethod
def is_initialized(cls) -> bool:
"""Thread-safe check if Netra has been initialized.
"""
Thread-safe check if Netra has been initialized.

Returns:
bool: True if Netra has been initialized, False otherwise
Expand All @@ -60,11 +65,27 @@ def init(
instruments: Optional[Set[NetraInstruments]] = None,
block_instruments: Optional[Set[NetraInstruments]] = None,
) -> None:
# Acquire lock at the start of the method and hold it throughout
# to prevent race conditions during initialization
with cls._init_lock:
"""
Thread-safe initialization of Netra.

# Check if already initialized while holding the lock
Args:
app_name: Name of the application
headers: Headers to be sent to the server
disable_batch: Whether to disable batch processing
trace_content: Whether to trace content
debug_mode: Whether to enable debug mode
enable_root_span: Whether to enable root span
resource_attributes: Resource attributes to be sent to the server
environment: Environment to be sent to the server
enable_scrubbing: Whether to enable scrubbing
blocked_spans: List of spans to be blocked
instruments: Set of instruments to be enabled
block_instruments: Set of instruments to be blocked

Returns:
None
"""
with cls._init_lock:
if cls._initialized:
logger.warning("Netra.init() called more than once; ignoring subsequent calls.")
return
Expand Down Expand Up @@ -104,8 +125,6 @@ def init(
cls.usage = None # type:ignore[attr-defined]

# Instrument all supported modules
# Pass trace_content flag to instrumentors that can capture prompts/completions

init_instrumentations(
should_enrich_metrics=True,
base64_image_uploader=None,
Expand Down Expand Up @@ -174,7 +193,7 @@ def shutdown(cls) -> None:
@classmethod
def set_session_id(cls, session_id: str) -> None:
"""
Set session_id context attributes in the current OpenTelemetry context.
Set session_id context attributes for all spans.

Args:
session_id: Session identifier
Expand All @@ -190,7 +209,7 @@ def set_session_id(cls, session_id: str) -> None:
@classmethod
def set_user_id(cls, user_id: str) -> None:
"""
Set user_id context attributes in the current OpenTelemetry context.
Set user_id context attributes for all spans.

Args:
user_id: User identifier
Expand All @@ -206,10 +225,10 @@ def set_user_id(cls, user_id: str) -> None:
@classmethod
def set_tenant_id(cls, tenant_id: str) -> None:
"""
Set user_account_id context attributes in the current OpenTelemetry context.
Set tenant_id context attributes for all spans.

Args:
user_account_id: User account identifier
tenant_id: Tenant identifier
"""
if not isinstance(tenant_id, str):
logger.error(f"set_tenant_id: tenant_id must be a string, got {type(tenant_id)}")
Expand All @@ -222,7 +241,7 @@ def set_tenant_id(cls, tenant_id: str) -> None:
@classmethod
def set_custom_attributes(cls, key: str, value: Any) -> None:
"""
Set a custom attribute on the currently active OpenTelemetry span only.
Set a custom attribute on the current active span.

Args:
key: Custom attribute key
Expand All @@ -237,7 +256,7 @@ def set_custom_attributes(cls, key: str, value: Any) -> None:
@classmethod
def set_custom_event(cls, event_name: str, attributes: Any) -> None:
"""
Set custom event in the current OpenTelemetry context.
Set custom event in the current active span.

Args:
event_name: Name of the custom event
Expand All @@ -251,9 +270,12 @@ def set_custom_event(cls, event_name: str, attributes: Any) -> None:
@classmethod
def add_conversation(cls, conversation_type: ConversationType, role: str, content: Any) -> None:
"""
Append a conversation entry and set span attribute 'conversation' as an array.
If a conversation array already exists for the current active span, this appends
to it; otherwise, it initializes a new array.
Append a conversation entry to the current active span.

Args:
conversation_type: Type of the conversation
role: Role of the conversation
content: Content of the conversation
"""
SessionManager.add_conversation(conversation_type=conversation_type, role=role, content=content)

Expand All @@ -266,9 +288,15 @@ def start_span(
as_type: Optional[SpanType] = SpanType.SPAN,
) -> SpanWrapper:
"""
Start a new session.
"""
return SpanWrapper(name, attributes, module_name, as_type=as_type)
Start a new span.

Args:
name: Name of the span
attributes: Attributes of the span
module_name: Name of the module
as_type: Type of the span (SPAN, TOOL, GENERATION, EMBEDDING, AGENT)

__all__ = ["Netra", "UsageModel", "ActionModel", "SpanType", "EvaluationScore"]
Returns:
SpanWrapper: SpanWrapper object
"""
return SpanWrapper(name, attributes, module_name, as_type=as_type)
8 changes: 0 additions & 8 deletions netra/anonymizer/anonymizer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
"""
Custom anonymizer for PII data that provides consistent hashing of entities.

This module provides a custom anonymizer that can be used to replace PII entities
with consistent hash values, allowing for tracking the same entities across multiple
texts while maintaining privacy.
"""

from typing import Callable, List, Optional

try:
Expand Down
7 changes: 0 additions & 7 deletions netra/anonymizer/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
"""
Base anonymizer class for PII data anonymization.

This module provides the base anonymizer class that contains the core anonymization
logic that can be extended by specific anonymizer implementations.
"""

import hashlib
from collections import OrderedDict
from dataclasses import dataclass
Expand Down
109 changes: 43 additions & 66 deletions netra/anonymizer/fp_anonymizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,28 @@ def __init__(self, preserve_length: bool = True, preserve_structure: bool = True
self.letters = "abcdefghijklmnopqrstuvwxyz"

def _get_deterministic_random(self, seed: str) -> random.Random:
"""Create a deterministic random generator from a seed."""
"""Create a deterministic random generator from a seed.

Args:
seed: The seed to use for the random generator.

Returns:
A random generator with a deterministic seed.
"""
# Use hash of the seed as random seed for consistency
hash_int = int(hashlib.md5(seed.encode()).hexdigest()[:8], 16)
return random.Random(hash_int)

def _preserve_structure_replace(self, text: str, seed: str) -> str:
"""
Replace text while preserving structure (length, special chars, case pattern).

Args:
text: The text to anonymize.
seed: The seed to use for the random generator.

Returns:
The anonymized text.
"""
if text in self.part_cache:
return self.part_cache[text]
Expand All @@ -54,7 +68,16 @@ def _preserve_structure_replace(self, text: str, seed: str) -> str:
return anonymized

def _simple_hash_replace(self, text: str, target_length: Optional[int] = None) -> str:
"""Simple hash replacement with optional length preservation."""
"""
Simple hash replacement with optional length preservation.

Args:
text: The text to anonymize.
target_length: The target length of the anonymized text.

Returns:
The anonymized text.
"""
if target_length is None:
target_length = len(text)

Expand All @@ -78,6 +101,12 @@ def _simple_hash_replace(self, text: str, target_length: Optional[int] = None) -
def _anonymize_email(self, email: str) -> str:
"""
Anonymize a single email while preserving format and structure.

Args:
email: The email to anonymize.

Returns:
The anonymized email.
"""
if email in self.email_cache:
return self.email_cache[email]
Expand Down Expand Up @@ -105,6 +134,12 @@ def _anonymize_email(self, email: str) -> str:
def anonymize_text(self, text: str) -> str:
"""
Anonymize all emails in the given text while preserving format.

Args:
text: The text to anonymize.

Returns:
The anonymized text.
"""
email_pattern = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"

Expand All @@ -115,68 +150,10 @@ def replace_email(match: re.Match[str]) -> str:
return re.sub(email_pattern, replace_email, text)

def get_mapping(self) -> Dict[str, str]:
"""Return the mapping of original emails to anonymized versions."""
return self.email_cache.copy()

"""
Return the mapping of original emails to anonymized versions.

# Example usage and comparison
if __name__ == "__main__":
print("Format-Preserving Email Anonymization - Structure Preserving:\n")

# Structure-preserving anonymizer
anonymizer1 = FormatPreservingEmailAnonymizer(preserve_structure=True)

test_emails = [
"john@gmail.com",
"john@gmail.com",
"john@outlook.com",
"joe@outlook.com",
"user.name@company.co.uk",
"test-email@sub.example.org",
"Admin123@BigCorp.net",
"a@b.co",
]

print("Structure-Preserving Mode:")
print("=" * 50)
for email in test_emails:
anonymized = anonymizer1._anonymize_email(email)
print(f"{email:25} -> {anonymized}")

print("\n" + "=" * 50)
print("Length-Preserving Mode:")
print("=" * 50)

# Length-preserving but simpler anonymizer
anonymizer2 = FormatPreservingEmailAnonymizer(preserve_length=True, preserve_structure=False)

for email in test_emails:
anonymized = anonymizer2._anonymize_email(email)
print(f"{email:25} -> {anonymized}")

# Test with full text
print("\n" + "=" * 70)
print("Full Text Anonymization Examples:")
print("=" * 70)

test_texts = [
"Hi, my name is John and my email is john@gmail.com",
"Contact: support@company.com or admin@BigCorp.net",
"Emails: user.name@test.co.uk, simple@domain.org",
]

for text in test_texts:
anonymized = anonymizer1.anonymize_text(text)
print(f"Original: {text}")
print(f"Anonymized: {anonymized}")
print()

# Consistency test
print("Consistency Test:")
print("-" * 30)
email = "john@gmail.com"
result1 = anonymizer1._anonymize_email(email)
result2 = anonymizer1._anonymize_email(email)
print(f"First call: {email} -> {result1}")
print(f"Second call: {email} -> {result2}")
print(f"Consistent: {'✓' if result1 == result2 else '✗'}")
Returns:
A dictionary mapping original emails to anonymized versions.
"""
return self.email_cache.copy()
27 changes: 14 additions & 13 deletions netra/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,6 @@
class Config:
"""
Holds configuration options for the tracer.

Attributes:
app_name: Logical name for this service
otlp_endpoint: URL for OTLP collector
api_key: API key for the collector (sent as Bearer token)
headers: Additional headers (W3C Correlation-Context format)
disable_batch: Whether to disable batch span processor
trace_content: Whether to capture prompt/completion content
debug_mode: Whether to enable SDK logging (default: False)
enable_root_span: Whether to create a process root span (default: False)
resource_attributes: Custom resource attributes dict (e.g., {'env': 'prod', 'version': '1.0.0'})
enable_scrubbing: Whether to enable pydantic logfire scrubbing (default: False)
blocked_spans: List of span names (prefix/suffix patterns) to block from export
"""

# SDK Constants
Expand All @@ -45,6 +32,20 @@ def __init__(
enable_scrubbing: Optional[bool] = None,
blocked_spans: Optional[List[str]] = None,
):
"""
Initialize the configuration.

Args:
app_name: Logical name for this service
headers: Additional headers (W3C Correlation-Context format)
disable_batch: Whether to disable batch span processor
trace_content: Whether to capture prompt/completion content
debug_mode: Whether to enable SDK logging (default: False)
enable_root_span: Whether to create a process root span (default: False)
resource_attributes: Custom resource attributes dict (e.g., {'env': 'prod', 'version': '1.0.0'})
enable_scrubbing: Whether to enable pydantic logfire scrubbing (default: False)
blocked_spans: List of span names (prefix/suffix patterns) to block from export
"""
self.app_name = self._get_app_name(app_name)
self.otlp_endpoint = self._get_otlp_endpoint()
self.api_key = os.getenv("NETRA_API_KEY")
Expand Down
Loading