Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions deepspeed/runtime/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,9 @@ def steps_per_print(self):
def zero_allgather_partitions(self):
return self._config.zero_config.allgather_partitions

def zero_round_robin_gradients(self):
return self._config.zero_config.round_robin_gradients

def dump_state(self):
return self._config.dump_state

Expand Down Expand Up @@ -592,7 +595,7 @@ def _configure_with_arguments(self, args, mpu):
if "OMPI_COMM_WORLD_LOCAL_RANK" in os.environ:
ompi_local_rank = os.environ.get("OMPI_COMM_WORLD_LOCAL_RANK")
local_rank = os.environ.get('LOCAL_RANK', ompi_local_rank)
assert ompi_local_rank == local_rank, f"LOCAL_RANK ({local_rank}) != OMPI_COMM_WORLD_LOCAL_RANK ({mpi_local_rank}), " \
assert ompi_local_rank == local_rank, f"LOCAL_RANK ({local_rank}) != OMPI_COMM_WORLD_LOCAL_RANK ({ompi_local_rank}), " \
"not sure how to proceed as we're seeing conficting local rank info."
os.environ['LOCAL_RANK'] = local_rank

Expand Down Expand Up @@ -926,6 +929,15 @@ def _configure_zero_optimizer(self, optimizer):
gradient_predivide=self.gradient_predivide)
elif zero_stage <= ZERO_OPTIMIZATION_GRADIENTS:
overlap_comm = self.zero_overlap_comm()
contiguous_gradients = self.zero_contiguous_gradients()
round_robin_gradients = self.zero_round_robin_gradients()

# Overlap and contiguous grads are meaningless in stage 1 and are ignored
if zero_stage == ZERO_OPTIMIZATION_OPTIMIZER_STATES:
overlap_comm = False
contiguous_gradients = False
round_robin_gradients = False

if isinstance(self.module, PipelineModule):
if overlap_comm:
logger.warning(
Expand All @@ -940,7 +952,7 @@ def _configure_zero_optimizer(self, optimizer):
dynamic_loss_scale=self.dynamic_loss_scale(),
dynamic_loss_args=self.dynamic_loss_scale_args(),
clip_grad=self.gradient_clipping(),
contiguous_gradients=self.zero_contiguous_gradients(),
contiguous_gradients=contiguous_gradients,
reduce_bucket_size=self.zero_reduce_bucket_size(),
allgather_bucket_size=self.zero_allgather_bucket_size(),
dp_process_group=self.data_parallel_group,
Expand All @@ -952,7 +964,8 @@ def _configure_zero_optimizer(self, optimizer):
gradient_predivide_factor=self.gradient_predivide_factor(),
gradient_accumulation_steps=self.gradient_accumulation_steps(),
ignore_unused_parameters=self.zero_ignore_unused_parameters(),
partition_grads=zero_stage == ZERO_OPTIMIZATION_GRADIENTS)
partition_grads=zero_stage == ZERO_OPTIMIZATION_GRADIENTS,
round_robin_gradients=round_robin_gradients)
elif zero_stage == ZERO_OPTIMIZATION_WEIGHTS:
logger.info("Initializing ZeRO Stage 3") if dist.get_rank() == 0 else None
from deepspeed.runtime.zero.stage3 import FP16_DeepSpeedZeroOptimizer_Stage3
Expand Down Expand Up @@ -1163,6 +1176,10 @@ def forward(self, *inputs, **kwargs):
return loss

def allreduce_gradients(self, bucket_size=MEMORY_OPT_ALLREDUCE_SIZE):
# Pass (PP) gas boundary flag to optimizer (required for zero)
self.optimizer.is_gradient_accumulation_boundary = self.is_gradient_accumulation_boundary(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is self.optimizer guaranteed to have is_gradient_accumulation_boundary attribute?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We traced the break to this line. Crash went away after commenting this line.

)

# ZeRO stage 2 communicates during non gradient accumulation boundaries as well
if self.zero_optimization_partition_gradients():
self.optimizer.overlapping_partition_gradients_reduce_epilogue()
Expand Down
6 changes: 6 additions & 0 deletions deepspeed/runtime/zero/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(self, param_dict):
self.gather_fp16_weights_on_model_save = None

self.ignore_unused_parameters = None
self.round_robin_gradients = None

if ZERO_OPTIMIZATION in param_dict.keys():
zero_config_dict = param_dict[ZERO_OPTIMIZATION]
Expand Down Expand Up @@ -184,3 +185,8 @@ def _initialize(self, zero_config_dict):
self.legacy_stage1 = get_scalar_param(zero_config_dict,
ZERO_OPTIMIZATION_LEGACY_STAGE1,
ZERO_OPTIMIZATION_LEGACY_STAGE1_DEFAULT)

self.round_robin_gradients = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_ROUND_ROBIN_GRADIENTS,
ZERO_OPTIMIZATION_ROUND_ROBIN_GRADIENTS_DEFAULT)
11 changes: 9 additions & 2 deletions deepspeed/runtime/zero/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
"sub_group_size" : 1000000000000,
"offload_param": {...},
"offload_optimizer": {...},
"ignore_unused_parameters": [true|false]
"ignore_unused_parameters": [true|false],
"round_robin_gradients": [true|false]
}
}
'''
Expand Down Expand Up @@ -124,6 +125,10 @@
ZERO_OPTIMIZATION_LEGACY_STAGE1 = "legacy_stage1"
ZERO_OPTIMIZATION_LEGACY_STAGE1_DEFAULT = False

# Stage 2 - partition gradients in a round robin fashsion to load-balance reduction and offload copying
ZERO_OPTIMIZATION_ROUND_ROBIN_GRADIENTS = 'round_robin_gradients'
ZERO_OPTIMIZATION_ROUND_ROBIN_GRADIENTS_DEFAULT = False

#yapf: disable
ZERO_OPTIMIZATION_DEFAULT = {
ZERO_OPTIMIZATION_STAGE:
Expand Down Expand Up @@ -161,5 +166,7 @@
ZERO_OPTIMIZATION_IGNORE_UNUSED_PARAMETERS:
ZERO_OPTIMIZATION_IGNORE_UNUSED_PARAMETERS_DEFAULT,
ZERO_OPTIMIZATION_LEGACY_STAGE1:
ZERO_OPTIMIZATION_LEGACY_STAGE1_DEFAULT
ZERO_OPTIMIZATION_LEGACY_STAGE1_DEFAULT,
ZERO_OPTIMIZATION_ROUND_ROBIN_GRADIENTS:
ZERO_OPTIMIZATION_ROUND_ROBIN_GRADIENTS_DEFAULT
}
18 changes: 13 additions & 5 deletions deepspeed/runtime/zero/stage2.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,14 @@ def __init__(self,
gradient_predivide_factor=1.0,
gradient_accumulation_steps=1,
ignore_unused_parameters=True,
partition_grads=True):
partition_grads=True,
round_robin_gradients=False):

if dist.get_rank() == 0:
logger.info(f"Reduce bucket size {reduce_bucket_size}")
logger.info(f"Allgather bucket size {allgather_bucket_size}")
logger.info(f"CPU Offload: {cpu_offload}")
logger.info(f'Round robin gradient partitioning: {round_robin_gradients}')
# The fused optimizer does all the work. We need this layer for two reason:
# 1. maintain same user API from apex.fp16_utils
# 2. keep common stuff here in case we need to add ne552w fused optimizer later
Expand Down Expand Up @@ -159,6 +161,7 @@ def __init__(self,
self.gradient_accumulation_steps = gradient_accumulation_steps
self.micro_step_id = 0
self.ignore_unused_parameters = ignore_unused_parameters
self.round_robin_gradients = round_robin_gradients

self.extra_large_param_to_reduce = None

Expand Down Expand Up @@ -232,10 +235,15 @@ def __init__(self,
# This ensures that gradients are reduced in a fashion such that ownership round robins among the ranks.
# For example, rather than 3 gradients (g_n+2, g_n+1, g_n) that are reduced consecutively belonging
# to the same rank, instead they will belong to 3 ranks (r_m+2, r_m+1, r_m).
round_robin_tensors, round_robin_indices = self._round_robin_reorder(
self.fp16_groups[i],
dist.get_world_size(group=self.dp_process_group)
)
if self.round_robin_gradients:
round_robin_tensors, round_robin_indices = self._round_robin_reorder(
self.fp16_groups[i],
dist.get_world_size(group=self.dp_process_group)
)
else:
round_robin_tensors = self.fp16_groups[i]
round_robin_indices = list(range(len(self.fp16_groups[i])))

self.round_robin_fp16_groups.append(round_robin_tensors)
self.round_robin_fp6_indices.append(round_robin_indices)

Expand Down