|
| 1 | +--- |
| 2 | +title: "Dask for parallel data operations" |
| 3 | +teaching: 50 |
| 4 | +exercises: 25 |
| 5 | +questions: |
| 6 | +- "How can I parallelise common library operations across multiple cores and nodes?" |
| 7 | +keypoints: |
| 8 | +- "Dask will parallelise across as many resources as it is asked to" |
| 9 | +- "Dask also has drop-in replacements for many other common |
| 10 | +operations, e.g. in scikit-learn" |
| 11 | +--- |
| 12 | + |
| 13 | +While the strategies we've discussed so far can improve the |
| 14 | +performance of your software on a single node, there will always be |
| 15 | +cases where you require more resources than a single node can |
| 16 | +provide. For example, a single iteration of your algorithm may take |
| 17 | +longer than the wall-time limits of the machine, or your algorithm may |
| 18 | +be sequential, so that you can't split it and run each step |
| 19 | +separately. You may also find that for very large problems, the memory |
| 20 | +capacity of the node becomes a limiting factor; in this case, using |
| 21 | +more than one node is not only necessary for to achieve a speedup, but |
| 22 | +to even run the problem at all. |
| 23 | + |
| 24 | +Once we expand our programs beyond running on a single node, then we |
| 25 | +have more concerns to consider. The main one of these is communication |
| 26 | +between the nodes: each node needs to be aware of what it needs to do, |
| 27 | +and to be kept updated on any information from the other nodes |
| 28 | +necessary to keep doing this. A confounding factor is that |
| 29 | +communication between nodes is in most cases significantly slower than |
| 30 | +communicating between processes or threads running on the same node, |
| 31 | +so many algorithms will pay a significant performance penalty for |
| 32 | +running across multiple nodes. |
| 33 | + |
| 34 | +Two commonly-encountered models of parallelism are: |
| 35 | + |
| 36 | +* The co-ordinator–worker pattern, where one process or node |
| 37 | + (the co-ordinator) has a privileged state and allocates work for |
| 38 | + other processes (the workers). This has the advantages that it is |
| 39 | + relatively conceptually simple to adapt an existing program |
| 40 | + to—find the expensive bits of computation that do not have |
| 41 | + data dependencies, and farm those out to workers. In many cases this |
| 42 | + can also make load balancing easier—if one process has a |
| 43 | + panoptic view of what every process is doing, it can make sure that |
| 44 | + each has a roughly similar amount of work to do, avoiding the |
| 45 | + situation where some workers finish their work much earlier and |
| 46 | + waste their time idling rather than picking up work from other |
| 47 | + processes still working. A disadvantage is that it doesn't scale |
| 48 | + well to large numbers of processes—once you have thousands of |
| 49 | + processes running, then the co-ordinator may not be able to keep up |
| 50 | + with giving them work, typically because there is not enough |
| 51 | + bandwidth to send the data each process needs and receive the |
| 52 | + result. |
| 53 | +* A more "distributed", "collectivist" or "flat" approach, where all |
| 54 | + processes and nodes have the same status, and know which other |
| 55 | + processes need to be sent data and be received from. Libraries for |
| 56 | + this approach provide collective operations for aspects that require |
| 57 | + global communications, such as global sums. The advantage here is |
| 58 | + that the full bandwidth of the network can be used, rather than |
| 59 | + relying on the links to and from a co-ordinator process. The |
| 60 | + trade-off is that it can be more conceptually difficult to |
| 61 | + understand how to adapt a program to this approach, and how to |
| 62 | + understand what an existing program taking this approach is doing. |
| 63 | + |
| 64 | + |
| 65 | +## Introducing Dask |
| 66 | + |
| 67 | +Dask is a library that takes functionality from a number of popular |
| 68 | +libraries used for scientific computing in Python, including Numpy, |
| 69 | +Pandas, and scikit-learn, and extends them to run in parallel across a |
| 70 | +variety of different parallelisation setups. This includes multiple |
| 71 | +threads or multiple processes on the same node, as well as using |
| 72 | +multiple nodes. |
| 73 | + |
| 74 | +For today, we're going to jump straight to the most advanced case and |
| 75 | +look at how we can use it to run across multiple nodes on an HPC |
| 76 | +cluster. |
| 77 | + |
| 78 | +While multi-node support is built in to Dask, we will use the |
| 79 | +`dask-mpi` package to help Dask interact with Slurm to create the |
| 80 | +right number of processes. Since we need to install extra packages, we |
| 81 | +first need to load the environment that we created earlier today. |
| 82 | + |
| 83 | +~~~ |
| 84 | +$ module load anaconda/2019.03 |
| 85 | +$ source activate scw_test |
| 86 | +~~~ |
| 87 | +{: .language-bash} |
| 88 | + |
| 89 | +Now, since we are going to use the MPI libraries that are installed on |
| 90 | +Sunbird, we can't use Conda to install `dask-mpi`, since this will |
| 91 | +instead use Anaconda's own version of MPI, which doesn't work |
| 92 | +properly. Instead, we need to load the MPI libraries, and then install |
| 93 | +`dask-mpi` with `pip`. |
| 94 | + |
| 95 | +~~~ |
| 96 | +$ module load compiler/intel/2019/5 mpi/intel/2019/5 |
| 97 | +$ pip install mpi4py dask-mpi dask-ml |
| 98 | +~~~ |
| 99 | +{: .language-bash} |
| 100 | + |
| 101 | + |
| 102 | +## The Message Passing Interface |
| 103 | + |
| 104 | +While there isn't time to talk in detail about MPI (the Message |
| 105 | +Passing Interface) today, it's worth talking a little about how it |
| 106 | +works so that we can better understand how Dask interacts with it. |
| 107 | + |
| 108 | +With MPI, multiple copies of the same program (_ranks_ in the MPI |
| 109 | +jargon) are run, and these communicate with each other to share |
| 110 | +work. These are usually run with a utility called `mpirun` (or |
| 111 | +sometimes `mpiexec`), which starts many copies of a given program on |
| 112 | +the specified nodes. |
| 113 | + |
| 114 | +On a cluster, `mpirun` is integrated with the scheduler |
| 115 | +(e.g. Slurm), so knows which nodes to place processes on |
| 116 | +automatically. Many clusters provide their own wrappers for `mpirun` |
| 117 | +to make this integration work better; in Slurm, this utility is |
| 118 | +`srun`. (You may remember we used `srun` with GNU Parallel to place |
| 119 | +processes on appropriate nodes; this was a special case where we only |
| 120 | +wanted a single process at a time.) |
| 121 | + |
| 122 | +More than one process can run per node—sometimes |
| 123 | +it makes sense to run as many processes as there are CPU cores, since |
| 124 | +then you don't also need to think about other sorts of parallelism. |
| 125 | +Programs that are aware of MPI will then talk to the MPI library |
| 126 | +(which `mpirun` makes available to them) to get enough information |
| 127 | +about the number and placement of processes to be able to do their |
| 128 | +work. |
| 129 | + |
| 130 | +While MPI by default uses a collectivist approach to its |
| 131 | +communications, Dask's model is a co-ordinator–worker one. To |
| 132 | +translate between these, `dask-mpi` finds the number of ranks from |
| 133 | +MPI, and does the following with them: |
| 134 | + |
| 135 | +* the first rank becomes the co-ordinator for the team of workers, |
| 136 | +* the second rank continues executing the program in serial, and |
| 137 | +* any remaining ranks become workers, and do any work that is |
| 138 | + submitted to them from the second rank. |
| 139 | + |
| 140 | +It then ignores MPI completely, using Dask's built-in communications |
| 141 | +methods to communicate between the processes instead. MPI is only used |
| 142 | +to create the right number of processes, and communicate that number |
| 143 | +to Dask. |
| 144 | + |
| 145 | +If we are programming in parallel effectively, most of our computation |
| 146 | +time will be spent having the workers do the work. If each piece of |
| 147 | +work is quite expensive, then this may mean that both the co-ordinator |
| 148 | +and the serial part of the work sit idling. Since in general we want |
| 149 | +each process to use a full node, then these processes can end up |
| 150 | +wasting two full nodes of computational resource that could have been |
| 151 | +getting us our results faster. Anticipating this, we can start two |
| 152 | +more processes than we have nodes, so that the workers can do some |
| 153 | +work on the same nodes as the co-ordinator and serial process sit. |
| 154 | + |
| 155 | +So, to get Dask working with MPI under Slurm, we need two things: |
| 156 | + |
| 157 | +* a Python program that uses `dask-mpi` to get information from MPI |
| 158 | + about the setup it is running under, and |
| 159 | +* a job script that uses `srun` to launch the right number of these |
| 160 | + programs |
| 161 | + |
| 162 | +Let's look at these in turn. Firstly, the job script to submit this |
| 163 | +workload to the cluster looks like this: |
| 164 | + |
| 165 | +~~~ |
| 166 | +#!/bin/bash --login |
| 167 | +### |
| 168 | +# Output file location |
| 169 | +#SBATCH --output dasktest.out.%J |
| 170 | +# Dask outputs a lot of debug information to stderr |
| 171 | +# so let's direct that to a separate file |
| 172 | +#SBATCH --error=dasktest.err.%J |
| 173 | +# For now, let's use two nodes, with one |
| 174 | +#SBATCH --nodes=2 |
| 175 | +#SBATCH --ntasks=2 |
| 176 | +# We want to use the full node |
| 177 | +#SBATCH --cpus-per-task=40 |
| 178 | +#SBATCH --exclusive |
| 179 | +# Time limit for this job |
| 180 | +#SBATCH --time 00:10:00 |
| 181 | +# specify our current project |
| 182 | +# change this for your own work |
| 183 | +#SBATCH --account=scw1389 |
| 184 | +# specify the reservation we have for the training workshop |
| 185 | +# remove this for your own work |
| 186 | +# replace XX with the code provided by your instructor |
| 187 | +#SBATCH --reservation=scw1389_XX |
| 188 | +### |
| 189 | +
|
| 190 | +# Get MPI and Anaconda ready to use |
| 191 | +module load anaconda/2019.03 compiler/intel/2019/5 mpi/intel/2019/5 |
| 192 | +source activate scw_test |
| 193 | +
|
| 194 | +# Get Slurm to run the Python program that uses Dask |
| 195 | +srun --overcommit \ |
| 196 | + --distribution=cyclic \ |
| 197 | + --nodes=${SLURM_NNODES} \ |
| 198 | + --ntasks=$[SLURM_NTASKS+2] \ |
| 199 | + --cpus-per-task=${SLURM_CPUS_PER_TASK} \ |
| 200 | + python dasktest.py |
| 201 | +~~~ |
| 202 | +{: .bash} |
| 203 | + |
| 204 | +While the `#SBATCH` directives are explained inline, in the call to |
| 205 | +`srun`, the meaning of the flags is: |
| 206 | + |
| 207 | +* `--overcommit`: run multiple MPI ranks on a single core/slot |
| 208 | +* `--distribution=cyclic`: instead of placing 0 and 1 on the same node, |
| 209 | + then 2 and 3, place 0 and N-2 on the first node, 1 and N-1 on the |
| 210 | + second, so that workers overlap with the co-ordinator and serial |
| 211 | + process |
| 212 | +* `--nodes=${SLURM_NNODES}` and |
| 213 | + `--cpus-per-task=${SLURM_CPUS_PER_TASK}` - get these variables from |
| 214 | + Slurm |
| 215 | +* `--ntasks=$[SLURM_NTASKS+2]` - get the number of tasks in Slurm, |
| 216 | + then add the 2 extra |
| 217 | + |
| 218 | + |
| 219 | +Next, the program itself. This is a Dask version of [a Scikit-learn |
| 220 | +example][scikit-learn example] making use of the `GridSearchCV` function. |
| 221 | + |
| 222 | +~~~ |
| 223 | +from os import environ |
| 224 | +from datetime import datetime |
| 225 | +
|
| 226 | +from dask_mpi import initialize |
| 227 | +from distributed import Client |
| 228 | +
|
| 229 | +from sklearn import datasets |
| 230 | +from sklearn.model_selection import train_test_split |
| 231 | +
|
| 232 | +# Get the Dask version of GridSearchCV |
| 233 | +from dask_ml.model_selection import GridSearchCV |
| 234 | +
|
| 235 | +from sklearn.metrics import classification_report |
| 236 | +from sklearn.svm import SVC |
| 237 | +
|
| 238 | +
|
| 239 | +def run_test(client): |
| 240 | + print(__doc__) |
| 241 | +
|
| 242 | + # Loading the Digits dataset |
| 243 | + digits = datasets.load_digits() |
| 244 | +
|
| 245 | + # To apply an classifier on this data, we need to flatten the image, to |
| 246 | + # turn the data in a (samples, feature) matrix: |
| 247 | + n_samples = len(digits.images) |
| 248 | + X = digits.images.reshape((n_samples, -1)) |
| 249 | + y = digits.target |
| 250 | +
|
| 251 | + # Split the dataset in two equal parts |
| 252 | + X_train, X_test, y_train, y_test = train_test_split( |
| 253 | + X, y, test_size=0.5, random_state=0) |
| 254 | +
|
| 255 | + # Set the parameters by cross-validation |
| 256 | + tuned_parameters = [{'kernel': ['rbf'], 'gamma': [1e-3, 1e-4], |
| 257 | + 'C': [1, 10, 100, 1000]}, |
| 258 | + {'kernel': ['linear'], 'C': [1, 10, 100, 1000]}] |
| 259 | +
|
| 260 | + scores = ['precision', 'recall'] |
| 261 | +
|
| 262 | + for score in scores: |
| 263 | + print("# Tuning hyper-parameters for %s" % score) |
| 264 | + print() |
| 265 | +
|
| 266 | + # scheduler=client makes sure that Dask uses the correct communications |
| 267 | + clf = GridSearchCV( |
| 268 | + SVC(), tuned_parameters, scoring='%s_macro' % score, |
| 269 | + scheduler=client |
| 270 | + ) |
| 271 | + clf.fit(X_train, y_train) |
| 272 | +
|
| 273 | + print("Best parameters set found on development set:") |
| 274 | + print() |
| 275 | + print(clf.best_params_) |
| 276 | + print() |
| 277 | + print("Grid scores on development set:") |
| 278 | + print() |
| 279 | + means = clf.cv_results_['mean_test_score'] |
| 280 | + stds = clf.cv_results_['std_test_score'] |
| 281 | + for mean, std, params in zip(means, stds, clf.cv_results_['params']): |
| 282 | + print("%0.3f (+/-%0.03f) for %r" |
| 283 | + % (mean, std * 2, params)) |
| 284 | + print() |
| 285 | +
|
| 286 | + print("Detailed classification report:") |
| 287 | + print() |
| 288 | + print("The model is trained on the full development set.") |
| 289 | + print("The scores are computed on the full evaluation set.") |
| 290 | + print() |
| 291 | + y_true, y_pred = y_test, clf.predict(X_test) |
| 292 | + print(classification_report(y_true, y_pred)) |
| 293 | + print() |
| 294 | +
|
| 295 | + # Note the problem is too easy: the hyperparameter plateau is too flat and |
| 296 | + # the output model is the same for precision and recall with ties in |
| 297 | + # quality. |
| 298 | +
|
| 299 | +
|
| 300 | +def main(): |
| 301 | + # Work out from the environment how many threads to allocate |
| 302 | + num_threads = int(environ.get( |
| 303 | + 'SLURM_CPUS_PER_TASK', |
| 304 | + environ.get('OMP_NUM_THREADS', 1) |
| 305 | + )) |
| 306 | +
|
| 307 | + # Create the Dask workers |
| 308 | + initialize(interface='ib0', nthreads=num_threads) |
| 309 | +
|
| 310 | + # Create the Dask object that will manage the communications |
| 311 | + client = Client() |
| 312 | +
|
| 313 | + start = datetime.now() |
| 314 | + run_test(client=client) |
| 315 | + end = datetime.now() |
| 316 | +
|
| 317 | + print("Time taken: {end - start}") |
| 318 | +
|
| 319 | +
|
| 320 | +if __name__ == '__main__': |
| 321 | + main() |
| 322 | +~~~ |
| 323 | +{: .language-python} |
| 324 | + |
| 325 | + |
| 326 | + |
| 327 | +[scikit-learn example]: https://scikit-learn.org/stable/auto_examples/model_selection/plot_grid_search_digits.html#sphx-glr-auto-examples-model-selection-plot-grid-search-digits-py |
0 commit comments