Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data Loading] Round-based per-epoch shuffling data loader for distributed training. #15531

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
09a4caa
Test scripts
stephanie-wang Apr 1, 2021
cc18afa
Consolidate data generation and benchmarks, extend benchmarking scrip…
clarkzinzow Apr 7, 2021
d89d963
Misc. updates.
clarkzinzow Apr 8, 2021
2a86123
Send only relevant chunks to reducers.
clarkzinzow Apr 8, 2021
8e73711
Add from-memory shuffle, add throughput stats collection.
clarkzinzow Apr 8, 2021
af2193f
Updates from paired programming session.
clarkzinzow Apr 9, 2021
f48b338
Measure consumer times from the start of the round.
clarkzinzow Apr 9, 2021
3f3fd5c
Measure consumer times from the start of the epoch.
clarkzinzow Apr 10, 2021
4e73f12
Add instrumentation of the shuffle stages, write out trial, round, an…
clarkzinzow Apr 10, 2021
10670b5
Benchmark batch script.
clarkzinzow Apr 11, 2021
863a183
Keep shuffle rounds stable with changing number of trainers, include …
clarkzinzow Apr 12, 2021
332f687
Added new TODOs.
clarkzinzow Apr 13, 2021
31ca63f
Add benchmark results.
clarkzinzow Apr 13, 2021
35baf30
Add multi-epoch and pipeline throttling support to shufflers.
clarkzinzow Apr 14, 2021
5b05a05
Updated benchmark results.
clarkzinzow Apr 15, 2021
45cf38d
Add support for collecting object store stats.
clarkzinzow Apr 15, 2021
4b07ac9
Add support for configurable mappers and reducers, simplify data gene…
clarkzinzow Apr 20, 2021
b98b76c
Make batch consumers plugable.
clarkzinzow Apr 21, 2021
df2a6b5
Add prototype dataset abstraction.
clarkzinzow Apr 21, 2021
1da710f
Fix consumer batching, make stats collection optional, fix pickled pr…
clarkzinzow Apr 22, 2021
299bb6a
Reorged shuffling data loader benchmarks and implementation.
clarkzinzow Apr 27, 2021
d44e268
Added PyTorch iterable dataset integration.
clarkzinzow Apr 27, 2021
c75b1de
Queue object refs instead of actual data.
clarkzinzow Apr 27, 2021
a00ea2a
Moved to random sampling implementation.
clarkzinzow Apr 27, 2021
3f8b432
Consolidated cache map and map stages in from memory shuffle.
clarkzinzow Apr 28, 2021
3e25b4c
Use smart_open for S3 file reading/writing, use snappy compression, u…
clarkzinzow Apr 28, 2021
62ae87e
Refactored backpressure to not require nested shuffle, epoch, and rou…
clarkzinzow Apr 28, 2021
ff90de1
Remove shuffle rounds, optimize pipeline backpressure, remove extra r…
clarkzinzow May 3, 2021
769fd66
Fix formatting.
clarkzinzow May 3, 2021
71b7d2d
Focus on TorchShufflingDataset.
clarkzinzow May 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix formatting.
  • Loading branch information
clarkzinzow committed May 4, 2021
commit 769fd665621ccfc35a329dec720c4a8e904340e2
128 changes: 44 additions & 84 deletions benchmarks/shuffling_data_loader/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@
from ray.experimental.data_loader.shuffle import (
shuffle_from_disk, shuffle_from_memory_with_stats,
shuffle_from_memory_no_stats)
from ray.experimental.data_loader.stats import (
process_stats, human_readable_size)
from ray.experimental.data_loader.stats import (process_stats,
human_readable_size)

from ray.experimental.data_loader.data_generation import generate_data


# TODOs:
# - [DONE] Add support for multiple epochs in a single trial.
# - [DONE] Add task graph for from memory shuffler to external doc.
Expand All @@ -31,7 +30,6 @@
# - Explore streaming implementation of cache map stage, where we sample and
# pop one round partition at a time.


# TODOs:
# - [DONE] Instrument profiling:
# - Get some basic metrics: disk read time, shuffle time between map and
Expand All @@ -53,7 +51,6 @@
# - is arrived at iteratively, can vary across models
# 4M rows/group, 256k rows/batch -> 170MB/file


DEFAULT_DATA_DIR = "/mnt/disk0/benchmark_scratch"
DEFAULT_STATS_DIR = "./results"

Expand All @@ -64,24 +61,21 @@ def dummy_batch_consumer(consumer_idx, epoch, batches):
pass


def run_trials(
num_epochs,
filenames,
num_reducers,
num_trainers,
max_concurrent_epochs,
utilization_sample_period,
collect_stats=True,
use_from_disk_shuffler=False,
num_trials=None,
trials_timeout=None):
def run_trials(num_epochs,
filenames,
num_reducers,
num_trainers,
max_concurrent_epochs,
utilization_sample_period,
collect_stats=True,
use_from_disk_shuffler=False,
num_trials=None,
trials_timeout=None):
if use_from_disk_shuffler:
print(
"Using from-disk shuffler that loads data from disk each round.")
print("Using from-disk shuffler that loads data from disk each round.")
shuffle = shuffle_from_disk
else:
print(
"Using from-memory shuffler.")
print("Using from-memory shuffler.")
if collect_stats:
shuffle = shuffle_from_memory_with_stats
else:
Expand All @@ -91,13 +85,8 @@ def run_trials(
for trial in range(num_trials):
print(f"Starting trial {trial}.")
stats, store_stats = shuffle(
filenames,
dummy_batch_consumer,
num_epochs,
num_reducers,
num_trainers,
max_concurrent_epochs,
utilization_sample_period)
filenames, dummy_batch_consumer, num_epochs, num_reducers,
num_trainers, max_concurrent_epochs, utilization_sample_period)
duration = stats.duration if collect_stats else stats
print(f"Trial {trial} done after {duration} seconds.")
all_stats.append((stats, store_stats))
Expand All @@ -107,13 +96,8 @@ def run_trials(
while timeit.default_timer() - start < trials_timeout:
print(f"Starting trial {trial}.")
stats, store_stats = shuffle(
filenames,
dummy_batch_consumer,
num_epochs,
num_reducers,
num_trainers,
max_concurrent_epochs,
utilization_sample_period)
filenames, dummy_batch_consumer, num_epochs, num_reducers,
num_trainers, max_concurrent_epochs, utilization_sample_period)
duration = stats.duration if collect_stats else stats
print(f"Trial {trial} done after {duration} seconds.")
all_stats.append((stats, store_stats))
Expand All @@ -125,17 +109,15 @@ def run_trials(


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Shuffling data loader")
parser.add_argument("--num-rows", type=int, default=4*(10**11))
parser = argparse.ArgumentParser(description="Shuffling data loader")
parser.add_argument("--num-rows", type=int, default=4 * (10**11))
parser.add_argument("--num-files", type=int, default=100)
parser.add_argument("--max-row-group-skew", type=float, default=0.0)
parser.add_argument("--num-row-groups-per-file", type=int, default=1)
parser.add_argument("--num-reducers", type=int, default=5)
parser.add_argument("--num-trainers", type=int, default=5)
parser.add_argument("--num-epochs", type=int, default=10)
parser.add_argument(
"--max-concurrent-epochs", type=int, default=None)
parser.add_argument("--max-concurrent-epochs", type=int, default=None)
parser.add_argument("--batch-size", type=int, default=100)
parser.add_argument("--num-trials", type=int, default=None)
parser.add_argument("--trials-timeout", type=int, default=None)
Expand Down Expand Up @@ -192,26 +174,20 @@ def run_trials(
num_files = args.num_files
max_row_group_skew = args.max_row_group_skew
if not args.use_old_data:
print(
f"Generating {num_rows} rows over {num_files} files, with "
f"{num_row_groups_per_file} row groups per file and at most "
f"{100 * max_row_group_skew:.1f}% row group skew.")
filenames, num_bytes = generate_data(
num_rows,
num_files,
num_row_groups_per_file,
max_row_group_skew,
data_dir)
print(
f"Generated {len(filenames)} files containing {num_rows} rows "
f"with {num_row_groups_per_file} row groups per file, totalling "
f"{human_readable_size(num_bytes)}.")
print(f"Generating {num_rows} rows over {num_files} files, with "
f"{num_row_groups_per_file} row groups per file and at most "
f"{100 * max_row_group_skew:.1f}% row group skew.")
filenames, num_bytes = generate_data(num_rows, num_files,
num_row_groups_per_file,
max_row_group_skew, data_dir)
print(f"Generated {len(filenames)} files containing {num_rows} rows "
f"with {num_row_groups_per_file} row groups per file, totalling "
f"{human_readable_size(num_bytes)}.")
else:
filenames = [
os.path.join(
data_dir,
f"input_data_{file_index}.parquet.snappy")
for file_index in range(num_files)]
os.path.join(data_dir, f"input_data_{file_index}.parquet.snappy")
for file_index in range(num_files)
]
print("Not generating input data, using existing data instead.")

num_reducers = args.num_reducers
Expand Down Expand Up @@ -252,42 +228,26 @@ def run_trials(
print(f"Shuffling will be pipelined with at most "
f"{max_concurrent_epochs} concurrent epochs.")
collect_stats = not args.no_stats
all_stats = run_trials(
num_epochs,
filenames,
num_reducers,
num_trainers,
max_concurrent_epochs,
utilization_sample_period,
collect_stats,
use_from_disk_shuffler,
num_trials,
trials_timeout)
all_stats = run_trials(num_epochs, filenames, num_reducers, num_trainers,
max_concurrent_epochs, utilization_sample_period,
collect_stats, use_from_disk_shuffler, num_trials,
trials_timeout)

if collect_stats:
process_stats(
all_stats,
args.overwrite_stats,
args.stats_dir,
args.no_epoch_stats,
args.no_consume_stats,
use_from_disk_shuffler,
num_rows,
num_row_groups_per_file,
batch_size,
num_reducers,
num_trainers,
num_epochs,
max_concurrent_epochs)
process_stats(all_stats, args.overwrite_stats, args.stats_dir,
args.no_epoch_stats, args.no_consume_stats,
use_from_disk_shuffler, num_rows,
num_row_groups_per_file, batch_size, num_reducers,
num_trainers, num_epochs, max_concurrent_epochs)
else:
print("Shuffle trials done, no detailed stats collected.")
times, _ = zip(*all_stats)
mean = np.mean(times)
std = np.std(times)
throughput_std = np.std(
[num_epochs * num_rows / time for time in times])
batch_throughput_std = np.std([
(num_epochs * num_rows / batch_size) / time for time in times])
batch_throughput_std = np.std(
[(num_epochs * num_rows / batch_size) / time for time in times])
print(f"\nMean over {len(times)} trials: {mean:.3f}s +- {std}")
print(f"Mean throughput over {len(times)} trials: "
f"{num_epochs * num_rows / mean:.2f} rows/s +- "
Expand Down
51 changes: 17 additions & 34 deletions python/ray/experimental/data_loader/data_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,63 +6,46 @@

import ray


#
# Data generation utilities for the shuffling data loader.
#


def generate_data(
num_rows,
num_files,
num_row_groups_per_file,
max_row_group_skew,
data_dir):
def generate_data(num_rows, num_files, num_row_groups_per_file,
max_row_group_skew, data_dir):
results = []
# TODO(Clark): Generate skewed row groups according to max_row_group_skew.
for file_index, global_row_index in enumerate(
range(0, num_rows, num_rows // num_files)):
num_rows_in_file = min(
num_rows // num_files, num_rows - global_row_index)
num_rows_in_file = min(num_rows // num_files,
num_rows - global_row_index)
results.append(
generate_file.remote(
file_index,
global_row_index,
num_rows_in_file,
num_row_groups_per_file,
data_dir))
generate_file.remote(file_index, global_row_index,
num_rows_in_file, num_row_groups_per_file,
data_dir))
filenames, data_sizes = zip(*ray.get(results))
return filenames, sum(data_sizes)


@ray.remote
def generate_file(
file_index,
global_row_index,
num_rows_in_file,
num_row_groups_per_file,
data_dir):
def generate_file(file_index, global_row_index, num_rows_in_file,
num_row_groups_per_file, data_dir):
# TODO(Clark): Generate skewed row groups according to max_row_group_skew.
# TODO(Clark): Optimize this data generation to reduce copies and
# progressively write smaller buffers to the Parquet file.
buffs = []
for group_index, group_global_row_index in enumerate(
range(
0,
num_rows_in_file,
num_rows_in_file // num_row_groups_per_file)):
num_rows_in_group = min(
num_rows_in_file // num_row_groups_per_file,
num_rows_in_file - group_global_row_index)
range(0, num_rows_in_file,
num_rows_in_file // num_row_groups_per_file)):
num_rows_in_group = min(num_rows_in_file // num_row_groups_per_file,
num_rows_in_file - group_global_row_index)
buffs.append(
generate_row_group(
group_index,
group_global_row_index,
num_rows_in_group))
generate_row_group(group_index, group_global_row_index,
num_rows_in_group))
df = pd.concat(buffs)
data_size = df.memory_usage(deep=True).sum()
filename = os.path.join(
data_dir, f"input_data_{file_index}.parquet.snappy")
filename = os.path.join(data_dir,
f"input_data_{file_index}.parquet.snappy")
df.to_parquet(
open(filename, "wb"),
engine="pyarrow",
Expand Down
Loading