-
Notifications
You must be signed in to change notification settings - Fork 1
/
workflows.Rmd
228 lines (170 loc) · 7.31 KB
/
workflows.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
---
title: "SDSS 2018: Data Science Workflows"
author: "Jim Harner, West Virginia University"
date: "May 17, 2018"
output:
slidy_presentation: default
---
## The Data Science Process
The data science workflow or process starts with data extraction and end with a data produce. A common version of this process is illustrated by:
![The Data Science Process](workflow-figures/dsciProcess.png)
This diagram is taken from [Doing Data Science](http://shop.oreilly.com/product/0636920028529.do). Notice the flow along the top where data is collected, processed, cleaned, and explored. Many conflate data science with the box on the right, i.e., machine learning algorithms and statistical models, but it is much more that that. Ultimately, we want to make decisions or to create data products, an iterative process.
## Hadley's Tidyverse
Hadley Wickham has a more abbreviated and specific version given here: [R for Data Science](http://r4ds.had.co.nz/introduction.html#what-you-will-learn).
It focuses on:
- `tidyr` to make data tidy,
- `dplyr` for transforming the data,
- `ggplot2` for visualization, and others.
## Minimal Operational Components for Big Data
- R/RStudio: an environment for statistical computing and graphics; an integrated development environment for R.
- PostgreSQL: an object-relational database system.
- Hadoop: an open-source framework for large-scale data storage and distributed computing, built on the MapReduce model.
- Hive: a data warehouse for reading, writing, and managing large datasets residing in distributed storage using SQL.
- Spark: an analytics engine for large-scale data processing.
## Batch Architecture
![](workflow-figures/BatchArch.png)
See O'Reilly's [Fast Data Architectures for Streaming Applications](http://www.oreilly.com/data/free/fast-data-architectures-for-streaming-applications.csp)
## Streaming Architecture
![](workflow-figures/StreamArch.png)
## Commercial Implementatons
- [AWS](https://aws.amazon.com): compute, storage, database services, etc., including Elastic Container Service
- [Blue Data](https://www.bluedata.com): Spark and TensorFlow using containers and GPUs
- Cloudera
- Hortonworks
- [H2O](https://www.h2o.ai/): AutoML, AI with GPUs
- [IBM](https://www.ibm.com/analytics/)
- SAS Viya
## Rocker
A Dockerized versions of R called <[rocker](http://github.com/rocker-org/rocker)> is available. The Docker image `verse` adds the `rstudio`, `tidyverse`, and `verse` layers to the `r-ver` base image. The tidyverse can be spun up with the following bash command:
```
sudo docker run -p 8787:8787 rocker/verse
```
Looks for the image locally and if not found, looks on hub.docker.com.
## RSpark
`rspark` extends `rocker` to the Spark ecosystem. The GitHub repo for `rspark` can be found [here](http://github.com/jharner/rspark). `rspark` consists of containers for RStudio, PostgreSQL, Hadoop, Hive, and Spark. By default, Spark has a master and two workers although it can also be run within the RStudio container as an option.
See [rspark](https://github.com/jharner/rspark)
## Workflows and ML Pipelines
Workflows can be developed in many environments, e.g., the pipe operator (`|`) in UNIX.
```
cat short.csv | ./mapper.R | sort | ./analysis.R
```
The flow is based on standard output (stdout) and standard input (stdin).
Then can be automated using Hadoop's MapReduce.
## dplyr Workflows
Workflows in R are accomplished using the pipe operator (`%>%`) often in conjunction with `dplyr`.
```{r}
library(dplyr, warn.conflicts = FALSE, quietly = TRUE, verbose = FALSE)
library(RPostgreSQL, quietly = TRUE)
library(ggplot2)
drv <- dbDriver("PostgreSQL")
con <- dbConnect(drv, host="postgres", dbname='dataexpo')
dbGetQuery(con,
"select round(l.elevation/500)*500 base_elev, avg(m.surftemp) avg_temp
from measure_table m
join location_table l on m.location = l.id
join date_table d on m.date = d.id
where d.year = 1998
group by base_elev
order by base_elev") %>%
ggplot(aes(x=base_elev, y=avg_temp)) +
geom_line() + geom_point() +
labs(title=" Avg Temperature by Elevation",
x="Base Elevation (feet)", y="Average Temperature(Kelvin)")
```
## dplyr Spark Workflow
```{r}
library(sparklyr)
library(nycflights13)
sc <- spark_connect("local")
flights_sdf <- sdf_copy_to(sc, flights, "flights_sdf", overwrite = TRUE)
airlines_sdf <- sdf_copy_to(sc, airlines, "airlines_sdf", overwrite = TRUE)
flights_sdf %>% left_join(airlines_sdf, by = "carrier") %>%
select(carrier, name, flight, year:day, arr_delay, dep_delay) %>%
filter(dep_delay > 1000) %>%
arrange(desc(dep_delay))
```
## dplyr R Workflow
```{r}
flights_df <- dbGetQuery(sc, "SELECT * FROM flights_sdf")
airlines_df <- dbGetQuery(sc, "SELECT * FROM airlines_sdf")
flights_df %>%
left_join(airlines_df, by = "carrier") %>%
select(carrier, name, flight, year:day, arr_delay, dep_delay) %>%
filter(dep_delay > 1000) %>%
arrange(desc(dep_delay))
```
## HDFS Spark Regression Workflow
```{r}
library(rhdfs, quietly = TRUE, verbose = FALSE)
hdfs.init()
# execute the first time
# hdfs.mkdir("data")
if(hdfs.exists("data/slump.csv") != TRUE)
hdfs.put("/home/rstudio/rspark-notes/c5_sparkr/s3_sparkr_ml/slump.csv",
"hdfs://hadoop:9000/user/rstudio/data/")
hdfs.ls("hdfs://hadoop:9000/user/rstudio/data/", recurse = TRUE)
slump_sdf <- spark_read_csv(sc, "slump_sdf",
path = "hdfs://hadoop:9000/user/rstudio/data/slump.csv")
slump_partition <- tbl(sc, "slump_sdf") %>%
sdf_partition(training = 0.7, test = 0.3, seed = 2)
slump_partition$training %>%
ml_linear_regression(compressive_strength ~ cement + slag + fly_ash + water + sp
+ coarse_aggr + fine_aggr + slump + flow) %>%
tidy()
```
## ML Pipelines
ML pipelines have a more specific API inspired by the [scikit-learn](http://scikit-learn.org/stable/) project.
Spark Pipeline API
- Transformer: transforms one DataFrame into another DataFrame, e.g., ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions.
- Estimator: trains an algorithm on a DataFrame to produce a Transformer, e.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.
- Pipeline: chains multiple Transformers and Estimators together to specify an ML workflow.
## Preparation for Logistic Regression on Flights Data
```{r}
partitioned_flights <- sdf_partition(
flights_sdf,
training = 0.01,
testing = 0.01,
rest = 0.98
)
flights_transf_sdf <- flights_sdf %>%
filter(!is.na(dep_delay)) %>%
mutate(
month = paste0("m", month),
day = paste0("d", day)
) %>%
select(dep_delay, sched_dep_time, month, day, distance)
```
## Definition of Logistic Regression Pipeline
```{r}
flights_pipeline <- ml_pipeline(sc) %>%
ft_dplyr_transformer(
tbl = flights_transf_sdf
) %>%
ft_binarizer(
input.col = "dep_delay",
output.col = "delayed",
threshold = 15
) %>%
ft_bucketizer(
input.col = "sched_dep_time",
output.col = "hours",
splits = c(400, 800, 1200, 1600, 2000, 2400)
) %>%
ft_r_formula(delayed ~ month + day + hours + distance) %>%
ml_logistic_regression()
```
## Fitted Logistic Regression
```{r}
ml_fit(
flights_pipeline,
partitioned_flights$training
) %>%
ml_transform(
partitioned_flights$testing
) %>%
group_by(delayed, prediction) %>%
tally()
```
```{r}
spark_disconnect(sc)
```