Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add unique sequence number for aggregation into supervisor #548

Merged
merged 1 commit into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion analytics/tools/aggregate_runs/core_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from pathlib import Path

import pandas as pd

from analytics.app.data.transform import dfs_models_and_evals, logs_dataframe
from analytics.tools.aggregate_runs.dir_utils import load_multiple_logfiles
from analytics.tools.aggregate_runs.pipeline_equivalence import assert_pipeline_equivalence
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ def run_post_pipeline_evaluations(self, eval_status_queue: Queue, manual_run: bo
# our current process is in terms of position in the dataset.
parent_log=StageLog(
id=PipelineStage.EVALUATE_SINGLE.name,
id_seq_num=-1,
start=datetime.datetime.now(),
batch_idx=-1,
sample_idx=-1,
Expand Down Expand Up @@ -238,6 +239,7 @@ def _launch_evaluations_async(
def worker_func(eval_req: EvalRequest) -> StageLog:
single_log = StageLog(
id=PipelineStage.EVALUATE_SINGLE.name,
id_seq_num=-1, # evaluation don't need sequence numbers, their order is not important
start=datetime.datetime.now(),
batch_idx=parent_log.batch_idx,
sample_idx=parent_log.sample_idx,
Expand Down
10 changes: 10 additions & 0 deletions modyn/supervisor/internal/pipeline_executor/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ class ExecutionState(PipelineExecutionParams):
stage: PipelineStage = PipelineStage.INIT
"""The current stage of the pipeline executor."""

stage_id_seq_counters: dict[str, int] = dataclasses.field(default_factory=dict)
"""Tracks for every stage that can be logged in StageLog how many logs have been created using this id.
This information can be used to uniquely identify logs over multiple pipeline runs given the pipelines use
a deterministic configuration.
"""

# for logging
seen_pipeline_stages: set[PipelineStage] = dataclasses.field(default_factory=set)
current_batch_index: int = 0
Expand Down Expand Up @@ -385,6 +391,10 @@ class StageLog(BaseModel):
id: str
"""Identifier for the pipeline stage, PipelineStage.name in most cases"""

id_seq_num: int
"""Identifies the log within the group of logs with the same id (given by PipelineStage). Used for aggregation over
multiple pipeline runs."""

# experiment time
start: datetime.datetime
end: datetime.datetime | None = Field(None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,12 @@ def report_results(stage_log: StageLog) -> None:
logger.info(f"[pipeline {state.pipeline_id}] Entering <{stage}>.")

# execute stage
stage_seq_num = state.stage_id_seq_counters.get(stage.name, 0)
state.stage_id_seq_counters[stage.name] = stage_seq_num + 1
epoch_micros_start = current_time_micros()
stage_log = StageLog(
id=stage.name,
id_seq_num=stage_seq_num,
start=datetime.now(),
batch_idx=state.current_batch_index,
sample_idx=state.current_sample_index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def dummy_eval_request() -> EvalRequest:
def dummy_stage_log() -> StageLog:
return StageLog(
id="log",
id_seq_num=-1,
start=datetime.datetime(2021, 1, 1),
batch_idx=-1,
sample_idx=-1,
Expand Down Expand Up @@ -240,6 +241,7 @@ def test_single_evaluation(

stage_log = StageLog(
id="log",
id_seq_num=-1,
start=datetime.datetime(2021, 1, 1),
batch_idx=-1,
sample_idx=-1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def dummy_logs(dummy_pipeline_args: PipelineExecutionParams) -> PipelineLogs:

@pytest.fixture
def dummy_stage_log() -> StageLog:
return StageLog(id="dummy", start=0, sample_idx=1, sample_time=1000, trigger_idx=0)
return StageLog(id="dummy", id_seq_num=-1, start=0, sample_idx=1, sample_time=1000, trigger_idx=0)


@overload
Expand Down
Loading