Skip to content
This repository was archived by the owner on Jul 7, 2023. It is now read-only.

Commit 079c7dc

Browse files
committed
Merge branch 'master' into 4-1_Pipeline
2 parents b77858f + 1bc7c52 commit 079c7dc

File tree

20 files changed

+646
-40
lines changed

20 files changed

+646
-40
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Setup Kubernetes
2+
3+
Kubernetes will be used as a cluster for this tutorial.
4+
It acts as a managing layer for the deployments. To follow this tutorial ensure that you
5+
have a kubernetes up and running.
6+
7+
Depending on your situation you can use a local (running on a single node) cluster or a cloud cluster.
8+
9+
## Local Setup
10+
11+
A local cluster can be done via the `minicube` application. It is a cheap and easy option for testing deployments.
12+
13+
>Be aware that your operating system and hardware need to support virtualization.
14+
15+
To install `minicube` follow the up-to-date instructions (available for linux / macOS / windows) provided [here](https://kubernetes.io/docs/tasks/tools/install-minikube/). Specific instructions for Archlinux can be obtained [here](http://blog.programmableproduction.com/2018/03/08/Archlinux-Setup-Minikube-using-KVM/).
16+
Installing minikube will automatically configure your local kubectl for it. The final output should look similar to this:
17+
18+
```text
19+
😄 minikube v1.6.2 on Arch 18.1.5
20+
✨ Selecting 'kvm2' driver from user configuration (alternates: [none])
21+
🔥 Creating kvm2 VM (CPUs=2, Memory=2000MB, Disk=20000MB) ...
22+
🐳 Preparing Kubernetes v1.17.0 on Docker '19.03.5' ...
23+
💾 Downloading kubeadm v1.17.0
24+
💾 Downloading kubelet v1.17.0
25+
🚜 Pulling images ...
26+
🚀 Launching Kubernetes ...
27+
⌛ Waiting for cluster to come online ...
28+
🏄 Done! kubectl is now configured to use "minikube"
29+
```
30+
31+
### Dashboard
32+
To verify that the application started start the dashboard like this:
33+
34+
```bash
35+
$ minikube dashboard
36+
```
37+
38+
If the browser window does not open, you can access the URL via `minikube dashboard --url`.
39+
40+
### Config file
41+
42+
If you need to refer to any configuration files of your minikube instance, you will find it in the `~/.kube/config` directory.
43+
44+
## Cloud Setup
45+
46+
Instructions for the cloud setup can be found [here](https://github.com/clc3-CloudComputing/clc3-ws19/tree/master/3%20Kubernetes/exercise%203.1).
47+
TODO: Write down what parameters (url port) we need from this and where we get them (screenshot)
48+

setup/02 - virtual environments/README.md renamed to 01 - Setup/02 - virtual environments/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ $ conda env create -f environment.yml
1010

1111
or install the required packages with: `conda install numpy pandas luigi scikit-learn`
1212

13+
Additionally you need to install the `pykube` package via pip by executing `pip install pykube azure-storage`.
1314
## Virtualenv
1415

1516
For a pip environment you can use the `requirements.txt` in this directory to install the dependencies:
@@ -18,4 +19,4 @@ For a pip environment you can use the `requirements.txt` in this directory to in
1819
pip install -r requirements.txt
1920
```
2021

21-
or install the required packages with: `pip install numpy pandas luigi scikit-learn`
22+
or install the required packages with: `pip install numpy pandas luigi scikit-learn pykube azure-storage`
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
azure-common==1.1.24
2+
azure-nspkg==3.0.2
3+
azure-storage==0.36.0
4+
certifi==2019.11.28
5+
cffi==1.13.2
6+
chardet==3.0.4
7+
cryptography==2.8
8+
docutils==0.16
9+
httplib2==0.17.0
10+
idna==2.8
11+
joblib==0.14.1
12+
lockfile==0.12.2
13+
luigi==2.8.11
14+
numpy==1.18.1
15+
oauth2client==4.1.3
16+
oauthlib==3.1.0
17+
pandas==1.0.0
18+
pyasn1==0.4.8
19+
pyasn1-modules==0.2.8
20+
pycparser==2.19
21+
pykube==0.15.0
22+
python-daemon==2.2.4
23+
python-dateutil==2.8.1
24+
pytz==2019.3
25+
PyYAML==5.3
26+
requests==2.22.0
27+
requests-oauthlib==1.3.0
28+
rsa==4.0
29+
scikit-learn==0.22.1
30+
scipy==1.4.1
31+
six==1.14.0
32+
tornado==5.1.1
33+
tzlocal==2.0.0
34+
urllib3==1.25.8
File renamed without changes.
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# Move data into the 'cloud'
2+
3+
This module will go into some details on how and why we have to move data into the cloud. It will also explain some more
4+
intricate parts of the source code in this directory.
5+
6+
## Why
7+
8+
Currently our sample solution only works for local data, meaning it loads local files and outputs
9+
them on the local hard drive. In order to run our solution in a cluster we need to provide the same source files
10+
from a place the cluster can reach. In addition to that we need a place to store our results.
11+
12+
Because our cluster should be runnable everywhere we will setup some storage in the cloud and use a github GIST to serve
13+
our source CSV files. In production you will probably want to serve your source files from a local HDFS cluster or something
14+
similar to that.
15+
16+
## Reading source files
17+
18+
For the luigi example project we have two source files in the CSV format. The make it easy for us we will just paste
19+
them using a github GIST. This is publicly available and free to use. The source files can downloaded [here](https://gist.github.com/falknerdominik/425d72f02bd58cb5d42c3ddc328f505f).
20+
If you click on `RAW` (read: show the raw file) you can obtain a link that is reachable for anyone.
21+
22+
- [CSV 1](https://gist.githubusercontent.com/falknerdominik/425d72f02bd58cb5d42c3ddc328f505f/raw/4ad926e347d01f45496ded5292af9a5a5d67c850/test_file1.CSV)
23+
- [CSV 2](https://gist.githubusercontent.com/falknerdominik/425d72f02bd58cb5d42c3ddc328f505f/raw/4ad926e347d01f45496ded5292af9a5a5d67c850/test_file2.CSV)
24+
25+
Fortunately for us pandas supports reading CSV files directly from an http[s] source. This is done by providing the
26+
`pd.read_csv` function with the url.
27+
28+
```python
29+
data_in: DataFrame = pd.read_csv(self.gist_input_url, sep=";")
30+
```
31+
32+
## Saving the result
33+
34+
Because kubernetes decides on which node the code is executed the program can not make any assumption about the location or the underlying
35+
filesystem structure. To accomplish this, the results are saved on a storage that is reachable from the cluster. There are
36+
multiple storage providers (Google Cloud Storage, Dropbox, AWS S3, ...), but for the purposes of this how-to Azure Blob Storage
37+
is used (This can be replaced by any option you like - or where you still have credits left).
38+
39+
### Creating the Storage [Azure]
40+
41+
Before creating a blob storage, you have to create a storage account. You can do this by simply searching for `storage account` and selecting
42+
`create`. Then fill out the form as shown below:
43+
44+
![Creating an Azure Storage Account](assets/blob_storage.png)
45+
46+
The storage account decides which plan you use (how much you
47+
pay), the actual data will reside in one of many storage types. After that select your new account and search for `containers`.
48+
Add a new one like shown in the screenshot below:
49+
50+
![Creating a Storage Container](assets/storage_container.png)
51+
52+
Selecting the container will show you whats inside of it. A container like this can be used as a simple blob storage.
53+
The pipeline will write any results here. For now take note of the container name (The one you filled in the form above).
54+
55+
To access the storage the pipeline needs a connection string. It contains every detail on how to connect securely to your storage (Account Name, Account Key).
56+
It can be found inside your storage account in the `Access keys` section take a note of it.
57+
58+
> Be careful with connection strings. Do not commit them to your GIT repository, they grant total access (depending on
59+
>your policies) to your storage!
60+
61+
### Saving results with luigi
62+
63+
With the connection string and container name in hand you are well prepared to save the data into your blob storage.
64+
In the `simple_workflow.py` add these two strings:
65+
66+
```python
67+
azure_connection_string = '<Insert-Connection-String>'
68+
container_name = '<Insert-Container-Name>'
69+
```
70+
71+
> Conveniently luigi supports azure blob storage out-of-the-box using the `luigi.contrib.azureblob.AzureBlobStorage` class. The
72+
> connection is provided using the `luigi.contrib.azureblob.AzureBlobClient` class.
73+
74+
After that execute the pipeline and the results get saved to your storage. The output should look similar to this:
75+
76+
```text
77+
DEBUG: Checking if PreprocessAllFiles() is complete
78+
DEBUG: Checking if Preprocess(gist_input_url=https://gist.githubusercontent.com/falknerdominik/425d72f02bd58cb5d42c3ddc328f505f/raw/4ad926e347d01f45496ded5292af9a5a5d67c850/test_file1.CSV, connection_string=<CONNECTION_STRING>, filename=test_file1.CSV) is complete
79+
DEBUG: Checking if Preprocess(gist_input_url=https://gist.githubusercontent.com/falknerdominik/425d72f02bd58cb5d42c3ddc328f505f/raw/4ad926e347d01f45496ded5292af9a5a5d67c850/test_file2.CSV, connection_string=<CONNECTION_STRING>, filename=test_file2.CSV) is complete
80+
INFO: Informed scheduler that task PreprocessAllFiles__99914b932b has status PENDING
81+
INFO: Informed scheduler that task Preprocess_DefaultEndpoints_test_file2_CSV_https___gist_git_6ab2dd2a85 has status PENDING
82+
INFO: Informed scheduler that task Preprocess_DefaultEndpoints_test_file1_CSV_https___gist_git_62fd631f6d has status PENDING
83+
INFO: Done scheduling tasks
84+
INFO: Running Worker with 1 processes
85+
DEBUG: Asking scheduler for work...
86+
DEBUG: Pending tasks: 3
87+
...
88+
DEBUG:luigi-interface:There are no more tasks to run at this time
89+
INFO: Worker Worker(salt=542199482, workers=1, host=andromeda, username=dfalkner, pid=11595) was stopped. Shutting down Keep-Alive thread
90+
INFO:luigi-interface:Worker Worker(salt=542199482, workers=1, host=andromeda, username=dfalkner, pid=11595) was stopped. Shutting down Keep-Alive thread
91+
INFO:
92+
===== Luigi Execution Summary =====
93+
94+
Scheduled 3 tasks of which:
95+
* 3 ran successfully:
96+
- 2 Preprocess(...)
97+
- 1 PreprocessAllFiles()
98+
99+
This progress looks :) because there were no failed tasks or missing dependencies
100+
101+
===== Luigi Execution Summary =====
102+
```
103+
72.9 KB
Loading
23.4 KB
Loading
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
"""
2+
This module includes an example preprocessing step.
3+
"""
4+
5+
import pandas as DataFrame
6+
7+
8+
def drop_nan_columns(data: DataFrame) -> DataFrame:
9+
"""
10+
Drop all columns with more than 80percent missing values.
11+
12+
:param data: Input DataFrame which should be preprocessed.
13+
:return: DataFrame where columns with more than 80 percent missing values are deleted.
14+
"""
15+
data = data.dropna(axis=1, thresh=(len(data)*80)/100)
16+
return data
17+
18+
19+
def drop_duplicates(data: DataFrame) -> DataFrame:
20+
"""
21+
Drop duplicated rows and columns.
22+
23+
:param data: Input DataFrame which should be preprocessed.
24+
:return: DataFrame where columns with more than 80 percent missing values are deleted.
25+
"""
26+
data = data.drop_duplicates()
27+
return data
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
"""Preprocessing example to show how luigi works (only one preprocessing step will be executed!)."""
2+
from typing import Generator
3+
4+
import luigi
5+
import pandas as DataFrame
6+
import pandas as pd
7+
from luigi.contrib.azureblob import AzureBlobTarget, AzureBlobClient
8+
9+
from preprocess import drop_nan_columns
10+
11+
12+
class Preprocess(luigi.Task):
13+
"""
14+
Applies general preprocessing steps to all CSV files loaded.
15+
"""
16+
gist_input_url: str = luigi.Parameter()
17+
connection_string: str = luigi.Parameter()
18+
filename: str = luigi.Parameter()
19+
container_name: str = luigi.Parameter()
20+
21+
def run(self):
22+
# read data from url
23+
data_in: DataFrame = pd.read_csv(self.gist_input_url, sep=";")
24+
data_preprocessed = drop_nan_columns(data_in)
25+
26+
# write contents to azure blob file
27+
with self.output().open("w") as output_file:
28+
data_preprocessed.to_csv(output_file)
29+
30+
def output(self) -> luigi.Target:
31+
# save the output in the azure blob storage
32+
# noinspection PyTypeChecker
33+
return AzureBlobTarget(
34+
container=self.container_name,
35+
blob=self.filename,
36+
client=AzureBlobClient(
37+
connection_string=self.connection_string)
38+
)
39+
40+
41+
class PreprocessAllFiles(luigi.WrapperTask):
42+
"""
43+
Applies defined preprocessing steps to all files in the selected folder.
44+
"""
45+
# gist where the CSV files are stored
46+
gist_url = 'https://gist.githubusercontent.com/falknerdominik/425d72f02bd58cb5d42c3ddc328f505f/raw/4ad926e347d01f45496ded5292af9a5a5d67c850/'
47+
# connection string obtained for the storage unit via azure
48+
azure_connection_string = '<Insert-Connection-String>'
49+
container_name = '<Insert-Container-Name>'
50+
51+
def requires(self) -> Generator[luigi.Task, None, None]:
52+
for filename in ['test_file1.CSV', 'test_file2.CSV']:
53+
yield Preprocess(
54+
gist_input_url=f'{self.gist_url}{filename}',
55+
filename=filename,
56+
connection_string=self.azure_connection_string,
57+
container_name=self.container_name,
58+
)
59+
60+
61+
if __name__ == "__main__":
62+
luigi.build([PreprocessAllFiles()], local_scheduler=True)

0 commit comments

Comments
 (0)