Skip to content

Commit

Permalink
Merge branch 'main' into XianzheMa/bug/fix-505
Browse files Browse the repository at this point in the history
  • Loading branch information
XianzheMa authored Jun 24, 2024
2 parents 73cd413 + 97a2b5f commit 8f587b8
Show file tree
Hide file tree
Showing 56 changed files with 1,469 additions and 582 deletions.
3 changes: 3 additions & 0 deletions analytics/app/data/const.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from typing import Literal

CompositeModelOptions = Literal["currently_active_model", "currently_trained_model"]
2 changes: 1 addition & 1 deletion analytics/app/data/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def list_pipelines() -> dict[int, tuple[str, Path]]:

pipelines[pipeline_id] = (pipeline_name, Path(pipeline))

return pipelines
return dict(sorted(pipelines.items()))


def load_pipeline_logs(pipeline_id: int) -> PipelineLogs:
Expand Down
183 changes: 109 additions & 74 deletions analytics/app/data/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

import pandas as pd
from modyn.supervisor.internal.grpc.enums import PipelineStage
from modyn.supervisor.internal.pipeline_executor.models import PipelineLogs, SingleEvaluationInfo
from modyn.supervisor.internal.pipeline_executor.models import PipelineLogs, SingleEvaluationInfo, StageLog
from modyn.supervisor.internal.utils.time_tools import generate_real_training_end_timestamp
from modyn.utils.utils import SECONDS_PER_UNIT

AGGREGATION_FUNCTION = Literal["mean", "median", "max", "min", "sum", "std"]
EVAL_AGGREGATION_FUNCTION = Literal["time_weighted_avg", "mean", "median", "max", "min", "sum", "std"]
Expand All @@ -14,16 +16,17 @@
# -------------------------------------------------------------------------------------------------------------------- #


def logs_dataframe(logs: PipelineLogs) -> pd.DataFrame:
def logs_dataframe(logs: PipelineLogs, pipeline_ref: str = "pipeline") -> pd.DataFrame:
df = logs.supervisor_logs.df
df["pipeline_ref"] = pipeline_ref
df["duration"] = df["duration"].apply(lambda x: x.total_seconds())
convert_epoch_to_datetime(df, "sample_time")
return df


def logs_dataframe_agg_by_stage(stage_run_df: pd.DataFrame) -> pd.DataFrame:
df_agg = (
stage_run_df.groupby(["id"] + [c for c in stage_run_df.columns if c == "pipeline_ref"])
stage_run_df.groupby((["pipeline_ref"] if "pipeline_ref" in stage_run_df.columns else []) + ["id"])
.agg(
max=("duration", "max"),
min=("duration", "min"),
Expand All @@ -33,77 +36,126 @@ def logs_dataframe_agg_by_stage(stage_run_df: pd.DataFrame) -> pd.DataFrame:
sum=("duration", "sum"),
count=("duration", "count"),
)
.reset_index()
.fillna(-1)
)
df_agg.reset_index(inplace=True)
return df_agg


def pipeline_stage_parents(logs: PipelineLogs) -> pd.DataFrame:
ids = []
parents = []
for i, (_, parent_list) in logs.pipeline_stages.items():
if len(parent_list) == 1:
ids.append(i)
parents.append(parent_list[0])
if len(parent_list) > 1:
if i == PipelineStage.PROCESS_NEW_DATA.name:
if logs.experiment:
ids.append(i)
parents.append(PipelineStage.REPLAY_DATA.name)
else:
ids.append(i)
parents.append(PipelineStage.FETCH_NEW_DATA.name)
else:
raise ValueError(f"Stage {i} has multiple parents: {parent_list}")

df = pd.DataFrame({"id": ids, "parent_id": parents})
return df


def dfs_models_and_evals(
logs: PipelineLogs, max_sample_time: Any
logs: PipelineLogs, max_sample_time: Any, pipeline_ref: str = "pipeline"
) -> tuple[pd.DataFrame, pd.DataFrame | None, pd.DataFrame | None]:
"""Returns a dataframe with the stored models and the dataframe for evaluations"""

# ---------------------------------------------------- MODELS ---------------------------------------------------- #

store_models = [x for x in logs.supervisor_logs.stage_runs if x.id == PipelineStage.STORE_TRAINED_MODEL.name]
df_models = pd.concat([x.df(extended=True) for x in store_models])
# df_models.sort_values(by=["sample_time"])
# PipelineStage.STORE_TRAINED_MODEL
df_store_models = StageLog.df(
(x for x in logs.supervisor_logs.stage_runs if x.id == PipelineStage.STORE_TRAINED_MODEL.name), extended=True
)
df_store_models.set_index(["trigger_idx"], inplace=True)

_list_single_triggers = [
x for x in logs.supervisor_logs.stage_runs if x.id == PipelineStage.HANDLE_SINGLE_TRIGGER.name
]
df_single_triggers = pd.concat([x.df(extended=True) for x in _list_single_triggers])
# PipelineStage.HANDLE_SINGLE_TRIGGER
df_single_triggers = StageLog.df(
(x for x in logs.supervisor_logs.stage_runs if x.id == PipelineStage.HANDLE_SINGLE_TRIGGER.name), extended=True
)[["trigger_idx", "trigger_id", "first_timestamp", "last_timestamp"]]
df_single_triggers.set_index(["trigger_idx"], inplace=True)

_list_single_trainings = [x for x in logs.supervisor_logs.stage_runs if x.id == PipelineStage.TRAIN.name]
df_single_trainings = pd.concat([x.df(extended=True) for x in _list_single_trainings])
# PipelineStage.TRAIN
df_single_trainings = StageLog.df(
(x for x in logs.supervisor_logs.stage_runs if x.id == PipelineStage.TRAIN.name), extended=True
)[["trigger_idx", "num_batches", "num_samples"]]
df_single_trainings.set_index(["trigger_idx"], inplace=True)

# MERGE
joined_models = df_store_models.merge(
df_single_triggers, on="trigger_idx", how="left", suffixes=("", "_trigger")
).merge(df_single_trainings, on="trigger_idx", how="left", suffixes=("", "_training"))

# sort models by trigger_id (we need that for the shift functions in generate_real_training_end_timestamp etc.)
joined_models.sort_index(level="trigger_idx", inplace=True)

joined_models = df_models.merge(df_single_triggers, on="trigger_idx", how="left", suffixes=("", "_trigger")).merge(
df_single_trainings, on="trigger_idx", how="left", suffixes=("", "_training")
)
joined_models["train_start"] = joined_models["first_timestamp"]
joined_models["train_end"] = joined_models["last_timestamp"]
joined_models["real_train_end"] = generate_real_training_end_timestamp(joined_models)

convert_epoch_to_datetime(joined_models, "sample_time")
convert_epoch_to_datetime(joined_models, "train_start")
convert_epoch_to_datetime(joined_models, "train_end")
convert_epoch_to_datetime(joined_models, "real_train_end")

df_models = joined_models[
[col for col in df_models.columns] + ["train_start", "train_end", "num_batches", "num_samples"]
[col for col in df_store_models.columns if col not in joined_models.index.names]
+ ["train_start", "train_end", "real_train_end", "num_batches", "num_samples"]
]

convert_epoch_to_datetime(df_models, "train_start")
convert_epoch_to_datetime(df_models, "train_end")

# sort models by trigger_id
df_models.sort_values(by=["trigger_id"], inplace=True)
df_models.reset_index(inplace=True)

# model_usage period
df_models["usage_start"] = df_models["train_end"] + pd.DateOffset(seconds=1)
df_models["usage_end"] = df_models["train_end"].shift(-1)
df_models["usage_start"] = df_models["real_train_end"] + pd.DateOffset(seconds=1)
df_models["usage_end"] = df_models["real_train_end"].shift(-1)
df_models["usage_end"] = df_models["usage_end"].fillna(max_sample_time)

# linearize ids:
df_models["trigger_idx"] = df_models["trigger_id"]
df_models["training_idx"] = df_models["training_id"]
df_models["model_idx"] = df_models["id_model"]
_, trigger_idx_mappings = linearize_ids(df_models, [], "training_idx")
_, model_idx_mappings = linearize_ids(df_models, [], "model_idx")

df_models["pipeline_ref"] = pipeline_ref

# -------------------------------------------------- EVALUATIONS ------------------------------------------------- #

dfs_requests = [
run.df(extended=True)
for run in logs.supervisor_logs.stage_runs
if run.id == PipelineStage.EVALUATE_SINGLE.name and run.info.failure_reason is None and run.info.eval_request
]
dfs_metrics = [
cast(SingleEvaluationInfo, run.info).results_df()
for run in logs.supervisor_logs.stage_runs
if run.id == PipelineStage.EVALUATE_SINGLE.name and run.info.failure_reason is None and run.info.eval_request
]
if not dfs_requests and not dfs_metrics:
return df_models, None, None
dfs_requests = StageLog.df(
(
run
for run in logs.supervisor_logs.stage_runs
if (
run.id == PipelineStage.EVALUATE_SINGLE.name
and run.info.failure_reason is None
and run.info.eval_request
)
),
extended=True,
)

dfs_metrics = SingleEvaluationInfo.results_df(
(
cast(SingleEvaluationInfo, run.info)
for run in logs.supervisor_logs.stage_runs
if run.id == PipelineStage.EVALUATE_SINGLE.name
and run.info.failure_reason is None
and run.info.eval_request
)
)

eval_requests = pd.concat(dfs_requests)
evals_metrics = pd.concat(dfs_metrics)
if dfs_requests.shape[0] == 0 or dfs_metrics.shape[0] == 0:
return df_models, None, None

for evals_df in [eval_requests, evals_metrics]:
for evals_df in [dfs_requests, dfs_metrics]:
evals_df["interval_center"] = (evals_df["interval_start"] + evals_df["interval_end"]) / 2
convert_epoch_to_datetime(evals_df, "interval_start")
convert_epoch_to_datetime(evals_df, "interval_end")
Expand All @@ -116,11 +168,10 @@ def dfs_models_and_evals(
linearize_ids(evals_df, [], "training_idx", trigger_idx_mappings)
linearize_ids(evals_df, [], "model_idx", model_idx_mappings)

return df_models, eval_requests, evals_metrics

dfs_requests["pipeline_ref"] = pipeline_ref
dfs_metrics["pipeline_ref"] = pipeline_ref

def logs_dataframe_pipeline_stage_logs(logs: PipelineLogs, stage: PipelineStage) -> pd.DateOffset:
return pd.concat([x.df(extended=True) for x in logs.supervisor_logs.stage_runs if x.id == stage.name])
return df_models, dfs_requests, dfs_metrics


# -------------------------------------------------------------------------------------------------------------------- #
Expand All @@ -137,32 +188,6 @@ def leaf_stages(logs: PipelineLogs) -> list[str]:
return [stage for stage in logs.pipeline_stages if stage not in referenced_as_parent]


def pipeline_stage_parents(logs: PipelineLogs) -> pd.DataFrame:
ids = []
parents = []
for i, (_, parent_list) in logs.pipeline_stages.items():
if len(parent_list) == 1:
ids.append(i)
parents.append(parent_list[0])
if len(parent_list) > 1:
if i == PipelineStage.PROCESS_NEW_DATA.name:
if logs.experiment:
ids.append(i)
parents.append(PipelineStage.REPLAY_DATA.name)
else:
ids.append(i)
parents.append(PipelineStage.FETCH_NEW_DATA.name)
else:
raise ValueError(f"Stage {i} has multiple parents: {parent_list}")

return pd.DataFrame(
{
"id": ids,
"parent_id": parents,
}
)


# -------------------------------------------------------------------------------------------------------------------- #
# TRANSFORM dataframe #
# -------------------------------------------------------------------------------------------------------------------- #
Expand Down Expand Up @@ -232,7 +257,13 @@ def patch_yearbook_time(df: pd.DataFrame, column: str) -> pd.DataFrame:
Returns:
DataFrame with patched yearbook time.
"""
df[column] = pd.to_datetime(1930 + (df[column] - datetime.datetime(1970, 1, 1)).dt.days, format="%Y")
if df.shape[0] == 0:
df[column] = pd.to_datetime([])
return df
delta = df[column] - pd.to_datetime("1970-01-01")
partial_years = delta.dt.seconds / SECONDS_PER_UNIT["d"]
partial_years_delta = partial_years.apply(lambda x: datetime.timedelta(seconds=x * SECONDS_PER_UNIT["y"]))
df[column] = pd.to_datetime(delta.apply(lambda x: f"{1930 + x.days}-01-01")) + partial_years_delta
return df


Expand All @@ -259,13 +290,15 @@ def df_aggregate_eval_metric(
if aggregate_func == "time_weighted_avg":
# Compute the duration (end - start) as the weight
df["weight"] = df[interval_end] - df[interval_start]
group_total_weights = df.groupby(group_by)["weight"].agg(weight_sum="sum").reset_index()
group_total_weights = df.groupby(group_by)["weight"].agg(weight_sum="sum")
group_total_weights.reset_index(inplace=True)

# Compute the weighted value
df["weighted_value"] = df[in_col] * df["weight"]

# Group by `group_by` and compute the weighted average
grouped = df.groupby(group_by)["weighted_value"].agg(sum_weighted_value="sum").reset_index()
grouped = df.groupby(group_by)["weighted_value"].agg(sum_weighted_value="sum")
grouped.reset_index(inplace=True)

# add weightsum info
grouped = grouped.merge(group_total_weights, on=group_by)
Expand All @@ -275,4 +308,6 @@ def df_aggregate_eval_metric(

else:
# normal average
return df.groupby(group_by).agg({in_col: aggregate_func}).reset_index().rename(columns={in_col: out_col})
df = df.groupby(group_by).agg({in_col: aggregate_func})
df.reset_index(inplace=True)
return df.rename(columns={in_col: out_col})
Loading

0 comments on commit 8f587b8

Please sign in to comment.