Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DeepSpeed] Remove partitioning of model in ZeRO 3 #10655

Merged
merged 11 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .azure-pipelines/gpu-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
- bash: |
python -c "fname = 'requirements/extra.txt' ; lines = [line for line in open(fname).readlines() if 'horovod' not in line] ; open(fname, 'w').writelines(lines)"
pip install fairscale==0.4.0
pip install deepspeed==0.5.4
pip install deepspeed==0.5.7
SeanNaren marked this conversation as resolved.
Show resolved Hide resolved
pip install . --requirement requirements/devel.txt
pip list
displayName: 'Install dependencies'
Expand Down
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Moved ownership of the `PrecisionPlugin` into `TrainingTypePlugin` and updated all references ([#10570](https://github.com/PyTorchLightning/pytorch-lightning/pull/10570))


-
- DeepSpeed does not require optimizer for inference anymore ([#10655](https://github.com/PyTorchLightning/pytorch-lightning/pull/10655))
SeanNaren marked this conversation as resolved.
Show resolved Hide resolved


-
Expand Down
31 changes: 2 additions & 29 deletions pytorch_lightning/plugins/training_type/deepspeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ def __init__(
contiguous_memory_optimization: bool = False,
synchronize_checkpoint_boundary: bool = False,
load_full_weights: bool = False,
partition_module: bool = True,
precision_plugin: Optional[PrecisionPlugin] = None,
) -> None:
"""Provides capabilities to run training using the DeepSpeed library, with training optimizations for large
Expand Down Expand Up @@ -260,12 +259,6 @@ def __init__(
load_full_weights: True when loading a single checkpoint file containing the model state dict
when using ZeRO Stage 3. This differs from the DeepSpeed checkpoint which contains shards
per worker.

partition_module: When True, partitions the ``LightningModule`` across devices when using ZeRO Stage 3.
SeanNaren marked this conversation as resolved.
Show resolved Hide resolved
This is the default behaviour to ensure that the entire module is appropriately initialized
for DeepSpeed. When False we do not explicitly convert the model, which is fine if NO layers
or ALL layers are defined in ``configure_sharded_model``. This is useful for layers such as
``torch.nn.RNN`` which do internal logic when moving to device.
"""
if not _DEEPSPEED_AVAILABLE:
raise MisconfigurationException(
Expand Down Expand Up @@ -318,7 +311,6 @@ def __init__(

self.remote_device = remote_device
self.load_full_weights = load_full_weights
self.partition_module = partition_module

# default FP16 parameters.
self.loss_scale = loss_scale
Expand Down Expand Up @@ -461,13 +453,6 @@ def init_deepspeed(self):

model = LightningDeepSpeedModule(pl_module=self.model, precision=self.precision)

if self.zero_stage_3 and self.partition_module:
# Ensure the entire model has been moved to the appropriate device
dtype = torch.float16 if self.precision in (16, "mixed") else torch.float32
deepspeed.zero.Init(
module=model, remote_device=self.remote_device, pin_memory=True, config=self.config, dtype=dtype
)

if self.lightning_module.trainer and self.lightning_module.trainer.training:
self._initialize_deepspeed_train(model)
else:
Expand Down Expand Up @@ -522,7 +507,7 @@ def model_sharded_context(self) -> Generator[None, None, None]:
assert self._config_initialized
dtype = torch.float16 if self.precision in (16, "mixed") else torch.float32
model_parallel_context = deepspeed.zero.Init(
remote_device=self.remote_device, pin_memory=True, config=self.config, dtype=dtype
remote_device=self.remote_device, pin_memory=True, config_dict_or_path=self.config, dtype=dtype
)
else:
model_parallel_context = super().model_sharded_context()
Expand All @@ -542,17 +527,8 @@ def _set_deepspeed_activation_checkpointing(self):
)

def _initialize_deepspeed_inference(self, model):
# todo: Currently DeepSpeed requires optimizers at inference to partition weights correctly
optimizer, scheduler = None, None
if "optimizer" not in self.config:
rank_zero_info(
"You have not specified an optimizer or scheduler within the DeepSpeed config."
" Using `configure_optimizers` to define optimizer and scheduler."
)
optimizer, lr_scheduler, _ = self._init_optimizers()
scheduler = lr_scheduler["scheduler"]
inference_config = {
# todo: this is required for DeepSpeed throughput timers, or throughput timers will be incorrect
# todo: this is required for DeepSpeed throughput timers
"train_micro_batch_size_per_gpu": 1
}
if "fp16" in self.config:
Expand All @@ -570,9 +546,6 @@ def _initialize_deepspeed_inference(self, model):
args=argparse.Namespace(device_rank=self.root_device.index),
config=inference_config,
model=model,
optimizer=optimizer,
lr_scheduler=scheduler,
model_parameters=[],
dist_init_required=False,
)
self.model = model
Expand Down
49 changes: 11 additions & 38 deletions tests/plugins/test_deepspeed_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,10 @@ def test_deepspeed_multigpu_stage_3_manual_optimization(tmpdir, deepspeed_config
_assert_save_model_is_equal(model, tmpdir, trainer)


def run_checkpoint_test(tmpdir: str, automatic_optimization: bool = True, accumulate_grad_batches: int = 2):
@pytest.mark.parametrize("automatic_optimization", [False, True])
@pytest.mark.parametrize("accumulate_grad_batches", [1, 2])
@RunIf(min_gpus=2, deepspeed=True, special=True)
def test_deepspeed_multigpu_stage_3_checkpointing(tmpdir, automatic_optimization, accumulate_grad_batches):
seed_everything(1)
if automatic_optimization:
model = ModelParallelClassificationModel()
Expand Down Expand Up @@ -600,13 +603,6 @@ def run_checkpoint_test(tmpdir: str, automatic_optimization: bool = True, accumu
assert results[0]["test_acc"] > 0.7


@RunIf(min_gpus=2, deepspeed=True, special=True)
def test_deepspeed_multigpu_stage_3_checkpointing(tmpdir):
"""Test to ensure with Stage 3 and multiple GPUs that we can save/load a model resuming from a checkpoint, and
see convergence."""
run_checkpoint_test(tmpdir)


@RunIf(min_gpus=1, deepspeed=True, special=True)
def test_deepspeed_multigpu_stage_3_warns_resume_training(tmpdir):
"""Test to ensure with Stage 3 and multiple GPUs that we can resume from training, throwing a warning that the
Expand Down Expand Up @@ -688,24 +684,9 @@ def on_train_batch_start(
trainer.fit(model, datamodule=dm, ckpt_path=ck.best_model_path)


@pytest.mark.parametrize("offload_optimizer", [False, True])
@RunIf(min_gpus=2, deepspeed=True, special=True)
def test_deepspeed_multigpu_stage_3_checkpointing_full_weights_manual(tmpdir):
"""Test to ensure with Stage 3 and multiple GPUs that we can save/load a model resuming from a checkpoint,
where we save the full weights to one file."""
run_checkpoint_test(tmpdir, automatic_optimization=False, accumulate_grad_batches=1)


@RunIf(min_gpus=2, deepspeed=True, special=True)
def test_deepspeed_multigpu_stage_2_accumulated_grad_batches(tmpdir):
_deepspeed_multigpu_stage_2_accumulated_grad_batches(tmpdir, offload_optimizer=False)


@RunIf(min_gpus=2, deepspeed=True, special=True)
def test_deepspeed_multigpu_stage_2_accumulated_grad_batches_offload_optimizer(tmpdir):
_deepspeed_multigpu_stage_2_accumulated_grad_batches(tmpdir, offload_optimizer=True)


def _deepspeed_multigpu_stage_2_accumulated_grad_batches(tmpdir, offload_optimizer):
def test_deepspeed_multigpu_stage_2_accumulated_grad_batches(tmpdir, offload_optimizer):
"""Test to ensure with Stage 2 and multiple GPUs, accumulated grad batches works."""
seed_everything(42)

Expand Down Expand Up @@ -911,22 +892,14 @@ def test_dataloader(self):


@mock.patch("torch.optim.lr_scheduler.StepLR.step", autospec=True)
@pytest.mark.parametrize("interval", ["step", "epoch"])
@pytest.mark.parametrize("max_epoch", [2])
@pytest.mark.parametrize("limit_train_batches", [2])
@RunIf(min_gpus=1, deepspeed=True, special=True)
def test_deepspeed_scheduler_step_count(mock_step):
def test_scheduler_step_count(mock_step, max_epoch, limit_train_batches, interval):
"""Test to ensure that the scheduler is called the correct amount of times during training when scheduler is
set to step."""
_run_scheduler_test(mock_step, max_epoch=2, limit_train_batches=2, interval="step")


@mock.patch("torch.optim.lr_scheduler.StepLR.step", autospec=True)
@RunIf(min_gpus=1, deepspeed=True, special=True)
def test_deepspeed_scheduler_step_count_epoch(mock_step):
"""Test to ensure that the scheduler is called the correct amount of times during training when scheduler is
set to epoch."""
_run_scheduler_test(mock_step, max_epoch=2, limit_train_batches=2, interval="epoch")

set to step or epoch."""

def _run_scheduler_test(mock_step, max_epoch, limit_train_batches, interval):
class TestModel(BoringModel):
def configure_optimizers(self):
optimizer = torch.optim.SGD(self.layer.parameters(), lr=0.1)
Expand Down