Skip to content

Commit

Permalink
Revert "[runtime env] runtime env inheritance refactor (ray-project#2…
Browse files Browse the repository at this point in the history
…2244)"

This reverts commit 5783cdb.
  • Loading branch information
xwjiang2010 committed Feb 24, 2022
1 parent e15a419 commit 8679377
Show file tree
Hide file tree
Showing 70 changed files with 9,532 additions and 169 deletions.
1 change: 1 addition & 0 deletions doc/source/auto_examples/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:orphan:
Binary file not shown.
Binary file added doc/source/auto_examples/auto_examples_python.zip
Binary file not shown.
259 changes: 259 additions & 0 deletions doc/source/auto_examples/dask_xgboost/dask_xgboost.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%matplotlib inline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n# XGBoost-Ray with Dask\n\nThis notebook includes an example workflow using\n`XGBoost-Ray <https://docs.ray.io/en/latest/xgboost-ray.html>`_ and\n`Dask <https://docs.dask.org/en/latest/>`_ for distributed model training,\nhyperparameter optimization, and prediction.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Cluster Setup\n\nFirst, we'll set up our Ray Cluster. The provided ``dask_xgboost.yaml``\ncluster config can be used to set up an AWS cluster with 64 CPUs.\n\nThe following steps assume you are in a directory with both\n``dask_xgboost.yaml`` and this file saved as ``dask_xgboost.ipynb``.\n\n**Step 1:** Bring up the Ray cluster.\n\n.. code-block:: bash\n\n $ pip install ray boto3\n $ ray up dask_xgboost.yaml\n\n**Step 2:** Move ``dask_xgboost.ipynb`` to the cluster and start Jupyter.\n\n.. code-block:: bash\n\n $ ray rsync_up dask_xgboost.yaml \"./dask_xgboost.ipynb\" \\\n \"~/dask_xgboost.ipynb\"\n $ ray exec dask_xgboost.yaml --port-forward=9999 \"jupyter notebook \\\n --port=9999\"\n\nYou can then access this notebook at the URL that is output:\n``http://localhost:9999/?token=<token>``\n\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Python Setup\n\nFirst, we'll import all the libraries we'll be using. This step also helps us\nverify that the environment is configured correctly. If any of the imports\nare missing, an exception will be raised.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import argparse\nimport time\n\nimport dask\nimport dask.dataframe as dd\nfrom xgboost_ray import RayDMatrix, RayParams, train, predict\n\nimport ray\nfrom ray import tune\nfrom ray.util.dask import ray_dask_get"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, let's parse some arguments. This will be used for executing the ``.py``\nfile, but not for the ``.ipynb``. If you are using the interactive notebook,\nyou can directly override the arguments manually.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"parser = argparse.ArgumentParser()\nparser.add_argument(\n \"--address\", type=str, default=\"auto\", help=\"The address to use for Ray.\")\nparser.add_argument(\n \"--smoke-test\",\n action=\"store_true\",\n help=\"Read a smaller dataset for quick testing purposes.\")\nparser.add_argument(\n \"--num-actors\",\n type=int,\n default=4,\n help=\"Sets number of actors for training.\")\nparser.add_argument(\n \"--cpus-per-actor\",\n type=int,\n default=6,\n help=\"The number of CPUs per actor for training.\")\nparser.add_argument(\n \"--num-actors-inference\",\n type=int,\n default=16,\n help=\"Sets number of actors for inference.\")\nparser.add_argument(\n \"--cpus-per-actor-inference\",\n type=int,\n default=2,\n help=\"The number of CPUs per actor for inference.\")\n# Ignore -f from ipykernel_launcher\nargs, _ = parser.parse_known_args()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Override these arguments as needed:\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"address = args.address\nsmoke_test = args.smoke_test\nnum_actors = args.num_actors\ncpus_per_actor = args.cpus_per_actor\nnum_actors_inference = args.num_actors_inference\ncpus_per_actor_inference = args.cpus_per_actor_inference"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Connecting to the Ray cluster\nNow, let's connect our Python script to this newly deployed Ray cluster!\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"if not ray.is_initialized():\n ray.init(address=address)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Data Preparation\nWe will use the `HIGGS dataset from the UCI Machine Learning dataset\nrepository <https://archive.ics.uci.edu/ml/datasets/HIGGS>`_. The HIGGS\ndataset consists of 11,000,000 samples and 28 attributes, which is large\nenough size to show the benefits of distributed computation.\n\nWe set the Dask scheduler to ``ray_dask_get`` to use `Dask on Ray\n<https://docs.ray.io/en/latest/data/dask-on-ray.html>`_ backend.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"LABEL_COLUMN = \"label\"\nif smoke_test:\n # Test dataset with only 10,000 records.\n FILE_URL = \"https://ray-ci-higgs.s3.us-west-2.amazonaws.com/simpleHIGGS\" \\\n \".csv\"\nelse:\n # Full dataset. This may take a couple of minutes to load.\n FILE_URL = \"https://archive.ics.uci.edu/ml/machine-learning-databases\" \\\n \"/00280/HIGGS.csv.gz\"\ncolnames = [LABEL_COLUMN] + [\"feature-%02d\" % i for i in range(1, 29)]\ndask.config.set(scheduler=ray_dask_get)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"load_data_start_time = time.time()\n\ndata = dd.read_csv(FILE_URL, names=colnames)\ndata = data[sorted(colnames)]\ndata = data.persist()\n\nload_data_end_time = time.time()\nload_data_duration = load_data_end_time - load_data_start_time\nprint(f\"Dataset loaded in {load_data_duration} seconds.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"With the connection established, we can now create the Dask dataframe.\n\nWe will split the data into a training set and a evaluation set using a 80-20\nproportion.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"train_df, eval_df = data.random_split([0.8, 0.2])\ntrain_df, eval_df = train_df.persist(), eval_df.persist()\nprint(train_df, eval_df)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Distributed Training\nThe ``train_xgboost`` function contains all of the logic necessary for\ntraining using XGBoost-Ray.\n\nDistributed training can not only speed up the process, but also allow you\nto use datasets that are to large to fit in memory of a single node. With\ndistributed training, the dataset is sharded across different actors\nrunning on separate nodes. Those actors communicate with each other to\ncreate the final model.\n\nFirst, the dataframes are wrapped in ``RayDMatrix`` objects, which handle\ndata sharding across the cluster. Then, the ``train`` function is called.\nThe evaluation scores will be saved to ``evals_result`` dictionary. The\nfunction returns a tuple of the trained model (booster) and the evaluation\nscores.\n\nThe ``ray_params`` variable expects a ``RayParams`` object that contains\nRay-specific settings, such as the number of workers.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def train_xgboost(config, train_df, test_df, target_column, ray_params):\n train_set = RayDMatrix(train_df, target_column)\n test_set = RayDMatrix(test_df, target_column)\n\n evals_result = {}\n\n train_start_time = time.time()\n\n # Train the classifier\n bst = train(\n params=config,\n dtrain=train_set,\n evals=[(test_set, \"eval\")],\n evals_result=evals_result,\n ray_params=ray_params)\n\n train_end_time = time.time()\n train_duration = train_end_time - train_start_time\n print(f\"Total time taken: {train_duration} seconds.\")\n\n model_path = \"model.xgb\"\n bst.save_model(model_path)\n print(\"Final validation error: {:.4f}\".format(\n evals_result[\"eval\"][\"error\"][-1]))\n\n return bst, evals_result"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can now pass our Dask dataframes and run the function. We will use\n``RayParams`` to specify that the number of actors and CPUs to train with.\n\nThe dataset has to be downloaded onto the cluster, which may take a few\nminutes.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# standard XGBoost config for classification\nconfig = {\n \"tree_method\": \"approx\",\n \"objective\": \"binary:logistic\",\n \"eval_metric\": [\"logloss\", \"error\"],\n}\n\nbst, evals_result = train_xgboost(\n config, train_df, eval_df, LABEL_COLUMN,\n RayParams(cpus_per_actor=cpus_per_actor, num_actors=num_actors))\nprint(f\"Results: {evals_result}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Hyperparameter optimization\nIf we are not content with the results obtained with default XGBoost\nparameters, we can use `Ray Tune\n<https://docs.ray.io/en/latest/tune/index.html>`_ for cutting-edge\ndistributed hyperparameter tuning. XGBoost-Ray automatically integrates\nwith Ray Tune, meaning we can use the same training function as before.\n\nIn this workflow, we will tune three hyperparameters - ``eta``, ``subsample``\nand ``max_depth``. We are using `Tune's samplers to define the search\nspace <https://docs.ray.io/en/latest/tune/user-guide.html#search-space-grid-random>`_.\n\nThe experiment configuration is done through ``tune.run``. We set the amount\nof resources each trial (hyperparameter combination) requires by using the\n``get_tune_resources`` method of ``RayParams``. The ``num_samples`` argument\ncontrols how many trials will be ran in total. In the end, the best\ncombination of hyperparameters evaluated during the experiment will be\nreturned.\n\nBy default, Tune will use simple random search. However, Tune also\nprovides various `search algorithms\n<https://docs.ray.io/en/latest/tune/api_docs/suggestion.html>`_ and\n`schedulers <https://docs.ray.io/en/latest/tune/api_docs/schedulers.html>`_\nto further improve the optimization process.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def tune_xgboost(train_df, test_df, target_column):\n # Set XGBoost config.\n config = {\n \"tree_method\": \"approx\",\n \"objective\": \"binary:logistic\",\n \"eval_metric\": [\"logloss\", \"error\"],\n \"eta\": tune.loguniform(1e-4, 1e-1),\n \"subsample\": tune.uniform(0.5, 1.0),\n \"max_depth\": tune.randint(1, 9)\n }\n\n ray_params = RayParams(\n max_actor_restarts=1,\n cpus_per_actor=cpus_per_actor,\n num_actors=num_actors)\n\n tune_start_time = time.time()\n\n analysis = tune.run(\n tune.with_parameters(\n train_xgboost,\n train_df=train_df,\n test_df=test_df,\n target_column=target_column,\n ray_params=ray_params),\n # Use the `get_tune_resources` helper function to set the resources.\n resources_per_trial=ray_params.get_tune_resources(),\n config=config,\n num_samples=10,\n metric=\"eval-error\",\n mode=\"min\")\n\n tune_end_time = time.time()\n tune_duration = tune_end_time - tune_start_time\n print(f\"Total time taken: {tune_duration} seconds.\")\n\n accuracy = 1. - analysis.best_result[\"eval-error\"]\n print(f\"Best model parameters: {analysis.best_config}\")\n print(f\"Best model total accuracy: {accuracy:.4f}\")\n\n return analysis.best_config"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Hyperparameter optimization may take some time to complete.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"tune_xgboost(train_df, eval_df, LABEL_COLUMN)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prediction\nWith the model trained, we can now predict on unseen data. For the\npurposes of this example, we will use the same dataset for prediction as\nfor training.\n\nSince prediction is naively parallelizable, distributing it over multiple\nactors can measurably reduce the amount of time needed.\n\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"inference_df = RayDMatrix(data, ignore=[LABEL_COLUMN, \"partition\"])\nresults = predict(\n bst,\n inference_df,\n ray_params=RayParams(\n cpus_per_actor=cpus_per_actor_inference,\n num_actors=num_actors_inference))\n\nprint(results)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Loading

0 comments on commit 8679377

Please sign in to comment.