Skip to content

Commit

Permalink
DS Benchmarks QoL Improvements (microsoft#2120)
Browse files Browse the repository at this point in the history
Co-authored-by: Jeff Rasley <jerasley@microsoft.com>
  • Loading branch information
Quentin-Anthony and jeffra authored Jul 22, 2022
1 parent 6392f83 commit 8413b7f
Show file tree
Hide file tree
Showing 10 changed files with 260 additions and 80 deletions.
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ recursive-include requirements *.txt
recursive-include deepspeed *.cpp *.h *.cu *.hip *.tr *.cuh *.cc *.json
recursive-include csrc *.cpp *.h *.cu *.tr *.cuh *.cc
recursive-include op_builder *.py
recursive-include benchmarks *.py
Empty file added benchmarks/__init__.py
Empty file.
40 changes: 25 additions & 15 deletions benchmarks/communication/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,52 @@ Scan across message sizes:
deepspeed all_reduce.py --scan
</pre>

Each individual communication operation's benchmarks have separate benchmarking options. For `all_reduce.py`, for example:
2. Run all available communication benchmarks:

<pre>
deepspeed run_all.py
</pre>

Like the individual benchmarks, `run_all.py` supports scanning arguments for the max message size, bw-unit, etc. Simply pass the desired arguments to `run_all.py` and they'll be propagated to each comm op.

<pre>
usage: ds_bench [-h] [--local_rank LOCAL_RANK] [--trials TRIALS] [--warmup WARMUP] [--maxsize MAXSIZE] [--async-op] [--bw-unit {Gbps,GBps}] [--backend {nccl}] [--dist {deepspeed,torch}] [--scan] [--dtype DTYPE] [--mem-factor MEM_FACTOR] [--debug]
usage: ds_bench [-h] [--local_rank LOCAL_RANK] [--trials TRIALS] [--warmups WARMUPS] [--maxsize MAXSIZE] [--async-op] [--bw-unit {Gbps,GBps}] [--backend {nccl}] [--dist {deepspeed,torch}] [--scan] [--raw] [--all-reduce] [--all-gather] [--all-to-all]
[--pt2pt] [--broadcast] [--dtype DTYPE] [--mem-factor MEM_FACTOR] [--debug]

optional arguments:
-h, --help show this help message and exit
--local_rank LOCAL_RANK
--trials TRIALS Number of timed iterations
--warmup WARMUP Number of warmup (non-timed) iterations
--warmups WARMUPS Number of warmup (non-timed) iterations
--maxsize MAXSIZE Max message size as a power of 2
--async-op Enables non-blocking communication
--bw-unit {Gbps,GBps}
--backend {nccl} Communication library to use
--dist {deepspeed,torch}
Distributed DL framework to use
--scan Enables scanning all message sizes
--raw Print the message size and latency without units
--all-reduce Run all_reduce
--all-gather Run all_gather
--all-to-all Run all_to_all
--pt2pt Run pt2pt
--broadcast Run broadcast
--dtype DTYPE PyTorch tensor dtype
--mem-factor MEM_FACTOR
Proportion of max available GPU memory to use for single-size evals
--debug Enables alltoall debug prints
--debug Enables all_to_all debug prints
</pre>

2. Run all available communication benchmarks:
Note that `ds_bench` is a pre-packaged wrapper around `run_all.py`. Users can pass the same arguments as well:

<pre>
deepspeed run_all.py
<path to deepspeed>/bin/ds_bench --scan --trials=10
</pre>

Like the individual benchmarks, `run_all.py` supports scanning arguments for the max message size, bw-unit, etc. Simply pass the desired arguments to `run_all.py` and they'll be propagated to each comm op.

Note that `ds_bench` is a pre-packaged wrapper around `run_all.py`. Users can pass the same arguments as well:
Finally, users can choose specific communication operations to run in `run_all.py` or `ds_bench` by passing them as arguments (all operations are run by default). For example:

<pre>
<path to deepspeed>/bin/ds_bench --scan --trials=10
deepspeed run_all.py --scan --all-reduce --all-to-all --broadcast
</pre>


Expand All @@ -58,8 +69,7 @@ Note that `ds_bench` is a pre-packaged wrapper around `run_all.py`. Users can pa
To add new communication benchmarks, follow this general procedure:

1. Copy a similar benchmark file (e.g. to add `reduce_scatter`, copy `all_reduce.py` as a template)
2. Add a new bw formula in `utils.get_bw`
3. Add a new maximum tensor element formula in `utils.max_numel`
4. Replace comm op calls in new file with find-replace
5. Find a good default `mem_factor` for use in `run_<collective>_single()` function
6. Add new comm op to `run_all.py`
2. Add a new bw formula in `utils.get_bw`, a new maximum tensor element formula in `utils.max_numel`, and a new arg in `utils.benchmark_parser`
3. Replace comm op calls in new file with find-replace
4. Find a good default `mem_factor` for use in `run_<collective>_single()` function
5. Add new comm op to `run_all.py`
28 changes: 15 additions & 13 deletions benchmarks/communication/all_gather.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
import math


# Run allgather and print metrics
def timed_allgather(input, output, args):
# Run all_gather and print metrics
def timed_all_gather(input, output, args):
if args.dist == 'torch':
import torch.distributed as dist
elif args.dist == 'deepspeed':
import deepspeed.comm as dist

sync_all()
# Warmup, establish connections, etc.
for i in range(args.warmup):
# Warmups, establish connections, etc.
for i in range(args.warmups):
# use all_gather_base if available
if args.dist == 'torch':
if hasattr(torch.distributed, "_all_gather_base"):
Expand Down Expand Up @@ -53,23 +53,25 @@ def timed_allgather(input, output, args):
avg_duration = duration / args.trials
size = input.element_size() * input.nelement()
n = dist.get_world_size()
tput, busbw = get_bw('allgather', size, avg_duration, args)
tput, busbw = get_bw('all_gather', size, avg_duration, args)
tput_str, busbw_str, duration_str = get_metric_strings(args, tput, busbw, avg_duration)
desc = f'{input.nelement()}x{input.element_size()}'

if not args.raw:
size = convert_size(size)

print_rank_0(
f"{convert_size(size):<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}"
)
f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")


def run_allgather(local_rank, args):
def run_all_gather(local_rank, args):
if args.dist == 'torch':
import torch.distributed as dist
elif args.dist == 'deepspeed':
import deepspeed.comm as dist

# Prepare benchmark header
print_header(args, 'allgather')
print_header(args, 'all_gather')
global_rank = dist.get_rank()
world_size = dist.get_world_size()

Expand Down Expand Up @@ -103,7 +105,7 @@ def run_allgather(local_rank, args):
sync_all()
break
sync_all()
timed_allgather(input, output, args)
timed_all_gather(input, output, args)
else:
# all_gather_base saves memory
if (args.dist == 'torch'
Expand All @@ -115,7 +117,7 @@ def run_allgather(local_rank, args):
mem_factor = args.mem_factor
# Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor
sync_all()
elements_per_gpu = max_numel(comm_op='allgather',
elements_per_gpu = max_numel(comm_op='all_gather',
dtype=getattr(torch,
args.dtype),
mem_factor=mem_factor,
Expand Down Expand Up @@ -143,11 +145,11 @@ def run_allgather(local_rank, args):
return

sync_all()
timed_allgather(input, output, args)
timed_all_gather(input, output, args)


if __name__ == "__main__":
args = benchmark_parser().parse_args()
rank = args.local_rank
init_processes(local_rank=rank, args=args)
run_allgather(local_rank=rank, args=args)
run_all_gather(local_rank=rank, args=args)
26 changes: 14 additions & 12 deletions benchmarks/communication/all_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
import math


def timed_allreduce(input, args):
def timed_all_reduce(input, args):
if args.dist == 'torch':
import torch.distributed as dist
elif args.dist == 'deepspeed':
import deepspeed.comm as dist

sync_all()
# Warmup, establish connections, etc.
for i in range(args.warmup):
# Warmups, establish connections, etc.
for i in range(args.warmups):
dist.all_reduce(input, async_op=args.async_op)
sync_all()

Expand All @@ -31,23 +31,25 @@ def timed_allreduce(input, args):
avg_duration = duration / args.trials
size = input.element_size() * input.nelement()
n = dist.get_world_size()
tput, busbw = get_bw('allreduce', size, avg_duration, args)
tput, busbw = get_bw('all_reduce', size, avg_duration, args)
tput_str, busbw_str, duration_str = get_metric_strings(args, tput, busbw, avg_duration)
desc = f'{input.nelement()}x{input.element_size()}'

if not args.raw:
size = convert_size(size)

print_rank_0(
f"{convert_size(size):<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}"
)
f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")


def run_allreduce(local_rank, args):
def run_all_reduce(local_rank, args):
if args.dist == 'torch':
import torch.distributed as dist
elif args.dist == 'deepspeed':
import deepspeed.comm as dist

# Prepare benchmark header
print_header(args, 'allreduce')
print_header(args, 'all_reduce')

world_size = dist.get_world_size()
global_rank = dist.get_rank()
Expand Down Expand Up @@ -75,11 +77,11 @@ def run_allreduce(local_rank, args):
sync_all()
break
sync_all()
timed_allreduce(input, args)
timed_all_reduce(input, args)
else:
# Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor
# Don't need output tensor, so we double mem_factor
elements_per_gpu = max_numel(comm_op='allreduce',
elements_per_gpu = max_numel(comm_op='all_reduce',
dtype=getattr(torch,
args.dtype),
mem_factor=args.mem_factor * 2,
Expand All @@ -99,11 +101,11 @@ def run_allreduce(local_rank, args):
sync_all()
return
sync_all()
timed_allreduce(input, args)
timed_all_reduce(input, args)


if __name__ == "__main__":
args = benchmark_parser().parse_args()
rank = args.local_rank
init_processes(local_rank=rank, args=args)
run_allreduce(local_rank=rank, args=args)
run_all_reduce(local_rank=rank, args=args)
26 changes: 14 additions & 12 deletions benchmarks/communication/all_to_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
import math


def timed_alltoall(input, output, args):
def timed_all_to_all(input, output, args):
if args.dist == 'torch':
import torch.distributed as dist
elif args.dist == 'deepspeed':
import deepspeed.comm as dist

sync_all()
# Warmup, establish connections, etc.
for i in range(args.warmup):
# Warmups, establish connections, etc.
for i in range(args.warmups):
dist.all_to_all_single(output, input, async_op=args.async_op)
sync_all()

Expand All @@ -31,16 +31,18 @@ def timed_alltoall(input, output, args):
avg_duration = duration / args.trials
size = input.element_size() * input.nelement()
n = dist.get_world_size()
tput, busbw = get_bw('alltoall', size, avg_duration, args)
tput, busbw = get_bw('all_to_all', size, avg_duration, args)
tput_str, busbw_str, duration_str = get_metric_strings(args, tput, busbw, avg_duration)
desc = f'{input.nelement()}x{input.element_size()}'

if not args.raw:
size = convert_size(size)

print_rank_0(
f"{convert_size(size):<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}"
)
f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")


def run_alltoall(local_rank, args):
def run_all_to_all(local_rank, args):
if args.dist == 'torch':
import torch.distributed as dist
elif args.dist == 'deepspeed':
Expand All @@ -49,7 +51,7 @@ def run_alltoall(local_rank, args):
world_size = dist.get_world_size()
global_rank = dist.get_rank()
# Prepare benchmark header
print_header(args, 'alltoall')
print_header(args, 'all_to_all')

if args.scan:
M_LIST = []
Expand All @@ -76,10 +78,10 @@ def run_alltoall(local_rank, args):
sync_all()
break
sync_all()
timed_alltoall(input, output, args)
timed_all_to_all(input, output, args)
else:
# Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor
elements_per_gpu = max_numel(comm_op='alltoall',
elements_per_gpu = max_numel(comm_op='all_to_all',
dtype=getattr(torch,
args.dtype),
mem_factor=args.mem_factor,
Expand Down Expand Up @@ -113,7 +115,7 @@ def run_alltoall(local_rank, args):
print(f"Before AllToAll Input List at rank {global_rank}: {input}")
dist.barrier()

timed_alltoall(input, output, args)
timed_all_to_all(input, output, args)

if args.debug:
for i in range(world_size):
Expand All @@ -126,4 +128,4 @@ def run_alltoall(local_rank, args):
args = benchmark_parser().parse_args()
rank = args.local_rank
init_processes(local_rank=rank, args=args)
run_alltoall(local_rank=rank, args=args)
run_all_to_all(local_rank=rank, args=args)
Loading

0 comments on commit 8413b7f

Please sign in to comment.