Skip to content

Commit 68bbf08

Browse files
authored
Merge pull request #389 from Modalities/benchmark_tooling
Benchmark Tooling
2 parents 5acf98c + c0c633a commit 68bbf08

File tree

57 files changed

+4499
-136
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+4499
-136
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,11 @@ tests/tmp/*
164164
*wandb_storage*
165165
.coverage/*
166166
*.pbin
167+
tutorials/scaling_up2/experiments
167168
tutorials/scaling_up/experiments
168169
tutorials/profiling/experiments
169170
tutorials/instruction_tuning/prepared_data
170171
config_files/instruction_tuning
171172
data/lorem_ipsum_instruct.jsonl
173+
tutorials/scaling_up/logs*
174+
tutorials/scaling_up/experiments_old/*

CHANGELOG_DEV.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,3 +186,19 @@ There are now three AC variants:
186186
* adds support for Tensor Parallelism (including Sequence Parallelism).
187187
* adds a debugging toolkit to track the input and output tensors during a forward pass, gradients during the backward pass and weight tensors.
188188
Tensors can be either normal Tensors or DTensors.
189+
190+
191+
## PR #389 Benchmark Tooling
192+
* adds benchmarking tooling to modalities and allows for scaling benchmarks across varying number of nodes and the cartesian product of configurable hyper parameters.
193+
194+
**Breaking Changes**
195+
* Renaming: EvaluationResultToDiscSubscriberConfig.output_path -> EvaluationResultToDiscSubscriberConfig.output_file_path
196+
197+
198+
199+
## PR #410 MFU incorporates dp_degree now instead of world_size
200+
201+
This PR fixes the MFU and throughput calculations by taking the dp degree into account instead of the world size. When we use parallelization strategies on top of FSDP, then the world size is different from the data parallel degree. This needs to be reflected in throughput and MFU metric calculations, as done by this PR.
202+
203+
**Breaking Changes**
204+
* Existing configs need to be adapted to correctly use dp degree rather than world size.

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,12 @@ Even though Modalities significantly simplifies LLM training, there is still som
198198
- [Library Usage](tutorials/library_usage/README.md)</br>
199199
How to use Modalities as a library and register custom components with Modalities.
200200

201+
- [Instruction Tuning](tutorials/instruction_tuning/README.md)</br>
202+
Teaches you how to apply instruction tuning on a pre-trained model.
203+
204+
- [Scaling Up](tutorials/scaling_up/README.md)</br>
205+
When scaling up your training to hundreds or thousands of GPUs, you want to maintain linear scalability.
206+
This tutorial teaches you how to find the optimal throughput setting for various hyperparameter settings at different scales.
201207

202208

203209

src/modalities/__main__.py

Lines changed: 150 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
#!/usr/bin/env python
22

33
import json
4+
import os
5+
import socket
6+
import traceback
47
from functools import partial
58
from pathlib import Path
6-
from typing import Optional
9+
from typing import Any, Optional
710

811
import click
912
import click_pathlib
@@ -29,6 +32,8 @@
2932
from modalities.models.huggingface_adapters.hf_adapter import HFModelAdapter
3033
from modalities.running_env.cuda_env import CudaEnv
3134
from modalities.util import print_rank_0
35+
from modalities.utils.benchmarking.benchmarking_utils import SweepSets, get_updated_sweep_status
36+
from modalities.utils.benchmarking.sweep_utils import SweepGenerator
3237
from modalities.utils.communication_test import run_communication_test
3338

3439

@@ -50,21 +55,71 @@ def main() -> None:
5055
default=False,
5156
help="If set, run a communication test before training.",
5257
)
53-
def CMD_entry_point_run_modalities(config_file_path: Path, test_comm: bool = False):
58+
@click.option(
59+
"--experiment_id",
60+
type=str,
61+
default=None,
62+
help="Optional experiment ID to use for this run. If not provided, it will be derived from the config file path.",
63+
)
64+
@click.option(
65+
"--error_log_folder",
66+
type=click_pathlib.Path(),
67+
default=None,
68+
help="Optional path to a folder where error logs will be written.",
69+
)
70+
def CMD_entry_point_run_modalities(
71+
config_file_path: Path,
72+
test_comm: bool = False,
73+
experiment_id: Optional[str] = None,
74+
error_log_folder: Optional[Path] = None,
75+
):
5476
"""Entrypoint to run the model training.
5577
5678
Args:
5779
config_file_path (Path): Path to the YAML training config file.
80+
test_comm (bool): If set, run a communication test before training.
81+
experiment_id (Optional[str]): Optional experiment ID to use for this run.
82+
If not provided it will be generated. Default is None.
83+
error_log_folder (Optional[Path]): Optional path to a folder where error logs will be written.
5884
"""
59-
with CudaEnv(process_group_backend=ProcessGroupBackendType.nccl):
60-
if test_comm:
61-
print_rank_0("Running communication test...")
62-
run_communication_test()
63-
print_rank_0("Communication test succeeded.")
6485

65-
main_obj = Main(config_file_path)
66-
components = main_obj.build_components(components_model_type=TrainingComponentsInstantiationModel)
67-
main_obj.run(components)
86+
def _format_exception_as_json(e: Exception, environment: dict[str, Any]) -> str:
87+
# Format an exception into a structured JSON string with error message, type, and stack trace.
88+
error = {
89+
"error": str(e),
90+
"type": type(e).__name__,
91+
"stacktrace": traceback.format_exception(type(e), e, e.__traceback__),
92+
}
93+
94+
return json.dumps({"environment": environment, "error": error}, indent=2)
95+
96+
try:
97+
with CudaEnv(process_group_backend=ProcessGroupBackendType.nccl):
98+
if test_comm:
99+
print_rank_0("Running communication test...")
100+
run_communication_test()
101+
print_rank_0("Communication test succeeded.")
102+
103+
main_obj = Main(config_file_path, experiment_id=experiment_id)
104+
components = main_obj.build_components(components_model_type=TrainingComponentsInstantiationModel)
105+
main_obj.run(components)
106+
except Exception as e:
107+
if error_log_folder is not None:
108+
environment = {
109+
"rank": int(os.environ["RANK"] if "RANK" in os.environ else -1),
110+
"local_rank": int(os.environ["LOCAL_RANK"] if "LOCAL_RANK" in os.environ else -1),
111+
"world_size": int(os.environ["WORLD_SIZE"] if "WORLD_SIZE" in os.environ else -1),
112+
"hostname": socket.gethostname(),
113+
}
114+
error_log_folder = (
115+
error_log_folder.parent
116+
/ f"{error_log_folder.stem}_{environment['hostname']}_{environment['local_rank']}.log"
117+
)
118+
error_log_folder.parent.mkdir(parents=True, exist_ok=True)
119+
with open(error_log_folder, "w", encoding="utf-8") as f:
120+
f.write(_format_exception_as_json(e, environment))
121+
122+
raise RuntimeError(f"An error occurred while running the training: {e}. ") from e
68123

69124

70125
@main.command(name="warmstart")
@@ -523,5 +578,90 @@ def CMD_shuffle_jsonl_data(
523578
)
524579

525580

581+
@main.group(name="benchmark")
582+
def benchmark():
583+
"""
584+
Collection of utilities to prepare and run benchmarks.
585+
"""
586+
pass
587+
588+
589+
@benchmark.command(name="prepare_sweep_configs")
590+
@click.option(
591+
"--sweep_config_path",
592+
type=click.Path(exists=True, path_type=Path),
593+
required=True,
594+
help="Path to the sweep configuration YAML file.",
595+
)
596+
@click.option(
597+
"--output_dir",
598+
type=click.Path(file_okay=False, writable=True, path_type=Path),
599+
required=True,
600+
help="Directory to save the generated sweep configurations.",
601+
)
602+
@click.option(
603+
"--world_sizes",
604+
type=str,
605+
default="2",
606+
help="Comma-separated list of world sizes (must not have spaces), e.g. --world_sizes '2,4,8'",
607+
)
608+
def prepare_sweep_configs(sweep_config_path: Path, output_dir: Path, world_sizes: str):
609+
"""
610+
Utility for preparing sweep configurations.
611+
"""
612+
try:
613+
world_sizes_list: list[int] = list(map(int, world_sizes.split(",")))
614+
except ValueError as e:
615+
raise ValueError("Invalid world_sizes format. Please provide a comma-separated list of integers.") from e
616+
SweepGenerator.generate_sweep_configs(sweep_config_path, output_dir, world_sizes_list)
617+
618+
619+
@benchmark.command(name="list_remaining_runs")
620+
@click.option(
621+
"--exp_root",
622+
type=click.Path(exists=True, file_okay=False, path_type=Path),
623+
required=True,
624+
help="Path to the root directory of the experiment containing config files.",
625+
)
626+
@click.option(
627+
"--file_list_path",
628+
type=click.Path(path_type=Path),
629+
required=True,
630+
help="Output file to store paths of configs to run.",
631+
)
632+
@click.option(
633+
"--expected_steps",
634+
type=int,
635+
required=True,
636+
help="Expected number of steps in evaluation_results.jsonl",
637+
)
638+
@click.option(
639+
"--skip_exception_types",
640+
type=str,
641+
default="",
642+
help="Exception types to skip when checking for successful runs. "
643+
"Typically, we would add 'OutOfMemoryError', as rerunning the experiment would result in the same error. "
644+
" List of exceptions is comma-separated.",
645+
)
646+
def CMD_entry_point_list_remaining_runs(
647+
exp_root: Path,
648+
file_list_path: Path,
649+
expected_steps: int,
650+
skip_exception_types: str = "",
651+
):
652+
"""
653+
Prepare a file list of remaining runs from a grid search experiment directory.
654+
"""
655+
skip_exception_types_list = skip_exception_types.split(",") if skip_exception_types != "" else []
656+
file_list_dict = get_updated_sweep_status(
657+
exp_root=exp_root,
658+
expected_steps=expected_steps,
659+
skip_exception_types=skip_exception_types_list,
660+
)
661+
with file_list_path.open("w", encoding="utf-8") as f:
662+
for cfg in file_list_dict[SweepSets.UPDATED_CONFIGS.value]:
663+
f.write(f"{cfg}\n")
664+
665+
526666
if __name__ == "__main__":
527667
main()

src/modalities/config/config.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -458,8 +458,7 @@ class DummyResultSubscriberConfig(BaseModel):
458458

459459

460460
class EvaluationResultToDiscSubscriberConfig(BaseModel):
461-
output_folder_path: Path
462-
experiment_id: str
461+
output_file_path: Path
463462

464463

465464
class WandBEvaluationResultSubscriberConfig(BaseModel):
@@ -517,7 +516,10 @@ def node_env_resolver_fun(var_name: str) -> int:
517516
return os.cpu_count()
518517

519518
OmegaConf.register_new_resolver("cuda_env", cuda_env_resolver_fun, replace=True)
520-
modalities_env_kwargs = {"config_file_path": config_file_path}
519+
modalities_env_kwargs: dict[str, Any] = {
520+
"config_file_path": config_file_path,
521+
"config_folder_path": config_file_path.parent,
522+
}
521523
if experiment_id is not None:
522524
modalities_env_kwargs["experiment_id"] = experiment_id
523525
OmegaConf.register_new_resolver(

src/modalities/config/instantiation_models.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ class StepProfile(BaseModel):
3535
sequence_length: Annotated[int, Field(strict=True, ge=1)]
3636

3737

38+
class MeshDefinition(BaseModel):
39+
dp_degree: Annotated[int, Field(strict=True, gt=0)]
40+
tp_degree: Annotated[int, Field(strict=True, gt=0)] = 1
41+
pp_degree: Annotated[int, Field(strict=True, gt=0)] = 1
42+
cp_degree: Annotated[int, Field(strict=True, gt=0)] = 1
43+
44+
3845
class ConsistencyEnforcement(BaseModel):
3946
enforce_tokens_per_step_consistency: bool = True
4047
enforce_last_step_logged: bool = True
@@ -92,6 +99,7 @@ class DCPWarmstartCheckpointPaths(BaseModel):
9299
intervals: Intervals
93100
consistency_enforcement: ConsistencyEnforcement
94101
step_profile: StepProfile
102+
mesh_definition: MeshDefinition
95103
training_target: TrainingTarget
96104
training_progress: TrainingProgress
97105
warmstart_checkpoint_paths: Optional[WarmstartCheckpointPaths | DCPWarmstartCheckpointPaths] = None
@@ -106,7 +114,7 @@ def _check_tokens_per_step_conistency(self) -> "TrainingComponentsInstantiationM
106114
self.step_profile.local_train_micro_batch_size
107115
* self.step_profile.sequence_length
108116
* self.step_profile.gradient_accumulation_steps
109-
* self.cuda_env.world_size
117+
* self.mesh_definition.dp_degree
110118
)
111119
if required_num_tokens_per_step != step_profile_num_tokens_per_step:
112120
warning_message = (

src/modalities/conversion/gpt2/conversion_model.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ def convert_model_config(modalities_config: dict) -> GPT2Config:
5151
num_hidden_layers=config["n_layer"],
5252
num_key_value_heads=config["n_head_kv"],
5353
num_attention_heads=config["n_head_q"],
54-
intermediate_size=SwiGLU._get_hidden_dim(ffn_hidden=config["ffn_hidden"]),
54+
intermediate_size=SwiGLU._get_hidden_dim(
55+
ffn_hidden=config["ffn_hidden"], enforce_swiglu_hidden_dim_multiple_of=256
56+
),
5557
attention_bias=config["bias"],
5658
mlp_bias=config["bias"],
5759
hidden_act="silu",

src/modalities/logging_broker/subscriber_impl/results_subscriber.py

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import json
2-
from dataclasses import asdict, is_dataclass
2+
from dataclasses import fields, is_dataclass
33
from pathlib import Path
44
from typing import Any
55

@@ -10,7 +10,7 @@
1010
from rich.console import Group
1111
from rich.panel import Panel
1212

13-
from modalities.batch import EvaluationResultBatch
13+
from modalities.batch import EvaluationResultBatch, ResultItem
1414
from modalities.config.config import WandbMode
1515
from modalities.logging_broker.messages import Message
1616
from modalities.logging_broker.subscriber import MessageSubscriberIF
@@ -115,34 +115,41 @@ class EvaluationResultToDiscSubscriber(MessageSubscriberIF[EvaluationResultBatch
115115
def __init__(self, output_file_path: Path) -> None:
116116
super().__init__()
117117
self.output_file_path = output_file_path
118+
self.output_file_path.parent.mkdir(parents=True, exist_ok=True)
118119

119120
def consume_dict(self, message_dict: dict[str, Any]):
120121
"""Optional: log config data if needed (here: no-op)."""
121122
pass
122123

123124
@staticmethod
124-
def _convert_evaluation_result_batch(eval_result_batch: EvaluationResultBatch) -> dict[str, Any]:
125+
def _convert_evaluation_result_batch(obj: EvaluationResultBatch) -> dict[str, Any]:
125126
"""
126127
Recursively convert EvaluationResultBatch structure to JSON-serializable format.
127128
Handles dataclasses and torch.Tensor.
128129
"""
129-
if is_dataclass(eval_result_batch):
130+
131+
def shallow_asdict(obj):
132+
# Converts a dataclass to a dictionary without deep recursion.
133+
if not is_dataclass(obj):
134+
raise TypeError("shallow_asdict() should be called on dataclass instances")
135+
return {f.name: getattr(obj, f.name) for f in fields(obj)}
136+
137+
if isinstance(obj, ResultItem):
138+
return obj.value.item() if obj.value.ndim == 0 else obj.value.tolist()
139+
elif is_dataclass(obj):
130140
result_dict = {}
131-
for k, v in asdict(eval_result_batch).items():
141+
for k, v in shallow_asdict(obj).items():
132142
result_dict[k] = EvaluationResultToDiscSubscriber._convert_evaluation_result_batch(v)
133143
return result_dict
134144

135-
elif isinstance(eval_result_batch, dict):
136-
return {
137-
k: EvaluationResultToDiscSubscriber._convert_evaluation_result_batch(v)
138-
for k, v in eval_result_batch.items()
139-
}
140-
elif isinstance(eval_result_batch, list):
141-
return [EvaluationResultToDiscSubscriber._convert_evaluation_result_batch(v) for v in eval_result_batch]
142-
elif isinstance(eval_result_batch, torch.Tensor):
143-
return eval_result_batch.item() if eval_result_batch.ndim == 0 else eval_result_batch.tolist()
145+
elif isinstance(obj, dict):
146+
return {k: EvaluationResultToDiscSubscriber._convert_evaluation_result_batch(v) for k, v in obj.items()}
147+
elif isinstance(obj, list):
148+
return [EvaluationResultToDiscSubscriber._convert_evaluation_result_batch(v) for v in obj]
149+
elif isinstance(obj, torch.Tensor):
150+
return obj.item() if obj.ndim == 0 else obj.tolist()
144151
else:
145-
return eval_result_batch
152+
return obj
146153

147154
def consume_message(self, message: Message[EvaluationResultBatch]):
148155
"""Writes the evaluation result to the JSONL file if rank 0."""

src/modalities/logging_broker/subscriber_impl/subscriber_factory.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,8 @@ def get_dummy_result_subscriber() -> DummyResultSubscriber:
5757
return DummyResultSubscriber()
5858

5959
@staticmethod
60-
def get_evaluation_result_to_disc_subscriber(
61-
output_folder_path: Path, experiment_id: str
62-
) -> EvaluationResultToDiscSubscriber:
63-
return EvaluationResultToDiscSubscriber(
64-
output_file_path=output_folder_path / experiment_id / "evaluation_results.jsonl"
65-
)
60+
def get_evaluation_result_to_disc_subscriber(output_file_path: Path) -> EvaluationResultToDiscSubscriber:
61+
return EvaluationResultToDiscSubscriber(output_file_path=output_file_path)
6662

6763
@staticmethod
6864
def get_wandb_result_subscriber(

0 commit comments

Comments
 (0)