-
Notifications
You must be signed in to change notification settings - Fork 0
NannyML drift calculator #45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Here's the diff for the NannyML dashboard ( import os
import pickle
import time
from collections import defaultdict
from pathlib import Path
from statistics import mean
import nannyml
import pandas as pd
import psycopg2
import psycopg2.extensions
import psycopg2.extras
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from mlserver.codecs import NumpyCodec
nannyml.disable_usage_logging()
db_conn: psycopg2.extensions.connection | None = None
async def lifespan(_: FastAPI):
global db_conn
db_conn = psycopg2.connect(
host="postgres",
user="postgres_user",
password="postgres_password",
dbname="hr_assistant",
)
try:
yield
finally:
if db_conn:
db_conn.close()
app = FastAPI(lifespan=lifespan)
estimator_path = Path(os.getenv("NANNYML_ESTIMATOR"))
estimator: nannyml.CBPE = pickle.loads(estimator_path.read_bytes())
drift_calc_path = Path(os.getenv("NANNYML_DRIFT_CALCULATOR"))
drift_calc: nannyml.UnivariateDriftCalculator = pickle.loads(
drift_calc_path.read_bytes()
)
def trace_execution_time(fn):
def wrapper(*args, **kwargs):
start = time.time()
result = fn(*args, **kwargs)
end = time.time()
print(f"{fn.__name__}(), elapsed: {end - start} s")
return result
return wrapper
class Tracer:
def __init__(self):
self.tracepoints = defaultdict(list) # {name: list[elapsed_time]}
self.start_time = time.time()
def trace(self, name):
self.tracepoints[name].append(time.time())
def dump(self):
prev_start = self.start_time
for name, elapsed in self.tracepoints.items():
# Each name maps to a list of elapsed times
# Calculate the mean elapsed time for each tracepoint by calculating the running difference
# between each pair of timestamps
elapsed = [prev_start] + elapsed
elapsed = [
end - start for start, end in zip(elapsed, elapsed[1:], strict=False)
]
prev_start = elapsed[-1]
print(f"{name}: {mean(elapsed)} s")
def load_predictions(cursor: psycopg2.extensions.cursor) -> pd.DataFrame:
query = """
SELECT
req.raw_request as request,
resp.raw_response as response
FROM inference_requests req INNER JOIN inference_responses resp
ON (req.id = resp.id)
"""
tracer = Tracer()
cursor.execute(query)
tracer.trace("query_execute")
rows = cursor.fetchall()
tracer.trace("fetchall")
from mlserver.codecs import PandasCodec
from mlserver.types import InferenceRequest, InferenceResponse
results = []
for row in rows:
req = InferenceRequest.model_validate(row["request"])
tracer.trace("request_validate")
resp = InferenceResponse.model_validate(row["response"])
tracer.trace("response_validate")
req_df = PandasCodec.decode_request(req)
tracer.trace("decode_request")
outputs = {o.name: NumpyCodec.decode_output(o) for o in resp.outputs}
tracer.trace("decode_outputs")
prediction = pd.Series(outputs["predict"].ravel(), name="prediction")
prediction_probability = pd.DataFrame(
outputs["predict_proba"].tolist(),
columns=[f"prob_{i}" for i in range(outputs["predict_proba"].shape[1])],
)
tracer.trace("build_df")
df = req_df.copy()
df = pd.concat([df, prediction, prediction_probability], axis=1)
results.append(df)
tracer.trace("decode_outputs")
tracer.dump()
return pd.concat(results, axis=0)
def plot_performance(analysis_df: pd.DataFrame):
estimated_performance = estimator.estimate(analysis_df)
fig = estimated_performance.plot()
return fig.to_html(full_html=False)
def plot_drift(analysis_df: pd.DataFrame):
drift_results = drift_calc.calculate(analysis_df)
fig = drift_results.plot(kind="drift")
plot_html = fig.to_html(full_html=False)
print(plot_html)
return plot_html
def load_analysis_data() -> pd.DataFrame:
with db_conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
analysis_df = load_predictions(cur)
return analysis_df
@app.get("/", response_class=HTMLResponse)
def index():
return """
<!DOCTYPE html>
<html lang="en">
<head>
<script src="https://unpkg.com/htmx.org@1.9.4"></script>
</head>
<body>
<div id="graph-container" hx-get="/update_data" hx-trigger="every 5s"></div>
</body>
</html>
"""
@app.get("/update_data", response_class=HTMLResponse)
def update_data():
analysis_df = load_analysis_data()
return plot_performance(analysis_df) |
|
Quick note on the performance of And obviously, buffering decoded requests/responses in memory and only fetching new rows when updating the plot. |
…ng falses unsync warning in dagster
No description provided.