Skip to content

Commit

Permalink
Merge branch 'master' into compile_collectives
Browse files Browse the repository at this point in the history
  • Loading branch information
loadams authored Nov 6, 2024
2 parents 615dc47 + d2a4718 commit a91f78a
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ repos:
- id: trailing-whitespace

- repo: https://github.com/google/yapf
rev: v0.32.0
rev: v0.40.0
hooks:
- id: yapf

Expand Down
4 changes: 2 additions & 2 deletions deepspeed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ def initialize(args=None,
if hasattr(args, "deepscale_config") and args.deepscale_config is not None:
logger.warning("************ --deepscale_config is deprecated, please use --deepspeed_config ************")
if hasattr(args, "deepspeed_config"):
assert (args.deepspeed_config is
None), "Not sure how to proceed, we were given both a deepscale_config and deepspeed_config"
assert (args.deepspeed_config
is None), "Not sure how to proceed, we were given both a deepscale_config and deepspeed_config"
args.deepspeed_config = args.deepscale_config
args.deepscale_config = None

Expand Down
12 changes: 6 additions & 6 deletions deepspeed/autotuning/autotuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ def mp_size(self):
return self.autotuning_config.mp_size

def max_train_micro_batch_size_per_gpu(self):
if self.max_train_batch_size(
) and self.max_train_batch_size() > 0: # if the user specifies a max_train_batch_size
if self.max_train_batch_size() and self.max_train_batch_size(
) > 0: # if the user specifies a max_train_batch_size
max_train_micro_batch_size = self.max_train_batch_size() * self.mp_size() // (
self.exp_num_gpus * self.exp_num_nodes) # gradient accumulation steps >=1
return min(self.autotuning_config.max_train_micro_batch_size_per_gpu, max_train_micro_batch_size)
Expand Down Expand Up @@ -964,8 +964,8 @@ def get_min_max_micro_batch_size(self, stage, min_micro_batch_size, calculated_m
low = mid + 1
self.update_records(tuning_space_name, exp, metric_val, 1)
used_micro_batch_sizes.append(mid)
if prev_metric_val and (
(metric_val - prev_metric_val) / prev_metric_val) < METRIC_PERCENT_DIFF_CONST:
if prev_metric_val and ((metric_val - prev_metric_val) /
prev_metric_val) < METRIC_PERCENT_DIFF_CONST:
logger.info(f"performance plateaus at mbs = {low}")
break
prev_metric_val = metric_val
Expand Down Expand Up @@ -1026,8 +1026,8 @@ def get_tuning_micro_batch_size_list(self, min_micro_batch_size, max_micro_batch
# NUM_GPUS=$(( ${NUM_WORKERS} * ${NUM_GPUS_PER_WORKER} ))
# DP_SIZE=$(( ${NUM_GPUS} / (${PP_SIZE} * ${MP_SIZE}) ))
# GRAD_ACC_STEPS=$(( ${TARGET_GLOBAL_BATCH_SIZE} / (${BATCH_SIZE} * ${DP_SIZE}) ))
if self.max_train_batch_size(
) and self.max_train_batch_size() > 0: # if the user specifies a max_train_batch_size
if self.max_train_batch_size() and self.max_train_batch_size(
) > 0: # if the user specifies a max_train_batch_size
max_train_batch_size_per_gpu = self.max_train_batch_size() * self.mp_size() // (self.exp_num_gpus *
self.exp_num_nodes)
else:
Expand Down
4 changes: 2 additions & 2 deletions deepspeed/elasticity/elastic_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ def _invoke_run(self, role: str = "default") -> RunResult:
f" Waiting {self._exit_barrier_timeout} seconds for other agents to finish.")
self._exit_barrier()
return run_result
elif state in {WorkerState.UNHEALTHY, WorkerState.FAILED
} or len(participants) > len(rdzv_handler._state_holder.state.participants):
elif state in {WorkerState.UNHEALTHY, WorkerState.FAILED} or len(participants) > len(
rdzv_handler._state_holder.state.participants):
if self._remaining_restarts > 0:
log.info(f"[{role}] Worker group {state.name}. "
f"{self._remaining_restarts}/{spec.max_restarts} attempts left;"
Expand Down
7 changes: 4 additions & 3 deletions deepspeed/module_inject/replace_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,9 +496,10 @@ def conv2d_parallel_shard_weights(model, rank, world_size):
if not dist.is_initialized() or dist.get_rank() == 0:
print("Saving tp-sharded checkpoints")
torch.save(
OrderedDict({k: v
for k, v in dict(replaced_module.state_dict()).items()
if transformer_name not in k}), f'{config.save_mp_checkpoint_path}/{non_tp_ckpt_name}')
OrderedDict({
k: v
for k, v in dict(replaced_module.state_dict()).items() if transformer_name not in k
}), f'{config.save_mp_checkpoint_path}/{non_tp_ckpt_name}')

dtype_reprs = {
torch.float32: 'float32',
Expand Down
4 changes: 2 additions & 2 deletions deepspeed/runtime/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1012,8 +1012,8 @@ def _do_error_check(self):
self.gradient_accumulation_steps), "DeepSpeedConfig: {} is not defined".format(GRADIENT_ACCUMULATION_STEPS)

if self.zero_enabled:
assert (self.zero_optimization_stage <=
ZeroStageEnum.max_stage), "DeepSpeedConfig: Maximum supported ZeRO stage is {}".format(
assert (self.zero_optimization_stage
<= ZeroStageEnum.max_stage), "DeepSpeedConfig: Maximum supported ZeRO stage is {}".format(
ZeroStageEnum.max_stage)

if self.fp16_master_weights_and_gradients:
Expand Down
4 changes: 2 additions & 2 deletions deepspeed/runtime/eigenvalue.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ def compute_eigenvalue(self, module, device=None, scale=1.0):
eigenvalue_current, eigenvalue_previous = 1., 0.

while (i < self.max_iter) and abs(eigenvalue_current) > 0 and (abs(
(eigenvalue_current - eigenvalue_previous) / eigenvalue_current) >=
self.tol): # test convergence criteria
(eigenvalue_current - eigenvalue_previous) / eigenvalue_current)
>= self.tol): # test convergence criteria
eigenvalue_previous = eigenvalue_current

Hv = torch.autograd.grad(grads, params, grad_outputs=v, only_inputs=True, retain_graph=True)
Expand Down
7 changes: 4 additions & 3 deletions deepspeed/runtime/pipe/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,9 +640,10 @@ def _aggregate_total_loss(self):
self.dp_group_loss = losses[0].clone().detach()
agg_loss = losses[1].clone().detach()
if additional_losses is not None:
self.agg_additional_losses = OrderedDict(
{name: losses[2 + i].clone().detach()
for i, name in enumerate(additional_losses.keys())})
self.agg_additional_losses = OrderedDict({
name: losses[2 + i].clone().detach()
for i, name in enumerate(additional_losses.keys())
})
return agg_loss

def set_dataloader(self, loader):
Expand Down
4 changes: 2 additions & 2 deletions deepspeed/runtime/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ def has_overflow(self, params, has_moe_params=None):
elif self.mpu is not None:
if self.deepspeed is not None:
using_pipeline = hasattr(self.deepspeed, 'pipeline_enable_backward_allreduce')
if (using_pipeline and self.deepspeed.pipeline_enable_backward_allreduce is False) or (
not using_pipeline and self.deepspeed.enable_backward_allreduce is False):
if (using_pipeline and self.deepspeed.pipeline_enable_backward_allreduce
is False) or (not using_pipeline and self.deepspeed.enable_backward_allreduce is False):
dist.all_reduce(overflow_gpu, op=dist.ReduceOp.MAX, group=self.mpu.get_data_parallel_group())
dist.all_reduce(overflow_gpu, op=dist.ReduceOp.MAX, group=self.mpu.get_model_parallel_group())
elif self.deepspeed is not None and self.deepspeed.enable_backward_allreduce is False:
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/runtime/zero/test_zero_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ def test_throughput_calculation(self):
engine.tput_timer.stop(global_step=global_step)
duration = engine.tput_timer.end_time - engine.tput_timer.start_time
# step elapsed time is reset after gradient accumulation steps
assert engine.tput_timer.step_elapsed_time == (
0 if engine.tput_timer.global_step_count != engine.tput_timer.start_step else current_duration +
duration)
assert engine.tput_timer.step_elapsed_time == (0 if engine.tput_timer.global_step_count
!= engine.tput_timer.start_step else current_duration +
duration)
assert engine.tput_timer.total_elapsed_time == total_duration + duration

def test_ext_param_getattr(self):
Expand Down

0 comments on commit a91f78a

Please sign in to comment.