Skip to content

Commit

Permalink
chore: Adjustments for triggering experiments (#595)
Browse files Browse the repository at this point in the history
  • Loading branch information
robinholzi authored Sep 2, 2024
1 parent 4208bef commit dc46b6d
Show file tree
Hide file tree
Showing 28 changed files with 782 additions and 480 deletions.
4 changes: 2 additions & 2 deletions docs/pipeline/triggers/PERFORMANCE_TRIGGER.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ classDiagram
+int previous_batch_num_misclassifications()
+float forecast_expected_accuracy(method)
+float forecast_next_accuracy(method)
+float forecast_expected_performance(metric, method)
+float forecast_optimal_performance(metric, method)
+float forecast_next_performance(metric, method)
}
```
Expand All @@ -120,7 +120,7 @@ classDiagram
}
class DynamicPerformanceThresholdCriterion {
+float allowed_deviation
+float deviation
}
class NumberAvoidableMisclassificationCriterion {
Expand Down
71 changes: 45 additions & 26 deletions experiments/arxiv/compare_trigger_policies/pipeline_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,35 @@


def gen_pipeline_config(
name: str, trigger: TriggerConfig, eval_handlers: list[EvalHandlerConfig]
name: str,
trigger_config: TriggerConfig,
eval_handlers: list[EvalHandlerConfig],
gpu_device: str,
seed: int,
) -> ModynPipelineConfig:
num_classes = 172
bytes_parser_function = (
"import torch\n"
"import numpy as np\n"
"def bytes_parser_function(data: bytes) -> str:\n"
" return str(data, 'utf8')"
)
evaluation_transformer_function = (
"import torch\n"
"def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n"
" return torch.argmax(model_output, dim=-1)\n"
)
return ModynPipelineConfig(
pipeline=Pipeline(name=name, description="Arxiv pipeline for comparing trigger policies", version="0.0.1"),
model=ModelConfig(id="ArticleNet", config={"num_classes": num_classes}),
model_storage=PipelineModelStorageConfig(full_model_strategy=FullModelStrategy(name="PyTorchFullModel")),
training=TrainingConfig(
gpus=1,
device="cuda:0",
dataloader_workers=2,
device=gpu_device,
dataloader_workers=1,
use_previous_model=True,
initial_model="random",
batch_size=96,
batch_size=128,
shuffle=True,
optimizers=[
OptimizerConfig(
Expand All @@ -49,46 +64,50 @@ def gen_pipeline_config(
],
optimization_criterion=OptimizationCriterion(name="CrossEntropyLoss"),
checkpointing=CheckpointingConfig(activated=False),
epochs_per_trigger=1,
epochs_per_trigger=5,
amp=False,
seed=seed,
),
selection_strategy=NewDataStrategyConfig(
maximum_keys_in_memory=10000, storage_backend="database", limit=-1, tail_triggers=0
maximum_keys_in_memory=200000, storage_backend="database", limit=-1, tail_triggers=0
),
data=DataConfig(
dataset_id="arxiv_kaggle_train",
bytes_parser_function=("def bytes_parser_function(data: bytes) -> str:\n" " return str(data, 'utf8')"),
bytes_parser_function=bytes_parser_function,
tokenizer="DistilBertTokenizerTransform",
),
trigger=trigger,
trigger=trigger_config,
evaluation=EvaluationConfig(
handlers=eval_handlers,
device="cuda:0",
device=gpu_device,
after_pipeline_evaluation_workers=12,
after_training_evaluation_workers=12,
datasets=[
EvalDataConfig(
dataset_id=yb_dataset_name,
bytes_parser_function=(
"def bytes_parser_function(data: bytes) -> str:\n" " return str(data, 'utf8')"
),
batch_size=96,
dataloader_workers=2,
bytes_parser_function=bytes_parser_function,
batch_size=128,
dataloader_workers=1,
tokenizer="DistilBertTokenizerTransform",
metrics=[
AccuracyMetricConfig(
evaluation_transformer_function=(
"import torch\n"
"def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n"
" return torch.argmax(model_output, dim=-1)"
),
),
AccuracyMetricConfig(evaluation_transformer_function=evaluation_transformer_function, topn=1),
AccuracyMetricConfig(evaluation_transformer_function="", topn=2),
AccuracyMetricConfig(evaluation_transformer_function="", topn=5),
F1ScoreMetricConfig(
evaluation_transformer_function=(
"import torch\n"
"def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n"
" return torch.argmax(model_output, dim=-1)"
),
evaluation_transformer_function=evaluation_transformer_function,
num_classes=num_classes,
average="weighted",
),
F1ScoreMetricConfig(
evaluation_transformer_function=evaluation_transformer_function,
num_classes=num_classes,
average="macro",
),
F1ScoreMetricConfig(
evaluation_transformer_function=evaluation_transformer_function,
num_classes=num_classes,
average="micro",
),
],
)
for yb_dataset_name in ["arxiv_kaggle_all", "arxiv_kaggle_train", "arxiv_kaggle_test"]
Expand Down
165 changes: 84 additions & 81 deletions experiments/arxiv/compare_trigger_policies/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@

import pandas as pd

from experiments.arxiv.compare_trigger_policies.pipeline_config import gen_pipeline_config
from experiments.models import Experiment
from experiments.utils.experiment_runner import run_multiple_pipelines
from modyn.config.schema.pipeline import (
DataAmountTriggerConfig,
EvalHandlerConfig,
ModynPipelineConfig,
TimeTriggerConfig,
)
from modyn.config.schema.pipeline.evaluation.strategy.between_two_triggers import BetweenTwoTriggersEvalStrategyConfig
from modyn.config.schema.pipeline.evaluation.strategy.periodic import PeriodicEvalStrategyConfig
from modyn.config.schema.pipeline.evaluation.strategy.slicing import SlicingEvalStrategyConfig
from modyn.config.schema.pipeline.trigger import DataDriftTriggerConfig
from modyn.config.schema.pipeline.trigger.drift.aggregation import MajorityVoteDriftAggregationStrategy
from modyn.config.schema.pipeline.evaluation.strategy.between_two_triggers import (
BetweenTwoTriggersEvalStrategyConfig,
)
from modyn.config.schema.pipeline.evaluation.strategy.periodic import (
PeriodicEvalStrategyConfig,
)
from modyn.config.schema.pipeline.evaluation.strategy.slicing import (
SlicingEvalStrategyConfig,
)
from modynclient.config.schema.client_config import ModynClientConfig, Supervisor

_FIRST_TIMESTAMP = int(pd.to_datetime("1989-10-26").timestamp())
Expand All @@ -28,7 +29,9 @@ def construct_slicing_eval_handler(slice: str, first_timestamp: int) -> EvalHand
execution_time="after_pipeline",
models="matrix",
strategy=SlicingEvalStrategyConfig(
eval_every=f"{slice}", eval_start_from=first_timestamp, eval_end_at=_LAST_TIMESTAMP
eval_every=f"{slice}",
eval_start_from=first_timestamp,
eval_end_at=_LAST_TIMESTAMP,
),
datasets=["arxiv_kaggle_test"],
)
Expand Down Expand Up @@ -69,83 +72,83 @@ def construct_between_trigger_eval_handler() -> EvalHandlerConfig:
def construct_pipelines(experiment: Experiment) -> list[ModynPipelineConfig]:
pipeline_configs: list[ModynPipelineConfig] = []

for time in experiment.time_trigger_schedules:
pipeline_configs.append(
gen_pipeline_config(
name=f"timetrigger_{time}",
trigger=TimeTriggerConfig(
every=f"{time}",
start_timestamp=experiment.warmup_until or _FIRST_TIMESTAMP,
),
eval_handlers=experiment.eval_handlers,
)
)

for count in experiment.data_amount_triggers:
pipeline_configs.append(
gen_pipeline_config(
name=f"dataamounttrigger_{count}",
trigger=DataAmountTriggerConfig(num_samples=count),
eval_handlers=experiment.eval_handlers,
)
)

for interval in experiment.drift_detection_intervals:
pipeline_configs.append(
gen_pipeline_config(
name=f"datadrifttrigger_{interval}",
trigger=DataDriftTriggerConfig(
evaluation_interval_data_points=interval,
metrics=experiment.drift_trigger_metrics,
aggregation_strategy=MajorityVoteDriftAggregationStrategy(),
),
eval_handlers=experiment.eval_handlers,
)
)
# for time in experiment.time_trigger_schedules:
# pipeline_configs.append(
# gen_pipeline_config(
# name=f"timetrigger_{time}",
# trigger_config=TimeTriggerConfig(
# every=f"{time}",
# start_timestamp=experiment.warmup_until or _FIRST_TIMESTAMP,
# ),
# eval_handlers=experiment.eval_handlers,
# )
# )

# for count in experiment.data_amount_triggers:
# pipeline_configs.append(
# gen_pipeline_config(
# name=f"dataamounttrigger_{count}",
# trigger_config=DataAmountTriggerConfig(num_samples=count),
# eval_handlers=experiment.eval_handlers,
# )
# )

# for interval in experiment.drift_detection_intervals:
# pipeline_configs.append(
# gen_pipeline_config(
# name=f"datadrifttrigger_{interval}",
# trigger=DataDriftTriggerConfig(
# evaluation_interval_data_points=interval,
# metrics=experiment.drift_trigger_metrics,
# aggregation_strategy=MajorityVoteDriftAggregationStrategy(),
# ),
# eval_handlers=experiment.eval_handlers,
# )
# )

return pipeline_configs


_EXPERIMENT_REFS = {
_EXPERIMENT_REFS: dict[int, Experiment] = {
# done
0: Experiment(
# to verify online composite model determination logic
name="arxiv-timetrigger-cold-start",
eval_handlers=[
construct_slicing_eval_handler("90d", _FIRST_TIMESTAMP),
# construct_between_trigger_eval_handler() # TODO: reenable for arxiv_kaggle_all
],
time_trigger_schedules=["90d"],
data_amount_triggers=[],
drift_detection_intervals=[],
drift_trigger_metrics=[],
gpu_device="cuda:1",
warmup_until=_FIRST_TIMESTAMP,
),
# cold training startup vs warmup phase
1: Experiment(
# to verify online composite model determination logic
name="arxiv-timetrigger-warm-start",
eval_handlers=[
construct_slicing_eval_handler("90d", int(pd.to_datetime("2000-01-01").timestamp())),
# construct_between_trigger_eval_handler() # TODO: reenable for arxiv_kaggle_all
],
time_trigger_schedules=["90d"],
data_amount_triggers=[],
drift_detection_intervals=[],
drift_trigger_metrics=[],
gpu_device="cuda:2",
warmup_until=int(pd.to_datetime("2000-01-01").timestamp()),
),
2: Experiment(
name="arxiv-numsamples-training-time",
eval_handlers=[construct_between_trigger_eval_handler()],
time_trigger_schedules=[],
data_amount_triggers=[100_000, 50_000, 25_000, 10_000, 5_000, 2_000, 1_000, 500],
drift_detection_intervals=[],
drift_trigger_metrics=[],
gpu_device="cuda:1",
),
# 0: Experiment(
# # to verify online composite model determination logic
# name="arxiv-timetrigger-cold-start",
# eval_handlers=[
# construct_slicing_eval_handler("90d", _FIRST_TIMESTAMP),
# # construct_between_trigger_eval_handler() # TODO: reenable for arxiv_kaggle_all
# ],
# time_trigger_schedules=["90d"],
# data_amount_triggers=[],
# drift_detection_intervals=[],
# drift_trigger_metrics=[],
# gpu_device="cuda:1",
# warmup_until=_FIRST_TIMESTAMP,
# ),
# # cold training startup vs warmup phase
# 1: Experiment(
# # to verify online composite model determination logic
# name="arxiv-timetrigger-warm-start",
# eval_handlers=[
# construct_slicing_eval_handler("90d", int(pd.to_datetime("2000-01-01").timestamp())),
# # construct_between_trigger_eval_handler() # TODO: reenable for arxiv_kaggle_all
# ],
# time_trigger_schedules=["90d"],
# data_amount_triggers=[],
# drift_detection_intervals=[],
# drift_trigger_metrics=[],
# gpu_device="cuda:2",
# warmup_until=int(pd.to_datetime("2000-01-01").timestamp()),
# ),
# 2: Experiment(
# name="arxiv-numsamples-training-time",
# eval_handlers=[construct_between_trigger_eval_handler()],
# time_trigger_schedules=[],
# data_amount_triggers=[100_000, 50_000, 25_000, 10_000, 5_000, 2_000, 1_000, 500],
# drift_detection_intervals=[],
# drift_trigger_metrics=[],
# gpu_device="cuda:1",
# ),
# tbd. arxiv-timetrigger1y-periodic-eval-intervals
# tbd. arxiv-drift
}
Expand Down
Loading

0 comments on commit dc46b6d

Please sign in to comment.