diff --git a/.travis.yml b/.travis.yml index ab1562bd0d912..2e696d54ee7b4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -47,7 +47,7 @@ matrix: - export PATH="$HOME/miniconda/bin:$PATH" - cd doc - pip install -q -r requirements-doc.txt - - pip install -q yapf==0.23.0 + - pip install -q yapf==0.23.0 sphinx-gallery - sphinx-build -W -b html -d _build/doctrees source _build/html - cd .. # Run Python linting, ignore dict vs {} (C408), others are defaults diff --git a/ci/jenkins_tests/run_multi_node_tests.sh b/ci/jenkins_tests/run_multi_node_tests.sh index c0d68af4a1efa..a1f622f323eb8 100755 --- a/ci/jenkins_tests/run_multi_node_tests.sh +++ b/ci/jenkins_tests/run_multi_node_tests.sh @@ -15,6 +15,17 @@ DOCKER_SHA=$($ROOT_DIR/../../build-docker.sh --output-sha --no-cache) SUPPRESS_OUTPUT=$ROOT_DIR/../suppress_output echo "Using Docker image" $DOCKER_SHA +######################## EXAMPLE TESTS ################################# + +$SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ + python /ray/doc/examples/plot_pong_example.py + +$SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ + python /ray/doc/examples/plot_parameter_server.py + +$SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ + python /ray/doc/examples/plot_hyperparameter.py + ######################## RLLIB TESTS ################################# source $ROOT_DIR/run_rllib_tests.sh diff --git a/doc/.gitignore b/doc/.gitignore new file mode 100644 index 0000000000000..6f5e52c0e1820 --- /dev/null +++ b/doc/.gitignore @@ -0,0 +1 @@ +auto_examples/ diff --git a/doc/Makefile b/doc/Makefile index e188f9a06d797..6ee544e11b9ee 100644 --- a/doc/Makefile +++ b/doc/Makefile @@ -6,6 +6,7 @@ SPHINXOPTS = SPHINXBUILD = sphinx-build PAPER = BUILDDIR = _build +AUTOGALLERYDIR= source/auto_examples # User-friendly check for sphinx-build ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1) @@ -49,7 +50,7 @@ help: @echo " coverage to run coverage check of the documentation (if enabled)" clean: - rm -rf $(BUILDDIR)/* + rm -rf $(BUILDDIR)/* $(AUTOGALLERYDIR) html: $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html diff --git a/doc/examples/README.rst b/doc/examples/README.rst new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/doc/examples/cython/cython_main.py b/doc/examples/cython/cython_main.py index 612ee02494cb4..296b3f67c1ceb 100644 --- a/doc/examples/cython/cython_main.py +++ b/doc/examples/cython/cython_main.py @@ -97,23 +97,12 @@ def example8(): """Cython with blas. NOTE: requires scipy""" # See cython_blas.pyx for argument documentation - mat = np.array([[[2.0, 2.0], [2.0, 2.0]], [[2.0, 2.0], [2.0, 2.0]]], - dtype=np.float32) + mat = np.array( + [[[2.0, 2.0], [2.0, 2.0]], [[2.0, 2.0], [2.0, 2.0]]], dtype=np.float32) result = np.zeros((2, 2), np.float32, order="C") - run_func(cyth.compute_kernel_matrix, - "L", - "T", - 2, - 2, - 1.0, - mat, - 0, - 2, - 1.0, - result, - 2 - ) + run_func(cyth.compute_kernel_matrix, "L", "T", 2, 2, 1.0, mat, 0, 2, 1.0, + result, 2) if __name__ == "__main__": diff --git a/doc/examples/cython/setup.py b/doc/examples/cython/setup.py index 56ce4805897cd..86021ea6d7c9d 100644 --- a/doc/examples/cython/setup.py +++ b/doc/examples/cython/setup.py @@ -25,11 +25,10 @@ modules = [os.path.join(pkg_dir, module) for module in modules] setup( - name=pkg_dir, - version="0.0.1", - description="Cython examples for Ray", - packages=[pkg_dir], - ext_modules=cythonize(modules), - install_requires=install_requires, - include_dirs=include_dirs - ) + name=pkg_dir, + version="0.0.1", + description="Cython examples for Ray", + packages=[pkg_dir], + ext_modules=cythonize(modules), + install_requires=install_requires, + include_dirs=include_dirs) diff --git a/doc/examples/hyperopt/hyperopt_adaptive.py b/doc/examples/hyperopt/hyperopt_adaptive.py deleted file mode 100644 index 727fdfcb6e2e0..0000000000000 --- a/doc/examples/hyperopt/hyperopt_adaptive.py +++ /dev/null @@ -1,154 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import argparse -from collections import defaultdict -import numpy as np -import ray - -from tensorflow.examples.tutorials.mnist import input_data - -import objective - -parser = argparse.ArgumentParser(description="Run the hyperparameter " - "optimization example.") -parser.add_argument("--num-starting-segments", default=5, type=int, - help="The number of training segments to start in " - "parallel.") -parser.add_argument("--num-segments", default=10, type=int, - help="The number of additional training segments to " - "perform.") -parser.add_argument("--steps-per-segment", default=20, type=int, - help="The number of steps of training to do per training " - "segment.") -parser.add_argument("--redis-address", default=None, type=str, - help="The Redis address of the cluster.") - - -if __name__ == "__main__": - args = parser.parse_args() - - ray.init(redis_address=args.redis_address) - - # The number of training passes over the dataset to use for network. - steps = args.steps_per_segment - - # Load the mnist data and turn the data into remote objects. - print("Downloading the MNIST dataset. This may take a minute.") - mnist = input_data.read_data_sets("MNIST_data", one_hot=True) - train_images = ray.put(mnist.train.images) - train_labels = ray.put(mnist.train.labels) - validation_images = ray.put(mnist.validation.images) - validation_labels = ray.put(mnist.validation.labels) - - # Keep track of the accuracies that we've seen at different numbers of - # iterations. - accuracies_by_num_steps = defaultdict(lambda: []) - - # Define a method to determine if an experiment looks promising or not. - def is_promising(experiment_info): - accuracies = experiment_info["accuracies"] - total_num_steps = experiment_info["total_num_steps"] - comparable_accuracies = accuracies_by_num_steps[total_num_steps] - if len(comparable_accuracies) == 0: - if len(accuracies) == 1: - # This means that we haven't seen anything finish yet, so keep - # running this experiment. - return True - else: - # The experiment is promising if the second half of the - # accuracies are better than the first half of the accuracies. - return (np.mean(accuracies[:len(accuracies) // 2]) < - np.mean(accuracies[len(accuracies) // 2:])) - # Otherwise, continue running the experiment if it is in the top half - # of experiments we've seen so far at this point in time. - return np.mean(accuracy > np.array(comparable_accuracies)) > 0.5 - - # Keep track of all of the experiment segments that we're running. This - # dictionary uses the object ID of the experiment as the key. - experiment_info = {} - # Keep track of the curently running experiment IDs. - remaining_ids = [] - - # Keep track of the best hyperparameters and the best accuracy. - best_hyperparameters = None - best_accuracy = 0 - - # A function for generating random hyperparameters. - def generate_hyperparameters(): - return {"learning_rate": 10 ** np.random.uniform(-5, 5), - "batch_size": np.random.randint(1, 100), - "dropout": np.random.uniform(0, 1), - "stddev": 10 ** np.random.uniform(-5, 5)} - - # Launch some initial experiments. - for _ in range(args.num_starting_segments): - hyperparameters = generate_hyperparameters() - experiment_id = objective.train_cnn_and_compute_accuracy.remote( - hyperparameters, steps, train_images, train_labels, - validation_images, validation_labels) - experiment_info[experiment_id] = {"hyperparameters": hyperparameters, - "total_num_steps": steps, - "accuracies": []} - remaining_ids.append(experiment_id) - - for _ in range(args.num_segments): - # Wait for a segment of an experiment to finish. - ready_ids, remaining_ids = ray.wait(remaining_ids, num_returns=1) - experiment_id = ready_ids[0] - # Get the accuracy and the weights. - accuracy, weights = ray.get(experiment_id) - # Update the experiment info. - previous_info = experiment_info[experiment_id] - previous_info["accuracies"].append(accuracy) - - # Update the best accuracy and best hyperparameters. - if accuracy > best_accuracy: - best_hyperparameters = previous_info["hyperparameters"] - best_accuracy = accuracy - - if is_promising(previous_info): - # If the experiment still looks promising, then continue running - # it. - print("Continuing to run the experiment with hyperparameters {}." - .format(previous_info["hyperparameters"])) - new_hyperparameters = previous_info["hyperparameters"] - new_info = {"hyperparameters": new_hyperparameters, - "total_num_steps": (previous_info["total_num_steps"] + - steps), - "accuracies": previous_info["accuracies"][:]} - starting_weights = weights - else: - # If the experiment does not look promising, start a new - # experiment. - print("Ending the experiment with hyperparameters {}." - .format(previous_info["hyperparameters"])) - new_hyperparameters = generate_hyperparameters() - new_info = {"hyperparameters": new_hyperparameters, - "total_num_steps": steps, - "accuracies": []} - starting_weights = None - - # Start running the next segment. - new_experiment_id = objective.train_cnn_and_compute_accuracy.remote( - new_hyperparameters, steps, train_images, train_labels, - validation_images, validation_labels, weights=starting_weights) - experiment_info[new_experiment_id] = new_info - remaining_ids.append(new_experiment_id) - - # Update the set of all accuracies that we've seen. - accuracies_by_num_steps[previous_info["total_num_steps"]].append( - accuracy) - - # Record the best performing set of hyperparameters. - print("""Best accuracy was {:.3} with - learning_rate: {:.2} - batch_size: {} - dropout: {:.2} - stddev: {:.2} - """.format(100 * best_accuracy, - best_hyperparameters["learning_rate"], - best_hyperparameters["batch_size"], - best_hyperparameters["dropout"], - best_hyperparameters["stddev"])) diff --git a/doc/examples/hyperopt/hyperopt_simple.py b/doc/examples/hyperopt/hyperopt_simple.py deleted file mode 100644 index 1a22f7c1d42eb..0000000000000 --- a/doc/examples/hyperopt/hyperopt_simple.py +++ /dev/null @@ -1,100 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import numpy as np -import ray -import argparse - -from tensorflow.examples.tutorials.mnist import input_data - -import objective - -parser = argparse.ArgumentParser(description="Run the hyperparameter " - "optimization example.") -parser.add_argument("--trials", default=2, type=int, - help="The number of random trials to do.") -parser.add_argument("--steps", default=10, type=int, - help="The number of steps of training to do per network.") -parser.add_argument("--redis-address", default=None, type=str, - help="The Redis address of the cluster.") - - -if __name__ == "__main__": - args = parser.parse_args() - - ray.init(redis_address=args.redis_address) - - # The number of sets of random hyperparameters to try. - trials = args.trials - # The number of training passes over the dataset to use for network. - steps = args.steps - - # Load the mnist data and turn the data into remote objects. - print("Downloading the MNIST dataset. This may take a minute.") - mnist = input_data.read_data_sets("MNIST_data", one_hot=True) - train_images = ray.put(mnist.train.images) - train_labels = ray.put(mnist.train.labels) - validation_images = ray.put(mnist.validation.images) - validation_labels = ray.put(mnist.validation.labels) - - # Keep track of the best hyperparameters and the best accuracy. - best_hyperparameters = None - best_accuracy = 0 - # This list holds the object IDs for all of the experiments that we have - # launched and that have not yet been processed. - remaining_ids = [] - # This is a dictionary mapping the object ID of an experiment to the - # hyerparameters used for that experiment. - hyperparameters_mapping = {} - - # A function for generating random hyperparameters. - def generate_hyperparameters(): - return {"learning_rate": 10 ** np.random.uniform(-5, 5), - "batch_size": np.random.randint(1, 100), - "dropout": np.random.uniform(0, 1), - "stddev": 10 ** np.random.uniform(-5, 5)} - - # Randomly generate some hyperparameters, and launch a task for each set. - for i in range(trials): - hyperparameters = generate_hyperparameters() - accuracy_id = objective.train_cnn_and_compute_accuracy.remote( - hyperparameters, steps, train_images, train_labels, - validation_images, validation_labels) - remaining_ids.append(accuracy_id) - # Keep track of which hyperparameters correspond to this experiment. - hyperparameters_mapping[accuracy_id] = hyperparameters - - # Fetch and print the results of the tasks in the order that they complete. - for i in range(trials): - # Use ray.wait to get the object ID of the first task that completes. - ready_ids, remaining_ids = ray.wait(remaining_ids) - # Process the output of this task. - result_id = ready_ids[0] - hyperparameters = hyperparameters_mapping[result_id] - accuracy, _ = ray.get(result_id) - print("""We achieve accuracy {:.3}% with - learning_rate: {:.2} - batch_size: {} - dropout: {:.2} - stddev: {:.2} - """.format(100 * accuracy, - hyperparameters["learning_rate"], - hyperparameters["batch_size"], - hyperparameters["dropout"], - hyperparameters["stddev"])) - if accuracy > best_accuracy: - best_hyperparameters = hyperparameters - best_accuracy = accuracy - - # Record the best performing set of hyperparameters. - print("""Best accuracy over {} trials was {:.3} with - learning_rate: {:.2} - batch_size: {} - dropout: {:.2} - stddev: {:.2} - """.format(trials, 100 * best_accuracy, - best_hyperparameters["learning_rate"], - best_hyperparameters["batch_size"], - best_hyperparameters["dropout"], - best_hyperparameters["stddev"])) diff --git a/doc/examples/hyperopt/objective.py b/doc/examples/hyperopt/objective.py deleted file mode 100644 index b531bd2192ec3..0000000000000 --- a/doc/examples/hyperopt/objective.py +++ /dev/null @@ -1,127 +0,0 @@ -# Most of the tensorflow code is adapted from Tensorflow's tutorial on using -# CNNs to train MNIST -# https://www.tensorflow.org/versions/r0.9/tutorials/mnist/pros/index.html#build-a-multilayer-convolutional-network. # noqa: E501 - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import tensorflow as tf - -import ray -import ray.experimental.tf_utils - - -def get_batch(data, batch_index, batch_size): - # This method currently drops data when num_data is not divisible by - # batch_size. - num_data = data.shape[0] - num_batches = num_data // batch_size - batch_index %= num_batches - return data[(batch_index * batch_size):((batch_index + 1) * batch_size)] - - -def weight(shape, stddev): - initial = tf.truncated_normal(shape, stddev=stddev) - return tf.Variable(initial) - - -def bias(shape): - initial = tf.constant(0.1, shape=shape) - return tf.Variable(initial) - - -def conv2d(x, W): - return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding="SAME") - - -def max_pool_2x2(x): - return tf.nn.max_pool( - x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding="SAME") - - -def cnn_setup(x, y, keep_prob, lr, stddev): - first_hidden = 32 - second_hidden = 64 - fc_hidden = 1024 - W_conv1 = weight([5, 5, 1, first_hidden], stddev) - B_conv1 = bias([first_hidden]) - x_image = tf.reshape(x, [-1, 28, 28, 1]) - h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + B_conv1) - h_pool1 = max_pool_2x2(h_conv1) - W_conv2 = weight([5, 5, first_hidden, second_hidden], stddev) - b_conv2 = bias([second_hidden]) - h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2) - h_pool2 = max_pool_2x2(h_conv2) - W_fc1 = weight([7 * 7 * second_hidden, fc_hidden], stddev) - b_fc1 = bias([fc_hidden]) - h_pool2_flat = tf.reshape(h_pool2, [-1, 7 * 7 * second_hidden]) - h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1) - h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob) - W_fc2 = weight([fc_hidden, 10], stddev) - b_fc2 = bias([10]) - y_conv = tf.nn.softmax(tf.matmul(h_fc1_drop, W_fc2) + b_fc2) - cross_entropy = tf.reduce_mean( - -tf.reduce_sum(y * tf.log(y_conv), reduction_indices=[1])) - correct_pred = tf.equal(tf.argmax(y_conv, 1), tf.argmax(y, 1)) - return (tf.train.AdamOptimizer(lr).minimize(cross_entropy), - tf.reduce_mean(tf.cast(correct_pred, tf.float32)), cross_entropy) - - -# Define a remote function that takes a set of hyperparameters as well as the -# data, consructs and trains a network, and returns the validation accuracy. -@ray.remote -def train_cnn_and_compute_accuracy(params, - steps, - train_images, - train_labels, - validation_images, - validation_labels, - weights=None): - # Extract the hyperparameters from the params dictionary. - learning_rate = params["learning_rate"] - batch_size = params["batch_size"] - keep = 1 - params["dropout"] - stddev = params["stddev"] - # Create the network and related variables. - with tf.Graph().as_default(): - # Create the input placeholders for the network. - x = tf.placeholder(tf.float32, shape=[None, 784]) - y = tf.placeholder(tf.float32, shape=[None, 10]) - keep_prob = tf.placeholder(tf.float32) - # Create the network. - train_step, accuracy, loss = cnn_setup(x, y, keep_prob, learning_rate, - stddev) - # Do the training and evaluation. - with tf.Session() as sess: - # Use the TensorFlowVariables utility. This is only necessary if we - # want to set and get the weights. - variables = ray.experimental.tf_utils.TensorFlowVariables( - loss, sess) - # Initialize the network weights. - sess.run(tf.global_variables_initializer()) - # If some network weights were passed in, set those. - if weights is not None: - variables.set_weights(weights) - # Do some steps of training. - for i in range(1, steps + 1): - # Fetch the next batch of data. - image_batch = get_batch(train_images, i, batch_size) - label_batch = get_batch(train_labels, i, batch_size) - # Do one step of training. - sess.run( - train_step, - feed_dict={ - x: image_batch, - y: label_batch, - keep_prob: keep - }) - # Training is done, so compute the validation accuracy and the - # current weights and return. - totalacc = accuracy.eval(feed_dict={ - x: validation_images, - y: validation_labels, - keep_prob: 1.0 - }) - new_weights = variables.get_weights() - return float(totalacc), new_weights diff --git a/doc/examples/overview.rst b/doc/examples/overview.rst new file mode 100644 index 0000000000000..a69e9a9db2810 --- /dev/null +++ b/doc/examples/overview.rst @@ -0,0 +1,34 @@ +Examples Overview +================= + +.. customgalleryitem:: + :tooltip: Build a simple parameter server using Ray. + :description: :doc:`/auto_examples/plot_parameter_server` + +.. customgalleryitem:: + :tooltip: Asynchronous Advantage Actor Critic agent using Ray. + :description: :doc:`/auto_examples/plot_example-a3c` + +.. customgalleryitem:: + :tooltip: Simple parallel asynchronous hyperparameter evaluation. + :description: :doc:`/auto_examples/plot_hyperparameter` + +.. customgalleryitem:: + :tooltip: Parallelizing a policy gradient calculation on OpenAI Gym Pong. + :description: :doc:`/auto_examples/plot_pong_example` + +.. customgalleryitem:: + :tooltip: Walkthrough of parallelizing the L-BFGS algorithm. + :description: :doc:`/auto_examples/plot_lbfgs` + +.. customgalleryitem:: + :tooltip: Implementing a simple news reader using Ray. + :description: :doc:`/auto_examples/plot_newsreader` + +.. customgalleryitem:: + :tooltip: Using Ray to train ResNet across multiple GPUs. + :description: :doc:`/auto_examples/plot_resnet` + +.. customgalleryitem:: + :tooltip: Implement a simple streaming application using Ray’s actors. + :description: :doc:`/auto_examples/plot_streaming` diff --git a/doc/source/example-a3c.rst b/doc/examples/plot_example-a3c.rst similarity index 99% rename from doc/source/example-a3c.rst rename to doc/examples/plot_example-a3c.rst index 821cd71869d02..be620f959fb56 100644 --- a/doc/source/example-a3c.rst +++ b/doc/examples/plot_example-a3c.rst @@ -113,7 +113,6 @@ Driver Code Walkthrough The driver manages the coordination among workers and handles updating the global model parameters. The main training script looks like the following. - .. code-block:: python import numpy as np diff --git a/doc/examples/plot_hyperparameter.py b/doc/examples/plot_hyperparameter.py new file mode 100644 index 0000000000000..b77da21844aa6 --- /dev/null +++ b/doc/examples/plot_hyperparameter.py @@ -0,0 +1,178 @@ +""" +Simple Parallel Model Selection +=============================== + +In this example, we'll demonstrate how to quickly write a hyperparameter +tuning script that evaluates a set of hyperparameters in parallel. + +This script will demonstrate how to use two important parts of the Ray API: +using ``ray.remote`` to define remote functions and ``ray.wait`` to wait for +their results to be ready. + +.. important:: For a production-grade implementation of distributed + hyperparameter tuning, use `Tune`_, a scalable hyperparameter + tuning library built using Ray's Actor API. + +.. _`Tune`: https://ray.readthedocs.io/en/latest/tune.html +""" +import os +import numpy as np +from filelock import FileLock + +import torch +import torch.nn as nn +import torch.nn.functional as F +import torch.optim as optim +from torchvision import datasets, transforms + +import ray + +ray.init() + +# The number of sets of random hyperparameters to try. +num_evaluations = 10 + + +# A function for generating random hyperparameters. +def generate_hyperparameters(): + return { + "learning_rate": 10**np.random.uniform(-5, 1), + "batch_size": np.random.randint(1, 100), + "momentum": np.random.uniform(0, 1) + } + + +def get_data_loaders(batch_size): + mnist_transforms = transforms.Compose( + [transforms.ToTensor(), + transforms.Normalize((0.1307, ), (0.3081, ))]) + + # We add FileLock here because multiple workers will want to + # download data, and this may cause overwrites since + # DataLoader is not threadsafe. + with FileLock(os.path.expanduser("~/data.lock")): + train_loader = torch.utils.data.DataLoader( + datasets.MNIST( + "~/data", + train=True, + download=True, + transform=mnist_transforms), + batch_size=batch_size, + shuffle=True) + test_loader = torch.utils.data.DataLoader( + datasets.MNIST("~/data", train=False, transform=mnist_transforms), + batch_size=batch_size, + shuffle=True) + return train_loader, test_loader + + +class ConvNet(nn.Module): + """Simple two layer Convolutional Neural Network.""" + + def __init__(self): + super(ConvNet, self).__init__() + self.conv1 = nn.Conv2d(1, 3, kernel_size=3) + self.fc = nn.Linear(192, 10) + + def forward(self, x): + x = F.relu(F.max_pool2d(self.conv1(x), 3)) + x = x.view(-1, 192) + x = self.fc(x) + return F.log_softmax(x, dim=1) + + +def train(model, optimizer, train_loader, device=torch.device("cpu")): + """Optimize the model with one pass over the data. + + Cuts off at 1024 samples to simplify training. + """ + model.train() + for batch_idx, (data, target) in enumerate(train_loader): + if batch_idx * len(data) > 1024: + return + data, target = data.to(device), target.to(device) + optimizer.zero_grad() + output = model(data) + loss = F.nll_loss(output, target) + loss.backward() + optimizer.step() + + +def test(model, test_loader, device=torch.device("cpu")): + """Checks the validation accuracy of the model. + + Cuts off at 512 samples for simplicity. + """ + model.eval() + correct = 0 + total = 0 + with torch.no_grad(): + for batch_idx, (data, target) in enumerate(test_loader): + if batch_idx * len(data) > 512: + break + data, target = data.to(device), target.to(device) + outputs = model(data) + _, predicted = torch.max(outputs.data, 1) + total += target.size(0) + correct += (predicted == target).sum().item() + + return correct / total + + +@ray.remote +def evaluate_hyperparameters(config): + model = ConvNet() + train_loader, test_loader = get_data_loaders(config["batch_size"]) + optimizer = optim.SGD( + model.parameters(), + lr=config["learning_rate"], + momentum=config["momentum"]) + train(model, optimizer, train_loader) + return test(model, test_loader) + + +# Keep track of the best hyperparameters and the best accuracy. +best_hyperparameters = None +best_accuracy = 0 +# A list holding the object IDs for all of the experiments that we have +# launched but have not yet been processed. +remaining_ids = [] +# A dictionary mapping an experiment's object ID to its hyperparameters. +# hyerparameters used for that experiment. +hyperparameters_mapping = {} + +# Randomly generate sets of hyperparameters and launch a task to test each set. +for i in range(num_evaluations): + hyperparameters = generate_hyperparameters() + accuracy_id = evaluate_hyperparameters.remote(hyperparameters) + remaining_ids.append(accuracy_id) + hyperparameters_mapping[accuracy_id] = hyperparameters + +# Fetch and print the results of the tasks in the order that they complete. +while remaining_ids: + # Use ray.wait to get the object ID of the first task that completes. + done_ids, remaining_ids = ray.wait(remaining_ids) + # There is only one return result by default. + result_id = done_ids[0] + + hyperparameters = hyperparameters_mapping[result_id] + accuracy = ray.get(result_id) + print("""We achieve accuracy {:.3}% with + learning_rate: {:.2} + batch_size: {} + momentum: {:.2} + """.format(100 * accuracy, hyperparameters["learning_rate"], + hyperparameters["batch_size"], hyperparameters["momentum"])) + if accuracy > best_accuracy: + best_hyperparameters = hyperparameters + best_accuracy = accuracy + +# Record the best performing set of hyperparameters. +print("""Best accuracy over {} trials was {:.3} with + learning_rate: {:.2} + batch_size: {} + momentum: {:.2} + """.format(num_evaluations, 100 * best_accuracy, + best_hyperparameters["learning_rate"], + best_hyperparameters["batch_size"], + best_hyperparameters["momentum"])) diff --git a/doc/source/example-lbfgs.rst b/doc/examples/plot_lbfgs.rst similarity index 100% rename from doc/source/example-lbfgs.rst rename to doc/examples/plot_lbfgs.rst diff --git a/doc/source/example-newsreader.rst b/doc/examples/plot_newsreader.rst similarity index 100% rename from doc/source/example-newsreader.rst rename to doc/examples/plot_newsreader.rst diff --git a/doc/examples/plot_parameter_server.py b/doc/examples/plot_parameter_server.py new file mode 100644 index 0000000000000..53d8c73b79132 --- /dev/null +++ b/doc/examples/plot_parameter_server.py @@ -0,0 +1,289 @@ +""" +Parameter Server +================ + +The parameter server is a framework for distributed machine learning training. + +In the parameter server framework, a centralized server (or group of server +nodes) maintains global shared parameters of a machine-learning model +(e.g., a neural network) while the data and computation of calculating +updates (i.e., gradient descent updates) are distributed over worker nodes. + +.. image:: ../images/param_actor.png + :align: center + +Parameter servers are a core part of many machine learning applications. This +document walks through how to implement simple synchronous and asynchronous +parameter servers using Ray actors. + +To run the application, first install some dependencies. + +.. code-block:: bash + + pip install torch torchvision filelock + +Let's first define some helper functions and import some dependencies. + +""" +import os +import torch +import torch.nn as nn +import torch.nn.functional as F +from torchvision import datasets, transforms +from filelock import FileLock +import numpy as np + +import ray + + +def get_data_loader(): + """Safely downloads data. Returns training/validation set dataloader.""" + mnist_transforms = transforms.Compose( + [transforms.ToTensor(), + transforms.Normalize((0.1307, ), (0.3081, ))]) + + # We add FileLock here because multiple workers will want to + # download data, and this may cause overwrites since + # DataLoader is not threadsafe. + with FileLock(os.path.expanduser("~/data.lock")): + train_loader = torch.utils.data.DataLoader( + datasets.MNIST( + "~/data", + train=True, + download=True, + transform=mnist_transforms), + batch_size=128, + shuffle=True) + test_loader = torch.utils.data.DataLoader( + datasets.MNIST("~/data", train=False, transform=mnist_transforms), + batch_size=128, + shuffle=True) + return train_loader, test_loader + + +def evaluate(model, test_loader): + """Evaluates the accuracy of the model on a validation dataset.""" + model.eval() + correct = 0 + total = 0 + with torch.no_grad(): + for batch_idx, (data, target) in enumerate(test_loader): + # This is only set to finish evaluation faster. + if batch_idx * len(data) > 1024: + break + outputs = model(data) + _, predicted = torch.max(outputs.data, 1) + total += target.size(0) + correct += (predicted == target).sum().item() + return 100. * correct / total + + +####################################################################### +# Setup: Defining the Neural Network +# ---------------------------------- +# +# We define a small neural network to use in training. We provide +# some helper functions for obtaining data, including getter/setter +# methods for gradients and weights. + + +class ConvNet(nn.Module): + """Small ConvNet for MNIST.""" + + def __init__(self): + super(ConvNet, self).__init__() + self.conv1 = nn.Conv2d(1, 3, kernel_size=3) + self.fc = nn.Linear(192, 10) + + def forward(self, x): + x = F.relu(F.max_pool2d(self.conv1(x), 3)) + x = x.view(-1, 192) + x = self.fc(x) + return F.log_softmax(x, dim=1) + + def get_weights(self): + return {k: v.cpu() for k, v in self.state_dict().items()} + + def set_weights(self, weights): + self.load_state_dict(weights) + + def get_gradients(self): + grads = [] + for p in self.parameters(): + grad = None if p.grad is None else p.grad.data.cpu().numpy() + grads.append(grad) + return grads + + def set_gradients(self, gradients): + for g, p in zip(gradients, self.parameters()): + if g is not None: + p.grad = torch.from_numpy(g) + + +########################################################################### +# Defining the Parameter Server +# ----------------------------- +# +# The parameter server will hold a copy of the model. +# During training, it will: +# +# 1. Receive gradients and apply them to its model. +# +# 2. Send the updated model back to the workers. +# +# The ``@ray.remote`` decorator defines a remote process. It wraps the +# ParameterServer class and allows users to instantiate it as a +# remote actor. + + +@ray.remote +class ParameterServer(object): + def __init__(self, lr): + self.model = ConvNet() + self.optimizer = torch.optim.SGD(self.model.parameters(), lr=lr) + + def apply_gradients(self, *gradients): + summed_gradients = [ + np.stack(gradient_zip).sum(axis=0) + for gradient_zip in zip(*gradients) + ] + self.optimizer.zero_grad() + self.model.set_gradients(summed_gradients) + self.optimizer.step() + return self.model.get_weights() + + def get_weights(self): + return self.model.get_weights() + + +########################################################################### +# Defining the Worker +# ------------------- +# The worker will also hold a copy of the model. During training. it will +# continuously evaluate data and send gradients +# to the parameter server. The worker will synchronize its model with the +# Parameter Server model weights. + + +@ray.remote +class DataWorker(object): + def __init__(self): + self.model = ConvNet() + self.data_iterator = iter(get_data_loader()[0]) + + def compute_gradients(self, weights): + self.model.set_weights(weights) + try: + data, target = next(self.data_iterator) + except StopIteration: # When the epoch ends, start a new epoch. + self.data_iterator = iter(get_data_loader()[0]) + data, target = next(self.data_iterator) + self.model.zero_grad() + output = self.model(data) + loss = F.nll_loss(output, target) + loss.backward() + return self.model.get_gradients() + + +########################################################################### +# Synchronous Parameter Server Training +# ------------------------------------- +# We'll now create a synchronous parameter server training scheme. We'll first +# instantiate a process for the parameter server, along with multiple +# workers. + +iterations = 200 +num_workers = 2 + +ray.init(ignore_reinit_error=True) +ps = ParameterServer.remote(1e-2) +workers = [DataWorker.remote() for i in range(num_workers)] + +########################################################################### +# We'll also instantiate a model on the driver process to evaluate the test +# accuracy during training. + +model = ConvNet() +test_loader = get_data_loader()[1] + +########################################################################### +# Training alternates between: +# +# 1. Computing the gradients given the current weights from the server +# 2. Updating the parameter server's weights with the gradients. + +print("Running synchronous parameter server training.") +current_weights = ps.get_weights.remote() +for i in range(iterations): + gradients = [ + worker.compute_gradients.remote(current_weights) for worker in workers + ] + # Calculate update after all gradients are available. + current_weights = ps.apply_gradients.remote(*gradients) + + if i % 10 == 0: + # Evaluate the current model. + model.set_weights(ray.get(current_weights)) + accuracy = evaluate(model, test_loader) + print("Iter {}: \taccuracy is {:.1f}".format(i, accuracy)) + +print("Final accuracy is {:.1f}.".format(accuracy)) +# Clean up Ray resources and processes before the next example. +ray.shutdown() + +########################################################################### +# Asynchronous Parameter Server Training +# -------------------------------------- +# We'll now create a synchronous parameter server training scheme. We'll first +# instantiate a process for the parameter server, along with multiple +# workers. + +print("Running Asynchronous Parameter Server Training.") + +ray.init(ignore_reinit_error=True) +ps = ParameterServer.remote(1e-2) +workers = [DataWorker.remote() for i in range(num_workers)] + +########################################################################### +# Here, workers will asynchronously compute the gradients given its +# current weights and send these gradients to the parameter server as +# soon as they are ready. When the Parameter server finishes applying the +# new gradient, the server will send back a copy of the current weights to the +# worker. The worker will then update the weights and repeat. + +current_weights = ps.get_weights.remote() + +gradients = {} +for worker in workers: + gradients[worker.compute_gradients.remote(current_weights)] = worker + +for i in range(iterations * num_workers): + ready_gradient_list, _ = ray.wait(list(gradients)) + ready_gradient_id = ready_gradient_list[0] + worker = gradients.pop(ready_gradient_id) + + # Compute and apply gradients. + current_weights = ps.apply_gradients.remote(*[ready_gradient_id]) + gradients[worker.compute_gradients.remote(current_weights)] = worker + + if i % 10 == 0: + # Evaluate the current model after every 10 updates. + model.set_weights(ray.get(current_weights)) + accuracy = evaluate(model, test_loader) + print("Iter {}: \taccuracy is {:.1f}".format(i, accuracy)) + +print("Final accuracy is {:.1f}.".format(accuracy)) + +############################################################################## +# Final Thoughts +# -------------- +# +# This approach is powerful because it enables you to implement a parameter +# server with a few lines of code as part of a Python application. +# As a result, this simplifies the deployment of applications that use +# parameter servers and to modify the behavior of the parameter server. +# +# For example, sharding the parameter server, changing the update rule, +# switch between asynchronous and synchronous updates, ignoring +# straggler workers, or any number of other customizations, +# will only require a few extra lines of code. diff --git a/doc/examples/plot_pong_example.py b/doc/examples/plot_pong_example.py new file mode 100644 index 0000000000000..9b0ef73dd10e0 --- /dev/null +++ b/doc/examples/plot_pong_example.py @@ -0,0 +1,293 @@ +# flake8: noqa +""" +Learning to Play Pong +===================== + +In this example, we'll train a **very simple** neural network to play Pong using +the OpenAI Gym. + +At a high level, we will use multiple Ray actors to obtain simulation rollouts +and calculate gradient simultaneously. We will then centralize these +gradients and update the neural network. The updated neural network will +then be passed back to each Ray actor for more gradient calculation. + +This application is adapted, with minimal modifications, from +Andrej Karpathy's `source code`_ (see the accompanying `blog post`_). + +To run the application, first install some dependencies. + +.. code-block:: bash + + pip install gym[atari] + +At the moment, on a large machine with 64 physical cores, computing an update +with a batch of size 1 takes about 1 second, a batch of size 10 takes about 2.5 +seconds. A batch of size 60 takes about 3 seconds. On a cluster with 11 nodes, +each with 18 physical cores, a batch of size 300 takes about 10 seconds. If the +numbers you see differ from these by much, take a look at the +**Troubleshooting** section at the bottom of this page and consider `submitting +an issue`_. + +.. _`source code`: https://gist.github.com/karpathy/a4166c7fe253700972fcbc77e4ea32c5 +.. _`blog post`: http://karpathy.github.io/2016/05/31/rl/ +.. _`submitting an issue`: https://github.com/ray-project/ray/issues + +**Note** that these times depend on how long the rollouts take, which in turn +depends on how well the policy is doing. For example, a really bad policy will +lose very quickly. As the policy learns, we should expect these numbers to +increase. +""" +import numpy as np +import os +import ray +import time + +import gym + +############################################################################## +# Hyperparameters +# --------------- +# +# Here we'll define a couple of the hyperparameters that are used. + +H = 200 # The number of hidden layer neurons. +gamma = 0.99 # The discount factor for reward. +decay_rate = 0.99 # The decay factor for RMSProp leaky sum of grad^2. +D = 80 * 80 # The input dimensionality: 80x80 grid. +learning_rate = 1e-4 # Magnitude of the update. + +############################################################################# +# Helper Functions +# ---------------- +# +# We first define a few helper functions: +# +# 1. Preprocessing: The ``preprocess`` function will +# preprocess the original 210x160x3 uint8 frame into a one-dimensional 6400 +# float vector. +# +# 2. Reward Processing: The ``process_rewards`` function will calculate +# a discounted reward. This formula states that the "value" of a +# sampled action is the weighted sum of all rewards afterwards, +# but later rewards are exponentially less important. +# +# 3. Rollout: The ``rollout`` function plays an entire game of Pong (until +# either the computer or the RL agent loses). + + +def preprocess(img): + # Crop the image. + img = img[35:195] + # Downsample by factor of 2. + img = img[::2, ::2, 0] + # Erase background (background type 1). + img[img == 144] = 0 + # Erase background (background type 2). + img[img == 109] = 0 + # Set everything else (paddles, ball) to 1. + img[img != 0] = 1 + return img.astype(np.float).ravel() + + +def process_rewards(r): + """Compute discounted reward from a vector of rewards.""" + discounted_r = np.zeros_like(r) + running_add = 0 + for t in reversed(range(0, r.size)): + # Reset the sum, since this was a game boundary (pong specific!). + if r[t] != 0: + running_add = 0 + running_add = running_add * gamma + r[t] + discounted_r[t] = running_add + return discounted_r + + +def rollout(model, env): + """Evaluates env and model until the env returns "Done". + + Returns: + xs: A list of observations + hs: A list of model hidden states per observation + dlogps: A list of gradients + drs: A list of rewards. + + """ + # Reset the game. + observation = env.reset() + # Note that prev_x is used in computing the difference frame. + prev_x = None + xs, hs, dlogps, drs = [], [], [], [] + done = False + while not done: + cur_x = preprocess(observation) + x = cur_x - prev_x if prev_x is not None else np.zeros(D) + prev_x = cur_x + + aprob, h = model.policy_forward(x) + # Sample an action. + action = 2 if np.random.uniform() < aprob else 3 + + # The observation. + xs.append(x) + # The hidden state. + hs.append(h) + y = 1 if action == 2 else 0 # A "fake label". + # The gradient that encourages the action that was taken to be + # taken (see http://cs231n.github.io/neural-networks-2/#losses if + # confused). + dlogps.append(y - aprob) + + observation, reward, done, info = env.step(action) + + # Record reward (has to be done after we call step() to get reward + # for previous action). + drs.append(reward) + return xs, hs, dlogps, drs + + +############################################################################## +# Neural Network +# -------------- +# Here, a neural network is used to define a "policy" +# for playing Pong (that is, a function that chooses an action given a state). +# +# To implement a neural network in NumPy, we need to provide helper functions +# for calculating updates and computing the output of the neural network +# given an input, which in our case is an observation. + + +class Model(): + """This class holds the neural network weights.""" + + def __init__(self): + self.weights = {} + self.weights["W1"] = np.random.randn(H, D) / np.sqrt(D) + self.weights["W2"] = np.random.randn(H) / np.sqrt(H) + + def policy_forward(self, x): + h = np.dot(self.weights["W1"], x) + h[h < 0] = 0 # ReLU nonlinearity. + logp = np.dot(self.weights["W2"], h) + # Softmax + p = 1.0 / (1.0 + np.exp(-logp)) + # Return probability of taking action 2, and hidden state. + return p, h + + def policy_backward(self, eph, epx, epdlogp): + """Backward pass to calculate gradients. + + Arguments: + eph: Array of intermediate hidden states. + epx: Array of experiences (observations. + epdlogp: Array of logps (output of last layer before softmax/ + + """ + dW2 = np.dot(eph.T, epdlogp).ravel() + dh = np.outer(epdlogp, self.weights["W2"]) + # Backprop relu. + dh[eph <= 0] = 0 + dW1 = np.dot(dh.T, epx) + return {"W1": dW1, "W2": dW2} + + def update(self, grad_buffer, rmsprop_cache, lr, decay): + """Applies the gradients to the model parameters with RMSProp.""" + for k, v in self.weights.items(): + g = grad_buffer[k] + rmsprop_cache[k] = (decay * rmsprop_cache[k] + (1 - decay) * g**2) + self.weights[k] += lr * g / (np.sqrt(rmsprop_cache[k]) + 1e-5) + + +def zero_grads(grad_buffer): + """Reset the batch gradient buffer.""" + for k, v in grad_buffer.items(): + grad_buffer[k] = np.zeros_like(v) + + +############################################################################# +# Parallelizing Gradients +# ----------------------- +# We define an **actor**, which is responsible for taking a model and an env +# and performing a rollout + computing a gradient update. + +ray.init() + + +@ray.remote +class RolloutWorker(object): + def __init__(self): + # Tell numpy to only use one core. If we don't do this, each actor may + # try to use all of the cores and the resulting contention may result + # in no speedup over the serial version. Note that if numpy is using + # OpenBLAS, then you need to set OPENBLAS_NUM_THREADS=1, and you + # probably need to do it from the command line (so it happens before + # numpy is imported). + os.environ["MKL_NUM_THREADS"] = "1" + self.env = gym.make("Pong-v0") + + def compute_gradient(self, model): + # Compute a simulation episode. + xs, hs, dlogps, drs = rollout(model, self.env) + reward_sum = sum(drs) + # Vectorize the arrays. + epx = np.vstack(xs) + eph = np.vstack(hs) + epdlogp = np.vstack(dlogps) + epr = np.vstack(drs) + + # Compute the discounted reward backward through time. + discounted_epr = process_rewards(epr) + # Standardize the rewards to be unit normal (helps control the gradient + # estimator variance). + discounted_epr -= np.mean(discounted_epr) + discounted_epr /= np.std(discounted_epr) + # Modulate the gradient with advantage (the policy gradient magic + # happens right here). + epdlogp *= discounted_epr + return model.policy_backward(eph, epx, epdlogp), reward_sum + + +############################################################################# +# Running +# ------- +# +# This example is easy to parallelize because the network can play ten games +# in parallel and no information needs to be shared between the games. +# +# In the loop, the network repeatedly plays games of Pong and +# records a gradient from each game. Every ten games, the gradients are +# combined together and used to update the network. + +iterations = 20 +batch_size = 4 +model = Model() +actors = [RolloutWorker.remote() for _ in range(batch_size)] + +running_reward = None +# "Xavier" initialization. +# Update buffers that add up gradients over a batch. +grad_buffer = {k: np.zeros_like(v) for k, v in model.weights.items()} +# Update the rmsprop memory. +rmsprop_cache = {k: np.zeros_like(v) for k, v in model.weights.items()} + +for i in range(1, 1 + iterations): + model_id = ray.put(model) + gradient_ids = [] + # Launch tasks to compute gradients from multiple rollouts in parallel. + start_time = time.time() + gradient_ids = [ + actor.compute_gradient.remote(model_id) for actor in actors + ] + for batch in range(batch_size): + [grad_id], gradient_ids = ray.wait(gradient_ids) + grad, reward_sum = ray.get(grad_id) + # Accumulate the gradient over batch. + for k in model.weights: + grad_buffer[k] += grad[k] + running_reward = (reward_sum if running_reward is None else + running_reward * 0.99 + reward_sum * 0.01) + end_time = time.time() + print("Batch {} computed {} rollouts in {} seconds, " + "running mean is {}".format(i, batch_size, end_time - start_time, + running_reward)) + model.update(grad_buffer, rmsprop_cache, learning_rate, decay_rate) + zero_grads(grad_buffer) diff --git a/doc/source/example-resnet.rst b/doc/examples/plot_resnet.rst similarity index 100% rename from doc/source/example-resnet.rst rename to doc/examples/plot_resnet.rst diff --git a/doc/source/example-streaming.rst b/doc/examples/plot_streaming.rst similarity index 100% rename from doc/source/example-streaming.rst rename to doc/examples/plot_streaming.rst diff --git a/doc/examples/resnet/cifar_input.py b/doc/examples/resnet/cifar_input.py index d19466561fd19..283a66a274443 100644 --- a/doc/examples/resnet/cifar_input.py +++ b/doc/examples/resnet/cifar_input.py @@ -34,8 +34,8 @@ def build_data(data_path, size, dataset): def load_transform(value): # Convert these examples to dense labels and processed images. record = tf.reshape(tf.decode_raw(value, tf.uint8), [record_bytes]) - label = tf.cast(tf.slice(record, [label_offset], [label_bytes]), - tf.int32) + label = tf.cast( + tf.slice(record, [label_offset], [label_bytes]), tf.int32) # Convert from string to [depth * height * width] to # [depth, height, width]. depth_major = tf.reshape( @@ -44,10 +44,11 @@ def load_transform(value): # Convert from [depth, height, width] to [height, width, depth]. image = tf.cast(tf.transpose(depth_major, [1, 2, 0]), tf.float32) return (image, label) + # Read examples from files in the filename queue. data_files = tf.gfile.Glob(data_path) - data = tf.contrib.data.FixedLengthRecordDataset(data_files, - record_bytes=record_bytes) + data = tf.contrib.data.FixedLengthRecordDataset( + data_files, record_bytes=record_bytes) data = data.map(load_transform) data = data.batch(size) iterator = data.make_one_shot_iterator() @@ -102,8 +103,7 @@ def map_test(image, label): labels = tf.reshape(labels, [batch_size, 1]) indices = tf.reshape(tf.range(0, batch_size, 1), [batch_size, 1]) labels = tf.sparse_to_dense( - tf.concat([indices, labels], 1), - [batch_size, num_classes], 1.0, 0.0) + tf.concat([indices, labels], 1), [batch_size, num_classes], 1.0, 0.0) assert len(images.get_shape()) == 4 assert images.get_shape()[0] == batch_size diff --git a/doc/examples/rl_pong/driver.py b/doc/examples/rl_pong/driver.py deleted file mode 100644 index ab9a634468d07..0000000000000 --- a/doc/examples/rl_pong/driver.py +++ /dev/null @@ -1,213 +0,0 @@ -# This code is copied and adapted from Andrej Karpathy's code for learning to -# play Pong https://gist.github.com/karpathy/a4166c7fe253700972fcbc77e4ea32c5. - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import argparse -import numpy as np -import os -import ray -import time - -import gym - -# Define some hyperparameters. - -# The number of hidden layer neurons. -H = 200 -learning_rate = 1e-4 -# Discount factor for reward. -gamma = 0.99 -# The decay factor for RMSProp leaky sum of grad^2. -decay_rate = 0.99 - -# The input dimensionality: 80x80 grid. -D = 80 * 80 - - -def sigmoid(x): - # Sigmoid "squashing" function to interval [0, 1]. - return 1.0 / (1.0 + np.exp(-x)) - - -def preprocess(img): - """Preprocess 210x160x3 uint8 frame into 6400 (80x80) 1D float vector.""" - # Crop the image. - img = img[35:195] - # Downsample by factor of 2. - img = img[::2, ::2, 0] - # Erase background (background type 1). - img[img == 144] = 0 - # Erase background (background type 2). - img[img == 109] = 0 - # Set everything else (paddles, ball) to 1. - img[img != 0] = 1 - return img.astype(np.float).ravel() - - -def discount_rewards(r): - """take 1D float array of rewards and compute discounted reward""" - discounted_r = np.zeros_like(r) - running_add = 0 - for t in reversed(range(0, r.size)): - # Reset the sum, since this was a game boundary (pong specific!). - if r[t] != 0: - running_add = 0 - running_add = running_add * gamma + r[t] - discounted_r[t] = running_add - return discounted_r - - -def policy_forward(x, model): - h = np.dot(model["W1"], x) - h[h < 0] = 0 # ReLU nonlinearity. - logp = np.dot(model["W2"], h) - p = sigmoid(logp) - # Return probability of taking action 2, and hidden state. - return p, h - - -def policy_backward(eph, epx, epdlogp, model): - """backward pass. (eph is array of intermediate hidden states)""" - dW2 = np.dot(eph.T, epdlogp).ravel() - dh = np.outer(epdlogp, model["W2"]) - # Backprop relu. - dh[eph <= 0] = 0 - dW1 = np.dot(dh.T, epx) - return {"W1": dW1, "W2": dW2} - - -@ray.remote -class PongEnv(object): - def __init__(self): - # Tell numpy to only use one core. If we don't do this, each actor may - # try to use all of the cores and the resulting contention may result - # in no speedup over the serial version. Note that if numpy is using - # OpenBLAS, then you need to set OPENBLAS_NUM_THREADS=1, and you - # probably need to do it from the command line (so it happens before - # numpy is imported). - os.environ["MKL_NUM_THREADS"] = "1" - self.env = gym.make("Pong-v0") - - def compute_gradient(self, model): - # Reset the game. - observation = self.env.reset() - # Note that prev_x is used in computing the difference frame. - prev_x = None - xs, hs, dlogps, drs = [], [], [], [] - reward_sum = 0 - done = False - while not done: - cur_x = preprocess(observation) - x = cur_x - prev_x if prev_x is not None else np.zeros(D) - prev_x = cur_x - - aprob, h = policy_forward(x, model) - # Sample an action. - action = 2 if np.random.uniform() < aprob else 3 - - # The observation. - xs.append(x) - # The hidden state. - hs.append(h) - y = 1 if action == 2 else 0 # A "fake label". - # The gradient that encourages the action that was taken to be - # taken (see http://cs231n.github.io/neural-networks-2/#losses if - # confused). - dlogps.append(y - aprob) - - observation, reward, done, info = self.env.step(action) - reward_sum += reward - - # Record reward (has to be done after we call step() to get reward - # for previous action). - drs.append(reward) - - epx = np.vstack(xs) - eph = np.vstack(hs) - epdlogp = np.vstack(dlogps) - epr = np.vstack(drs) - # Reset the array memory. - xs, hs, dlogps, drs = [], [], [], [] - - # Compute the discounted reward backward through time. - discounted_epr = discount_rewards(epr) - # Standardize the rewards to be unit normal (helps control the gradient - # estimator variance). - discounted_epr -= np.mean(discounted_epr) - discounted_epr /= np.std(discounted_epr) - # Modulate the gradient with advantage (the policy gradient magic - # happens right here). - epdlogp *= discounted_epr - return policy_backward(eph, epx, epdlogp, model), reward_sum - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Train an RL agent on Pong.") - parser.add_argument( - "--batch-size", - default=10, - type=int, - help="The number of rollouts to do per batch.") - parser.add_argument( - "--redis-address", - default=None, - type=str, - help="The Redis address of the cluster.") - parser.add_argument( - "--iterations", - default=-1, - type=int, - help="The number of model updates to perform. By " - "default, training will not terminate.") - args = parser.parse_args() - batch_size = args.batch_size - - ray.init(redis_address=args.redis_address) - - # Run the reinforcement learning. - - running_reward = None - batch_num = 1 - model = {} - # "Xavier" initialization. - model["W1"] = np.random.randn(H, D) / np.sqrt(D) - model["W2"] = np.random.randn(H) / np.sqrt(H) - # Update buffers that add up gradients over a batch. - grad_buffer = {k: np.zeros_like(v) for k, v in model.items()} - # Update the rmsprop memory. - rmsprop_cache = {k: np.zeros_like(v) for k, v in model.items()} - actors = [PongEnv.remote() for _ in range(batch_size)] - iteration = 0 - while iteration != args.iterations: - iteration += 1 - model_id = ray.put(model) - actions = [] - # Launch tasks to compute gradients from multiple rollouts in parallel. - start_time = time.time() - for i in range(batch_size): - action_id = actors[i].compute_gradient.remote(model_id) - actions.append(action_id) - for i in range(batch_size): - action_id, actions = ray.wait(actions) - grad, reward_sum = ray.get(action_id[0]) - # Accumulate the gradient over batch. - for k in model: - grad_buffer[k] += grad[k] - running_reward = (reward_sum if running_reward is None else - running_reward * 0.99 + reward_sum * 0.01) - end_time = time.time() - print("Batch {} computed {} rollouts in {} seconds, " - "running mean is {}".format(batch_num, batch_size, - end_time - start_time, - running_reward)) - for k, v in model.items(): - g = grad_buffer[k] - rmsprop_cache[k] = ( - decay_rate * rmsprop_cache[k] + (1 - decay_rate) * g**2) - model[k] += learning_rate * g / (np.sqrt(rmsprop_cache[k]) + 1e-5) - # Reset the batch gradient buffer. - grad_buffer[k] = np.zeros_like(v) - batch_num += 1 diff --git a/doc/source/_static/img/thumbnails/default.png b/doc/source/_static/img/thumbnails/default.png new file mode 100644 index 0000000000000..233f8e605efca Binary files /dev/null and b/doc/source/_static/img/thumbnails/default.png differ diff --git a/doc/source/conf.py b/doc/source/conf.py index 63a9716538130..b7f706054dc51 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -12,10 +12,13 @@ # All configuration values have a default; values that are commented out # serve to show the default. +import glob +import shutil import sys import os import urllib -import shlex +sys.path.insert(0, os.path.abspath('.')) +from custom_directives import CustomGalleryItemDirective # These lines added to enable Sphinx to work without installing Ray. import mock @@ -67,13 +70,33 @@ # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. extensions = [ - 'sphinx.ext.autodoc', - 'sphinx.ext.viewcode', - 'sphinx.ext.napoleon', - 'sphinx_click.ext', - 'sphinx-jsonschema', + 'sphinx.ext.autodoc', 'sphinx.ext.viewcode', 'sphinx.ext.napoleon', + 'sphinx_click.ext', 'sphinx-jsonschema', 'sphinx_gallery.gen_gallery' ] +sphinx_gallery_conf = { + "examples_dirs": ["../examples"], # path to example scripts + "gallery_dirs": ["auto_examples"], # path where to save generated examples + "ignore_pattern": "../examples/doc_code/", + "plot_gallery": "False", + # "filename_pattern": "tutorial.py", + "backreferences_dir": False + # "show_memory': False, + # 'min_reported_time': False +} + +for i in range(len(sphinx_gallery_conf["examples_dirs"])): + gallery_dir = sphinx_gallery_conf["gallery_dirs"][i] + source_dir = sphinx_gallery_conf["examples_dirs"][i] + try: + os.mkdir(gallery_dir) + except OSError: + pass + + # Copy rst files from source dir to gallery dir. + for f in glob.glob(os.path.join(source_dir, '*.rst')): + shutil.copy(f, gallery_dir) + # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates'] @@ -95,7 +118,7 @@ # General information about the project. project = u'Ray' -copyright = u'2016, The Ray Team' +copyright = u'2019, The Ray Team' author = u'The Ray Team' # The version info for the project you're documenting, acts as replacement for @@ -123,6 +146,8 @@ # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. exclude_patterns = ['_build'] +exclude_patterns += sphinx_gallery_conf['examples_dirs'] +exclude_patterns += ["*/README.rst"] # The reST default role (used for this markup: `text`) to use for all # documents. @@ -354,5 +379,10 @@ def update_context(app, pagename, templatename, context, doctree): pagename) +# see also http://searchvoidstar.tumblr.com/post/125486358368/making-pdfs-from-markdown-on-readthedocsorg-using + + def setup(app): app.connect('html-page-context', update_context) + # Custom directives + app.add_directive('customgalleryitem', CustomGalleryItemDirective) diff --git a/doc/source/custom_directives.py b/doc/source/custom_directives.py new file mode 100644 index 0000000000000..4d7e847784e2c --- /dev/null +++ b/doc/source/custom_directives.py @@ -0,0 +1,94 @@ +# Originally from: +# github.com/pytorch/tutorials/blob/60d6ef365e36f3ba82c2b61bf32cc40ac4e86c7b/custom_directives.py # noqa +from docutils.parsers.rst import Directive, directives +from docutils.statemachine import StringList +from docutils import nodes +import os +import sphinx_gallery + +try: + FileNotFoundError +except NameError: + FileNotFoundError = IOError + +GALLERY_TEMPLATE = """ +.. raw:: html + +