diff --git a/analytics/tools/aggregate_runs/core_aggregation.py b/analytics/tools/aggregate_runs/core_aggregation.py index d6a5e9fd9..9c0ae6714 100644 --- a/analytics/tools/aggregate_runs/core_aggregation.py +++ b/analytics/tools/aggregate_runs/core_aggregation.py @@ -3,7 +3,6 @@ from pathlib import Path import pandas as pd - from analytics.app.data.transform import dfs_models_and_evals, logs_dataframe from analytics.tools.aggregate_runs.dir_utils import load_multiple_logfiles from analytics.tools.aggregate_runs.pipeline_equivalence import assert_pipeline_equivalence diff --git a/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py b/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py index e74eb080f..4e5fefd64 100644 --- a/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py +++ b/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py @@ -203,6 +203,7 @@ def run_post_pipeline_evaluations(self, eval_status_queue: Queue, manual_run: bo # our current process is in terms of position in the dataset. parent_log=StageLog( id=PipelineStage.EVALUATE_SINGLE.name, + id_seq_num=-1, start=datetime.datetime.now(), batch_idx=-1, sample_idx=-1, @@ -238,6 +239,7 @@ def _launch_evaluations_async( def worker_func(eval_req: EvalRequest) -> StageLog: single_log = StageLog( id=PipelineStage.EVALUATE_SINGLE.name, + id_seq_num=-1, # evaluation don't need sequence numbers, their order is not important start=datetime.datetime.now(), batch_idx=parent_log.batch_idx, sample_idx=parent_log.sample_idx, diff --git a/modyn/supervisor/internal/pipeline_executor/models.py b/modyn/supervisor/internal/pipeline_executor/models.py index 380e34d6d..87f479263 100644 --- a/modyn/supervisor/internal/pipeline_executor/models.py +++ b/modyn/supervisor/internal/pipeline_executor/models.py @@ -104,6 +104,12 @@ class ExecutionState(PipelineExecutionParams): stage: PipelineStage = PipelineStage.INIT """The current stage of the pipeline executor.""" + stage_id_seq_counters: dict[str, int] = dataclasses.field(default_factory=dict) + """Tracks for every stage that can be logged in StageLog how many logs have been created using this id. + This information can be used to uniquely identify logs over multiple pipeline runs given the pipelines use + a deterministic configuration. + """ + # for logging seen_pipeline_stages: set[PipelineStage] = dataclasses.field(default_factory=set) current_batch_index: int = 0 @@ -385,6 +391,10 @@ class StageLog(BaseModel): id: str """Identifier for the pipeline stage, PipelineStage.name in most cases""" + id_seq_num: int + """Identifies the log within the group of logs with the same id (given by PipelineStage). Used for aggregation over + multiple pipeline runs.""" + # experiment time start: datetime.datetime end: datetime.datetime | None = Field(None) diff --git a/modyn/supervisor/internal/pipeline_executor/pipeline_executor.py b/modyn/supervisor/internal/pipeline_executor/pipeline_executor.py index 3062243d8..fdb93a4b1 100644 --- a/modyn/supervisor/internal/pipeline_executor/pipeline_executor.py +++ b/modyn/supervisor/internal/pipeline_executor/pipeline_executor.py @@ -136,9 +136,12 @@ def report_results(stage_log: StageLog) -> None: logger.info(f"[pipeline {state.pipeline_id}] Entering <{stage}>.") # execute stage + stage_seq_num = state.stage_id_seq_counters.get(stage.name, 0) + state.stage_id_seq_counters[stage.name] = stage_seq_num + 1 epoch_micros_start = current_time_micros() stage_log = StageLog( id=stage.name, + id_seq_num=stage_seq_num, start=datetime.now(), batch_idx=state.current_batch_index, sample_idx=state.current_sample_index, diff --git a/modyn/tests/supervisor/internal/pipeline_executor/test_evaluation_executor.py b/modyn/tests/supervisor/internal/pipeline_executor/test_evaluation_executor.py index f5a6c5f35..190870bc8 100644 --- a/modyn/tests/supervisor/internal/pipeline_executor/test_evaluation_executor.py +++ b/modyn/tests/supervisor/internal/pipeline_executor/test_evaluation_executor.py @@ -132,6 +132,7 @@ def dummy_eval_request() -> EvalRequest: def dummy_stage_log() -> StageLog: return StageLog( id="log", + id_seq_num=-1, start=datetime.datetime(2021, 1, 1), batch_idx=-1, sample_idx=-1, @@ -240,6 +241,7 @@ def test_single_evaluation( stage_log = StageLog( id="log", + id_seq_num=-1, start=datetime.datetime(2021, 1, 1), batch_idx=-1, sample_idx=-1, diff --git a/modyn/tests/supervisor/internal/pipeline_executor/test_pipeline_executor.py b/modyn/tests/supervisor/internal/pipeline_executor/test_pipeline_executor.py index 9ecebb658..c5280fcaf 100644 --- a/modyn/tests/supervisor/internal/pipeline_executor/test_pipeline_executor.py +++ b/modyn/tests/supervisor/internal/pipeline_executor/test_pipeline_executor.py @@ -98,7 +98,7 @@ def dummy_logs(dummy_pipeline_args: PipelineExecutionParams) -> PipelineLogs: @pytest.fixture def dummy_stage_log() -> StageLog: - return StageLog(id="dummy", start=0, sample_idx=1, sample_time=1000, trigger_idx=0) + return StageLog(id="dummy", id_seq_num=-1, start=0, sample_idx=1, sample_time=1000, trigger_idx=0) @overload