Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions mindtrace/automation/mindtrace/automation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from mindtrace.automation.workers.inference_worker import AnnotatorWorker
from mindtrace.automation.workers.image_similarity_worker import ImageSimilarityWorker
from mindtrace.automation.workers.anomaly_detection_worker import AnomalyDetectionWorker
from mindtrace.automation.workers.dataset_analysis_worker import DatasetAnalysisWorker

__all__ = [
"AnnotatorWorker",
"ImageSimilarityWorker",
"AnomalyDetectionWorker",
"DatasetAnalysisWorker"
]
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from typing import Dict, Any, List, Union
from mindtrace.models import BaseModel
from mindtrace.core import Worker
from mindtrace.registry import Registry
from mindtrace.datalake import Datalake


class AnomalyDetectionWorker(Worker):
"""
Static worker that performs anomaly detection on datasets.

Job inputs:
- dataset_name_in_datalake: str
- version: str (default "latest")
"""

def __init__(self, model: Union[str, BaseModel], registry_path: str = None):
"""Initialize Anomaly Detection Worker with specific model."""
self.registry = Registry(path=registry_path) if registry_path else None
self.datalake = Datalake()

if isinstance(model, str):
if not self.registry:
raise ValueError("Registry path required when loading model by name")
self.model = self.registry.load(model)
else:
self.model = model

def run(self, job) -> Dict[str, Any]:
dataset_name = job.get('dataset_name_in_datalake')
version = job.get('version', 'latest')
self.dataset = self.datalake.load(dataset_name, version=version)
return

def predict(self) -> List[Dict[str, Any]]:
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from typing import Dict, Any
from mindtrace.datalake import Datalake
from mindtrace.core import Worker



class DatasetAnalysisWorker(Worker):
"""
Worker that performs dataset analysis.

Job inputs:
- dataset_name_in_datalake: str
- version: str (default "latest")
"""

def __init__(self):
"""Initialize Dataset Analysis Worker."""
self.datalake = Datalake()
self.dataset = None

def run(self, job) -> Dict[str, Any]:
"""Execute dataset analysis job."""
dataset_name = job.get('dataset_name_in_datalake')
version = job.get('version', 'latest')
self.dataset = self.datalake.load(dataset_name, version=version)
return

def analyze_dataset_quality(self) -> Dict[str, Any]:
"""Analyze quality of the loaded dataset."""
pass

def analyze_class_balance(self) -> Dict[str, Any]:
"""Analyze class balance of the loaded dataset."""
pass

def analyze_data_drift(self) -> Dict[str, Any]:
"""Analyze data drift of the loaded dataset."""
pass

def analyze_model_drift(self) -> Dict[str, Any]:
"""Analyze model drift of the loaded dataset."""
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import Dict, Any, List
from mindtrace.core import Worker


class ImageSimilarityWorker(Worker):
"""
Worker that performs image similarity analysis.

Job inputs:
- dataset_name_in_datalake: str
- version: str (default "lastest")
- query_image: str or None (if provided, finds similar images to this one)
"""

def run(self, job) -> Dict[str, Any]:
dataset_name = job.get('dataset_name_in_datalake')
version = job.get('version', 'latest')
self.dataset = self.datalake.load(dataset_name, version=version)
return

def embedding(self, images: Any) -> Any:
pass

def similarities(self, embeddings: Any,
mode: str = "query", top_k: int = 10) -> List[Dict[str, Any]]:
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from typing import Dict, Any, List, Callable, Union
from mindtrace.datalake import Datalake
from mindtrace.models import BaseModel
from mindtrace.core import Worker
from mindtrace.registry import Registry
from mindtrace.models import ClassificationModel, DetectionModel, SegmentationModel


class AnnotatorWorker(Worker):
"""
Static worker that runs inference using models from Registry.
Loads datasets from datalake for each job execution.

Job inputs:
- model: str | ModelBase
- registry_path: str (optional, for loading models by name)
- dataset: str (dataset name in datalake)
- version: str (default "lastest")
- output_callback: Callable (optional)
- output_key: str (optional)
"""

def __init__(self, model: Union[str, BaseModel], registry_path: str = None):
"""Initialize Annotator with model from registry or direct model instance."""
self.registry = Registry(path=registry_path) if registry_path else None
self.datalake = Datalake()

if isinstance(model, str):
if not self.registry:
raise ValueError("Registry path required when loading model by name")
self.model = self.registry.load(model)
elif isinstance(model, (ClassificationModel, DetectionModel, SegmentationModel)):
self.model = model
else:
raise NotImplementedError("Model type not supported")

if isinstance(self.model, ClassificationModel):
self.inference = self.model.classify
elif isinstance(self.model, DetectionModel):
self.inference = self.model.detect
elif isinstance(self.model, SegmentationModel):
self.inference = self.model.segment

def run(self, job) -> Dict[str, Any]:
"""Execute annotation job on dataset loaded from datalake."""
dataset_name = job.get('dataset_name_in_datalake')
version = job.get('version', 'latest')
self.dataset = self.datalake.load(dataset_name, version=version)
return

def predict_dataset(self, output_callback: Callable = None, *args, **kwargs) -> List[Dict[str, Any]]:
pass