diff --git a/.gitignore b/.gitignore index ba3e575f2..5833ad5d8 100644 --- a/.gitignore +++ b/.gitignore @@ -79,10 +79,10 @@ cmake-build-debug/ clang-tidy-build/ libbuild/ - # Data & config files .data/ +.debug/ .env exploration/ diff --git a/benchmark/huffpost_kaggle/analytics.ipynb b/benchmark/huffpost_kaggle/analytics.ipynb deleted file mode 100644 index a13e1ff7e..000000000 --- a/benchmark/huffpost_kaggle/analytics.ipynb +++ /dev/null @@ -1,110 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import plotly.express as px\n", - "\n", - "from benchmark.huffpost_kaggle.data_generation import HuffpostKaggleDataGenerator" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from pathlib import Path\n", - "\n", - "huffpost_dataset = HuffpostKaggleDataGenerator(\n", - " Path(\"/scratch/robinholzi/gh/modyn/.debug.log/.data/huffpost_kaggle\"),\n", - " Path(\"/scratch/robinholzi/gh/modyn/.debug.log/.data/huffpost_kaggle/_raw/news-category-dataset.zip\"),\n", - ")\n", - "huffpost_dataset.extract_data()\n", - "hp_df = huffpost_dataset.load_into_dataframe()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "hp_df[\"category\"].unique()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# number of samples over time\n", - "px.histogram(hp_df, x=\"date\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "category_and_years = hp_df[[\"category\", \"date\"]]\n", - "category_and_years[\"year\"] = category_and_years[\"date\"].dt.year\n", - "category_and_years = category_and_years[[\"category\", \"year\"]].drop_duplicates()\n", - "category_and_years = category_and_years.groupby(\"category\").size().reset_index()\n", - "category_and_years.columns = [\"category\", \"num_years\"]\n", - "category_and_years[category_and_years[\"num_years\"] > 9]" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "hp_df_reduced = hp_df.merge(category_and_years, on=\"category\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "px.histogram(hp_df_reduced, x=\"date\", color=\"category\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.12.3" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/docker-compose.yml b/docker-compose.yml index 49db7eb97..8b91a0900 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: "3.8" - # By default, we disable mountain the current directory under /modyn_host. However, this might be helpful for local development. # For the trainer server, you additionally might want to enable the runtime and deployment option to enable the GPU in the container. # For the storage, you probably want to mount some volume containing the datasets. @@ -8,7 +6,7 @@ version: "3.8" services: metadata-db: - image: postgres:15.2-alpine + image: postgres:16.4-alpine restart: always environment: POSTGRES_USER: postgres @@ -28,7 +26,7 @@ services: timeout: 5s retries: 20 storage-db: - image: postgres:15.2-alpine + image: postgres:16.4-alpine restart: always environment: POSTGRES_USER: postgres diff --git a/modyn/config/examples/modyn_config.yaml b/modyn/config/examples/modyn_config.yaml index ace18ddbf..42ec1588e 100644 --- a/modyn/config/examples/modyn_config.yaml +++ b/modyn/config/examples/modyn_config.yaml @@ -47,6 +47,24 @@ storage: selector_batch_size: 2000000, }, # ---------------------------------- YEARBOOK ---------------------------------- # + { + name: "yearbook_all", + description: "Yearbook Dataset from Wild-Time (full set)", + version: "0.0.1", + base_path: "/datasets/yearbook/all", + filesystem_wrapper_type: "LocalFilesystemWrapper", + file_wrapper_type: "BinaryFileWrapper", + file_wrapper_config: + { + byteorder: "big", + record_size: 12292, + label_size: 4, + file_extension: ".bin", + }, + ignore_last_timestamp: false, + file_watcher_interval: 5, + selector_batch_size: 256, + }, { name: "yearbook_train", description: "Yearbook Dataset from Wild-Time (training set)", @@ -114,6 +132,39 @@ storage: file_watcher_interval: 5, selector_batch_size: 4096, }, + # ------------------------------- HUFFPOST KAGGLE ------------------------------ # + { + name: "huffpost_kaggle_train", + description: "Original Huffpost Dataset from Kaggle (train)", + version: "0.0.1", + base_path: "/datasets/huffpost_kaggle/train", + filesystem_wrapper_type: "LocalFilesystemWrapper", + file_wrapper_type: "CsvFileWrapper", + file_wrapper_config: { + file_extension: ".csv", + separator: "\t", #tsv best option here since headlines contain commas and semicolons + label_index: 1, + }, + ignore_last_timestamp: false, + file_watcher_interval: 5, + selector_batch_size: 4096, + }, + { + name: "huffpost_kaggle_test", + description: "Original Huffpost Dataset from Kaggle (test)", + version: "0.0.1", + base_path: "/datasets/huffpost_kaggle/test", + filesystem_wrapper_type: "LocalFilesystemWrapper", + file_wrapper_type: "CsvFileWrapper", + file_wrapper_config: { + file_extension: ".csv", + separator: "\t", #tsv best option here since headlines contain commas and semicolons + label_index: 1, + }, + ignore_last_timestamp: false, + file_watcher_interval: 5, + selector_batch_size: 4096, + }, # ------------------------------------ ARXIV ----------------------------------- # { name: "arxiv_train", @@ -147,6 +198,39 @@ storage: file_watcher_interval: 5, selector_batch_size: 4096, }, + # -------------------------------- ARXIV KAGGLE -------------------------------- # + { + name: "arxiv_kaggle_train", + description: "Original Arxiv Dataset from Kaggle (training set)", + version: "0.0.1", + base_path: "/datasets/arxiv_kaggle/train", + filesystem_wrapper_type: "LocalFilesystemWrapper", + file_wrapper_type: "CsvFileWrapper", + file_wrapper_config: { + file_extension: ".csv", + separator: "\t", #tsv best option here since sentences contain commas and semicolons + label_index: 1, + }, + ignore_last_timestamp: false, + file_watcher_interval: 5, + selector_batch_size: 4096, + }, + { + name: "arxiv_kaggle_test", + description: "Original Arxiv Dataset from Kaggle (test set)", + version: "0.0.1", + base_path: "/datasets/arxiv_kaggle/test", + filesystem_wrapper_type: "LocalFilesystemWrapper", + file_wrapper_type: "CsvFileWrapper", + file_wrapper_config: { + file_extension: ".csv", + separator: "\t", #tsv best option here since sentences contain commas and semicolons + label_index: 1, + }, + ignore_last_timestamp: false, + file_watcher_interval: 5, + selector_batch_size: 4096, + }, # ------------------------------------ CLOC ------------------------------------ # { name: "cloc", diff --git a/modyn/config/schema/pipeline/trigger/drift/alibi_detect.py b/modyn/config/schema/pipeline/trigger/drift/alibi_detect.py index 882fec679..b0cecb15f 100644 --- a/modyn/config/schema/pipeline/trigger/drift/alibi_detect.py +++ b/modyn/config/schema/pipeline/trigger/drift/alibi_detect.py @@ -8,11 +8,15 @@ from modyn.config.schema.base_model import ModynBaseModel from modyn.config.schema.pipeline.trigger.drift.metric import BaseMetric +from modyn.config.schema.pipeline.trigger.drift.preprocess.alibi_detect import ( + AlibiDetectNLPreprocessor, +) class _AlibiDetectBaseDriftMetric(BaseMetric): p_val: float = Field(0.05, description="The p-value threshold for the drift detection.") x_ref_preprocessed: bool = Field(False) + preprocessor: AlibiDetectNLPreprocessor | None = Field(None, description="Preprocessor function.") class AlibiDetectDeviceMixin(ModynBaseModel): @@ -65,6 +69,13 @@ def validate_threshold_permutations(self) -> "AlibiDetectMmdDriftMetric": return self +class AlibiDetectClassifierDriftMetric(_AlibiDetectBaseDriftMetric, AlibiDetectDeviceMixin): + id: Literal["AlibiDetectClassifierDriftMetric"] = Field("AlibiDetectClassifierDriftMetric") + classifier_id: str = Field( + description="The model to use for classifications; has to be registered in alibi_detector.py" + ) + + class AlibiDetectKSDriftMetric( _AlibiDetectBaseDriftMetric, _AlibiDetectAlternativeMixin, diff --git a/modyn/config/schema/pipeline/trigger/drift/preprocess/alibi_detect.py b/modyn/config/schema/pipeline/trigger/drift/preprocess/alibi_detect.py new file mode 100644 index 000000000..278d71c92 --- /dev/null +++ b/modyn/config/schema/pipeline/trigger/drift/preprocess/alibi_detect.py @@ -0,0 +1,30 @@ +from collections.abc import Callable +from functools import partial + +from alibi_detect.cd.pytorch import preprocess_drift +from alibi_detect.models.pytorch import TransformerEmbedding +from pydantic import Field +from transformers import AutoTokenizer + +from modyn.config.schema.base_model import ModynBaseModel + + +class AlibiDetectNLPreprocessor(ModynBaseModel): + tokenizer_model: str = Field(description="AutoTokenizer pretrained model name. E.g. bert-base-cased") + n_layers: int = Field(8) + max_len: int = Field(..., description="Maximum length of input token sequences.") + batch_size: int = Field(32, description="Batch size for tokenization.") + + def gen_preprocess_fn(self, device: str | None) -> Callable: + tokenizer = AutoTokenizer.from_pretrained(self.tokenizer_model) + emb_type = "hidden_state" + layers = [-_ for _ in range(1, self.n_layers + 1)] + + embedding = TransformerEmbedding(self.tokenizer_model, emb_type, layers) + if device: + embedding = embedding.to(device) + embedding = embedding.eval() + + return partial( + preprocess_drift, model=embedding, tokenizer=tokenizer, max_len=self.max_len, batch_size=self.batch_size + ) diff --git a/modyn/supervisor/internal/triggers/drift/classifier_models/__init__.py b/modyn/supervisor/internal/triggers/drift/classifier_models/__init__.py new file mode 100644 index 000000000..c36ab3e3e --- /dev/null +++ b/modyn/supervisor/internal/triggers/drift/classifier_models/__init__.py @@ -0,0 +1,5 @@ +from modyn.supervisor.internal.triggers.drift.classifier_models.ybnet_classifier import YearbookNetDriftDetector + +alibi_classifier_models = { + "ybnet": YearbookNetDriftDetector(3), +} diff --git a/modyn/supervisor/internal/triggers/drift/classifier_models/ybnet_classifier.py b/modyn/supervisor/internal/triggers/drift/classifier_models/ybnet_classifier.py new file mode 100644 index 000000000..4fdc65d86 --- /dev/null +++ b/modyn/supervisor/internal/triggers/drift/classifier_models/ybnet_classifier.py @@ -0,0 +1,36 @@ +import torch +from torch import nn + +from modyn.models.coreset_methods_support import CoresetSupportingModule + + +class YearbookNetDriftDetector(CoresetSupportingModule): + def __init__(self, num_input_channels: int) -> None: + super().__init__() + self.enc = nn.Sequential( + self.conv_block(num_input_channels, 32), + self.conv_block(32, 32), + self.conv_block(32, 32), + self.conv_block(32, 32), + ) + self.hid_dim = 32 + # Binary classifier for drift detection + # see: https://docs.seldon.io/projects/alibi-detect/en/latest/cd/methods/classifierdrift.html + self.classifier = nn.Sequential(nn.Flatten(), nn.Linear(32, 2)) + + def conv_block(self, in_channels: int, out_channels: int) -> nn.Module: + return nn.Sequential( + nn.Conv2d(in_channels, out_channels, 3, padding=1), + nn.BatchNorm2d(out_channels), + nn.ReLU(), + nn.MaxPool2d(2), + ) + + def forward(self, data: torch.Tensor) -> torch.Tensor: + data = self.enc(data) + data = torch.mean(data, dim=(2, 3)) + data = self.classifier(data) + return data + + def get_last_layer(self) -> nn.Module: + return self.classifier diff --git a/modyn/supervisor/internal/triggers/drift/detector/alibi.py b/modyn/supervisor/internal/triggers/drift/detector/alibi.py index d03880838..239b554f0 100644 --- a/modyn/supervisor/internal/triggers/drift/detector/alibi.py +++ b/modyn/supervisor/internal/triggers/drift/detector/alibi.py @@ -6,6 +6,7 @@ import torch from alibi_detect.cd import ( ChiSquareDrift, + ClassifierDrift, CVMDrift, FETDrift, KSDrift, @@ -19,13 +20,16 @@ MetricResult, ) from modyn.config.schema.pipeline.trigger.drift.alibi_detect import ( + AlibiDetectClassifierDriftMetric, AlibiDetectCVMDriftMetric, AlibiDetectKSDriftMetric, ) +from modyn.supervisor.internal.triggers.drift.classifier_models import ( + alibi_classifier_models, +) +from modyn.supervisor.internal.triggers.drift.detector.drift import DriftDetector -from .drift import DriftDetector - -_AlibiMetrics = MMDDrift | ChiSquareDrift | KSDrift | CVMDrift | FETDrift | LSDDDrift +_AlibiMetrics = MMDDrift | ClassifierDrift | ChiSquareDrift | CVMDrift | FETDrift | KSDrift | LSDDDrift | MMDDrift class AlibiDriftDetector(DriftDetector): @@ -42,18 +46,18 @@ def init_detector(self) -> None: def detect_drift( self, - embeddings_ref: pd.DataFrame | np.ndarray | torch.Tensor, - embeddings_cur: pd.DataFrame | np.ndarray | torch.Tensor, + embeddings_ref: pd.DataFrame | pd.Series | np.ndarray | torch.Tensor, + embeddings_cur: pd.DataFrame | pd.Series | np.ndarray | torch.Tensor, is_warmup: bool, ) -> dict[str, MetricResult]: - assert isinstance(embeddings_ref, (np.ndarray | torch.Tensor)) - assert isinstance(embeddings_cur, (np.ndarray | torch.Tensor)) - embeddings_ref = ( - embeddings_ref.detach().cpu().numpy() if isinstance(embeddings_ref, torch.Tensor) else embeddings_ref - ) - embeddings_cur = ( - embeddings_cur.detach().cpu().numpy() if isinstance(embeddings_cur, torch.Tensor) else embeddings_cur - ) + if isinstance(embeddings_ref, pd.DataFrame): + embeddings_ref = embeddings_ref.to_numpy() + if isinstance(embeddings_ref, torch.Tensor): + embeddings_ref = embeddings_ref.detach().cpu().numpy() + if isinstance(embeddings_cur, pd.DataFrame): + embeddings_cur = embeddings_cur.to_numpy() + if isinstance(embeddings_cur, torch.Tensor): + embeddings_cur = embeddings_cur.detach().cpu().numpy() results: dict[str, MetricResult] = {} @@ -76,14 +80,18 @@ def detect_drift( if isinstance(result["data"]["p_val"], np.ndarray) else result["data"]["p_val"] ) - + _threshold = ( + float(result["data"]["threshold"].mean()) + if isinstance(result["data"]["threshold"], np.ndarray) + else result["data"]["threshold"] + ) results[metric_ref] = MetricResult( metric_id=metric_ref, # will be overwritten by DecisionPolicy inside the DataDriftTrigger is_drift=result["data"]["is_drift"], distance=_dist, p_val=_p_val, - threshold=result["data"].get("threshold"), + threshold=_threshold, ) return results @@ -99,6 +107,10 @@ def _alibi_detect_metric_factory(config: AlibiDetectDriftMetric, embeddings_ref: if isinstance(config, AlibiDetectMmdDriftMetric): kernel = getattr(alibi_detect.utils.pytorch, config.kernel) + kwargs = {} + if config.preprocessor: + kwargs.update({"preprocess_fn": config.preprocessor.gen_preprocess_fn(config.device)}) + if isinstance(config, AlibiDetectMmdDriftMetric): assert kernel is not None return MMDDrift( @@ -110,6 +122,18 @@ def _alibi_detect_metric_factory(config: AlibiDetectDriftMetric, embeddings_ref: device=config.device, configure_kernel_from_x_ref=config.configure_kernel_from_x_ref, x_ref_preprocessed=config.x_ref_preprocessed, + **kwargs, + ) + + if isinstance(config, AlibiDetectClassifierDriftMetric): + return ClassifierDrift( + embeddings_ref, + alibi_classifier_models[config.classifier_id], + backend="pytorch", + p_val=config.p_val, + preds_type="logits", + device=config.device, + **kwargs, ) if isinstance(config, AlibiDetectKSDriftMetric): @@ -118,6 +142,7 @@ def _alibi_detect_metric_factory(config: AlibiDetectDriftMetric, embeddings_ref: p_val=config.p_val, correction=config.correction, x_ref_preprocessed=config.x_ref_preprocessed, + **kwargs, ) if isinstance(config, AlibiDetectCVMDriftMetric): @@ -126,6 +151,7 @@ def _alibi_detect_metric_factory(config: AlibiDetectDriftMetric, embeddings_ref: p_val=config.p_val, correction=config.correction, x_ref_preprocessed=config.x_ref_preprocessed, + **kwargs, ) raise NotImplementedError(f"Metric {config.id} is not supported in AlibiDetectDriftMetric.") diff --git a/modyn/supervisor/internal/triggers/drift/detector/evidently.py b/modyn/supervisor/internal/triggers/drift/detector/evidently.py index 8935628ae..a855865f2 100644 --- a/modyn/supervisor/internal/triggers/drift/detector/evidently.py +++ b/modyn/supervisor/internal/triggers/drift/detector/evidently.py @@ -32,18 +32,32 @@ def init_detector(self) -> None: def detect_drift( self, - embeddings_ref: pd.DataFrame | np.ndarray | torch.Tensor, - embeddings_cur: pd.DataFrame | np.ndarray | torch.Tensor, + embeddings_ref: pd.DataFrame | pd.Series | np.ndarray | torch.Tensor, + embeddings_cur: pd.DataFrame | pd.Series | np.ndarray | torch.Tensor, is_warmup: bool, ) -> dict[str, MetricResult]: - assert isinstance(embeddings_ref, pd.DataFrame) - assert isinstance(embeddings_cur, pd.DataFrame) + if isinstance(embeddings_ref, torch.Tensor): + embeddings_ref = embeddings_ref.cpu().numpy() + + if isinstance(embeddings_cur, torch.Tensor): + embeddings_cur = embeddings_cur.cpu().numpy() + + if isinstance(embeddings_ref, np.ndarray): + assert len(embeddings_ref.shape) == 2 + embeddings_ref = pd.DataFrame(embeddings_ref) + + if isinstance(embeddings_cur, np.ndarray): + assert len(embeddings_cur.shape) == 2 + embeddings_cur = pd.DataFrame(embeddings_cur) # Run Evidently detection # ColumnMapping is {mapping name: column indices}, # an Evidently way of identifying (sub)columns to use in the detection. # e.g. {"even columns": [0,2,4]}. - column_mapping = ColumnMapping(embeddings={EVIDENTLY_COLUMN_MAPPING_NAME: embeddings_ref.columns}) + mapped_columns = list(map(str, embeddings_ref.columns)) + embeddings_ref.columns = mapped_columns + embeddings_cur.columns = mapped_columns + column_mapping = ColumnMapping(embeddings={EVIDENTLY_COLUMN_MAPPING_NAME: mapped_columns}) # https://docs.evidentlyai.com/user-guide/customization/embeddings-drift-parameters report = Report(