diff --git a/paddlenlp/data/dist_dataloader.py b/paddlenlp/data/dist_dataloader.py index e97cc60c84a8..5d5c6cc7512c 100644 --- a/paddlenlp/data/dist_dataloader.py +++ b/paddlenlp/data/dist_dataloader.py @@ -12,13 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import numpy as np import paddle from paddle.distributed import fleet from paddlenlp.utils.log import logger - -_MAX_DATA_DIM = 64 +from paddlenlp.utils.nested import ( + nested_broadcast_tensor, + nested_copy_place, + nested_empty_tensor, + nested_reduce_tensor, +) class DummyDataset(paddle.io.Dataset): @@ -53,6 +56,7 @@ def __init__( timeout=0, worker_init_fn=None, persistent_workers=False, + eval=False, ): if dataset is None: @@ -62,12 +66,15 @@ def __init__( super().__init__(dataset=dataset, batch_sampler=batch_sampler, collate_fn=collate_fn, num_workers=num_workers) self._hcg = fleet.get_hybrid_communicate_group() + self.eval = eval # Init pp data comm group. if self._hcg.get_pipe_parallel_world_size() > 1: self._pp_data_group = self._init_dataloader_comm_group() + self._pp_group = self._hcg.get_pipe_parallel_group() else: self._pp_data_group = None + self._pp_group = None self.mp_group = self._hcg.get_model_parallel_group() self.mp_rank = self._hcg.get_model_parallel_rank() @@ -78,10 +85,6 @@ def __init__( sharding_rank = self._hcg.get_sharding_parallel_rank() self._need_data = (self.mp_rank == 0) and (self.pp_rank == 0) - # When needed other data types, we can modify dtype_list. - self.dtype_list = [paddle.int64, paddle.float32, paddle.int32] - self._data_keys_list, self._data_keys_size = None, None - if self._need_data: self._dataloader = paddle.io.DataLoader( dataset, @@ -127,7 +130,6 @@ def _init_dataloader_comm_group(self): parallel_groups = topo.get_comm_list("pipe") for group in parallel_groups: - # only first rank and last rank ranks = [group[0], group[-1]] comm_group = paddle.distributed.new_group(ranks=ranks) if paddle.distributed.get_rank() in ranks: @@ -137,127 +139,68 @@ def _init_dataloader_comm_group(self): def __iter__(self): return self - def __next__(self): - data_keys_size = [0 for i in range(len(self.dtype_list))] - if self._need_data: - data = next(self._dataloader_iter) - data_keys = list(data.keys()) - - for key in data_keys: - if data[key].dtype not in self.dtype_list: - raise ValueError( - f"Dist dataloader requires dtype as `int64`, `float32` or `int32` currently, but got: {data[key].dtype}" + def _broadcast_data(self, data): + process_rank = paddle.distributed.get_rank() + if self.mp_group.nranks > 1: + if process_rank == self.mp_src_rank: + fake_data = [nested_reduce_tensor(data)] + else: + if data is not None: + logger.warning( + f"Your local rank {paddle.distributed.get_rank()} are forbidden to have a state_dict." ) - - data_list, data_keys_list = [], [] - for i, dtype in enumerate(self.dtype_list): - data_list.append([data[key] for key in data_keys if data[key].dtype == dtype]) - data_keys_list.append([key for key in data_keys if data[key].dtype == dtype]) - data_keys_size = [len(keys) for keys in data_keys_list] - - # Broadcast data keys size. - if self._data_keys_size is None: - if self.mp_group.nranks > 1 and self.pp_rank == 0: - paddle.distributed.broadcast_object_list(data_keys_size, src=self.mp_src_rank, group=self.mp_group) - if self._pp_data_group is not None: - paddle.distributed.broadcast_object_list( - data_keys_size, src=self._pp_data_group.ranks[0], group=self._pp_data_group - ) - self._data_keys_size = data_keys_size - - if not self._need_data: - data_keys_list = [[None for i in range(keys_size)] for keys_size in self._data_keys_size] - - # Broadcast data keys name. - if self._data_keys_list is None: - if self.mp_group.nranks > 1 and self.pp_rank == 0: - paddle.distributed.broadcast_object_list(data_keys_list, src=self.mp_src_rank, group=self.mp_group) - if self._pp_data_group is not None: - paddle.distributed.broadcast_object_list( - data_keys_list, src=self._pp_data_group.ranks[0], group=self._pp_data_group - ) - self._data_keys_list = data_keys_list - - # Broadcast data. - if not self._need_data: - data_list = [[None for i in range(keys_size)] for keys_size in self._data_keys_size] - - if self.mp_group.nranks > 1 and self.pp_rank == 0: - for i, dtype in enumerate(self.dtype_list): - if self._data_keys_size[i] > 0: - data_list[i] = broadcast_data_list( - data_list[i], dtype, self.mp_rank, self.mp_group, self.mp_src_rank + fake_data = [None] + if self._pp_group is not None: + if process_rank == self._pp_group.ranks[0]: + fake_data = [nested_reduce_tensor(data)] + else: + if data is not None: + logger.warning( + f"Your local rank {paddle.distributed.get_rank()} are forbidden to have a state_dict." ) + fake_data = [None] + if self.mp_group.nranks > 1 and self.pp_rank == 0: + paddle.distributed.broadcast_object_list( + fake_data, + src=self.mp_src_rank, + group=self.mp_group, + ) + if self._pp_group is not None: + paddle.distributed.broadcast_object_list( + fake_data, + src=self._pp_group.ranks[0], + group=self._pp_group, + ) - if self._pp_data_group is not None: - # Note(daisimng): In last stage of pp, we don't need input_ids. - # It will be removed in future. - for i, dtype in enumerate(self.dtype_list): - if self._data_keys_size[i] > 0: - data_list[i] = broadcast_data_list( - data_list[i], - dtype, - self.pp_rank, - self._pp_data_group, - self._pp_data_group.ranks[0], - ) - - out_data = {} - for keys, datas in zip(self._data_keys_list, data_list): - out_data.update([(k, d) for k, d in zip(keys, datas)]) - - return out_data - - -def broadcast_data_list(data_list, datatype, comm_rank=0, comm_group=None, src_rank=0): - """ - Broadcast data from src_rank to all ranks in comm_group. - """ - # Move to GPU and broadcast. - size_cpu = [] - if comm_rank == 0: - for data in data_list: - size_cpu.append(len(data.shape)) - size_cpu += data.shape - size_cpu = size_cpu + [0] * (_MAX_DATA_DIM - len(size_cpu)) - size_cuda = paddle.to_tensor(size_cpu) - paddle.distributed.broadcast(size_cuda, src_rank, group=comm_group).wait() - - size_cpu = size_cuda.tolist() - i = 0 - numel = 0 - sizes = [] - while size_cpu[i] > 0: - rank = size_cpu[i] - this_size = size_cpu[i + 1 : i + 1 + rank] - numel += int(np.prod(this_size)) - sizes.append(this_size) - i += rank + 1 - - if comm_rank == 0: - assert data.dtype == datatype, "input has data type {} which " "is different than {}".format( - data.dtype, datatype - ) - if paddle.is_compiled_with_cuda(): - data_b = paddle.concat([d.cuda().reshape([-1]) for d in data_list], 0) - else: - data_b = paddle.concat([d.reshape([-1]) for d in data_list], 0) + fake_data = fake_data[0] + if fake_data is None: + raise StopIteration - assert numel == sum([d.numel().item() for d in data_list]), (numel, [d.numel().item() for d in data_list]) - else: - if paddle.is_compiled_with_cuda(): - data_b = paddle.empty([numel], dtype=datatype).cuda() - else: - data_b = paddle.empty([numel], dtype=datatype) + dst_pp_group = self._pp_group if self.eval else self._pp_data_group + if self.mp_group.nranks > 1: + if process_rank != self.mp_src_rank: + data = nested_empty_tensor(fake_data) + if dst_pp_group is not None: + if process_rank != dst_pp_group.ranks[0]: + data = nested_empty_tensor(fake_data) - # Broadcast - paddle.distributed.broadcast(data_b, src_rank, group=comm_group).wait() + if self.mp_group.nranks > 1 and self.pp_rank == 0: + data = nested_broadcast_tensor(data, src=self.mp_src_rank, group=self.mp_group) + if dst_pp_group is not None: + data = nested_broadcast_tensor(data, src=dst_pp_group.ranks[0], group=dst_pp_group) + # for pp1 - pp_{n-1}, Paddle need to recevie empty dict for pipeline parallel. + if data is None: + data = {} - ret = [] - offset = 0 - for size in sizes: - numel = int(np.prod(size)) - ret.append(data_b[offset : offset + numel].reshape(size)) - offset += numel + return data - return ret + def __next__(self): + data = None + if self._need_data: + try: + data = next(self._dataloader_iter) + data = nested_copy_place(data, place=paddle.framework._current_expected_place()) + except: + pass + data = self._broadcast_data(data) + return data diff --git a/paddlenlp/trainer/plugins/unified_checkpoint.py b/paddlenlp/trainer/plugins/unified_checkpoint.py index f8b62a15b77e..9a14ebba2882 100644 --- a/paddlenlp/trainer/plugins/unified_checkpoint.py +++ b/paddlenlp/trainer/plugins/unified_checkpoint.py @@ -62,6 +62,7 @@ SAFE_WEIGHTS_NAME, ) from paddlenlp.utils.log import logger +from paddlenlp.utils.nested import nested_copy, nested_copy_place if is_safetensors_available(): from safetensors import safe_open @@ -1876,26 +1877,6 @@ def mapping_optimizer_tp_actions(tp_actions, optimizer_loaded_keys): return new_actions -def nested_copy(inputs): - if isinstance(inputs, dict): - outputs = {} - for key in list(inputs.keys()): - outputs[key] = nested_copy(inputs[key]) - return outputs - return inputs - - -def nested_copy_place(inputs, place=None, blocking=False): - if isinstance(inputs, dict): - outputs = {} - for key in list(inputs.keys()): - outputs[key] = nested_copy_place(inputs[key], place, blocking) - return outputs - if isinstance(inputs, paddle.Tensor): - inputs = inputs if inputs.place == place else inputs._copy_to(place, blocking) - return inputs - - def flatten_list(nested_list): flattened_list = [] for item in nested_list: diff --git a/paddlenlp/trainer/trainer.py b/paddlenlp/trainer/trainer.py index 419349e02d21..bf83420acf85 100644 --- a/paddlenlp/trainer/trainer.py +++ b/paddlenlp/trainer/trainer.py @@ -1419,8 +1419,6 @@ def get_eval_dataloader(self, eval_dataset: Optional[Dataset] = None) -> DataLoa if is_datasets_available() and eval_dataset is not None and isinstance(eval_dataset, datasets.Dataset): eval_dataset = self._remove_unused_columns(eval_dataset, description="evaluation") - _DataLoader = DistDataLoader if self.args.distributed_dataloader else DataLoader - if self._is_iterable_dataset(eval_dataset): if self.args.dataset_world_size > 1: eval_dataset = IterableDatasetShard( @@ -1431,24 +1429,41 @@ def get_eval_dataloader(self, eval_dataset: Optional[Dataset] = None) -> DataLoa process_index=self.args.dataset_rank, ) - return _DataLoader( - eval_dataset, - batch_size=self.args.per_device_eval_batch_size, - collate_fn=self.data_collator, - num_workers=self.args.dataloader_num_workers, - ) + if self.args.distributed_dataloader: + return DistDataLoader( + eval_dataset, + batch_size=self.args.per_device_eval_batch_size, + collate_fn=self.data_collator, + num_workers=0, + eval=True, + ) + else: + return DataLoader( + eval_dataset, + batch_size=self.args.per_device_eval_batch_size, + collate_fn=self.data_collator, + num_workers=0, + ) eval_sampler = self._get_eval_sampler(eval_dataset) if self.args.distributed_dataloader: logger.info("Eval using DistDataLoader.") - return _DataLoader( - eval_dataset, - batch_sampler=eval_sampler, - collate_fn=self.data_collator, - num_workers=self.args.dataloader_num_workers, - ) + return DistDataLoader( + eval_dataset, + batch_sampler=eval_sampler, + collate_fn=self.data_collator, + num_workers=self.args.dataloader_num_workers, + eval=True, + ) + else: + return DataLoader( + eval_dataset, + batch_sampler=eval_sampler, + collate_fn=self.data_collator, + num_workers=self.args.dataloader_num_workers, + ) def get_test_dataloader(self, test_dataset: Dataset) -> DataLoader: """ @@ -1469,8 +1484,6 @@ def get_test_dataloader(self, test_dataset: Dataset) -> DataLoader: if is_datasets_available() and test_dataset is not None and isinstance(test_dataset, datasets.Dataset): test_dataset = self._remove_unused_columns(test_dataset, description="test") - _DataLoader = DistDataLoader if self.args.distributed_dataloader else DataLoader - if self._is_iterable_dataset(test_dataset): if self.args.dataset_world_size > 1: test_dataset = IterableDatasetShard( @@ -1481,25 +1494,42 @@ def get_test_dataloader(self, test_dataset: Dataset) -> DataLoader: process_index=self.args.dataset_rank, ) - return _DataLoader( - test_dataset, - batch_size=self.args.per_device_eval_batch_size * self.world_size, - collate_fn=self.data_collator, # _get_collator_with_removed_columns - num_workers=self.args.dataloader_num_workers, - ) + if self.args.distributed_dataloader: + return DistDataLoader( + test_dataset, + batch_size=self.args.per_device_eval_batch_size * self.world_size, + collate_fn=self.data_collator, # _get_collator_with_removed_columns + num_workers=0, + eval=True, + ) + else: + return DataLoader( + test_dataset, + batch_size=self.args.per_device_eval_batch_size * self.world_size, + collate_fn=self.data_collator, # _get_collator_with_removed_columns + num_workers=0, + ) test_sampler = self._get_eval_sampler(test_dataset) if self.args.distributed_dataloader: logger.info("Test using DistDataLoader.") - # We use the same batch_size as for eval. - return _DataLoader( - test_dataset, - batch_sampler=test_sampler, - collate_fn=self.data_collator, - drop_last=self.args.dataloader_drop_last, - ) + # We use the same batch_size as for eval. + return DistDataLoader( + test_dataset, + batch_sampler=test_sampler, + collate_fn=self.data_collator, + drop_last=self.args.dataloader_drop_last, + eval=True, + ) + else: + return DataLoader( + test_dataset, + batch_sampler=test_sampler, + collate_fn=self.data_collator, + drop_last=self.args.dataloader_drop_last, + ) def create_optimizer_and_scheduler(self, num_training_steps: int): """ diff --git a/paddlenlp/trainer/utils/helper.py b/paddlenlp/trainer/utils/helper.py index ff68e51f127b..25f593f71e35 100644 --- a/paddlenlp/trainer/utils/helper.py +++ b/paddlenlp/trainer/utils/helper.py @@ -16,8 +16,6 @@ # This file is modified from # https://github.com/huggingface/transformers/blob/main/src/transformers -import collections -import copy import os from typing import Any, Optional @@ -27,6 +25,11 @@ from paddle.distributed import fleet from paddlenlp.utils.log import logger +from paddlenlp.utils.nested import ( + nested_broadcast_tensor, + nested_empty_tensor, + nested_reduce_tensor, +) __all__ = [ "distributed_concat", @@ -180,52 +183,6 @@ def distributed_file(filename): return filename -TensorHolder = collections.namedtuple("TensorHolder", ["shape", "dtype", "name"]) - - -def nested_reduce_tensor(tensor): - if isinstance(tensor, dict): - # copy tensor since it will be inplace modified dict - tensor = copy.copy(tensor) - for key in list(tensor.keys()): - tensor[key] = nested_reduce_tensor(tensor[key]) - if isinstance(tensor, (tuple, list)): - return type(tensor)(nested_reduce_tensor(t) for t in tensor) - - if isinstance(tensor, paddle.Tensor): - return TensorHolder(tensor.shape, tensor.dtype, tensor.name) - - return tensor - - -def nested_empty_tensor(tensor): - if isinstance(tensor, dict): - for key in list(tensor.keys()): - tensor[key] = nested_empty_tensor(tensor[key]) - if isinstance(tensor, list): - return type(tensor)(nested_empty_tensor(t) for t in tensor) - - # TensorHolder is tuple - if isinstance(tensor, TensorHolder): - t = paddle.empty(tensor.shape, dtype=tensor.dtype, name=tensor.name) - t.name = tensor.name - return t - - return tensor - - -def nested_broadcast_tensor(tensor, src=0, group=None): - if isinstance(tensor, dict): - for key in list(tensor.keys()): - tensor[key] = nested_broadcast_tensor(tensor[key], src=src, group=group) - if isinstance(tensor, list): - return type(tensor)(nested_broadcast_tensor(t, src=src, group=group) for t in tensor) - - if isinstance(tensor, paddle.Tensor): - paddle.distributed.broadcast(tensor, src=src, group=group, sync_op=True) - return tensor - - def broadcast_dp_optimizer(state_dict): if paddle.distributed.get_world_size() <= 1: return state_dict diff --git a/paddlenlp/utils/nested.py b/paddlenlp/utils/nested.py new file mode 100644 index 000000000000..27942b8cb256 --- /dev/null +++ b/paddlenlp/utils/nested.py @@ -0,0 +1,83 @@ +# Copyright (c) 2024 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import copy + +import paddle + +TensorHolder = collections.namedtuple("TensorHolder", ["shape", "dtype", "name"]) + + +def nested_reduce_tensor(tensor): + if isinstance(tensor, dict): + # copy tensor since it will be inplace modified dict + tensor = copy.copy(tensor) + for key in list(tensor.keys()): + tensor[key] = nested_reduce_tensor(tensor[key]) + if isinstance(tensor, (tuple, list)): + return type(tensor)(nested_reduce_tensor(t) for t in tensor) + + if isinstance(tensor, paddle.Tensor): + return TensorHolder(tensor.shape, tensor.dtype, tensor.name) + + return tensor + + +def nested_empty_tensor(tensor): + if isinstance(tensor, dict): + for key in list(tensor.keys()): + tensor[key] = nested_empty_tensor(tensor[key]) + if isinstance(tensor, list): + return type(tensor)(nested_empty_tensor(t) for t in tensor) + + # TensorHolder is tuple + if isinstance(tensor, TensorHolder): + t = paddle.empty(tensor.shape, dtype=tensor.dtype, name=tensor.name) + t.name = tensor.name + return t + + return tensor + + +def nested_broadcast_tensor(tensor, src=0, group=None): + if isinstance(tensor, dict): + for key in list(tensor.keys()): + tensor[key] = nested_broadcast_tensor(tensor[key], src=src, group=group) + if isinstance(tensor, list): + return type(tensor)(nested_broadcast_tensor(t, src=src, group=group) for t in tensor) + + if isinstance(tensor, paddle.Tensor): + paddle.distributed.broadcast(tensor, src=src, group=group, sync_op=True) + return tensor + + +def nested_copy(inputs): + if isinstance(inputs, dict): + outputs = {} + for key in list(inputs.keys()): + outputs[key] = nested_copy(inputs[key]) + return outputs + return inputs + + +def nested_copy_place(inputs, place=None, blocking=False): + if isinstance(inputs, dict): + outputs = {} + for key in list(inputs.keys()): + outputs[key] = nested_copy_place(inputs[key], place, blocking) + return outputs + if isinstance(inputs, paddle.Tensor): + inputs = inputs if inputs.place == place else inputs._copy_to(place, blocking) + return inputs