Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update yapf version and style settings #3098

Merged
merged 1 commit into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ repos:
- id: requirements-txt-fixer
- id: trailing-whitespace

- repo: https://github.com/pre-commit/mirrors-yapf
rev: v0.31.0
- repo: https://github.com/google/yapf
rev: v0.32.0
hooks:
- id: yapf

Expand Down
4 changes: 2 additions & 2 deletions .style.yapf
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[style]
SPLIT_ALL_COMMA_SEPARATED_VALUES = true
COLUMN_LIMIT = 89
SPLIT_ALL_COMMA_SEPARATED_VALUES = false
COLUMN_LIMIT = 119
1 change: 1 addition & 0 deletions accelerator/abstract_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@


class DeepSpeedAccelerator(ABC):

def __init__(self):
self._name = None
self._communication_backend_name = None
Expand Down
5 changes: 2 additions & 3 deletions accelerator/cuda_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@


class CUDA_Accelerator(DeepSpeedAccelerator):

def __init__(self):
self._name = 'cuda'
self._communication_backend_name = 'nccl'
Expand All @@ -26,9 +27,7 @@ def __init__(self):
for _, module_name, _ in pkgutil.iter_modules([os.path.dirname(op_builder_module.__file__)]):
# avoid self references
if module_name != 'all_ops' and module_name != 'builder':
module = importlib.import_module("{}.{}".format(
op_builder_dir,
module_name))
module = importlib.import_module("{}.{}".format(op_builder_dir, module_name))
for member_name in module.__dir__():
if member_name.endswith(
'Builder'
Expand Down
9 changes: 2 additions & 7 deletions accelerator/real_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,8 @@ def _validate_accelerator(accel_obj):
# accelerator.abstractor_accelerator
# or deepspeed.accelerator.abstract_accelerator, consider accel_obj
# is a conforming object
if not ((dsa1 != None and isinstance(accel_obj,
dsa1)) or
(dsa2 != None and isinstance(accel_obj,
dsa2))):
raise AssertionError(
f'{accel_obj.__class__.__name__} accelerator is not subclass of DeepSpeedAccelerator'
)
if not ((dsa1 != None and isinstance(accel_obj, dsa1)) or (dsa2 != None and isinstance(accel_obj, dsa2))):
raise AssertionError(f'{accel_obj.__class__.__name__} accelerator is not subclass of DeepSpeedAccelerator')

# TODO: turn off is_available test since this breaks tests
#assert accel_obj.is_available(), \
Expand Down
49 changes: 14 additions & 35 deletions benchmarks/communication/all_gather.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ def timed_all_gather(input, output, args):
if hasattr(torch.distributed, "_all_gather_base"):
dist._all_gather_base(output, input, group=None, async_op=args.async_op)
else:
output_tensors = list(
torch.chunk(output_tensor,
cdb.get_world_size(group)))
output_tensors = list(torch.chunk(output_tensor, cdb.get_world_size(group)))
dist.all_gather(output_tensors, input_tensor, group=group, async_op=True)
elif args.dist == 'deepspeed':
dist.allgather_fn(output, input, group=None, async_op=args.async_op)
Expand All @@ -38,9 +36,7 @@ def timed_all_gather(input, output, args):
if hasattr(torch.distributed, "_all_gather_base"):
dist._all_gather_base(output, input, group=None, async_op=args.async_op)
else:
output_tensors = list(
torch.chunk(output_tensor,
cdb.get_world_size(group)))
output_tensors = list(torch.chunk(output_tensor, cdb.get_world_size(group)))
dist.all_gather(output_tensors, input_tensor, group=group, async_op=True)
elif args.dist == 'deepspeed':
dist.allgather_fn(output, input, group=None, async_op=args.async_op)
Expand All @@ -58,8 +54,7 @@ def timed_all_gather(input, output, args):
if not args.raw:
size = convert_size(size)

print_rank_0(
f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")
print_rank_0(f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")


def run_all_gather(local_rank, args):
Expand All @@ -84,22 +79,15 @@ def run_all_gather(local_rank, args):
for M in M_LIST:
global_rank = dist.get_rank()
try:
mat = torch.ones(world_size,
M,
dtype=getattr(
torch,
args.dtype)).to(
get_accelerator().device_name(local_rank))
mat = torch.ones(world_size, M,
dtype=getattr(torch, args.dtype)).to(get_accelerator().device_name(local_rank))
sync_all()
input = ((mat.mul_(float(global_rank))).view(-1))
# Delete original mat to avoid OOM
del mat
get_accelerator().empty_cache()
output = torch.zeros(input.nelement() * world_size,
dtype=getattr(
torch,
args.dtype)).to(
get_accelerator().device_name(local_rank))
dtype=getattr(torch, args.dtype)).to(get_accelerator().device_name(local_rank))
except RuntimeError as e:
if 'out of memory' in str(e):
if dist.get_rank() == 0:
Expand All @@ -110,41 +98,32 @@ def run_all_gather(local_rank, args):
timed_all_gather(input, output, args)
else:
# all_gather_base saves memory
if (args.dist == 'torch'
and hasattr(torch.distributed,
"_all_gather_base")) or (args.dist == 'deepspeed'
and dist.has_allgather_base):
if (args.dist == 'torch' and hasattr(torch.distributed, "_all_gather_base")) or (args.dist == 'deepspeed'
and dist.has_allgather_base):
mem_factor = args.mem_factor + 0.2
else:
mem_factor = args.mem_factor
# Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor
sync_all()
elements_per_gpu = max_numel(comm_op='all_gather',
dtype=getattr(torch,
args.dtype),
dtype=getattr(torch, args.dtype),
mem_factor=mem_factor,
local_rank=local_rank,
args=args)
try:
mat = torch.ones(elements_per_gpu,
dtype=getattr(torch,
args.dtype)).to(
get_accelerator().device_name(local_rank))
mat = torch.ones(elements_per_gpu, dtype=getattr(torch,
args.dtype)).to(get_accelerator().device_name(local_rank))
# multiply each GPU's tensor by the rank to ease debugging
input = ((mat.mul_(float(global_rank))).view(-1))
# Delete original mat to avoid OOM
del mat
get_accelerator().empty_cache()
output = torch.zeros(
elements_per_gpu * world_size,
dtype=getattr(torch,
args.dtype)).to(get_accelerator().device_name(local_rank))
output = torch.zeros(elements_per_gpu * world_size,
dtype=getattr(torch, args.dtype)).to(get_accelerator().device_name(local_rank))
except RuntimeError as e:
if 'out of memory' in str(e):
if dist.get_rank() == 0:
print(
'WARNING: Ran out of GPU memory. Try to reduce the --mem-factor argument!'
)
print('WARNING: Ran out of GPU memory. Try to reduce the --mem-factor argument!')
sync_all()
return

Expand Down
24 changes: 7 additions & 17 deletions benchmarks/communication/all_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ def timed_all_reduce(input, args):
if not args.raw:
size = convert_size(size)

print_rank_0(
f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")
print_rank_0(f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")


def run_all_reduce(local_rank, args):
Expand All @@ -63,12 +62,8 @@ def run_all_reduce(local_rank, args):
for M in M_LIST:
global_rank = dist.get_rank()
try:
mat = torch.ones(world_size,
M,
dtype=getattr(
torch,
args.dtype)).to(
get_accelerator().device_name(local_rank))
mat = torch.ones(world_size, M,
dtype=getattr(torch, args.dtype)).to(get_accelerator().device_name(local_rank))
sync_all()
input = ((mat.mul_(float(global_rank))).view(-1))
except RuntimeError as e:
Expand All @@ -83,23 +78,18 @@ def run_all_reduce(local_rank, args):
# Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor
# Don't need output tensor, so we double mem_factor
elements_per_gpu = max_numel(comm_op='all_reduce',
dtype=getattr(torch,
args.dtype),
dtype=getattr(torch, args.dtype),
mem_factor=args.mem_factor * 2,
local_rank=local_rank,
args=args)
try:
mat = torch.ones(elements_per_gpu,
dtype=getattr(torch,
args.dtype)).to(
get_accelerator().device_name(local_rank))
mat = torch.ones(elements_per_gpu, dtype=getattr(torch,
args.dtype)).to(get_accelerator().device_name(local_rank))
input = ((mat.mul_(float(global_rank))).view(-1))
except RuntimeError as e:
if 'out of memory' in str(e):
if dist.get_rank() == 0:
print(
'WARNING: Ran out of GPU memory. Try to reduce the --mem-factor argument!'
)
print('WARNING: Ran out of GPU memory. Try to reduce the --mem-factor argument!')
sync_all()
return
sync_all()
Expand Down
33 changes: 11 additions & 22 deletions benchmarks/communication/all_to_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ def timed_all_to_all(input, output, args):
if not args.raw:
size = convert_size(size)

print_rank_0(
f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")
print_rank_0(f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")


def run_all_to_all(local_rank, args):
Expand All @@ -62,12 +61,8 @@ def run_all_to_all(local_rank, args):
for M in M_LIST:
global_rank = dist.get_rank()
try:
mat = torch.ones(world_size,
M,
dtype=getattr(
torch,
args.dtype)).to(
get_accelerator().device_name(local_rank))
mat = torch.ones(world_size, M,
dtype=getattr(torch, args.dtype)).to(get_accelerator().device_name(local_rank))
assert mat.numel() % world_size == 0, f"tensor cannot be divided in {world_size} chunks"
sync_all()
input = ((mat.mul_(float(global_rank))).view(-1))
Expand All @@ -83,31 +78,25 @@ def run_all_to_all(local_rank, args):
else:
# Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor
elements_per_gpu = max_numel(comm_op='all_to_all',
dtype=getattr(torch,
args.dtype),
dtype=getattr(torch, args.dtype),
mem_factor=args.mem_factor,
local_rank=local_rank,
args=args)
try:
mat = torch.ones(elements_per_gpu,
dtype=getattr(torch,
args.dtype)).to(
get_accelerator().device_name(local_rank))
assert mat.numel() % world_size == 0, f"tensor with {mat.numel()} elements cannot be divided in {world_size} chunks"
mat = torch.ones(elements_per_gpu, dtype=getattr(torch,
args.dtype)).to(get_accelerator().device_name(local_rank))
assert mat.numel(
) % world_size == 0, f"tensor with {mat.numel()} elements cannot be divided in {world_size} chunks"
input = ((mat.mul_(float(global_rank))).view(-1))
# Delete original mat to avoid OOM
del mat
get_accelerator().empty_cache()
output = torch.zeros(
elements_per_gpu,
dtype=getattr(torch,
args.dtype)).to(get_accelerator().device_name(local_rank))
output = torch.zeros(elements_per_gpu,
dtype=getattr(torch, args.dtype)).to(get_accelerator().device_name(local_rank))
except RuntimeError as e:
if 'out of memory' in str(e):
if dist.get_rank() == 0:
print(
'WARNING: Ran out of GPU memory. Try to reduce the --mem-factor argument!'
)
print('WARNING: Ran out of GPU memory. Try to reduce the --mem-factor argument!')
sync_all()
return
sync_all()
Expand Down
24 changes: 7 additions & 17 deletions benchmarks/communication/broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ def timed_broadcast(input, args):
if not args.raw:
size = convert_size(size)

print_rank_0(
f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")
print_rank_0(f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")


def run_broadcast(local_rank, args):
Expand All @@ -64,12 +63,8 @@ def run_broadcast(local_rank, args):
for M in M_LIST:
global_rank = dist.get_rank()
try:
mat = torch.ones(world_size,
M,
dtype=getattr(
torch,
args.dtype)).to(
get_accelerator().device_name(local_rank))
mat = torch.ones(world_size, M,
dtype=getattr(torch, args.dtype)).to(get_accelerator().device_name(local_rank))
sync_all()
input = ((mat.mul_(float(global_rank))).view(-1))
except RuntimeError as e:
Expand All @@ -84,23 +79,18 @@ def run_broadcast(local_rank, args):
# Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor
# Don't need output tensor, so we double mem_factor
elements_per_gpu = max_numel(comm_op='broadcast',
dtype=getattr(torch,
args.dtype),
dtype=getattr(torch, args.dtype),
mem_factor=args.mem_factor * 2,
local_rank=local_rank,
args=args)
try:
mat = torch.ones(elements_per_gpu,
dtype=getattr(torch,
args.dtype)).to(
get_accelerator().device_name(local_rank))
mat = torch.ones(elements_per_gpu, dtype=getattr(torch,
args.dtype)).to(get_accelerator().device_name(local_rank))
input = ((mat.mul_(float(global_rank))).view(-1))
except RuntimeError as e:
if 'out of memory' in str(e):
if dist.get_rank() == 0:
print(
'WARNING: Ran out of GPU memory. Try to reduce the --mem-factor argument!'
)
print('WARNING: Ran out of GPU memory. Try to reduce the --mem-factor argument!')
sync_all()
return
sync_all()
Expand Down
24 changes: 7 additions & 17 deletions benchmarks/communication/pt2pt.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ def timed_pt2pt(input, args):
if not args.raw:
size = convert_size(size)

print_rank_0(
f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")
print_rank_0(f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")


def run_pt2pt(local_rank, args):
Expand All @@ -82,12 +81,8 @@ def run_pt2pt(local_rank, args):
for M in M_LIST:
global_rank = dist.get_rank()
try:
mat = torch.ones(world_size,
M,
dtype=getattr(
torch,
args.dtype)).to(
get_accelerator().device_name(local_rank))
mat = torch.ones(world_size, M,
dtype=getattr(torch, args.dtype)).to(get_accelerator().device_name(local_rank))
sync_all()
input = ((mat.mul_(float(global_rank))).view(-1))
except RuntimeError as e:
Expand All @@ -102,23 +97,18 @@ def run_pt2pt(local_rank, args):
# Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor
# Don't need output tensor, so double mem_factor
elements_per_gpu = max_numel(comm_op='pt2pt',
dtype=getattr(torch,
args.dtype),
dtype=getattr(torch, args.dtype),
mem_factor=args.mem_factor * 2,
local_rank=local_rank,
args=args)
try:
mat = torch.ones(elements_per_gpu,
dtype=getattr(torch,
args.dtype)).to(
get_accelerator().device_name(local_rank))
mat = torch.ones(elements_per_gpu, dtype=getattr(torch,
args.dtype)).to(get_accelerator().device_name(local_rank))
input = ((mat.mul_(float(global_rank))).view(-1))
except RuntimeError as e:
if 'out of memory' in str(e):
if dist.get_rank() == 0:
print(
'WARNING: Ran out of GPU memory. Try to reduce the --mem-factor argument!'
)
print('WARNING: Ran out of GPU memory. Try to reduce the --mem-factor argument!')
sync_all()
return
sync_all()
Expand Down
Loading