Skip to content

Commit

Permalink
feat: Add unique sequence number for aggregation into supervisor (#548)
Browse files Browse the repository at this point in the history
  • Loading branch information
robinholzi authored Jun 24, 2024
1 parent b9e255e commit 59d8ee9
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 2 deletions.
1 change: 0 additions & 1 deletion analytics/tools/aggregate_runs/core_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from pathlib import Path

import pandas as pd

from analytics.app.data.transform import dfs_models_and_evals, logs_dataframe
from analytics.tools.aggregate_runs.dir_utils import load_multiple_logfiles
from analytics.tools.aggregate_runs.pipeline_equivalence import assert_pipeline_equivalence
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ def run_post_pipeline_evaluations(self, eval_status_queue: Queue, manual_run: bo
# our current process is in terms of position in the dataset.
parent_log=StageLog(
id=PipelineStage.EVALUATE_SINGLE.name,
id_seq_num=-1,
start=datetime.datetime.now(),
batch_idx=-1,
sample_idx=-1,
Expand Down Expand Up @@ -238,6 +239,7 @@ def _launch_evaluations_async(
def worker_func(eval_req: EvalRequest) -> StageLog:
single_log = StageLog(
id=PipelineStage.EVALUATE_SINGLE.name,
id_seq_num=-1, # evaluation don't need sequence numbers, their order is not important
start=datetime.datetime.now(),
batch_idx=parent_log.batch_idx,
sample_idx=parent_log.sample_idx,
Expand Down
10 changes: 10 additions & 0 deletions modyn/supervisor/internal/pipeline_executor/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ class ExecutionState(PipelineExecutionParams):
stage: PipelineStage = PipelineStage.INIT
"""The current stage of the pipeline executor."""

stage_id_seq_counters: dict[str, int] = dataclasses.field(default_factory=dict)
"""Tracks for every stage that can be logged in StageLog how many logs have been created using this id.
This information can be used to uniquely identify logs over multiple pipeline runs given the pipelines use
a deterministic configuration.
"""

# for logging
seen_pipeline_stages: set[PipelineStage] = dataclasses.field(default_factory=set)
current_batch_index: int = 0
Expand Down Expand Up @@ -385,6 +391,10 @@ class StageLog(BaseModel):
id: str
"""Identifier for the pipeline stage, PipelineStage.name in most cases"""

id_seq_num: int
"""Identifies the log within the group of logs with the same id (given by PipelineStage). Used for aggregation over
multiple pipeline runs."""

# experiment time
start: datetime.datetime
end: datetime.datetime | None = Field(None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,12 @@ def report_results(stage_log: StageLog) -> None:
logger.info(f"[pipeline {state.pipeline_id}] Entering <{stage}>.")

# execute stage
stage_seq_num = state.stage_id_seq_counters.get(stage.name, 0)
state.stage_id_seq_counters[stage.name] = stage_seq_num + 1
epoch_micros_start = current_time_micros()
stage_log = StageLog(
id=stage.name,
id_seq_num=stage_seq_num,
start=datetime.now(),
batch_idx=state.current_batch_index,
sample_idx=state.current_sample_index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def dummy_eval_request() -> EvalRequest:
def dummy_stage_log() -> StageLog:
return StageLog(
id="log",
id_seq_num=-1,
start=datetime.datetime(2021, 1, 1),
batch_idx=-1,
sample_idx=-1,
Expand Down Expand Up @@ -240,6 +241,7 @@ def test_single_evaluation(

stage_log = StageLog(
id="log",
id_seq_num=-1,
start=datetime.datetime(2021, 1, 1),
batch_idx=-1,
sample_idx=-1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def dummy_logs(dummy_pipeline_args: PipelineExecutionParams) -> PipelineLogs:

@pytest.fixture
def dummy_stage_log() -> StageLog:
return StageLog(id="dummy", start=0, sample_idx=1, sample_time=1000, trigger_idx=0)
return StageLog(id="dummy", id_seq_num=-1, start=0, sample_idx=1, sample_time=1000, trigger_idx=0)


@overload
Expand Down

0 comments on commit 59d8ee9

Please sign in to comment.