Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Adjustments for triggering experiments #595

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixes
  • Loading branch information
robinholzi committed Sep 2, 2024
commit e29c1bc90ec611851de46ac08e6567dc5e98659c
14 changes: 7 additions & 7 deletions experiments/yearbook/compare_trigger_policies/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def construct_pipelines(experiment: Experiment) -> list[ModynPipelineConfig]:
f"{num_samples}": DataAmountTriggerConfig(num_samples=num_samples)
for num_samples in [100, 200, 500, 1_000, 2_500, 5_000, 10_000, 15_000, 30_000]
},
gpu_device="cuda:0",
gpu_device="cuda:1",
),
# -------------------------------- Drift triggers -------------------------------- #
# Static treshold drift
Expand All @@ -168,18 +168,18 @@ def construct_pipelines(experiment: Experiment) -> list[ModynPipelineConfig]:
metrics={
"mmd": AlibiDetectMmdDriftMetric(
decision_criterion=criterion,
device="cuda:0",
device="cuda:2",
)
},
)
for interval in [100, 250, 500, 1_000]
for window_size in [3, 5]
for window_size in [1, 4, 10]
for criterion_name, criterion in {
f"mmd-{threshold}": ThresholdDecisionCriterion(threshold=threshold)
for threshold in [0.03, 0.05, 0.07, 0.09, 0.12, 0.15]
for threshold in [0.03, 0.05, 0.07, 0.09, 0.12, 0.15, 0.2]
}.items()
},
gpu_device="cuda:0",
gpu_device="cuda:2",
),
# Dynamic threshold drift
4: Experiment(
Expand All @@ -197,7 +197,7 @@ def construct_pipelines(experiment: Experiment) -> list[ModynPipelineConfig]:
metrics={
"mmd": AlibiDetectMmdDriftMetric(
decision_criterion=criterion,
device="cuda:0",
device="cuda:1",
)
},
)
Expand All @@ -220,7 +220,7 @@ def construct_pipelines(experiment: Experiment) -> list[ModynPipelineConfig]:
}
).items()
},
gpu_device="cuda:0",
gpu_device="cuda:1",
),
# ----------------------------- Performance triggers ----------------------------- #
5: Experiment(
Expand Down
14 changes: 7 additions & 7 deletions modyn/evaluator/evaluator_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
)
logger = logging.getLogger(__name__)

# # We need to do this at the top because other dependencies otherwise set fork.
# try:
# mp.set_start_method("spawn")
# except RuntimeError as error:
# if mp.get_start_method() != "spawn" and "PYTEST_CURRENT_TEST" not in os.environ:
# logger.error("Start method is already set to {}", mp.get_start_method())
# raise error
# We need to do this at the top because other dependencies otherwise set fork.
try:
mp.set_start_method("spawn")
except RuntimeError as error:
if mp.get_start_method() != "spawn" and "PYTEST_CURRENT_TEST" not in os.environ:
logger.error("Start method is already set to {}", mp.get_start_method())
raise error

from modyn.evaluator.evaluator import Evaluator # noqa # pylint: disable=wrong-import-position

Expand Down
49 changes: 40 additions & 9 deletions modyn/supervisor/internal/pipeline_executor/evaluation_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import datetime
import logging
import os
import pickle
import sys
from collections import defaultdict
Expand Down Expand Up @@ -395,14 +396,11 @@ def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str:
# DevTools #
# ------------------------------------------------------------------------------------ #

if __name__ == "__main__":
snapshot_path = Path(input("Enter pipeline log directory path to (re)run evaluation executor: "))
if not snapshot_path.exists():
print("Path not found")
sys.exit(1)

def eval_executor_single_pipeline(
pipeline_dir: Path
) -> None:
# restart evaluation executor
ex = EvaluationExecutor.init_from_path(snapshot_path)
ex = EvaluationExecutor.init_from_path(pipeline_dir)

logs_ = PipelineLogs(
pipeline_id=ex.pipeline_id,
Expand All @@ -413,6 +411,39 @@ def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str:
)

logs_.supervisor_logs = ex.run_post_pipeline_evaluations(manual_run=True)
logs_.materialize(snapshot_path, mode="final")
logger.info("Done with manual evaluation!")
logs_.materialize(pipeline_dir, mode="final")
logger.info(f"Done with manual evaluation.")

def eval_executor_multi_pipeline(
pipelines_dir: Path
) -> None:
"""Run the evaluation executor for multiple pipelines."""
pipeline_dirs = [p for p in pipelines_dir.iterdir() if p.is_dir()]
for p_dir in pipeline_dirs:
pipeline_logfile = p_dir / "pipeline.log"
if not pipeline_logfile.exists():
# move file to _faulty subdir
faulty_dir = pipelines_dir / "_faulty"
faulty_dir.mkdir(exist_ok=True)
os.rename(p_dir, faulty_dir / p_dir.name)
continue

eval_executor_single_pipeline(p_dir)
logger.info(f"Done with pipeline {p_dir.name}")

if __name__ == "__main__":
single_pipeline_mode = input("Run evaluation executor for single pipeline? (y/n): ")
userpath = Path(input("Enter pipeline log directory path to (re)run evaluation executor: "))
if not userpath.exists():
print("Path not found")
sys.exit(1)

if single_pipeline_mode.lower() == "y":
eval_executor_single_pipeline(userpath)
elif single_pipeline_mode.lower() == "n":
eval_executor_multi_pipeline(userpath)
else:
print("Invalid input")
sys.exit(1)

sys.exit(0)
2 changes: 1 addition & 1 deletion modyn/utils/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def timed_generator(generator: Generator[X, None, None]) -> Generator[tuple[X, f
Returns:
A generator that yields (item, elapsed_time_millis) for each item in the original generator.
"""
start_time = time.time() # first evaluation starts when loop is entered9
start_time = time.time() # first evaluation starts when loop is entered
for item in generator:
yield item, (time.time() - start_time) * 1000 # yield item and compute elapsed time
start_time = time.time() # next item requested: start timer again
Loading