Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 2 additions & 257 deletions src/multicalibration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,17 @@
import threading
import time
import warnings
from typing import Any, Dict, Protocol, Tuple
from typing import Any, Protocol

import numpy as np
import pandas as pd

import psutil
import torch
from scipy import stats
from scipy.optimize._linesearch import LineSearchWarning
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder
from sklearn.preprocessing import OrdinalEncoder

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -450,260 +449,6 @@ def check_range(series, precision_type):
)


# ========================================= #
# FEATURE PREPROCESSING FUNCTIONS FOR MCNET #
# ========================================= #


class FeatureProcessorState:
def __init__(self):
self.enc: OrdinalEncoderWithUnknownSupport | None = None
self.one_hot_enc: OneHotEncoder | None = None
self.missing_value_fill: Dict[str, float] = {}
self.feature_means: torch.Tensor | None = None
self.feature_stds: torch.Tensor | None = None
self.categorical_segment_cols: list[str] = []
self.numerical_segment_cols: list[str] = []
self.feature_dim: int = 0
self._is_fitted: bool = False
self.category_mappings: Dict[str, Dict[str, str]] = {}
self.max_n_categories: int | None = None


def extract_segment_features(
df: pd.DataFrame,
categorical_segment_cols: list[str] | None = None,
numerical_segment_cols: list[str] | None = None,
processor_state: FeatureProcessorState | None = None,
max_n_categories: int | None = None,
is_fit_phase: bool = False,
fillna: bool = True,
) -> Tuple[torch.Tensor, FeatureProcessorState]:
if processor_state is None:
processor_state = FeatureProcessorState()
processor_state.max_n_categories = max_n_categories

if is_fit_phase:
processor_state.categorical_segment_cols = categorical_segment_cols or []
processor_state.numerical_segment_cols = numerical_segment_cols or []

feature_list = []

if categorical_segment_cols:
cat_features = _process_categorical_features(
df, categorical_segment_cols, processor_state, is_fit_phase
)
feature_list.append(cat_features)

if numerical_segment_cols:
num_features = _process_numerical_features(
df, numerical_segment_cols, processor_state, is_fit_phase, fillna
)
feature_list.append(num_features)

if feature_list:
combined_features = torch.cat(feature_list, dim=1)
else:
combined_features = torch.empty((len(df), 0), dtype=torch.float32)

if is_fit_phase:
processor_state.feature_dim = combined_features.shape[1]
processor_state._is_fitted = True

return combined_features, processor_state


def collapse_categorical_features_by_frequency(
df: pd.DataFrame,
categorical_cols: list[str],
max_n_categories: int,
processor_state: FeatureProcessorState,
is_fit_phase: bool = True,
) -> pd.DataFrame:
"""
Collapse categorical features to max_n_categories based on their frequency.
Keeps the most frequent categories and groups the rest into an "OTHER" category.
"""
df_collapsed = df.copy()

if is_fit_phase:
processor_state.max_n_categories = max_n_categories
processor_state.category_mappings = {}

for col in categorical_cols:
if col not in df_collapsed.columns:
continue

if is_fit_phase:
value_counts = df_collapsed[col].value_counts()

if len(value_counts) <= max_n_categories:
mapping = {}
for category in value_counts.index:
mapping[category] = category

processor_state.category_mappings[col] = mapping
else:
top_categories = value_counts.head(max_n_categories - 1).index.tolist()

mapping = {}
for category in value_counts.index:
if category in top_categories:
mapping[category] = category
else:
mapping[category] = "OTHER"

processor_state.category_mappings[col] = mapping

df_collapsed[col] = (
df_collapsed[col].map(mapping).fillna(df_collapsed[col])
)
else:
if col in processor_state.category_mappings:
mapping = processor_state.category_mappings[col]
mapped_values = df_collapsed[col].map(mapping)
df_collapsed[col] = mapped_values.fillna("OTHER")

return df_collapsed


def _process_categorical_features(
df: pd.DataFrame,
categorical_segment_cols: list[str],
processor_state: FeatureProcessorState,
is_fit_phase: bool,
) -> torch.Tensor:
"""Optimized categorical feature processing."""
cat_data = df[categorical_segment_cols].copy()

cat_data = cat_data.fillna("__MISSING__")

if processor_state.max_n_categories is not None:
cat_data = collapse_categorical_features_by_frequency(
cat_data,
categorical_segment_cols,
processor_state.max_n_categories,
processor_state,
is_fit_phase,
)

cat_features = _encode_categorical_features(cat_data, processor_state, is_fit_phase)
return torch.tensor(cat_features, dtype=torch.float32)


def _encode_categorical_features(
cat_data: pd.DataFrame,
processor_state: FeatureProcessorState,
is_fit_phase: bool,
) -> np.ndarray:
cat_features = cat_data.values

if is_fit_phase:
processor_state.enc = OrdinalEncoderWithUnknownSupport()
if processor_state.enc is not None:
processor_state.enc.fit(cat_features)

if processor_state.enc is not None:
cat_features = processor_state.enc.transform(cat_features)
else:
raise ValueError("Fit has to be called before encoder can be applied.")

if np.nanmax(cat_features) >= np.iinfo(np.int32).max:
raise ValueError(
"All categorical feature values must be smaller than 2^32 to prevent integer overflow."
)

if is_fit_phase:
processor_state.one_hot_enc = OneHotEncoder(
sparse_output=False,
handle_unknown="ignore", # This will create zeros for unknown categories
dtype=np.float32,
)
processor_state.one_hot_enc.fit(cat_features)

if processor_state.one_hot_enc is not None:
cat_features = processor_state.one_hot_enc.transform(cat_features)
else:
raise ValueError(
"OneHotEncoder fit has to be called before transform can be applied."
)

return cat_features


def _process_numerical_features(
df: pd.DataFrame,
numerical_segment_cols: list[str],
processor_state: FeatureProcessorState,
is_fit_phase: bool,
fillna: bool = True,
) -> torch.Tensor:
num_data = torch.tensor(df[numerical_segment_cols].values, dtype=torch.float32)

nan_mask = torch.isnan(num_data)
has_nan = nan_mask.any()
if has_nan:
missing_info = []
for i, col_name in enumerate(numerical_segment_cols):
col_nan_count = nan_mask[:, i].sum().item()
if col_nan_count > 0:
missing_pct = (col_nan_count / num_data.shape[0]) * 100
missing_info.append(
f"{col_name}: {missing_pct:.1f}% ({col_nan_count}/{num_data.shape[0]} rows)"
)

if fillna:
logger.warning(
f"Found missing values in numerical features, imputing with mean. "
f"Missing data breakdown: {', '.join(missing_info)}. "
"Set fillna=False to raise an error instead."
)
else:
raise ValueError(
f"Found missing values in numerical features. "
f"Missing data breakdown: {', '.join(missing_info)}. "
"Set fillna=True to replace with mean or handle NaN values beforehand."
)

presence_mask = (~torch.isnan(num_data)).float()
if is_fit_phase:
processor_state.feature_means = torch.nanmean(num_data, dim=0)

feature_stds = []
for i in range(num_data.shape[1]):
col_data = num_data[:, i]
valid_mask = ~torch.isnan(col_data)
if valid_mask.any():
std_val = torch.std(col_data[valid_mask])
else:
std_val = torch.tensor(1.0)
feature_stds.append(std_val)

feature_stds = torch.stack(feature_stds)

processor_state.feature_stds = torch.where(
feature_stds == 0.0,
torch.ones_like(feature_stds),
feature_stds,
)

num_data_filled = (
torch.where(torch.isnan(num_data), torch.zeros_like(num_data), num_data)
if fillna
else num_data
)
if (
processor_state.feature_means is not None
and processor_state.feature_stds is not None
):
feature_means = processor_state.feature_means
feature_stds = processor_state.feature_stds
num_data_filled = (num_data_filled - feature_means) / feature_stds

combined_features = torch.cat([num_data_filled, presence_mask], dim=1)
return combined_features


def log_peak_rss(samples_per_second=10.0):
"""
Decorator factory to log peak RSS while a function runs.
Expand Down
110 changes: 0 additions & 110 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,116 +523,6 @@ def test_train_test_split_wrapper_split_does_not_modify_input_arrays(rng):
np.testing.assert_array_equal(y, y_original)


def test_extract_segment_features_does_not_modify_input_dataframe(rng):
df = pd.DataFrame(
{
"cat_feature": rng.choice(["A", "B", "C"], 50),
"num_feature": rng.uniform(0, 100, 50),
}
)
df_original = df.copy()

_, _ = utils.extract_segment_features(
df=df,
categorical_segment_cols=["cat_feature"],
numerical_segment_cols=["num_feature"],
is_fit_phase=True,
)

pd.testing.assert_frame_equal(df, df_original)


def test_extract_segment_features_predict_does_not_modify_input_dataframe(rng):
df_train = pd.DataFrame(
{
"cat_feature": rng.choice(["A", "B", "C"], 50),
"num_feature": rng.uniform(0, 100, 50),
}
)
df_test = pd.DataFrame(
{
"cat_feature": rng.choice(["A", "B", "C"], 20),
"num_feature": rng.uniform(0, 100, 20),
}
)
df_test_original = df_test.copy()

_, processor_state = utils.extract_segment_features(
df=df_train,
categorical_segment_cols=["cat_feature"],
numerical_segment_cols=["num_feature"],
is_fit_phase=True,
)

_, _ = utils.extract_segment_features(
df=df_test,
categorical_segment_cols=["cat_feature"],
numerical_segment_cols=["num_feature"],
processor_state=processor_state,
is_fit_phase=False,
)

pd.testing.assert_frame_equal(df_test, df_test_original)


def test_collapse_categorical_features_by_frequency_does_not_modify_input_dataframe(
rng,
):
df = pd.DataFrame(
{
"cat_feature_1": rng.choice(["A", "B", "C", "D", "E"], 100),
"cat_feature_2": rng.choice(["X", "Y", "Z"], 100),
}
)
df_original = df.copy()

processor_state = utils.FeatureProcessorState()
_ = utils.collapse_categorical_features_by_frequency(
df=df,
categorical_cols=["cat_feature_1", "cat_feature_2"],
max_n_categories=3,
processor_state=processor_state,
is_fit_phase=True,
)

pd.testing.assert_frame_equal(df, df_original)


def test_collapse_categorical_features_by_frequency_predict_does_not_modify_input_dataframe(
rng,
):
df_train = pd.DataFrame(
{
"cat_feature": rng.choice(["A", "B", "C", "D", "E"], 100),
}
)
df_test = pd.DataFrame(
{
"cat_feature": rng.choice(["A", "B", "C", "D", "E", "F"], 30),
}
)
df_test_original = df_test.copy()

processor_state = utils.FeatureProcessorState()
_ = utils.collapse_categorical_features_by_frequency(
df=df_train,
categorical_cols=["cat_feature"],
max_n_categories=3,
processor_state=processor_state,
is_fit_phase=True,
)

_ = utils.collapse_categorical_features_by_frequency(
df=df_test,
categorical_cols=["cat_feature"],
max_n_categories=3,
processor_state=processor_state,
is_fit_phase=False,
)

pd.testing.assert_frame_equal(df_test, df_test_original)


def test_make_equispaced_bins_does_not_modify_input_array(rng):
predicted_scores = rng.uniform(0.1, 0.9, 100)
predicted_scores_original = predicted_scores.copy()
Expand Down