Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Additional args for Data + Train benchmark #37839

Merged
merged 7 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from ray.air import session
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig
from ray.train import DataConfig


import time
import os
Expand All @@ -11,14 +13,70 @@
import torch

# This benchmark does the following:
# 1) Read images with ray.data.read_images()
# 1) Read files (images or parquet) with ray.data
# 2) Apply preprocessing with map_batches()
# 3) Train TorchTrainer on processed data
# Metrics recorded to the output file are:
# - ray.torchtrainer.fit: Throughput of the final epoch in
# TorchTrainer.fit() (step 3 above)


def parse_args():
import argparse

parser = argparse.ArgumentParser()

parser.add_argument(
"--file-type",
default="image",
type=str,
help="Input file type; choose from: ['image', 'parquet']",
)
parser.add_argument(
"--batch-size",
default=32,
type=int,
help="Batch size to use.",
)
parser.add_argument(
"--num-epochs",
# Use 10 epochs and report the throughput of the last epoch, in case
# there is warmup in the first epoch.
default=10,
type=int,
help="Number of epochs to run. The throughput for the last epoch will be kept.",
)
parser.add_argument(
"--num-workers",
default=1,
type=int,
help="Number of workers.",
)
parser.add_argument(
"--local-shuffle-buffer-size",
default=200,
type=int,
help="Parameter into ds.iter_batches(local_shuffle_buffer_size=...)",
)
parser.add_argument(
"--preserve-order",
action="store_true",
default=False,
help="Whether to configure Train with preserve_order flag.",
)
args = parser.parse_args()

if args.file_type == "image":
args.data_root = "s3://air-cuj-imagenet-1gb"
elif args.file_type == "parquet":
args.data_root = "s3://air-example-data-2/10G-image-data-synthetic-raw-parquet"
else:
raise Exception(
f"Unknown file type {args.file_type}; expected one of: ['image', 'parquet']"
)
return args


# Constants and utility methods for image-based benchmarks.
DEFAULT_IMAGE_SIZE = 224

Expand Down Expand Up @@ -54,46 +112,18 @@ def crop_and_flip_image_batch(image_batch):


if __name__ == "__main__":
import argparse

parser = argparse.ArgumentParser()

parser.add_argument(
"--data-root",
default="s3://air-cuj-imagenet-1gb",
type=str,
help="Directory path with files.",
)
parser.add_argument(
"--batch-size",
default=32,
type=int,
help="Batch size to use.",
)
parser.add_argument(
"--num-epochs",
# Use 2 epochs and report the throughput of the last epoch, in case
# there is warmup in the first epoch.
default=2,
type=int,
help="Number of epochs to run. The throughput for the last epoch will be kept.",
)
parser.add_argument(
"--num-workers",
default=1,
type=int,
help="Number of workers.",
)
args = parser.parse_args()

args = parse_args()
metrics = {}

ray_dataset = (
# 1) Read in data with read_images()
ray.data.read_images(args.data_root)
# 2) Preprocess data by applying transformation with map_batches()
.map_batches(crop_and_flip_image_batch)
)
# 1) Read in data with read_images() / read_parquet()
if args.file_type == "image":
ray_dataset = ray.data.read_images(args.data_root)
elif args.file_type == "parquet":
ray_dataset = ray.data.read_parquet(args.data_root)
else:
raise Exception(f"Unknown file type {args.file_type}")
# 2) Preprocess data by applying transformation with map_batches()
ray_dataset = ray_dataset.map_batches(crop_and_flip_image_batch)

def train_loop_per_worker():
it = session.get_dataset_shard("train")
Expand All @@ -102,38 +132,46 @@ def train_loop_per_worker():
num_rows = 0
start_t = time.time()
for batch in it.iter_batches(
batch_size=args.batch_size, prefetch_batches=10
batch_size=args.batch_size,
local_shuffle_buffer_size=args.local_shuffle_buffer_size,
prefetch_batches=10,
):
num_rows += len(batch["image"])
num_rows += args.batch_size
end_t = time.time()
# Record throughput per epoch.
epoch_tput = num_rows / (end_t - start_t)
session.report({"tput": epoch_tput, "epoch": i})

# 3) Train TorchTrainer on processed data
options = DataConfig.default_ingest_options()
options.preserve_order = args.preserve_order

torch_trainer = TorchTrainer(
train_loop_per_worker,
scaling_config=ScalingConfig(num_workers=args.num_workers),
datasets={"train": ray_dataset},
scaling_config=ScalingConfig(num_workers=args.num_workers),
dataset_config=ray.train.DataConfig(
execution_options=options,
),
)
result = torch_trainer.fit()

# Report the throughput of the last training epoch.
metrics["ray.TorchTrainer.fit"] = list(result.metrics_dataframe["tput"])[-1]
metrics["ray_TorchTrainer_fit"] = list(result.metrics_dataframe["tput"])[-1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how this metrics_dataframe being aggregated across training workers? is this the sum up of individual tput from each worker?


# Gather up collected metrics, and write to output JSON file.
metrics_dict = defaultdict(dict)
for label, tput in metrics.items():
metrics_dict[label].update({"THROUGHPUT": tput})

test_name = f"read_images_train{args.num_workers}_cpu"
test_name = f"read_{args.file_type}_train_{args.num_workers}workers"
result_dict = {
test_name: metrics_dict,
"success": 1,
}

test_output_json = os.environ.get(
"TEST_OUTPUT_JSON", "/tmp/train_torch_image_benchmark.json"
"TEST_OUTPUT_JSON", "/tmp/multi_node_train_benchmark.json"
)

with open(test_output_json, "wt") as f:
Expand Down
94 changes: 86 additions & 8 deletions release/release_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6361,11 +6361,11 @@
cluster_env: app_config.yaml
cluster_compute: single_node_benchmark_compute_gce.yaml

- name: read_images_train_4_cpu
- name: read_images_train_4_gpu
group: data-tests
working_dir: nightly_tests/dataset

frequency: nightly
frequency: manual
team: data
python: "3.8"
cluster:
Expand All @@ -6375,8 +6375,8 @@
cluster_compute: ../../air_tests/air_benchmarks/compute_gpu_2x2_aws.yaml

run:
timeout: 600
script: python image_train_benchmark.py --num-workers 4
timeout: 1800
script: python multi_node_train_benchmark.py --num-workers 4 --file-type image

variations:
- __suffix__: aws
Expand All @@ -6387,11 +6387,11 @@
cluster_env: app_config.yaml
cluster_compute: ../../air_tests/air_benchmarks/compute_gpu_2x2_gce.yaml

- name: read_images_train_16_cpu
- name: read_images_train_16_gpu
group: data-tests
working_dir: nightly_tests/dataset

frequency: nightly
frequency: manual
team: data
python: "3.8"
cluster:
Expand All @@ -6401,8 +6401,86 @@
cluster_compute: ../../air_tests/air_benchmarks/compute_gpu_4x4_aws.yaml

run:
timeout: 600
script: python image_train_benchmark.py --num-workers 16
timeout: 1800
script: python multi_node_train_benchmark.py --num-workers 16 --file-type image

variations:
- __suffix__: aws
- __suffix__: gce
env: gce
frequency: manual
cluster:
cluster_env: app_config.yaml
cluster_compute: ../../air_tests/air_benchmarks/compute_gpu_4x4_gce.yaml

- name: read_images_train_16_gpu_preserve_order
group: data-tests
working_dir: nightly_tests/dataset

frequency: manual
team: data
python: "3.8"
cluster:
byod:
type: gpu
cluster_env: app_config.yaml
cluster_compute: ../../air_tests/air_benchmarks/compute_gpu_4x4_aws.yaml

run:
timeout: 1800
script: python multi_node_train_benchmark.py --num-workers 16 --file-type image --preserve-order

variations:
- __suffix__: aws
- __suffix__: gce
env: gce
frequency: manual
cluster:
cluster_env: app_config.yaml
cluster_compute: ../../air_tests/air_benchmarks/compute_gpu_4x4_gce.yaml

- name: read_parquet_train_4_gpu
group: data-tests
working_dir: nightly_tests/dataset

frequency: manual
team: data
python: "3.8"
cluster:
byod:
type: gpu
cluster_env: app_config.yaml
cluster_compute: ../../air_tests/air_benchmarks/compute_gpu_2x2_aws.yaml

run:
timeout: 1800
script: python multi_node_train_benchmark.py --num-workers 4 --file-type parquet

variations:
- __suffix__: aws
- __suffix__: gce
env: gce
frequency: manual
cluster:
cluster_env: app_config.yaml
cluster_compute: ../../air_tests/air_benchmarks/compute_gpu_2x2_gce.yaml

- name: read_parquet_train_16_gpu
group: data-tests
working_dir: nightly_tests/dataset

frequency: manual
team: data
python: "3.8"
cluster:
byod:
type: gpu
cluster_env: app_config.yaml
cluster_compute: ../../air_tests/air_benchmarks/compute_gpu_4x4_aws.yaml

run:
timeout: 1800
script: python multi_node_train_benchmark.py --num-workers 16 --file-type parquet

variations:
- __suffix__: aws
Expand Down