Skip to content

Commit

Permalink
Remove partitioning of model in ZeRO 3 (#10655)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sean Naren authored Dec 17, 2021
1 parent 4415677 commit c66cd12
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 62 deletions.
2 changes: 1 addition & 1 deletion .azure-pipelines/gpu-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
- bash: |
python -c "fname = 'requirements/extra.txt' ; lines = [line for line in open(fname).readlines() if 'horovod' not in line] ; open(fname, 'w').writelines(lines)"
pip install fairscale==0.4.0
pip install deepspeed==0.5.4
pip install deepspeed==0.5.7
pip install . --requirement requirements/devel.txt
pip list
displayName: 'Install dependencies'
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

- Moved ownership of the `Accelerator` instance to the `TrainingTypePlugin`; all training-type plugins now take an optional parameter `accelerator` ([#11022](https://github.com/PyTorchLightning/pytorch-lightning/pull/11022))


- DeepSpeed does not require lightning module zero 3 partitioning ([#10655](https://github.com/PyTorchLightning/pytorch-lightning/pull/10655))


### Deprecated

- Deprecated `ClusterEnvironment.master_{address,port}` in favor of `ClusterEnvironment.main_{address,port}` ([#10103](https://github.com/PyTorchLightning/pytorch-lightning/issues/10103))
Expand Down
2 changes: 1 addition & 1 deletion dockers/base-cuda/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ RUN \

RUN \
# install DeepSpeed
pip install deepspeed==0.5.4
pip install deepspeed==0.5.7

RUN \
# Show what we have
Expand Down
23 changes: 2 additions & 21 deletions pytorch_lightning/plugins/training_type/deepspeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ def __init__(
contiguous_memory_optimization: bool = False,
synchronize_checkpoint_boundary: bool = False,
load_full_weights: bool = False,
partition_module: bool = True,
precision_plugin: Optional[PrecisionPlugin] = None,
) -> None:
"""Provides capabilities to run training using the DeepSpeed library, with training optimizations for large
Expand Down Expand Up @@ -261,12 +260,6 @@ def __init__(
load_full_weights: True when loading a single checkpoint file containing the model state dict
when using ZeRO Stage 3. This differs from the DeepSpeed checkpoint which contains shards
per worker.
partition_module: When True, partitions the ``LightningModule`` across devices when using ZeRO Stage 3.
This is the default behaviour to ensure that the entire module is appropriately initialized
for DeepSpeed. When False we do not explicitly convert the model, which is fine if NO layers
or ALL layers are defined in ``configure_sharded_model``. This is useful for layers such as
``torch.nn.RNN`` which do internal logic when moving to device.
"""
if not _DEEPSPEED_AVAILABLE:
raise MisconfigurationException(
Expand Down Expand Up @@ -320,7 +313,6 @@ def __init__(

self.remote_device = remote_device
self.load_full_weights = load_full_weights
self.partition_module = partition_module

# default FP16 parameters.
self.loss_scale = loss_scale
Expand Down Expand Up @@ -446,17 +438,6 @@ def init_deepspeed(self):

model = LightningDeepSpeedModule(pl_module=self.model, precision=self.precision_plugin.precision)

if self.zero_stage_3 and self.partition_module:
# Ensure the entire model has been moved to the appropriate device
dtype = (
torch.float16
if self.precision_plugin.precision in (PrecisionType.HALF, PrecisionType.MIXED)
else torch.float32
)
deepspeed.zero.Init(
module=model, remote_device=self.remote_device, pin_memory=True, config=self.config, dtype=dtype
)

if self.lightning_module.trainer and self.lightning_module.trainer.training:
self._initialize_deepspeed_train(model)
else:
Expand Down Expand Up @@ -515,7 +496,7 @@ def model_sharded_context(self) -> Generator[None, None, None]:
else torch.float32
)
model_parallel_context = deepspeed.zero.Init(
remote_device=self.remote_device, pin_memory=True, config=self.config, dtype=dtype
remote_device=self.remote_device, pin_memory=True, config_dict_or_path=self.config, dtype=dtype
)
else:
model_parallel_context = super().model_sharded_context()
Expand Down Expand Up @@ -545,7 +526,7 @@ def _initialize_deepspeed_inference(self, model):
optimizer, lr_scheduler, _ = self._init_optimizers()
scheduler = lr_scheduler["scheduler"]
inference_config = {
# todo: this is required for DeepSpeed throughput timers, or throughput timers will be incorrect
# todo: this is required for DeepSpeed throughput timers
"train_micro_batch_size_per_gpu": 1
}
if "fp16" in self.config:
Expand Down
52 changes: 13 additions & 39 deletions tests/plugins/test_deepspeed_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,9 @@ def test_deepspeed_multigpu_stage_3_manual_optimization(tmpdir, deepspeed_config
_assert_save_model_is_equal(model, tmpdir, trainer)


def run_checkpoint_test(tmpdir: str, automatic_optimization: bool = True, accumulate_grad_batches: int = 2):
@pytest.mark.parametrize(("accumulate_grad_batches", "automatic_optimization"), [(1, False), (2, True)])
@RunIf(min_gpus=2, deepspeed=True, standalone=True)
def test_deepspeed_multigpu_stage_3_checkpointing(tmpdir, automatic_optimization, accumulate_grad_batches):
seed_everything(1)
if automatic_optimization:
model = ModelParallelClassificationModel()
Expand Down Expand Up @@ -630,13 +632,6 @@ def run_checkpoint_test(tmpdir: str, automatic_optimization: bool = True, accumu
assert results[0]["test_acc"] > 0.7


@RunIf(min_gpus=2, deepspeed=True, standalone=True)
def test_deepspeed_multigpu_stage_3_checkpointing(tmpdir):
"""Test to ensure with Stage 3 and multiple GPUs that we can save/load a model resuming from a checkpoint, and
see convergence."""
run_checkpoint_test(tmpdir)


@RunIf(min_gpus=1, deepspeed=True, standalone=True)
def test_deepspeed_multigpu_stage_3_warns_resume_training(tmpdir):
"""Test to ensure with Stage 3 and multiple GPUs that we can resume from training, throwing a warning that the
Expand Down Expand Up @@ -718,24 +713,9 @@ def on_train_batch_start(
trainer.fit(model, datamodule=dm, ckpt_path=ck.best_model_path)


@pytest.mark.parametrize("offload_optimizer", [False, True])
@RunIf(min_gpus=2, deepspeed=True, standalone=True)
def test_deepspeed_multigpu_stage_3_checkpointing_full_weights_manual(tmpdir):
"""Test to ensure with Stage 3 and multiple GPUs that we can save/load a model resuming from a checkpoint,
where we save the full weights to one file."""
run_checkpoint_test(tmpdir, automatic_optimization=False, accumulate_grad_batches=1)


@RunIf(min_gpus=2, deepspeed=True, standalone=True)
def test_deepspeed_multigpu_stage_2_accumulated_grad_batches(tmpdir):
_deepspeed_multigpu_stage_2_accumulated_grad_batches(tmpdir, offload_optimizer=False)


@RunIf(min_gpus=2, deepspeed=True, standalone=True)
def test_deepspeed_multigpu_stage_2_accumulated_grad_batches_offload_optimizer(tmpdir):
_deepspeed_multigpu_stage_2_accumulated_grad_batches(tmpdir, offload_optimizer=True)


def _deepspeed_multigpu_stage_2_accumulated_grad_batches(tmpdir, offload_optimizer):
def test_deepspeed_multigpu_stage_2_accumulated_grad_batches(tmpdir, offload_optimizer):
"""Test to ensure with Stage 2 and multiple GPUs, accumulated grad batches works."""
seed_everything(42)

Expand Down Expand Up @@ -781,6 +761,8 @@ def test_deepspeed_multigpu_test(tmpdir):
trainer.test(model)


# TODO(Sean): Once partial parameter partitioning is supported this test should be re-enabled
@pytest.mark.skip("Partial parameter partitioning for DeepSpeed is currently broken.")
@RunIf(min_gpus=1, deepspeed=True, standalone=True)
def test_deepspeed_multigpu_partial_partition_parameters(tmpdir):
"""Test to ensure that a module that defines a layer inside the ``__init__`` and ``configure_sharded_model``
Expand Down Expand Up @@ -824,7 +806,7 @@ def on_train_epoch_start(self) -> None:
model = TestModel()
trainer = Trainer(
default_root_dir=tmpdir,
strategy=DeepSpeedPlugin(stage=3, partition_module=False),
strategy=DeepSpeedPlugin(stage=3),
gpus=1,
fast_dev_run=True,
precision=16,
Expand Down Expand Up @@ -941,22 +923,14 @@ def test_dataloader(self):


@mock.patch("torch.optim.lr_scheduler.StepLR.step", autospec=True)
@pytest.mark.parametrize("interval", ["step", "epoch"])
@pytest.mark.parametrize("max_epoch", [2])
@pytest.mark.parametrize("limit_train_batches", [2])
@RunIf(min_gpus=1, deepspeed=True, standalone=True)
def test_deepspeed_scheduler_step_count(mock_step):
def test_scheduler_step_count(mock_step, max_epoch, limit_train_batches, interval):
"""Test to ensure that the scheduler is called the correct amount of times during training when scheduler is
set to step."""
_run_scheduler_test(mock_step, max_epoch=2, limit_train_batches=2, interval="step")


@mock.patch("torch.optim.lr_scheduler.StepLR.step", autospec=True)
@RunIf(min_gpus=1, deepspeed=True, standalone=True)
def test_deepspeed_scheduler_step_count_epoch(mock_step):
"""Test to ensure that the scheduler is called the correct amount of times during training when scheduler is
set to epoch."""
_run_scheduler_test(mock_step, max_epoch=2, limit_train_batches=2, interval="epoch")

set to step or epoch."""

def _run_scheduler_test(mock_step, max_epoch, limit_train_batches, interval):
class TestModel(BoringModel):
def configure_optimizers(self):
optimizer = torch.optim.SGD(self.layer.parameters(), lr=0.1)
Expand Down

0 comments on commit c66cd12

Please sign in to comment.