Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data Loading] Round-based per-epoch shuffling data loader for distributed training. #15531

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
09a4caa
Test scripts
stephanie-wang Apr 1, 2021
cc18afa
Consolidate data generation and benchmarks, extend benchmarking scrip…
clarkzinzow Apr 7, 2021
d89d963
Misc. updates.
clarkzinzow Apr 8, 2021
2a86123
Send only relevant chunks to reducers.
clarkzinzow Apr 8, 2021
8e73711
Add from-memory shuffle, add throughput stats collection.
clarkzinzow Apr 8, 2021
af2193f
Updates from paired programming session.
clarkzinzow Apr 9, 2021
f48b338
Measure consumer times from the start of the round.
clarkzinzow Apr 9, 2021
3f3fd5c
Measure consumer times from the start of the epoch.
clarkzinzow Apr 10, 2021
4e73f12
Add instrumentation of the shuffle stages, write out trial, round, an…
clarkzinzow Apr 10, 2021
10670b5
Benchmark batch script.
clarkzinzow Apr 11, 2021
863a183
Keep shuffle rounds stable with changing number of trainers, include …
clarkzinzow Apr 12, 2021
332f687
Added new TODOs.
clarkzinzow Apr 13, 2021
31ca63f
Add benchmark results.
clarkzinzow Apr 13, 2021
35baf30
Add multi-epoch and pipeline throttling support to shufflers.
clarkzinzow Apr 14, 2021
5b05a05
Updated benchmark results.
clarkzinzow Apr 15, 2021
45cf38d
Add support for collecting object store stats.
clarkzinzow Apr 15, 2021
4b07ac9
Add support for configurable mappers and reducers, simplify data gene…
clarkzinzow Apr 20, 2021
b98b76c
Make batch consumers plugable.
clarkzinzow Apr 21, 2021
df2a6b5
Add prototype dataset abstraction.
clarkzinzow Apr 21, 2021
1da710f
Fix consumer batching, make stats collection optional, fix pickled pr…
clarkzinzow Apr 22, 2021
299bb6a
Reorged shuffling data loader benchmarks and implementation.
clarkzinzow Apr 27, 2021
d44e268
Added PyTorch iterable dataset integration.
clarkzinzow Apr 27, 2021
c75b1de
Queue object refs instead of actual data.
clarkzinzow Apr 27, 2021
a00ea2a
Moved to random sampling implementation.
clarkzinzow Apr 27, 2021
3f8b432
Consolidated cache map and map stages in from memory shuffle.
clarkzinzow Apr 28, 2021
3e25b4c
Use smart_open for S3 file reading/writing, use snappy compression, u…
clarkzinzow Apr 28, 2021
62ae87e
Refactored backpressure to not require nested shuffle, epoch, and rou…
clarkzinzow Apr 28, 2021
ff90de1
Remove shuffle rounds, optimize pipeline backpressure, remove extra r…
clarkzinzow May 3, 2021
769fd66
Fix formatting.
clarkzinzow May 3, 2021
71b7d2d
Focus on TorchShufflingDataset.
clarkzinzow May 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove shuffle rounds, optimize pipeline backpressure, remove extra r…
…educer --> trainer shuffle.
  • Loading branch information
clarkzinzow committed May 4, 2021
commit ff90de125cb9d78e0f55cbfc82b889cfa1d3a85d
35 changes: 7 additions & 28 deletions benchmarks/shuffling_data_loader/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,12 @@
DEFAULT_UTILIZATION_SAMPLE_PERIOD = 5.0


def dummy_batch_consumer(consumer_idx, batches):
def dummy_batch_consumer(consumer_idx, epoch, batches):
pass


def run_trials(
num_epochs,
num_rounds,
filenames,
num_reducers,
num_trainers,
Expand All @@ -82,8 +81,7 @@ def run_trials(
shuffle = shuffle_from_disk
else:
print(
"Using from-memory shuffler that caches data in memory between "
"rounds.")
"Using from-memory shuffler.")
if collect_stats:
shuffle = shuffle_from_memory_with_stats
else:
Expand All @@ -96,7 +94,6 @@ def run_trials(
filenames,
dummy_batch_consumer,
num_epochs,
num_rounds,
num_reducers,
num_trainers,
max_concurrent_epochs,
Expand All @@ -113,7 +110,6 @@ def run_trials(
filenames,
dummy_batch_consumer,
num_epochs,
num_rounds,
num_reducers,
num_trainers,
max_concurrent_epochs,
Expand All @@ -140,9 +136,6 @@ def run_trials(
parser.add_argument("--num-epochs", type=int, default=10)
parser.add_argument(
"--max-concurrent-epochs", type=int, default=None)
parser.add_argument("--num-rounds", type=int, default=10)
parser.add_argument(
"--max-concurrent-rounds", type=int, default=None)
parser.add_argument("--batch-size", type=int, default=100)
parser.add_argument("--num-trials", type=int, default=None)
parser.add_argument("--trials-timeout", type=int, default=None)
Expand All @@ -158,7 +151,6 @@ def run_trials(
parser.add_argument("--use-old-data", action="store_true")
parser.add_argument("--no-stats", action="store_true")
parser.add_argument("--no-epoch-stats", action="store_true")
parser.add_argument("--no-round-stats", action="store_true")
parser.add_argument("--no-consume-stats", action="store_true")
parser.add_argument("--overwrite-stats", action="store_true")
args = parser.parse_args()
Expand Down Expand Up @@ -226,15 +218,6 @@ def run_trials(
num_trainers = args.num_trainers
batch_size = args.batch_size

# Calculate the number of shuffle rounds.
num_rounds = args.num_rounds
# TODO(Clark): Handle uneven rounds (remainders).
# num_rounds = max(
# num_rows / num_trainers / batch_size / batches_per_round, 1)
# # Assert even division (no remainders, uneven rounds).
# assert num_rounds % 1 == 0
# num_rounds = int(num_rounds)

num_epochs = args.num_epochs
max_concurrent_epochs = args.max_concurrent_epochs
if max_concurrent_epochs is None or max_concurrent_epochs > num_epochs:
Expand All @@ -259,20 +242,18 @@ def run_trials(
"From disk shuffler not yet updated for new config.")
if num_trials is not None:
print(f"Running {num_trials} shuffle trials with {num_epochs} epochs, "
f"{num_rounds} rounds, {num_reducers} reducers, {num_trainers} "
f"trainers, and a batch size of {batch_size} over {num_rows} "
"rows.")
f"{num_reducers} reducers, {num_trainers} trainers, and a batch "
f"size of {batch_size} over {num_rows} rows.")
else:
print(f"Running {trials_timeout} seconds of shuffle trials with "
f"{num_epochs} epochs, {num_rounds} rounds, {num_reducers} "
f"reducers, {num_trainers} trainers, and a batch size of "
f"{batch_size} over {num_rows} rows.")
f"{num_epochs} epochs, {num_reducers} reducers, {num_trainers} "
f"trainers, and a batch size of {batch_size} over {num_rows} "
"rows.")
print(f"Shuffling will be pipelined with at most "
f"{max_concurrent_epochs} concurrent epochs.")
collect_stats = not args.no_stats
all_stats = run_trials(
num_epochs,
num_rounds,
filenames,
num_reducers,
num_trainers,
Expand All @@ -289,7 +270,6 @@ def run_trials(
args.overwrite_stats,
args.stats_dir,
args.no_epoch_stats,
args.no_round_stats,
args.no_consume_stats,
use_from_disk_shuffler,
num_rows,
Expand All @@ -298,7 +278,6 @@ def run_trials(
num_reducers,
num_trainers,
num_epochs,
num_rounds,
max_concurrent_epochs)
else:
print("Shuffle trials done, no detailed stats collected.")
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/shuffling_data_loader/single-node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ head_node:
- Key: anyscale-user
Value: "clark@anyscale.com"
- Key: anyscale-expiration
Value: "2021-05-01"
Value: "2021-05-10"

# Additional options in the boto docs.

Expand Down Expand Up @@ -114,7 +114,7 @@ cluster_synced_files: []

# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False
file_mounts_sync_continuously: True

# Patterns for files to exclude when running rsync up or rsync down
rsync_exclude:
Expand Down
58 changes: 43 additions & 15 deletions python/ray/experimental/data_loader/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ class ShufflingDataset:
num_trainers (int): Number of trainer workers.
batch_size (int): Size of the batches that the iterator should yield.
rank (int): The worker rank of the current process.
num_rounds (optional, int): The number of shuffle rounds that the
shuffler should perform. Default is 2.
num_reducers (optional, int): The number of shuffler reducers. Default
is the number of trainers x the number of cores on the master
(rank 0) worker.
Expand All @@ -39,7 +37,6 @@ def __init__(
num_trainers: int,
batch_size: int,
rank: int,
num_rounds: int = 2,
num_reducers: int = None,
max_concurrent_epochs: int = 2,
max_batch_queue_size: int = 100):
Expand All @@ -48,7 +45,6 @@ def __init__(

self._batch_size = batch_size

# TODO(Clark): Find way to do this without batch consumer proxy queue.
if rank == 0:
# rank == 0 --> master process
# Create the batch queue. Trainers will consume GPU batches
Expand All @@ -60,17 +56,17 @@ def __init__(
# corresponding batch queue, and have each trainer reference their
# single batch queue.
self._batch_queue = MultiQueue(
num_trainers, max_batch_queue_size,
num_epochs * num_trainers, max_batch_queue_size,
name=MULTIQUEUE_ACTOR_NAME, connect=False)
# Kick off shuffle.
# TODO(Clark): Move the shuffle kickoff to an init() method so the
# user can better control when the shuffling starts?
self._shuffle_result = ray.remote(shuffle_from_memory).remote(
filenames,
functools.partial(
batch_consumer, self._batch_queue, batch_size),
batch_consumer, self._batch_queue, batch_size,
num_trainers),
num_epochs,
num_rounds,
num_reducers,
num_trainers,
max_concurrent_epochs,
Expand All @@ -79,21 +75,42 @@ def __init__(
# rank != 0 --> worker process
# Connect to the batch queue.
self._batch_queue = MultiQueue(
num_trainers, max_batch_queue_size,
num_epochs * num_trainers, max_batch_queue_size,
name=MULTIQUEUE_ACTOR_NAME, connect=True)
self._shuffle_result = None

self._num_epochs = num_epochs
self._num_trainers = num_trainers
self._rank = rank
self._epoch = None

def set_epoch(self, epoch):
"""
Set the current training epoch. This should be called before
constructing the iterator on this dataset (e.g. before the
enumerate(train_loader) call).

Args:
epoch (int) The epoch number for the training epoch that is about
to start.
"""
self._epoch = epoch

def __iter__(self):
"""
This iterator yields GPU batches from the shuffling queue.
"""
if self._epoch is None:
raise ValueError(
"You must set the epoch on this dataset via set_epoch()"
"before constructing the iterator.")

leftover_batches = None
while True:
# TODO(Clark): Add get_up_to queue method that can fetch up to
# some number of batches in a single RPC.
batches = self._batch_queue.get(self._rank, block=True)
queue_idx = self._epoch * self._num_trainers + self._rank
batches = self._batch_queue.get(queue_idx, block=True)
if batches is None:
break
batches = ray.get(batches)
Expand All @@ -118,26 +135,35 @@ def __iter__(self):
# Consume leftover batch.
if leftover_batches is not None:
yield leftover_batches
if self._shuffle_result is not None:
if (
self._epoch == self._num_epochs - 1 and
self._shuffle_result is not None):
ray.get(self._shuffle_result)


def batch_consumer(
queue: MultiQueue,
batch_size: int,
num_trainers: int,
rank: int,
epoch: int,
batches: Iterable[ray.ObjectRef]):
"""
Batch consumer that will be provided to the shuffler.
"""
queue_idx = epoch * num_trainers + rank
print(
f"Sending batch to queue for epoch {epoch} and rank {rank} at index: "
f"{queue_idx}")
if batches is None:
queue.put(rank, None)
queue.put(queue_idx, None)
else:
queue.put_batch(rank, batches)
queue.put_batch(queue_idx, batches)


def debug_batch_consumer(
rank: int,
epoch: int,
batches: Iterable[pd.DataFrame]):
num_batches = len(batches) if batches is not None else 0
print(f"Received {num_batches} batches in consumer {rank}.")
Expand Down Expand Up @@ -184,7 +210,7 @@ def chunk(iterable: Iterable[T], n: int) -> Iterator[Tuple[T, ...]]:
num_trainers = 1
batch_size = 20000
rank = 0
num_reducers = 4
num_reducers = 8
print(f"Creating shuffling dataset with {batch_size} batch size, "
f"{num_epochs} epochs, {num_reducers} reducers, and {num_trainers} "
"trainers.")
Expand All @@ -197,6 +223,8 @@ def chunk(iterable: Iterable[T], n: int) -> Iterator[Tuple[T, ...]]:
rank,
num_reducers=num_reducers)

for batch_idx, batch in enumerate(ds):
print(f"Consuming batch {batch_idx}!")
for epoch in range(num_epochs):
ds.set_epoch(epoch)
for batch_idx, batch in enumerate(ds):
print(f"Consuming batch {batch_idx}!")
print("Done consuming batches.")
Loading