Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ repos:
# line too long and line before binary operator (black is ok with these)
types:
- python
args:
- "--max-line-length=90"
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
Expand Down
127 changes: 69 additions & 58 deletions vetiver/monitor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import datetime
import pins
from pins.errors import PinsError
import plotly.express as px
import pandas as pd
from datetime import datetime, timedelta
from datetime import timedelta


def compute_metrics(
Expand Down Expand Up @@ -75,60 +72,74 @@ def _rolling_df(df: pd.DataFrame, td: timedelta):
first = stop


def pin_metrics(board, df_metrics, metrics_pin_name, overwrite=False):
pass


# """
# Update an existing pin storing model metrics over time

# Parameters
# ----------
# board :
# Pins board
# df_metrics: pd.DataFrame
# Dataframe of metrics over time, such as created by `vetiver_compute_metrics()`
# metrics_pin_name:
# Pin name for where the metrics are stored
# overwrite: bool
# If TRUE (the default), overwrite any metrics for
# dates that exist both in the existing pin and
# new metrics with the new values. If FALSE, error
# when the new metrics contain overlapping dates with
# the existing pin.
# """
# date_types = (datetime.date, datetime.time, datetime.datetime)
# if not isinstance(df_metrics.index, date_types):
# try:
# df_metrics = df_metrics.index.astype("datetime")
# except TypeError:
# raise TypeError(f"Index of {df_metrics} must be a date type")

# new_metrics = df_metrics.sort_index()

# new_dates = df_metrics.index.unique()

# try:
# old_metrics = board.pin_read(metrics_pin_name)
# except PinsError:
# board.pin_write(metrics_pin_name)

# overlapping_dates = old_metrics.index in new_dates

# if overwrite is True:
# old_metrics = old_metrics not in overlapping_dates
# else:
# if overlapping_dates:
# raise ValueError(
# f"The new metrics overlap with dates \
# already stored in {repr(metrics_pin_name)} \
# Check the aggregated dates or use `overwrite = True`"
# )

# new_metrics = old_metrics + df_metrics
# new_metrics = new_metrics.sort_index()

# pins.pin_write(board, new_metrics, metrics_pin_name)
def pin_metrics(
board,
df_metrics: pd.DataFrame,
metrics_pin_name: str,
pin_type: "str | None" = None,
index_name: str = "index",
overwrite: bool = False,
) -> pd.DataFrame:
"""
Update an existing pin storing model metrics over time

Parameters
----------
board :
Pins board
df_metrics: pd.DataFrame
Dataframe of metrics over time, such as created by `vetiver_compute_metrics()`
metrics_pin_name:
Pin name for where the metrics are stored
index_name:
The column in df_metrics containing the aggregated dates or datetimes.
Note that this defaults to a column named "index".
overwrite: bool
If TRUE (the default), overwrite any metrics for
dates that exist both in the existing pin and
new metrics with the new values. If FALSE, error
when the new metrics contain overlapping dates with
the existing pin.
"""

old_metrics_raw = board.pin_read(metrics_pin_name)

# need to coerce date index to a datetime, since pandas does not infer
# date columns from CSV (but note that formats like arrow do)
old_metrics = old_metrics_raw.copy()
old_metrics[index_name] = pd.to_datetime(old_metrics[index_name])

# handle overlapping dates ----
dt_new = pd.to_datetime(df_metrics[index_name])
dt_old = old_metrics[index_name]

indx_old_overlap = dt_old.isin(dt_new)

if overwrite:
# get only rows specific to old metrics, so when we concat below
# it effectively is an upsert
old_metrics = old_metrics.loc[~indx_old_overlap, :]

elif not overwrite and indx_old_overlap.any():
raise ValueError(
f"The new metrics overlap with dates already stored in {metrics_pin_name}."
" Check the aggregated dates or use `overwrite=True`."
)

# update and pin ----
combined_metrics = pd.concat([old_metrics, df_metrics], ignore_index=True)
sorted_metrics = combined_metrics.sort_values(index_name)

if pin_type is None:
meta = board.pin_meta(metrics_pin_name)

final_pin_type = meta.type
else:
final_pin_type = pin_type

board.pin_write(sorted_metrics, metrics_pin_name, type=final_pin_type)

return sorted_metrics


def plot_metrics(
Expand Down
93 changes: 91 additions & 2 deletions vetiver/tests/test_monitor.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
from sklearn import metrics
from datetime import timedelta

import pandas as pd
import pins
import numpy
import time
import vetiver

import pytest

rng = pd.date_range("1/1/2012", periods=10, freq="S")
new = dict(x=range(len(rng)), y=range(len(rng)))
df = pd.DataFrame(new, index=rng)
td = timedelta(seconds=2)
metric_set = [metrics.mean_squared_error, metrics.mean_absolute_error]


def test_rolling():
m = [_ for _ in vetiver._rolling_df(df, td)]
assert len(m) == 5
assert len(m[0]) == 2


def test_compute():
df.reset_index(inplace=True)
m = vetiver.compute_metrics(
Expand All @@ -27,10 +34,92 @@ def test_compute():
numpy.array(["mean_squared_error", "mean_absolute_error"], dtype=object),
)


def test_monitor(snapshot):
snapshot.snapshot_dir = './vetiver/tests/snapshots'
snapshot.snapshot_dir = "./vetiver/tests/snapshots"
m = vetiver.compute_metrics(
df, "index", td, metric_set=metric_set, truth="x", estimate="y"
)
vetiver.plot_metrics(m)
snapshot.assert_match(m.to_json(), 'test_monitor.json')
snapshot.assert_match(m.to_json(), "test_monitor.json")


@pytest.fixture
def df_metrics_old():
return pd.DataFrame(
{
"index": pd.to_datetime(["2021-01-01", "2021-01-02"]),
"n": [1, 2],
"metric": ["x", "x"],
"estimate": [0.1, 0.2],
}
)


def test_vetiver_pin_metrics_simple(df_metrics_old):
board = pins.board_temp()
board.pin_write(df_metrics_old, "test_metrics", type="csv")
time.sleep(1)

df_metrics_new = pd.DataFrame(
{
"index": pd.to_datetime(["2021-01-03", "2021-01-04"]),
"n": [3, 4],
"metric": ["x", "x"],
"estimate": [0.8, 0.9],
}
)

df_res = vetiver.pin_metrics(board, df_metrics_new, "test_metrics")

assert len(df_res) == 4
assert df_res.equals(pd.concat([df_metrics_old, df_metrics_new], ignore_index=True))


def test_vetiver_pin_metrics_overlap_error(df_metrics_old):
board = pins.board_temp()
board.pin_write(df_metrics_old, "test_metrics", type="csv")
time.sleep(0.1)

with pytest.raises(ValueError) as exc_info:
vetiver.pin_metrics(board, df_metrics_old, "test_metrics")

assert "The new metrics overlap" in exc_info.value.args[0]


def test_vetiver_pin_metrics_overwrite(df_metrics_old):
board = pins.board_temp()
board.pin_write(df_metrics_old, "test_metrics", type="csv")
time.sleep(1)

# first row should update existing metrics
df_metrics_new = pd.DataFrame(
{
"index": pd.to_datetime(["2021-01-01", "2021-01-03"]),
"n": [200, 201],
"metric": ["y", "y"],
"estimate": [0.8, 0.9],
}
)

df_res = vetiver.pin_metrics(board, df_metrics_new, "test_metrics", overwrite=True)
assert len(df_res) == 3

df_dst = pd.concat([df_metrics_old.iloc[[1], :], df_metrics_new], ignore_index=True)
assert df_res.equals(df_dst.sort_values("index"))


def test_vetiver_pin_metrics_manual_pin_type(df_metrics_old):
board = pins.board_temp()
board.pin_write(df_metrics_old, "test_metrics", type="csv")
time.sleep(1)

df_res = vetiver.pin_metrics(
board, df_metrics_old, "test_metrics", overwrite=True, pin_type="joblib"
)

assert len(df_res) == 2

meta = board.pin_meta("test_metrics")

assert meta.type == "joblib"