Skip to content

Commit

Permalink
feat: Adjust yearbook multi-pipeline experiment (#498)
Browse files Browse the repository at this point in the history
  • Loading branch information
robinholzi authored Jun 24, 2024
1 parent be4c4fc commit 7d694d0
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 58 deletions.
81 changes: 47 additions & 34 deletions experiments/yearbook/compare_trigger_policies/pipeline_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,27 @@
TrainingConfig,
)
from modyn.config.schema.pipeline.config import (
EvalDataConfig,
EvalStrategyModel,
EvaluationConfig,
FullModelStrategy,
Metric,
ModelConfig,
ModynPipelineConfig,
NewDataStrategyConfig,
Pipeline,
PipelineModelStorageConfig,
TriggerConfig,
)
from modyn.config.schema.pipeline.evaluation.config import EvalDataConfig
from modyn.config.schema.pipeline.evaluation.handler import EvalHandlerConfig
from modyn.config.schema.pipeline.evaluation.metrics import AccuracyMetricConfig, F1ScoreMetricConfig
from modyn.config.schema.pipeline.model_storage import FullModelStrategy
from modyn.config.schema.pipeline.sampling.config import NewDataStrategyConfig


def gen_pipeline_config(name: str, trigger: TriggerConfig, eval_strategy: EvalStrategyModel) -> ModynPipelineConfig:
def gen_pipeline_config(
name: str, trigger: TriggerConfig, eval_handlers: list[EvalHandlerConfig]
) -> ModynPipelineConfig:
num_classes = 2
return ModynPipelineConfig(
pipeline=Pipeline(name=name, description="Yearbook pipeline for comparing trigger policies", version="0.0.1"),
model=ModelConfig(id="YearbookNet", config={"num_input_channels": 3, "num_classes": 2}),
model=ModelConfig(id="YearbookNet", config={"num_input_channels": 3, "num_classes": num_classes}),
model_storage=PipelineModelStorageConfig(full_model_strategy=FullModelStrategy(name="PyTorchFullModel")),
training=TrainingConfig(
gpus=1,
Expand All @@ -33,6 +36,7 @@ def gen_pipeline_config(name: str, trigger: TriggerConfig, eval_strategy: EvalSt
use_previous_model=True,
initial_model="random",
batch_size=64,
shuffle=True,
optimizers=[
OptimizerConfig(
name="default",
Expand All @@ -43,49 +47,58 @@ def gen_pipeline_config(name: str, trigger: TriggerConfig, eval_strategy: EvalSt
],
optimization_criterion=OptimizationCriterion(name="CrossEntropyLoss"),
checkpointing=CheckpointingConfig(activated=False),
selection_strategy=NewDataStrategyConfig(
maximum_keys_in_memory=1000, storage_backend="database", limit=-1, tail_triggers=0
),
),
selection_strategy=NewDataStrategyConfig(
maximum_keys_in_memory=1000, storage_backend="database", limit=-1, tail_triggers=0
),
data=DataConfig(
dataset_id="yearbook",
transformations=[],
bytes_parser_function="""
import warnings
import torch
def bytes_parser_function(data: memoryview) -> torch.Tensor:
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UserWarning)
return torch.frombuffer(data, dtype=torch.float32).reshape(3, 32, 32)
""",
bytes_parser_function=(
"import warnings\n"
"import torch\n"
"def bytes_parser_function(data: memoryview) -> torch.Tensor:\n"
" with warnings.catch_warnings():\n"
" warnings.simplefilter('ignore', category=UserWarning)\n"
" return torch.frombuffer(data, dtype=torch.float32).reshape(3, 32, 32)"
),
),
trigger=trigger,
evaluation=EvaluationConfig(
eval_strategy=eval_strategy,
handlers=eval_handlers,
device="cuda:0",
result_writers=["json"],
datasets=[
EvalDataConfig(
dataset_id="yearbook",
bytes_parser_function="""
import torch
import numpy as np
def bytes_parser_function(data: bytes) -> torch.Tensor:
return torch.from_numpy(np.frombuffer(data, dtype=np.float32)).reshape(3, 32, 32)
""",
dataset_id=yb_dataset_name,
bytes_parser_function=(
"import torch\n"
"import numpy as np\n"
"def bytes_parser_function(data: bytes) -> torch.Tensor:\n"
" return torch.from_numpy(np.frombuffer(data, dtype=np.float32)).reshape(3, 32, 32)\n"
),
batch_size=64,
dataloader_workers=2,
metrics=[
Metric(
name="Accuracy",
evaluation_transformer_function="""
import torch
def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:
return torch.argmax(model_output, dim=-1)
""",
)
AccuracyMetricConfig(
evaluation_transformer_function=(
"import torch\n"
"def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n"
" return torch.argmax(model_output, dim=-1)\n"
),
),
F1ScoreMetricConfig(
evaluation_transformer_function=(
"import torch\n"
"def evaluation_transformer_function(model_output: torch.Tensor) -> torch.Tensor:\n"
" return torch.argmax(model_output, dim=-1)"
),
num_classes=num_classes,
average="weighted",
),
],
)
for yb_dataset_name in ["yearbook", "yearbook_test"]
],
),
)
103 changes: 79 additions & 24 deletions experiments/yearbook/compare_trigger_policies/run.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,103 @@
import os

from experiments.utils.experiment_runner import run_multiple_pipelines
from experiments.yearbook.compare_trigger_policies.pipeline_config import gen_pipeline_config
from modyn.config.schema.pipeline import (
DataAmountTriggerConfig,
ModynPipelineConfig,
OffsetEvalStrategyConfig,
OffsetEvalStrategyModel,
TimeTriggerConfig,
)
from modynclient.client.client import run_multiple_pipelines
from modyn.config.schema.pipeline import DataAmountTriggerConfig, ModynPipelineConfig, TimeTriggerConfig
from modyn.config.schema.pipeline.evaluation.handler import EvalHandlerConfig
from modyn.config.schema.pipeline.evaluation.strategy.between_two_triggers import BetweenTwoTriggersEvalStrategyConfig
from modyn.config.schema.pipeline.evaluation.strategy.periodic import PeriodicEvalStrategyConfig
from modyn.config.schema.pipeline.trigger import DataDriftTriggerConfig
from modyn.utils.utils import SECONDS_PER_UNIT
from modynclient.config.schema.client_config import ModynClientConfig, Supervisor


def run_experiment() -> None:
pipeline_configs: ModynPipelineConfig = []
def construct_pipelines() -> None:
pipeline_configs: list[ModynPipelineConfig] = []
first_timestamp = 0
last_timestamp = SECONDS_PER_UNIT["d"] * (2015 - 1930)

eval_handlers = [
EvalHandlerConfig(
name=f"scheduled-{interval}",
execution_time="manual",
models="matrix",
strategy=PeriodicEvalStrategyConfig(
every="1d", # every year
interval=f"[-{fake_interval}; +{fake_interval}]",
start_timestamp=first_timestamp,
end_timestamp=last_timestamp,
),
datasets=["yearbook_test"],
)
for (interval, fake_interval) in [("~1y", "300d"), ("1y", "1d"), ("2y", "2d"), ("3y", "3d"), ("5y", "5d")]
] + [
EvalHandlerConfig(
name="full",
execution_time="manual",
models="most_recent",
strategy=BetweenTwoTriggersEvalStrategyConfig(),
datasets=["yearbook", "yearbook_test"],
),
]

# time based triggers: every: 1y, 5y, 15y, 25y
for years in [1, 5, 15, 25]:
# time based triggers: every: 1y, 3y, 5y, 15y, 25y, 40y
for years in [1, 3, 5, 15, 25, 40]:
pipeline_configs.append(
gen_pipeline_config(
name=f"TimeTrigger_{years}y",
trigger=TimeTriggerConfig(every=years, unit="y"),
# as OffsetEvalStrategy uses time offsets, this is compliant with the trigger config
eval_strategy=OffsetEvalStrategyModel(
name="OffsetEvalStrategy", config=OffsetEvalStrategyConfig(offsets=[f"{years}d"])
),
name=f"timetrigger_{years}y",
trigger=TimeTriggerConfig(every=f"{years}d", start_timestamp=first_timestamp), # faked timestamps
eval_handlers=eval_handlers,
)
)

# sample count based triggers: every: 100, 500, 1000, 2000, 10_000
for count in [100, 500, 1000, 2000, 10_000]:
pipeline_configs.append(
gen_pipeline_config(
name=f"DataAmountTrigger_{count}",
name=f"dataamounttrigger_{count}",
trigger=DataAmountTriggerConfig(num_samples=count),
eval_strategy=OffsetEvalStrategyModel(
name="OffsetEvalStrategy", config=OffsetEvalStrategyConfig(offsets=[f"{count}"])
eval_handlers=eval_handlers,
)
)

for interval, threshold in [
(500, 0.7),
(1_000, 0.5),
(1_000, 0.6),
(1_000, 0.7),
(1_000, 0.8),
(1_000, 0.9),
(5_000, 0.7),
(10_000, 0.7),
]:
pipeline_configs.append(
gen_pipeline_config(
name=f"datadrifttrigger_{interval}_{threshold}",
trigger=DataDriftTriggerConfig(
detection_interval_data_points=interval,
sample_size=None,
metric="model",
metric_config={"threshold": threshold},
),
eval_handlers=eval_handlers,
)
)

host = input("Enter the supervisors host address: ") or "localhost"
port = int(input("Enter the supervisors port: ")) or 50063
return pipeline_configs


def run_experiment() -> None:
host = os.getenv("MODYN_SUPERVISOR_HOST")
port = os.getenv("MODYN_SUPERVISOR_PORT")

if not host:
host = input("Enter the supervisors host address: ") or "localhost"
if not port:
port = int(input("Enter the supervisors port: ")) or 50063

run_multiple_pipelines(
client_config=ModynClientConfig(supervisor=Supervisor(ip=host, port=port)),
pipeline_configs=pipeline_configs,
pipeline_configs=construct_pipelines(),
start_replay_at=0,
stop_replay_at=None,
maximum_triggers=None,
Expand Down

0 comments on commit 7d694d0

Please sign in to comment.