Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/analytics/drift/pipelines.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import pandas as pd
from typing import Dict, Union, Any
import json
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
from src.schemas.driftingMetric import DataDriftTable
from src.schemas.driftingMetric import DataDriftTable, ConceptDriftTable


def run_data_drift_pipeline(
Expand Down Expand Up @@ -54,15 +53,14 @@ def run_data_drift_pipeline(
initial_report = json.loads(initial_report)

data_drift_report = {}
data_drift_report["timestamp"] = initial_report["timestamp"]
data_drift_report["drift_summary"] = initial_report["metrics"][1]["result"]

return DataDriftTable(**data_drift_report["drift_summary"])


def run_concept_drift_pipeline(
reference_dataset: pd.DataFrame, current_dataset: pd.DataFrame, target_feature: str
) -> Dict[str, Union[TargetDriftPreset, str]]:
) -> ConceptDriftTable:
"""
To estimate the categorical target drift, we compare the distribution of the target in the two datasets.
This solution works for both binary and multi-class classification.
Expand All @@ -89,10 +87,12 @@ def run_concept_drift_pipeline(
initial_report = drift_report.json()
initial_report = json.loads(initial_report)
concept_drift_report = {}
concept_drift_report["timestamp"] = initial_report["timestamp"]
concept_drift_report["concept_drift_summary"] = initial_report["metrics"][0][
"result"
]
concept_drift_report["column_correlation"] = initial_report["metrics"][1]["result"]

return concept_drift_report
return ConceptDriftTable(
concept_drift_summary=concept_drift_report["concept_drift_summary"],
column_correlation=concept_drift_report["column_correlation"],
)
32 changes: 20 additions & 12 deletions src/analytics/tests/test_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,36 +203,44 @@ def test_create_data_drift_pipeline(self):
)

def test_create_concept_drift_pipeline_drift_not_detected(self):
concept_drift_report = run_concept_drift_pipeline(
reference_concept_drift, current_concept_drift, "y_testing_multi"
concept_drift_report = vars(
run_concept_drift_pipeline(
reference_concept_drift, current_concept_drift, "y_testing_multi"
)
)
assert list(concept_drift_report.keys()) == [
"timestamp",
"concept_drift_summary",
"column_correlation",
]
assert (
round(concept_drift_report["concept_drift_summary"]["drift_score"], 3)
round(vars(concept_drift_report["concept_drift_summary"])["drift_score"], 3)
== 0.082
)
assert concept_drift_report["concept_drift_summary"]["drift_detected"] == False
assert (
vars(concept_drift_report["concept_drift_summary"])["drift_detected"]
== False
)

def test_create_concept_drift_pipeline_drift_detected(self):
concept_drift_report = run_concept_drift_pipeline(
reference_concept_drift_detected,
current_concept_drift_detected,
"discount_price__currency",
concept_drift_report = vars(
run_concept_drift_pipeline(
reference_concept_drift_detected,
current_concept_drift_detected,
"discount_price__currency",
)
)
assert list(concept_drift_report.keys()) == [
"timestamp",
"concept_drift_summary",
"column_correlation",
]
assert (
round(concept_drift_report["concept_drift_summary"]["drift_score"], 3)
round(vars(concept_drift_report["concept_drift_summary"])["drift_score"], 3)
== 0.008
)
assert concept_drift_report["concept_drift_summary"]["drift_detected"] == True
assert (
vars(concept_drift_report["concept_drift_summary"])["drift_detected"]
== True
)

def test_create_binary_classification_training_model_pipeline(self):
model, eval = create_binary_classification_training_model_pipeline(
Expand Down
11 changes: 10 additions & 1 deletion src/cron_tasks/monitoring_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
from sqlalchemy.orm import sessionmaker, Session

from src import crud, entities
from src.analytics.drift.pipelines import run_data_drift_pipeline
from src.analytics.drift.pipelines import (
run_data_drift_pipeline,
run_concept_drift_pipeline,
)
from src.analytics.metrics.pipelines import (
create_binary_classification_evaluation_metrics_pipeline,
create_feature_metrics_pipeline,
Expand Down Expand Up @@ -56,10 +59,16 @@ async def run_calculate_drifting_metrics_pipeline(
data_drift_report = run_data_drift_pipeline(
processed_training_dropped_target_df, processed_inference_dropped_target_df
)
concept_drift_report = run_concept_drift_pipeline(
training_processed_df,
inference_processed_df,
model.prediction,
)

new_drifting_metric = entities.DriftingMetric(
timestamp=str(datetime.utcnow()),
model_id=model.id,
concept_drift_summary=concept_drift_report,
data_drift_summary=data_drift_report,
)

Expand Down
5 changes: 2 additions & 3 deletions src/entities/DriftingMetric.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from sqlalchemy import Column, Float, ForeignKey, String, DateTime, JSON
from sqlalchemy import Column, ForeignKey, String, DateTime, JSON
from src.entities.Base import Base
from src.utils.id_gen import generate_uuid

Expand All @@ -9,8 +9,7 @@ class DriftingMetric(Base):
id = Column(String, primary_key=True, unique=True, default=generate_uuid)
model_id = Column(String, ForeignKey("models.id", ondelete="CASCADE"))
timestamp = Column(DateTime)
# TODO: Fix pipeline to return a DataDriftTable first
# concept_drift_summary = Column(JSON)
concept_drift_summary = Column(JSON)
data_drift_summary = Column(JSON)
created_at = Column(DateTime)
updated_at = Column(DateTime)
12 changes: 5 additions & 7 deletions src/schemas/driftingMetric.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Dict, Union
from typing import Dict, List, Union
from pydantic import BaseModel
from src.schemas.base import ItemBase

Expand Down Expand Up @@ -28,15 +28,15 @@ class CramerV(BaseModel):

column_name: str
kind: str
values: Dict[str, Dict[str, str]]
values: Dict[str, List[str]]


class ColumnConceptDriftCorrelationMetrics(BaseModel):
"""One column concept drift correlation metrics"""

column_name: str
current: CramerV
reference: CramerV
current: Dict[str, CramerV]
reference: Dict[str, CramerV]


class ColumnConceptDriftMetrics(BaseModel):
Expand All @@ -57,12 +57,10 @@ class ConceptDriftTable(BaseModel):
column_correlation: ColumnConceptDriftCorrelationMetrics


# TODO: Need to include the class of the concept drift
class DriftingMetricBase(ItemBase):
model_id: str
timestamp: Union[str, datetime]
# TODO: The pipeline needs to return a DataDriftTable. If evidently does not provide it we should create it.
# concept_drift_summary: DataDriftTable
concept_drift_summary: ConceptDriftTable
data_drift_summary: DataDriftTable


Expand Down