Open
Description
Bug description
When using DeepSpeed, the changes of checkpoint (add/remove key) in on_save_checkpoint
are not being preserved. Switching strategy to ddp
, the changes are saved as expected.
Environment
- Lightning: 2.0.3
- Deepspeed: 0.10.1
What version are you seeing the problem on?
v2.0
How to reproduce the bug
import os
import torch
from torch.utils.data import DataLoader, Dataset
from pytorch_lightning import LightningModule, Trainer
from pytorch_lightning.callbacks import ModelCheckpoint
from lightning.pytorch.utilities.deepspeed import ds_checkpoint_dir
from deepspeed.utils.zero_to_fp32 import (
get_fp32_state_dict_from_zero_checkpoint,
get_model_state_file,
get_optim_files,
)
class RandomDataset(Dataset):
def __init__(self, size, length):
self.len = length
self.data = torch.randn(length, size)
def __getitem__(self, index):
return self.data[index]
def __len__(self):
return self.len
class BoringModel(LightningModule):
def __init__(self, remove_key=None):
super().__init__()
self.layer = torch.nn.Linear(32, 2)
self.remove_key = remove_key
def forward(self, x):
return self.layer(x)
def training_step(self, batch, batch_idx):
loss = self(batch).sum()
self.log("train_loss", loss)
return {"loss": loss}
def validation_step(self, batch, batch_idx):
loss = self(batch).sum()
self.log("valid_loss", loss)
def test_step(self, batch, batch_idx):
loss = self(batch).sum()
self.log("test_loss", loss)
def configure_optimizers(self):
return torch.optim.SGD(self.layer.parameters(), lr=0.1)
def on_save_checkpoint(self, checkpoint) -> None:
if self.remove_key is not None:
print(f"{checkpoint['state_dict'].keys()} in state_dict ")
self.remove_params(checkpoint, key=self.remove_key)
print(f"{checkpoint['state_dict'].keys()} in state_dict after remove params\n")
def remove_params(self, checkpoint, key) -> None:
del_keys = []
for k in checkpoint["state_dict"]:
if key in k:
del_keys.append(k)
print(f"{len(del_keys)} keys to remove")
for k in del_keys:
checkpoint["state_dict"].pop(k)
def run(key, strategy):
train_data = DataLoader(RandomDataset(32, 64), batch_size=2)
val_data = DataLoader(RandomDataset(32, 64), batch_size=2)
test_data = DataLoader(RandomDataset(32, 64), batch_size=2)
model = BoringModel(key)
trainer = Trainer(
strategy=strategy,
accelerator='gpu',
devices=1,
callbacks=[ModelCheckpoint(save_top_k=1, save_last=True)],
default_root_dir=os.getcwd(),
limit_train_batches=1,
limit_val_batches=1,
limit_test_batches=1,
num_sanity_val_steps=0,
max_epochs=10,
enable_model_summary=False,
)
trainer.fit(model, train_dataloaders=train_data, val_dataloaders=val_data)
trainer.test(model, dataloaders=test_data)
def load_ckpt(checkpoint_dir, tag=None, map_location="cpu"):
CPU_DEVICE = map_location
state_dict = get_fp32_state_dict_from_zero_checkpoint(checkpoint_dir, tag)
# additional logic to ensure we keep the lightning state dict as well from rank 0.
deepspeed_states = [
"module",
"optimizer",
"lr_scheduler",
"csr_tensor_module_names",
"skipped_steps",
"global_steps",
"dp_world_size",
"mp_world_size",
]
checkpoint_dir = ds_checkpoint_dir(checkpoint_dir)
optim_files = get_optim_files(checkpoint_dir)
optim_state = torch.load(optim_files[0], map_location=CPU_DEVICE)
zero_stage = optim_state["optimizer_state_dict"]["zero_stage"]
model_file = get_model_state_file(checkpoint_dir, zero_stage)
client_state = torch.load(model_file, map_location=CPU_DEVICE)
client_state = {
key: value
for key, value in client_state.items()
if key not in deepspeed_states
}
state_dict = {k.partition("module.")[2]: state_dict[k] for k in state_dict}
client_state["state_dict"] = state_dict
return client_state
if __name__ == "__main__":
run(None, 'deepspeed')
run("layer.weight", 'deepspeed')
run(None, 'ddp')
run("layer.weight", 'ddp')
ckpt_v0 = load_ckpt('./lightning_logs/version_0/checkpoints/last.ckpt')
print(f"keys in ckpt_v0 {ckpt_v0['state_dict'].keys()} \n")
ckpt_v1 = load_ckpt('./lightning_logs/version_1/checkpoints/last.ckpt')
print(f"keys in ckpt_v1 {ckpt_v1['state_dict'].keys()} \n")
ckpt_v2 = torch.load('./lightning_logs/version_2/checkpoints/last.ckpt')
print(f"keys in ckpt_v2 {ckpt_v2['state_dict'].keys()} \n")
ckpt_v3 = torch.load('./lightning_logs/version_3/checkpoints/last.ckpt')
print(f"keys in ckpt_v3 {ckpt_v3['state_dict'].keys()} \n")
Error messages and logs
Processing zero checkpoint './lightning_logs/version_0/checkpoints/last.ckpt/checkpoint'
Detected checkpoint of type zero stage 2, world_size: 1
Parsing checkpoint created by deepspeed==0.10.1
Reconstructed fp32 state dict with 2 params 66 elements
keys in ckpt_v0 dict_keys(['layer.weight', 'layer.bias'])
Processing zero checkpoint './lightning_logs/version_1/checkpoints/last.ckpt/checkpoint'
Detected checkpoint of type zero stage 2, world_size: 1
Parsing checkpoint created by deepspeed==0.10.1
Reconstructed fp32 state dict with 2 params 66 elements
keys in ckpt_v1 dict_keys(['layer.weight', 'layer.bias'])
keys in ckpt_v2 odict_keys(['layer.weight', 'layer.bias'])
keys in ckpt_v3 odict_keys(['layer.bias'])
Environment
Current environment
#- Lightning Component (e.g. Trainer, LightningModule, LightningApp, LightningWork, LightningFlow):
#- PyTorch Lightning Version (e.g., 1.5.0):
#- Lightning App Version (e.g., 0.5.2):
#- PyTorch Version (e.g., 2.0):
#- Python version (e.g., 3.9):
#- OS (e.g., Linux):
#- CUDA/cuDNN version:
#- GPU models and configuration:
#- How you installed Lightning(`conda`, `pip`, source):
#- Running environment of LightningApp (e.g. local, cloud):
More info
No response
cc @awaelchli