Skip to content

Conversation

@AdrianoKF
Copy link
Collaborator

No description provided.

@AdrianoKF
Copy link
Collaborator Author

Here's the diff for the NannyML dashboard (main.py), when adding the drift calculator, latency goes through the room:

import os
import pickle
import time
from collections import defaultdict
from pathlib import Path
from statistics import mean

import nannyml
import pandas as pd
import psycopg2
import psycopg2.extensions
import psycopg2.extras
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from mlserver.codecs import NumpyCodec

nannyml.disable_usage_logging()
db_conn: psycopg2.extensions.connection | None = None


async def lifespan(_: FastAPI):
    global db_conn
    db_conn = psycopg2.connect(
        host="postgres",
        user="postgres_user",
        password="postgres_password",
        dbname="hr_assistant",
    )

    try:
        yield
    finally:
        if db_conn:
            db_conn.close()


app = FastAPI(lifespan=lifespan)
estimator_path = Path(os.getenv("NANNYML_ESTIMATOR"))
estimator: nannyml.CBPE = pickle.loads(estimator_path.read_bytes())

drift_calc_path = Path(os.getenv("NANNYML_DRIFT_CALCULATOR"))
drift_calc: nannyml.UnivariateDriftCalculator = pickle.loads(
    drift_calc_path.read_bytes()
)


def trace_execution_time(fn):
    def wrapper(*args, **kwargs):
        start = time.time()
        result = fn(*args, **kwargs)
        end = time.time()
        print(f"{fn.__name__}(), elapsed: {end - start} s")
        return result

    return wrapper


class Tracer:
    def __init__(self):
        self.tracepoints = defaultdict(list)  # {name: list[elapsed_time]}
        self.start_time = time.time()

    def trace(self, name):
        self.tracepoints[name].append(time.time())

    def dump(self):
        prev_start = self.start_time
        for name, elapsed in self.tracepoints.items():
            # Each name maps to a list of elapsed times
            # Calculate the mean elapsed time for each tracepoint by calculating the running difference
            # between each pair of timestamps
            elapsed = [prev_start] + elapsed
            elapsed = [
                end - start for start, end in zip(elapsed, elapsed[1:], strict=False)
            ]

            prev_start = elapsed[-1]
            print(f"{name}: {mean(elapsed)} s")


def load_predictions(cursor: psycopg2.extensions.cursor) -> pd.DataFrame:
    query = """
        SELECT
            req.raw_request as request,
            resp.raw_response as response
        FROM inference_requests req INNER JOIN inference_responses resp
        ON (req.id = resp.id)
    """

    tracer = Tracer()

    cursor.execute(query)
    tracer.trace("query_execute")
    rows = cursor.fetchall()
    tracer.trace("fetchall")

    from mlserver.codecs import PandasCodec
    from mlserver.types import InferenceRequest, InferenceResponse

    results = []
    for row in rows:
        req = InferenceRequest.model_validate(row["request"])
        tracer.trace("request_validate")
        resp = InferenceResponse.model_validate(row["response"])
        tracer.trace("response_validate")
        req_df = PandasCodec.decode_request(req)
        tracer.trace("decode_request")

        outputs = {o.name: NumpyCodec.decode_output(o) for o in resp.outputs}
        tracer.trace("decode_outputs")

        prediction = pd.Series(outputs["predict"].ravel(), name="prediction")
        prediction_probability = pd.DataFrame(
            outputs["predict_proba"].tolist(),
            columns=[f"prob_{i}" for i in range(outputs["predict_proba"].shape[1])],
        )
        tracer.trace("build_df")

        df = req_df.copy()
        df = pd.concat([df, prediction, prediction_probability], axis=1)
        results.append(df)

    tracer.trace("decode_outputs")
    tracer.dump()

    return pd.concat(results, axis=0)


def plot_performance(analysis_df: pd.DataFrame):
    estimated_performance = estimator.estimate(analysis_df)
    fig = estimated_performance.plot()
    return fig.to_html(full_html=False)


def plot_drift(analysis_df: pd.DataFrame):
    drift_results = drift_calc.calculate(analysis_df)
    fig = drift_results.plot(kind="drift")
    plot_html = fig.to_html(full_html=False)
    print(plot_html)
    return plot_html


def load_analysis_data() -> pd.DataFrame:
    with db_conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
        analysis_df = load_predictions(cur)
        return analysis_df


@app.get("/", response_class=HTMLResponse)
def index():
    return """
    <!DOCTYPE html>
    <html lang="en">
    <head>
        <script src="https://unpkg.com/htmx.org@1.9.4"></script>
    </head>
    <body>
        <div id="graph-container" hx-get="/update_data" hx-trigger="every 5s"></div>
    </body>
    </html>
    """


@app.get("/update_data", response_class=HTMLResponse)
def update_data():
    analysis_df = load_analysis_data()
    return plot_performance(analysis_df)

@AdrianoKF
Copy link
Collaborator Author

AdrianoKF commented Mar 26, 2025

Quick note on the performance of load_predictions: I just profiled it and the bulk of the execution time is spent in PandasCodec.decode_request() and _to_series() (which it calls). Probably not that much we can do, other than maybe trying to concatenate the request data before calling decode_request.

And obviously, buffering decoded requests/responses in memory and only fetching new rows when updating the plot.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants