|
7 | 7 | "source": [
|
8 | 8 | "(vicuna_lightning_deepspeed_finetuning)=\n",
|
9 | 9 | "\n",
|
10 |
| - "# Fine-tune `vicuna-13b` with Ray LightningTrainer and DeepSpeed\n", |
| 10 | + "# Fine-tune `vicuna-13b` with Lightning and DeepSpeed\n", |
11 | 11 | "\n",
|
12 |
| - "In this example, we will demonstrate how to perform full fine-tuning for a [`vicuna-13b-v1.3`](https://huggingface.co/lmsys/vicuna-13b-v1.3) model using LightningTrainer with the DeepSpeed ZeRO-3 strategy.\n", |
| 12 | + "In this example, we will demonstrate how to perform full fine-tuning for a [`vicuna-13b-v1.3`](https://huggingface.co/lmsys/vicuna-13b-v1.3) model using Ray Train PyTorch Lightning integrations with the DeepSpeed ZeRO-3 strategy.\n", |
13 | 13 | "\n",
|
14 | 14 | "- [DeepSpeed](<https://github.com/microsoft/DeepSpeed>) is an open-source deep learning optimization library for PyTorch. It's designed to reduce computing power and memory usage, and to train large distributed models by leveraging state-of-the-art innovations like ZeRO, 3D-Parallelism, DeepSpeed-MoE, and ZeRO-Infinity. \n",
|
15 | 15 | "- PyTorch Lightning offers a [DeepSpeed integration](https://lightning.ai/docs/pytorch/stable/api/pytorch_lightning.strategies.DeepSpeedStrategy.html), which provides a simple interface to configure the knobs for DeepSpeed and automatically trigger your training process with the DeepSpeed Engine.\n",
|
16 |
| - "- {class}`Ray LightningTrainer <ray.train.lightning.LightningTrainer>` allows you to easily scale your PyTorch Lightning job across multiple nodes in a Ray cluster, without worrying about the underlying cluster management, autoscaling, and distributed process group settings.\n", |
| 16 | + "- {class}`Ray TorchTrainer <ray.train.torch.TorchTrainer>` allows you to easily scale your PyTorch Lightning job across multiple nodes in a Ray cluster, without worrying about the underlying cluster management, autoscaling, and distributed process group settings.\n", |
17 | 17 | "\n",
|
18 | 18 | "Our demo aims to illustrate how these three tools can be combined effectively to finetune the Vicuna-13B model, leveraging the strengths of each to create an efficient and high-performance deep learning solution.\n"
|
19 | 19 | ]
|
|
24 | 24 | "metadata": {},
|
25 | 25 | "source": [
|
26 | 26 | "```{note}\n",
|
27 |
| - "This is an advanced example of Large Language Model fine-tuning with Ray Train. If you're a beginner or new to the concepts of Ray Train and LightningTrainer, it would be beneficial to first explore the introductory documentation below to build a foundational understanding. \n", |
| 27 | + "This is an advanced example of Large Language Model fine-tuning with Ray Train. If you're a beginner or new to the concepts of Ray Train and our Lightning integrations, it would be beneficial to first explore the introductory documentation below to build a foundational understanding. \n", |
28 | 28 | "- [Ray Train Key Concepts](train-key-concepts) \n",
|
29 | 29 | "- [Ray Data Key Concepts](data_key_concepts)\n",
|
30 |
| - "- {ref}`[Basic] Image Classification with LightningTrainer <lightning_mnist_example>`\n", |
31 |
| - "- {ref}`[Intermediate] Using LightningTrainer with Ray Data <lightning_advanced_example>`\n", |
| 30 | + "- {ref}`[Basic] Image Classification with PyTorch Lightning and Ray Train <lightning_mnist_example>`\n", |
| 31 | + "- {ref}`[Intermediate] Fine-tuning Lightning Modules with with Ray Data <lightning_advanced_example>`\n", |
32 | 32 | "```\n"
|
33 | 33 | ]
|
34 | 34 | },
|
|
81 | 81 | "```"
|
82 | 82 | ]
|
83 | 83 | },
|
| 84 | + { |
| 85 | + "cell_type": "code", |
| 86 | + "execution_count": null, |
| 87 | + "metadata": { |
| 88 | + "tags": [ |
| 89 | + "remove-cell" |
| 90 | + ] |
| 91 | + }, |
| 92 | + "outputs": [], |
| 93 | + "source": [ |
| 94 | + "# TODO(@justinvyu): Remove it\n", |
| 95 | + "import os\n", |
| 96 | + "os.environ[\"RAY_AIR_NEW_PERSISTENCE_MODE\"] = \"1\"" |
| 97 | + ] |
| 98 | + }, |
84 | 99 | {
|
85 | 100 | "cell_type": "code",
|
86 | 101 | "execution_count": null,
|
|
102 | 117 | " \"accelerate==0.20.3\",\n",
|
103 | 118 | " \"transformers==4.30.2\",\n",
|
104 | 119 | " \"pytorch_lightning==2.0.3\",\n",
|
105 |
| - " ]\n", |
| 120 | + " ],\n", |
| 121 | + " \"env_vars\": {\"RAY_AIR_NEW_PERSISTENCE_MODE\": \"1\"} # TODO(@justinvyu): Remove it\n", |
106 | 122 | " }\n",
|
107 | 123 | ")"
|
108 | 124 | ]
|
|
219 | 235 | "processed_ds = ray_ds.map_batches(fill_prompt, batch_format=\"pandas\").map_batches(tokenize, batch_format=\"pandas\")"
|
220 | 236 | ]
|
221 | 237 | },
|
| 238 | + { |
| 239 | + "cell_type": "code", |
| 240 | + "execution_count": null, |
| 241 | + "metadata": { |
| 242 | + "tags": [ |
| 243 | + "remove-cell" |
| 244 | + ] |
| 245 | + }, |
| 246 | + "outputs": [], |
| 247 | + "source": [ |
| 248 | + "# To accelerate release tests\n", |
| 249 | + "processed_ds = processed_ds.limit(16 * 8 * 16) # each worker has 16 batches" |
| 250 | + ] |
| 251 | + }, |
222 | 252 | {
|
223 | 253 | "attachments": {},
|
224 | 254 | "cell_type": "markdown",
|
225 | 255 | "metadata": {},
|
226 | 256 | "source": [
|
227 |
| - "## Define your model\n", |
| 257 | + "## Define a Lightning Module\n", |
228 | 258 | "\n",
|
229 | 259 | "Here we load the pre-trained model weights from HuggingFace Model Hub, and wrap them into `pl.LightningModule`. We adopted the efficient model initialization techniques introduced in [Lightning-transformers](https://github.com/Lightning-Universe/lightning-transformers) to avoid unnecessary full weights loading."
|
230 | 260 | ]
|
|
306 | 336 | "cell_type": "markdown",
|
307 | 337 | "metadata": {},
|
308 | 338 | "source": [
|
309 |
| - "## Training Configurations\n", |
| 339 | + "## DeepSpeed Configurations\n", |
310 | 340 | "\n",
|
311 | 341 | "Before training, let's calculate the memory usage of finetuning a `vicuna-13b` model. Assume we are using FP16 mixed-precision training, and the optimizer is Adam with FP32 states.\n",
|
312 | 342 | "\n",
|
|
324 | 354 | "metadata": {},
|
325 | 355 | "outputs": [],
|
326 | 356 | "source": [
|
327 |
| - "from ray.train.lightning import LightningTrainer, LightningConfigBuilder\n", |
328 | 357 | "from transformers import AutoConfig\n",
|
329 | 358 | "\n",
|
330 | 359 | "config = AutoConfig.from_pretrained(MODEL_NAME)\n",
|
|
342 | 371 | " \"stage3_prefetch_bucket_size\": 0.9 * HIDDEN_SIZE * HIDDEN_SIZE,\n",
|
343 | 372 | " \"stage3_param_persistence_threshold\": 10 * HIDDEN_SIZE,\n",
|
344 | 373 | " },\n",
|
345 |
| - "}\n", |
346 |
| - "\n", |
347 |
| - "lightning_config = (\n", |
348 |
| - " LightningConfigBuilder()\n", |
349 |
| - " .module(cls=Vicuna13BModel)\n", |
350 |
| - " .trainer(\n", |
351 |
| - " max_epochs=1,\n", |
352 |
| - " accelerator=\"gpu\",\n", |
353 |
| - " precision=\"bf16-mixed\",\n", |
354 |
| - " accumulate_grad_batches=2,\n", |
355 |
| - " )\n", |
356 |
| - " .strategy(name=\"deepspeed\", config=deepspeed_configs)\n", |
357 |
| - " .checkpointing(save_top_k=0, save_weights_only=True, save_last=True)\n", |
358 |
| - ")" |
359 |
| - ] |
360 |
| - }, |
361 |
| - { |
362 |
| - "cell_type": "code", |
363 |
| - "execution_count": null, |
364 |
| - "metadata": { |
365 |
| - "tags": [ |
366 |
| - "remove-cell" |
367 |
| - ] |
368 |
| - }, |
369 |
| - "outputs": [], |
370 |
| - "source": [ |
371 |
| - "from pytorch_lightning.callbacks import TQDMProgressBar\n", |
372 |
| - "\n", |
373 |
| - "# Create a customized progress bar for LightningTrainer\n", |
374 |
| - "class VicunaProgressBar(TQDMProgressBar):\n", |
375 |
| - " def __init__(self, num_iters_per_epoch, *args, **kwargs):\n", |
376 |
| - " super().__init__(*args, **kwargs)\n", |
377 |
| - " self.num_iters_per_epoch = num_iters_per_epoch\n", |
378 |
| - "\n", |
379 |
| - " def on_train_epoch_start(self, trainer, *_):\n", |
380 |
| - " super().on_train_epoch_start(trainer, *_)\n", |
381 |
| - " self.train_progress_bar.reset(self.num_iters_per_epoch)\n", |
382 |
| - "\n", |
383 |
| - "\n", |
384 |
| - "total_batches = processed_ds.count()\n", |
385 |
| - "num_iters_per_epoch = total_batches // (NUM_WORKERS * BATCH_SIZE_PER_WORKER)\n", |
386 |
| - "progress_bar = VicunaProgressBar(num_iters_per_epoch)\n", |
387 |
| - "\n", |
388 |
| - "\n", |
389 |
| - "lightning_config.trainer(\n", |
390 |
| - " callbacks=[progress_bar],\n", |
391 |
| - " # Take a subset to accelerate release tests\n", |
392 |
| - " limit_train_batches=20,\n", |
393 |
| - ")" |
| 374 | + "}" |
394 | 375 | ]
|
395 | 376 | },
|
396 | 377 | {
|
397 | 378 | "attachments": {},
|
398 | 379 | "cell_type": "markdown",
|
399 | 380 | "metadata": {},
|
400 | 381 | "source": [
|
401 |
| - "Finally, combine all the configurations with {class}`LightningConfigBuilder <ray.train.lightning.LightningConfigBuilder>` and instantiate a LightningTrainer. " |
| 382 | + "## Define your training function\n", |
| 383 | + "\n", |
| 384 | + "Finally, define the training function that will be launched on multiple workers. The training function is generally the same as the pure pytorch Lightning training code, with additional Ray Train utilities:\n", |
| 385 | + "\n", |
| 386 | + "- {class}`~ray.train.lightning.RayDeepSpeedStrategy`: Same argument list as Lightning DeepSpeedStrategy but integrated with Ray Train.\n", |
| 387 | + "- {class}`~ray.train.lightning.RayLightningEnvironment`: Lightning environments for Ray cluster.\n", |
| 388 | + "- {class}`~ray.train.lightning.RayTrainReportCallback`: On each epoch end, it reports the checkpoint from each worker to the ray train (distributed checkpointing).\n", |
| 389 | + "- {meth}`~ray.train.lightning.prepare_trainer`: Validate your lightning Trainer configurations.\n", |
| 390 | + "\n", |
| 391 | + "For Ray Data ingestion, we fetched the preprocessed and sharded dataset with {meth}`~ray.train.get_dataset_shard`, and created a dataloader with {meth}`~ray.data.Dataset.iter_torch_batches`. It returns a custom iterator that replaces the Torch DataLoader.\n" |
402 | 392 | ]
|
403 | 393 | },
|
404 | 394 | {
|
|
407 | 397 | "metadata": {},
|
408 | 398 | "outputs": [],
|
409 | 399 | "source": [
|
| 400 | + "import ray.train\n", |
410 | 401 | "from ray.train import CheckpointConfig, RunConfig, ScalingConfig\n",
|
| 402 | + "from ray.train.torch import TorchTrainer\n", |
| 403 | + "from ray.train.lightning import (\n", |
| 404 | + " prepare_trainer,\n", |
| 405 | + " RayDeepSpeedStrategy, \n", |
| 406 | + " RayLightningEnvironment, \n", |
| 407 | + " RayTrainReportCallback\n", |
| 408 | + ")\n", |
411 | 409 | "\n",
|
412 |
| - "trainer = LightningTrainer(\n", |
413 |
| - " lightning_config=lightning_config.build(),\n", |
| 410 | + "\n", |
| 411 | + "def train_func(config):\n", |
| 412 | + " \"\"\"Training function for each worker.\"\"\"\n", |
| 413 | + "\n", |
| 414 | + " # Unpack the `train_loop_config`\n", |
| 415 | + " max_epochs = config[\"max_epochs\"]\n", |
| 416 | + " batch_size = config[\"batch_size\"]\n", |
| 417 | + " accumulate_grad_batches = config[\"accumulate_grad_batches\"]\n", |
| 418 | + "\n", |
| 419 | + " model = Vicuna13BModel()\n", |
| 420 | + " \n", |
| 421 | + " # Prepare Ray Data Ingestion\n", |
| 422 | + " train_ds = ray.train.get_dataset_shard(\"train\")\n", |
| 423 | + " train_dataloader = train_ds.iter_torch_batches(batch_size=batch_size)\n", |
| 424 | + " \n", |
| 425 | + " pl_trainer = pl.Trainer(\n", |
| 426 | + " devices=\"auto\",\n", |
| 427 | + " accelerator=\"auto\",\n", |
| 428 | + " strategy=RayDeepSpeedStrategy(config=deepspeed_configs),\n", |
| 429 | + " plugins=[RayLightningEnvironment()],\n", |
| 430 | + " callbacks=[RayTrainReportCallback()],\n", |
| 431 | + " enable_checkpointing=False, # RayTrainReportCallback will save the checkpoints\n", |
| 432 | + " max_epochs=max_epochs,\n", |
| 433 | + " precision=\"bf16-mixed\",\n", |
| 434 | + " accumulate_grad_batches=accumulate_grad_batches,\n", |
| 435 | + " )\n", |
| 436 | + " pl_trainer = prepare_trainer(pl_trainer)\n", |
| 437 | + "\n", |
| 438 | + " pl_trainer.fit(model, train_dataloaders=train_dataloader)\n", |
| 439 | + " \n", |
| 440 | + "\n", |
| 441 | + "trainer = TorchTrainer(\n", |
| 442 | + " train_loop_per_worker=train_func,\n", |
| 443 | + " train_loop_config={\n", |
| 444 | + " \"max_epochs\": 1,\n", |
| 445 | + " \"batch_size\": BATCH_SIZE_PER_WORKER,\n", |
| 446 | + " \"accumulate_grad_batches\": 2\n", |
| 447 | + " },\n", |
414 | 448 | " run_config=RunConfig(\n",
|
415 | 449 | " name=\"vicuna-13b-finetune\",\n",
|
416 | 450 | " storage_path=\"s3://anyscale-staging-data-cld-kvedzwag2qa8i5bjxuevf5i7/air-release-tests\",\n",
|
417 |
| - " checkpoint_config=CheckpointConfig(\n", |
418 |
| - " num_to_keep=1,\n", |
419 |
| - " # Enable distributed checkpointing\n", |
420 |
| - " _checkpoint_keep_all_ranks=True,\n", |
421 |
| - " _checkpoint_upload_from_workers=True,\n", |
422 |
| - " ),\n", |
| 451 | + " checkpoint_config=CheckpointConfig(num_to_keep=1),\n", |
423 | 452 | " ),\n",
|
424 | 453 | " scaling_config=ScalingConfig(\n",
|
425 | 454 | " num_workers=NUM_WORKERS,\n",
|
426 | 455 | " use_gpu=True,\n",
|
427 | 456 | " resources_per_worker={\"CPU\": 15, \"GPU\": 1},\n",
|
428 | 457 | " ),\n",
|
429 | 458 | " datasets={\"train\": processed_ds},\n",
|
430 |
| - " datasets_iter_config={\"batch_size\": BATCH_SIZE_PER_WORKER},\n", |
431 | 459 | ")"
|
432 | 460 | ]
|
433 | 461 | },
|
434 |
| - { |
435 |
| - "attachments": {}, |
436 |
| - "cell_type": "markdown", |
437 |
| - "metadata": {}, |
438 |
| - "source": [ |
439 |
| - "```{tip}\n", |
440 |
| - "\n", |
441 |
| - "Here, we highly recommend saving checkpoints with cloud storage and enabling distributed checkpointing by setting `_checkpoint_keep_all_ranks` and `_checkpoint_upload_from_workers` to True when training huge models. Otherwise, all checkpoint shards will be synced to the head node, which may introduce enormous syncing overhead and even cause out-of-memory.\n", |
442 |
| - "\n", |
443 |
| - "```" |
444 |
| - ] |
445 |
| - }, |
446 | 462 | {
|
447 | 463 | "attachments": {},
|
448 | 464 | "cell_type": "markdown",
|
449 | 465 | "metadata": {},
|
450 | 466 | "source": [
|
451 | 467 | "## Model Fine-tuning\n",
|
452 | 468 | "\n",
|
453 |
| - "Once everything is configured in LightningTrainer, training becomes easy. Simply call `trainer.fit()`, and your workload will be scaled to the Ray cluster, initiating ZeRO-3 parallel training." |
| 469 | + "Once everything is configured in TorchTrainer, training becomes easy. Simply call `trainer.fit()`, and your workload will be scaled to the Ray cluster, initiating ZeRO-3 parallel training." |
454 | 470 | ]
|
455 | 471 | },
|
456 | 472 | {
|
|
1022 | 1038 | "- Training takes: 36:06 = 2166s\n",
|
1023 | 1039 | "- Training + initialization + checkpointing takes 2473s\n",
|
1024 | 1040 | "\n",
|
1025 |
| - "Therefore, the model initialization and checkpoint syncing takes 307s. It will be amortized when you have larger datasets and spend more time on training." |
| 1041 | + "Model initialization and checkpoint synchronization took 307 seconds. It will be amortized as you have larger datasets and take more time to train." |
1026 | 1042 | ]
|
1027 | 1043 | },
|
1028 | 1044 | {
|
|
1091 | 1107 | "source": [
|
1092 | 1108 | "import os\n",
|
1093 | 1109 | "\n",
|
1094 |
| - "os.system(f\"awsv2 s3 sync {result.checkpoint.uri} /mnt/local_storage/checkpoint\")" |
| 1110 | + "os.system(f\"awsv2 s3 sync s3://{result.checkpoint.path} /mnt/local_storage\")" |
1095 | 1111 | ]
|
1096 | 1112 | },
|
1097 | 1113 | {
|
|
1136 | 1152 | " torch.save(vicuna_state_dict, os.path.join(zero_ckpt_dir, \"full_model.pt\"))\n",
|
1137 | 1153 | "\n",
|
1138 | 1154 | "\n",
|
1139 |
| - "full_model_ckpt_path = \"/mnt/local_storage/checkpoint/model/full_model.pt\"\n", |
1140 |
| - "extract_fp32_ckpt_from_zero(\"/mnt/local_storage/checkpoint/model\")" |
| 1155 | + "full_model_ckpt_path = \"/mnt/local_storage/checkpoint.ckpt/full_model.pt\"\n", |
| 1156 | + "extract_fp32_ckpt_from_zero(\"/mnt/local_storage/checkpoint.ckpt\")" |
1141 | 1157 | ]
|
1142 | 1158 | },
|
1143 | 1159 | {
|
|
0 commit comments