Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward shuffle to trainer server and measure GPU #526

Merged
merged 11 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions experiments/utils/experiment_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def run_multiple_pipelines(
start_replay_at: int = 0,
stop_replay_at: int | None = None,
maximum_triggers: int | None = None,
show_eval_progress: bool = True
show_eval_progress: bool = True,
) -> None:
logger.info("Start running multiple experiments!")

Expand All @@ -27,10 +27,9 @@ def run_multiple_pipelines(
if started:
result = client.poll_pipeline_status(show_eval_progress=show_eval_progress)
logger.info(f"Finished pipeline: {pipeline_config.pipeline.name}")

if not result:
logger.error("Client exited with error, aborting.")
return

logger.info("Finished running multiple experiments!")

57 changes: 34 additions & 23 deletions modyn/common/grpc/grpc_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,16 @@
# TODO(#130): Implement this at trainer server.
logger.error("The trainer server currently does not support remotely stopping training, ignoring.")

# pylint: disable=too-many-branches,too-many-locals,too-many-statements
def start_training(
self,
# pylint: disable=too-many-locals, too-many-branches
@staticmethod
def prepare_start_training_request(
pipeline_id: int,
trigger_id: int,
training_config: TrainingConfig,
data_config: DataConfig,
previous_model_id: Optional[int],
num_samples_to_pass: Optional[int] = None,
) -> int:
assert self.trainer_server is not None
if not self.connected_to_trainer_server:
raise ConnectionError("Tried to start training at trainer server, but not there is no gRPC connection.")

) -> StartTrainingRequest:
optimizers_config = {}
for optimizer in training_config.optimizers:
optimizer_config: dict[str, Any] = {
Expand All @@ -207,15 +203,7 @@
lr_scheduler_configs = training_config.lr_scheduler.model_dump(by_alias=True)

criterion_config = json.dumps(training_config.optimization_criterion.config)

epochs_per_trigger = training_config.epochs_per_trigger
num_prefetched_partitions = training_config.num_prefetched_partitions
parallel_prefetch_requests = training_config.parallel_prefetch_requests

XianzheMa marked this conversation as resolved.
Show resolved Hide resolved
seed = training_config.seed
tokenizer = data_config.tokenizer
transform_list = data_config.transformations
label_transformer = data_config.label_transformer_function

if training_config.checkpointing.activated:
# the None-ility of the two fields are already validated by pydantic
Expand Down Expand Up @@ -244,23 +232,46 @@
num_dataloaders=training_config.dataloader_workers,
),
"checkpoint_info": checkpoint_info,
"transform_list": transform_list,
"transform_list": data_config.transformations,
"bytes_parser": PythonString(value=data_config.bytes_parser_function),
"label_transformer": PythonString(value=label_transformer),
"label_transformer": PythonString(value=data_config.label_transformer_function),
"lr_scheduler": TrainerServerJsonString(value=json.dumps(lr_scheduler_configs)),
"grad_scaler_configuration": TrainerServerJsonString(value=json.dumps(grad_scaler_config)),
"epochs_per_trigger": epochs_per_trigger,
"num_prefetched_partitions": num_prefetched_partitions,
"parallel_prefetch_requests": parallel_prefetch_requests,
"seed": seed,
"epochs_per_trigger": training_config.epochs_per_trigger,
"num_prefetched_partitions": training_config.num_prefetched_partitions,
"parallel_prefetch_requests": training_config.parallel_prefetch_requests,
"seed": training_config.seed,
"tokenizer": PythonString(value=tokenizer) if tokenizer is not None else None,
"num_samples_to_pass": num_samples_to_pass,
"shuffle": training_config.shuffle,
XianzheMa marked this conversation as resolved.
Show resolved Hide resolved
"measure_operation_time": training_config.measure_operation_time,
}

cleaned_kwargs: dict[str, Any] = {k: v for k, v in start_training_kwargs.items() if v is not None}

req = StartTrainingRequest(**cleaned_kwargs)
return StartTrainingRequest(**cleaned_kwargs)

def start_training(
self,
pipeline_id: int,
trigger_id: int,
training_config: TrainingConfig,
data_config: DataConfig,
previous_model_id: Optional[int],
num_samples_to_pass: Optional[int] = None,
) -> int:
assert self.trainer_server is not None
if not self.connected_to_trainer_server:
raise ConnectionError("Tried to start training at trainer server, but not there is no gRPC connection.")

Check warning on line 265 in modyn/common/grpc/grpc_helpers.py

View check run for this annotation

Codecov / codecov/patch

modyn/common/grpc/grpc_helpers.py#L265

Added line #L265 was not covered by tests

req = self.prepare_start_training_request(
pipeline_id,
trigger_id,
training_config,
data_config,
previous_model_id,
num_samples_to_pass,
)
response: StartTrainingResponse = self.trainer_server.start_training(req)

if not response.training_started:
Expand Down
6 changes: 6 additions & 0 deletions modyn/config/schema/pipeline/training/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ class TrainingConfig(ModynBaseModel):
"Otherwise, the output order is deterministic."
)
)
measure_operation_time: bool = Field(
default=False,
description="If True, we measure the time of individual operations within a training process and log them. "
"Note that this can have a significant impact on performance as it requires cuda synchronization "
"at every measurement.",
)
use_previous_model: bool = Field(
description=(
"If True, on trigger, we continue training on the model outputted by the previous trigger. If False, "
Expand Down
1 change: 1 addition & 0 deletions modyn/protos/trainer_server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ message StartTrainingRequest {
optional PythonString tokenizer = 22;
int64 num_samples_to_pass = 23;
bool shuffle = 24;
bool measure_operation_time = 25;
}

message StartTrainingResponse {
Expand Down
108 changes: 107 additions & 1 deletion modyn/tests/common/grpc/test_grpc_helpers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
import json
import multiprocessing as mp
from pathlib import Path
from typing import Optional
from unittest.mock import MagicMock, patch

import pytest
from modyn.common.grpc import GenericGRPCServer
from modyn.common.grpc.grpc_helpers import TrainerServerGRPCHandlerMixin
from modyn.config import ModynConfig, ModynPipelineConfig
from modyn.config import (
CheckpointingConfig,
DataConfig,
LrSchedulerConfig,
ModynConfig,
ModynPipelineConfig,
TrainingConfig,
)
from modyn.supervisor.internal.utils import TrainingStatusReporter
from modyn.trainer_server.internal.grpc.generated.trainer_server_pb2 import JsonString as TrainerJsonString
from modyn.trainer_server.internal.grpc.generated.trainer_server_pb2 import (
Expand Down Expand Up @@ -41,8 +52,100 @@ def test_init_and_trainer_server_available(
avail_method.assert_called_once()


@pytest.fixture()
def pipeline_data_config():
return DataConfig(
dataset_id="test",
bytes_parser_function="def bytes_parser_function(x):\n\treturn x",
label_transformer_function="def label_transformer_function(x):\n\treturn x",
transformations=["transformation1", "transformation2"],
)


@pytest.fixture()
def lr_scheduler_config():
return LrSchedulerConfig(
name="CosineAnnealingLR",
source="PyTorch",
step_every="batch",
optimizers=["default"],
config={"T_max": "MODYN_NUM_BATCHES", "eta_min": 0.001},
)


@pytest.mark.parametrize("previous_model_id", [1, None])
@pytest.mark.parametrize("num_samples_to_pass", [5, None])
@pytest.mark.parametrize("set_lr_scheduler_to_none", [True, False])
@pytest.mark.parametrize("disable_checkpointing", [True, False])
def test_prepare_start_training_request(
disable_checkpointing: bool,
set_lr_scheduler_to_none: bool,
num_samples_to_pass: Optional[int],
previous_model_id: Optional[int],
pipeline_training_config: TrainingConfig,
pipeline_data_config: DataConfig,
lr_scheduler_config: LrSchedulerConfig,
):
# for bool value False is the default value so we don't need to test it
pipeline_training_config.shuffle = True
pipeline_training_config.measure_operation_time = True
pipeline_training_config.optimization_criterion.config = {"key": "value"}
pipeline_training_config.use_previous_model = previous_model_id is not None

pipeline_training_config.lr_scheduler = None if set_lr_scheduler_to_none else lr_scheduler_config
if set_lr_scheduler_to_none:
expected_lr_scheduler_config = {}
else:
expected_lr_scheduler_config = lr_scheduler_config.model_dump(by_alias=True)
if disable_checkpointing:
pipeline_training_config.checkpointing = CheckpointingConfig(activated=False)
else:
pipeline_training_config.checkpointing = CheckpointingConfig(activated=True, interval=1, path=Path("test"))

pipeline_id = 42
trigger_id = 21

req = TrainerServerGRPCHandlerMixin.prepare_start_training_request(
pipeline_id, trigger_id, pipeline_training_config, pipeline_data_config, previous_model_id, num_samples_to_pass
)

assert req.pipeline_id == pipeline_id
assert req.trigger_id == trigger_id
assert req.device == pipeline_training_config.device
assert req.use_pretrained_model == pipeline_training_config.use_previous_model
assert req.pretrained_model_id == previous_model_id if previous_model_id is not None else -1
assert req.batch_size == pipeline_training_config.batch_size
assert req.torch_criterion == pipeline_training_config.optimization_criterion.name
assert json.loads(req.criterion_parameters.value) == pipeline_training_config.optimization_criterion.config
assert req.data_info.dataset_id == pipeline_data_config.dataset_id
assert req.data_info.num_dataloaders == pipeline_training_config.dataloader_workers
if disable_checkpointing:
assert req.checkpoint_info.checkpoint_path == ""
assert req.checkpoint_info.checkpoint_interval == 0
else:
assert req.checkpoint_info.checkpoint_path == str(pipeline_training_config.checkpointing.path)
assert req.checkpoint_info.checkpoint_interval == pipeline_training_config.checkpointing.interval
assert req.bytes_parser.value == pipeline_data_config.bytes_parser_function
assert req.transform_list == pipeline_data_config.transformations
assert req.label_transformer.value == pipeline_data_config.label_transformer_function
assert json.loads(req.lr_scheduler.value) == expected_lr_scheduler_config
assert req.epochs_per_trigger == pipeline_training_config.epochs_per_trigger
assert req.num_prefetched_partitions == pipeline_training_config.num_prefetched_partitions
assert req.parallel_prefetch_requests == pipeline_training_config.parallel_prefetch_requests
assert req.seed == 0
assert req.num_samples_to_pass == (num_samples_to_pass if num_samples_to_pass is not None else 0)
assert req.shuffle
assert req.measure_operation_time


@patch("modyn.common.grpc.grpc_helpers.grpc_connection_established", return_value=True)
@patch.object(
TrainerServerGRPCHandlerMixin,
"prepare_start_training_request",
wraps=TrainerServerGRPCHandlerMixin.prepare_start_training_request,
)
def test_start_training(
test_prepare_start_training_request: MagicMock,
test_common_connection_established: MagicMock,
dummy_pipeline_config: ModynPipelineConfig,
dummy_system_config: ModynConfig,
Expand All @@ -65,6 +168,9 @@ def test_start_training(
== 42
)
avail_method.assert_called_once()
test_prepare_start_training_request.assert_called_once_with(
pipeline_id, trigger_id, dummy_pipeline_config.training, dummy_pipeline_config.data, None, None
)


@patch("modyn.common.grpc.grpc_helpers.grpc_connection_established", return_value=True)
Expand Down
Loading
Loading