-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathREADME.Rmd
315 lines (252 loc) · 17.8 KB
/
README.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
---
output: github_document
---
```{r, include = FALSE}
knitr::opts_chunk$set(
eval = FALSE,
collapse = TRUE,
comment = "#>",
fig.path = "man/figures/README-",
out.width = "100%"
)
```
# crew.aws.batch: a crew launcher plugin for AWS Batch <img src='man/figures/logo-readme.png' align="right" height="139"/>
[](https://CRAN.R-project.org/package=crew.aws.batch)
[](https://www.repostatus.org/#Active)
[](https://github.com/wlandau/crew.aws.batch/actions?query=workflow%3Acheck)
[](https://app.codecov.io/gh/wlandau/crew.aws.batch)
[](https://github.com/wlandau/crew.aws.batch/actions?query=workflow%3Alint)
[](https://github.com/wlandau/crew.aws.batch/actions?query=workflow%3Apkgdown)
In computationally demanding analysis projects, statisticians and data scientists asynchronously deploy long-running tasks to distributed systems, ranging from traditional clusters to cloud services. The `crew.aws.batch` package extends the [`mirai`](https://github.com/shikokuchuo/mirai)-powered 'crew' package with a worker launcher plugin for [AWS Batch](https://aws.amazon.com/batch/). Inspiration also comes from packages [`mirai`](https://github.com/shikokuchuo/mirai), [`future`](https://future.futureverse.org/), [`rrq`](https://mrc-ide.github.io/rrq/), [`clustermq`](https://mschubert.github.io/clustermq/), and [`batchtools`](https://mllg.github.io/batchtools/).
# Installation
Type | Source | Command
---|---|---
Release | CRAN | `install.packages("crew.aws.batch")`
Development | GitHub | `remotes::install_github("wlandau/crew.aws.batch")`
Development | R-universe | `install.packages("crew.aws.batch", repos = "https://wlandau.r-universe.dev")`
# Documentation
Please see <https://wlandau.github.io/crew.aws.batch/> for documentation, including a full function reference and usage tutorial.
# Prerequisites
`crew.aws.batch` launches [AWS Batch](https://aws.amazon.com/batch/) jobs to run [`crew`](https://wlandau.github.io/crew/) workers. This comes with a set of special requirements:
1. Understand [AWS Batch](https://aws.amazon.com/batch/) and its [official documentation](https://aws.amazon.com/batch/).
2. Your [job definitions](https://docs.aws.amazon.com/batch/latest/userguide/job_definitions.html) must each have [Docker](https://www.docker.com/)-compatible container image with R and `crew.aws.batch` installed. You may wish to inherit from an existing [rocker](https://github.com/rocker-org/rocker-versioned2) image.
3. At minimum, for the launcher plugin to work, your [IAM policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies.html) need permission to [submit](https://docs.aws.amazon.com/batch/latest/APIReference/API_SubmitJob.html) and [terminate](https://docs.aws.amazon.com/batch/latest/APIReference/API_TerminateJob.html) jobs. To appropriately monitor jobs, your policies also need permission to [list](https://docs.aws.amazon.com/batch/latest/APIReference/API_ListJobs.html) and [describe](https://docs.aws.amazon.com/batch/latest/APIReference/API_DescribeJobs.html) jobs. In addition, managing job definitions as described below requires permission to [register](https://docs.aws.amazon.com/batch/latest/APIReference/API_RegisterJobDefinition.html), [deregister](https://docs.aws.amazon.com/batch/latest/APIReference/API_DeregisterJobDefinition.html), and [describe](https://docs.aws.amazon.com/batch/latest/APIReference/API_DescribeJobDefinitions.html) job definitions. To view [CloudWatch](https://aws.amazon.com/cloudwatch/) logs, you need permission to [get log events](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_GetLogEvents.html).
4. In the [compute environment](https://docs.aws.amazon.com/batch/latest/userguide/compute_environments.html), the [security group](https://docs.aws.amazon.com/vpc/latest/userguide/security-groups.html) must permit all inbound and outbound TCP traffic within itself.^[If you already know the TCP port you will supply to `port` argument of `crew_controller_aws_batch()`, you can restrict the port range to only use that port number.] The controller and the workers must run in this security group so they can communicate within the firewalled local network.^[Please read about the [risks](https://wlandau.github.io/crew/articles/risks.html) and keep TLS encryption turned on (default: `tls = crew_tls(mode = "automatic")`). Please understand and comply with all the security policies of your organization.] If your security group ID is `sg-00000` and belongs to [VPC](https://aws.amazon.com/vpc/) `vpc-00000`, then your inbound and outbound rules may look something like this:


```{r}
client <- paws.compute::ec2()
groups <- client$describe_security_groups(GroupIds = "sg-00000")
str(groups$SecurityGroups[[1L]])
#> List of 8
#> $ Description : chr "Allow TCP traffic on ephemeral ports"
#> $ GroupName : chr "self-pointing-group"
#> $ IpPermissions :List of 1
#> ..$ :List of 7
#> .. ..$ FromPort : num 1024
#> .. ..$ IpProtocol : chr "tcp"
#> .. ..$ IpRanges : list()
#> .. ..$ Ipv6Ranges : list()
#> .. ..$ PrefixListIds : list()
#> .. ..$ ToPort : num 65535
#> .. ..$ UserIdGroupPairs:List of 1
#> .. .. ..$ :List of 7
#> .. .. .. ..$ Description : chr "Accept traffic from other jobs in group."
#> .. .. .. ..$ GroupId : chr "sg-00000"
#> .. .. .. ..$ GroupName : chr(0)
#> .. .. .. ..$ PeeringStatus : chr(0)
#> .. .. .. ..$ UserId : chr "CENSORED"
#> .. .. .. ..$ VpcId : chr(0)
#> .. .. .. ..$ VpcPeeringConnectionId: chr(0)
#> $ OwnerId : chr "CENSORED"
#> $ GroupId : chr "sg-00000"
#> $ IpPermissionsEgress:List of 1
#> ..$ :List of 7
#> .. ..$ FromPort : num 1024
#> .. ..$ IpProtocol : chr "tcp"
#> .. ..$ IpRanges : list()
#> .. ..$ Ipv6Ranges : list()
#> .. ..$ PrefixListIds : list()
#> .. ..$ ToPort : num 65535
#> .. ..$ UserIdGroupPairs:List of 1
#> .. .. ..$ :List of 7
#> .. .. .. ..$ Description : chr "Allow traffic to other jobs in group."
#> .. .. .. ..$ GroupId : chr "sg-00000"
#> .. .. .. ..$ GroupName : chr(0)
#> .. .. .. ..$ PeeringStatus : chr(0)
#> .. .. .. ..$ UserId : chr "CENSORED"
#> .. .. .. ..$ VpcId : chr(0)
#> .. .. .. ..$ VpcPeeringConnectionId: chr(0)
#> $ Tags : list()
#> $ VpcId : chr "vpc-00000"
```
# Managing job definitions
Before submitting jobs, AWS Batch requires a job definition to describe the container image and resource requirements. You can do this through the AWS web console, the AWS command line interface (CLI), a software development kit (SDK) like the `paws` R package, or the job definition class in `crew.aws.batch`. For `crew.aws.batch`, first create a job definition object.
```r
definition <- crew_definition_aws_batch(
job_definition = "YOUR_JOB_DEFINITION_NAME",
job_queue = "YOUR_JOB_QUEUE_NAME"
)
```
The job definition may or may not exist at this point. If it does not exist, you can register with `register()`, an oversimplified limited-scope method which creates container-based job definitions with the `"awslogs"` log driver (for CloudWatch).^[The log group supplied to `crew_monitor_aws_batch()` must be valid. The default is `"/aws/batch/log"`, which may not exist if your system administrator has a custom logging policy.] Below, your container image can be as simple as a Docker Hub identifier (like `"alpine:latest:`) or a full URI of an ECR image.^[For the `crew` controller, you will definitely want an image with R and `crew` installed. For the purposes of testing the monitor, `"alpine:latest"` will work.]
```{r}
definition$register(
image = "AWS_ACCOUNT_ID.dkr.ecr.AWS_REGION.amazonaws.com/ECR_REPOSITORY_NAME:IMAGE_TAG",
platform_capabilities = "EC2",
memory_units = "gigabytes",
memory = 8,
cpus = 2
)
#> # A tibble: 1 × 3
#> name revision arn
#> <chr> <int> <chr>
#> 1 YOUR_JOB_DEFINITION_NAME 81 arn:aws:batch:us-east-1:CENSORED:jo…
```
The `describe()` method shows information about current and past revisions of the job definition. Set `active` to `TRUE` to see just the active revisions.
```{r}
definition$describe(active = TRUE)
#> # A tibble: 2 × 16
#> name arn revision status type scheduling_priority parameters
#> <chr> <chr> <int> <chr> <chr> <dbl> <list>
#> 1 YOUR_JOB_DEFIN… arn:… 82 active cont… 3 <list [0]>
#> 2 YOUR_JOB_DEFIN… arn:… 81 active cont… 3 <list [0]>
#> # ℹ 9 more variables: retry_strategy <list>, container_properties <list>,
#> # timeout <list>, node_properties <list>, tags <list>,
#> # propagate_tags <lgl>, platform_capabilities <chr>,
#> # eks_properties <list>, container_orchestration_type <chr>
```
Use `deregister()` to deregister a revision of a job definition. If a revision number is not supplied, then it defaults to the greatest active revision number.
```{r}
definition$deregister()
#> # A tibble: 1 × 16
#> name arn revision status type scheduling_priority parameters
#> <chr> <chr> <int> <chr> <chr> <dbl> <list>
#> 1 YOUR_JOB_DEFIN… arn:… 81 active cont… 3 <list [0]>
#> # ℹ 9 more variables: retry_strategy <list>, container_properties <list>,
#> # timeout <list>, node_properties <list>, tags <list>,
#> # propagate_tags <lgl>, platform_capabilities <chr>,
#> # eks_properties <list>, container_orchestration_type <chr>
```
# Monitoring and terminating jobs
With `crew.aws.batch`, your `crew` controller automatically submits jobs to AWS Batch. These jobs may fail or linger for any number of reasons, which could impede work and increase costs. So before you use `crew_controller_aws_batch()`, please learn how to monitor and terminate AWS Batch jobs manually.
`crew_monitor_aws_batch()` defines a "monitor" to help you manually list, inspect, and terminate jobs. You will need to supply a job definition name and a job queue name.
```{r}
monitor <- crew_monitor_aws_batch(
job_definition = "YOUR_JOB_DEFINITION_NAME",
job_queue = "YOUR_JOB_QUEUE_NAME"
)
```
You can submit individual AWS Batch jobs to test your computing environment.
```{r}
job1 <- monitor$submit(name = "job1", command = c("echo", "hello\nworld"))
job2 <- monitor$submit(name = "job2", command = c("echo", "job\nsubmitted"))
job2
#> # A tibble: 1 × 3
#> name id arn
#> <chr> <chr> <chr>
#> 1 job2 c38d55ad-4a86-4371-9994-6ea8882f5726 arn:aws:batch:us-east-2:0…
```
Method `status()` checks the status of an individual job.
```{r}
monitor$status(id = job2$id)
#> # A tibble: 1 × 8
#> name id arn status reason created started stopped
#> <chr> <chr> <chr> <chr> <chr> <dttm> <dbl> <dbl>
#> 1 job2 c38d55ad-4a86-43… arn:… runnable EMPTY... 2025-01-30 16:29:00 NA NA
```
The `jobs()` method gets the status of all the jobs within the job queue and job definition you originally supplied to `crew_monitor_aws_batch()`. This may include many more jobs than the ones you submitted during the life cycle of the current `monitor` object.
```{r}
monitor$jobs()[, c("name", "id", "arn", "status", "reason")]
#> # A tibble: 2 × 8
#> name id arn status reason
#> <chr> <chr> <chr> <chr> <chr>
#> 1 job1 653df636-ac74-43… arn:… succeeded Essen…
#> 2 job2 c38d55ad-4a86-43… arn:… runnable EMPTY. Either...
```
The [job state](https://docs.aws.amazon.com/batch/latest/userguide/job_states.html) can be `"submitted"`, `"pending"`, `"runnable"`, `"starting"`, `"running"`, `"succeeded"`, or `"failed"`. The monitor has a method for each job state to get only the jobs with that state.
```{r}
monitor$succeeded()[, c("name", "id", "arn", "status", "reason")]
#> # A tibble: 1 × 8
#> name id arn status reason
#> <chr> <chr> <chr> <chr> <chr>
#> 1 job1 653df636-ac74-43… arn:… succeeded EMPTY. Either...
```
In addition, there is an `active()` method for just states `"submitted"`, `"pending"`, `"runnable"`, `"starting"`, and `"running"`, and there is an `inactive()` method for just the `"succeeded"` and `"failed"` states.
```{r}
monitor$inactive()[, c("name", "id", "arn", "status", "reason")]
#> # A tibble: 1 × 8
#> name id arn status reason
#> <chr> <chr> <chr> <chr> <chr>
#> 1 job1 653df636-ac74-43… arn:… succeeded EMPTY. Either...
``
To terminate a job, use the `terminate()` method. This has the effect of both canceling and terminating the job, although you may not see the change right away if the job is currently `"runnable"`. Manually terminated jobs are listed as failed.
```{r}
monitor$terminate(id = job2$id)
```
To get the CloudWatch logs of a job, use the `log()` method. This method writes the log messages with `writeLines()` and invisibly return a `tibble` with timestamped results. Optionally set the `path` argument to `nullfile()` to avoid printing lines and just use the invisibly returned `tibble`.
```{r}
log <- monitor$log(id = job1$id)
#> hello
#> world
log
#> # A tibble: 2 × 3
#> message timestamp ingestion_time
#> <chr> <dbl> <dbl>
#> 1 hello 2025-01-30 16:29:00 2025-01-30 16:29:03
#> 2 world 2025-01-30 16:29:00 2025-01-30 16:29:03
```
# Using `crew` with AWS Batch workers
To start using `crew.aws.batch` in earnest, first create a controller object. Also supply the names of your job queue and job definition, as well as any optional flags and settings you may need. If you do not already have a job definition, the "monitor" object above can help you create one (see above).
```{r}
library(crew.aws.batch)
controller <- crew_controller_aws_batch(
name = "my_workflow", # for informative job names
workers = 16,
tasks_max = 2, # to avoid reaching wall time limits (if any exist)
seconds_launch = 600, # to allow a 10-minute startup window
seconds_idle = 60, # to release resources when they are not needed
processes = NULL, # See the "Asynchronous worker management" section below.
options_aws_batch = crew_options_aws_batch(
job_definition = "YOUR_JOB_DEFINITION_NAME",
job_queue = "YOUR_JOB_QUEUE_NAME",
cpus = 2,
gpus = 0,
# Launch workers with 4 GB memory, then 8 GB if the worker crashes,
# then 16 GB on all subsequent launches. Go back to 4 GB if the worker
# completes all its tasks before exiting.
memory = c(4, 8, 16),
memory_units = "gigabytes"
)
)
controller$start()
```
At this point, usage is exactly the same as basic [`crew`](https://wlandau.github.io/crew/). The `push()` method submits tasks and auto-scales [AWS Batch](https://aws.amazon.com/batch/) workers to meet demand.
```{r}
controller$push(name = "do work", command = do_work())
```
The `pop()` method retrieves available tasks.
```{r}
controller$pop()
#> # A tibble: 1 × 11
#> name command result seconds seed error trace warni…¹ launc…² worker insta…³
#> <chr> <chr> <list> <dbl> <int> <chr> <chr> <chr> <chr> <int> <chr>
#> 1 do work … do_work… <int> 0 1.56e8 NA NA NA 79e71c… 1 7686b2…
#> # … with abbreviated variable names ¹warnings, ²launcher, ³instance
```
Remember to terminate the controller when you are done.
```{r}
controller$terminate()
```
# Asynchronous worker management
HTTP requests to submit and terminate jobs may take up to 1 or 2 seconds, and this overhead may be burdensome if there are many workers. To run these requests asynchronously, set the `processes` argument of `crew_controller_aws_batch()` to the number of local `mirai` daemons you want to process the requests. These processes will start on `controller$start()` and end on `controller$terminate()` or when your local R session ends. `controller$launcher$async$errors()` shows the most recent error messages generated on launch or termination for all workers.
# Troubleshooting
`processes = NULL` disables async and makes launch/termination errors immediate and easier to see. You may also wish to set `options(paws.log_level = 3L)` to increase the verbosity of `paws` messages.
# Thanks
* [Charlie Gao](https://github.com/shikokuchuo) created [`mirai`](https://github.com/shikokuchuo/mirai) and [`nanonext`](https://github.com/shikokuchuo/nanonext) and graciously accommodated the complicated and demanding feature requests that made `crew` and its ecosystem possible.
* Thanks to [Henrik Bengtsson](https://github.com/HenrikBengtsson), [David Kretch](https://github.com/davidkretch), [Adam Banker](https://github.com/adambanker), and [Michael Schubert](https://github.com/mschubert) for edifying conversations about cloud computing in R.
# Code of Conduct
Please note that the `crew` project is released with a [Contributor Code of Conduct](https://github.com/wlandau/crew/blob/main/CODE_OF_CONDUCT.md). By contributing to this project, you agree to abide by its terms.
# Citation
```{r eval = TRUE, warning = FALSE, comment = ""}
citation("crew.aws.batch")
```