Skip to content
This repository was archived by the owner on Jul 7, 2023. It is now read-only.

Commit 031f0ba

Browse files
Add 'Move data into the cloud' example
1 parent da4045b commit 031f0ba

File tree

4 files changed

+103
-0
lines changed

4 files changed

+103
-0
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
"""
2+
This module includes an example preprocessing step.
3+
"""
4+
5+
import pandas as DataFrame
6+
7+
8+
def drop_nan_columns(data: DataFrame) -> DataFrame:
9+
"""
10+
Drop all columns with more than 80percent missing values.
11+
12+
:param data: Input DataFrame which should be preprocessed.
13+
:return: DataFrame where columns with more than 80 percent missing values are deleted.
14+
"""
15+
data = data.dropna(axis=1, thresh=(len(data)*80)/100)
16+
return data
17+
18+
19+
def drop_duplicates(data: DataFrame) -> DataFrame:
20+
"""
21+
Drop duplicated rows and columns.
22+
23+
:param data: Input DataFrame which should be preprocessed.
24+
:return: DataFrame where columns with more than 80 percent missing values are deleted.
25+
"""
26+
data = data.drop_duplicates()
27+
return data
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
docutils==0.15.2
2+
lockfile==0.12.2
3+
luigi==2.8.10
4+
numpy==1.18.0
5+
pandas==0.25.3
6+
python-daemon==2.1.2
7+
python-dateutil==2.8.1
8+
pytz==2019.3
9+
six==1.13.0
10+
tornado==5.1.1
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
"""Preprocessing example to show how luigi works (only one preprocessing step will be executed!)."""
2+
from typing import Generator
3+
4+
import luigi
5+
import pandas as DataFrame
6+
import pandas as pd
7+
from luigi.contrib.azureblob import AzureBlobTarget, AzureBlobClient
8+
9+
from preprocessing import drop_nan_columns
10+
11+
12+
class Preprocess(luigi.Task):
13+
"""
14+
Applies general preprocessing steps to all CSV files loaded.
15+
"""
16+
gist_input_url: str = luigi.Parameter()
17+
connection_string: str = luigi.Parameter()
18+
filename: str = luigi.Parameter()
19+
20+
def run(self):
21+
# read data from url
22+
data_in: DataFrame = pd.read_csv(self.gist_input_url, sep=";")
23+
data_preprocessed = drop_nan_columns(data_in)
24+
25+
# write contents to azure blob file
26+
with self.output().open("w") as output_file:
27+
data_preprocessed.to_csv(output_file)
28+
29+
def output(self) -> luigi.Target:
30+
# save the output in the azure blob storage
31+
# noinspection PyTypeChecker
32+
return AzureBlobTarget(
33+
container=r'clcstoragecontainer',
34+
blob=self.filename,
35+
client=AzureBlobClient(
36+
connection_string=self.connection_string)
37+
)
38+
39+
40+
class PreprocessAllFiles(luigi.WrapperTask):
41+
"""
42+
Applies defined preprocessing steps to all files in the selected folder.
43+
"""
44+
# gist where the CSV files are stored
45+
gist_url = 'https://gist.githubusercontent.com/falknerdominik/425d72f02bd58cb5d42c3ddc328f505f/raw/4ad926e347d01f45496ded5292af9a5a5d67c850/'
46+
# connection string obtained for the storage unit via azure
47+
azure_connection_string = '<INSERT-AZURE-CONNECTION-STRING>'
48+
49+
def requires(self) -> Generator[luigi.Task, None, None]:
50+
for filename in ['test_file1.CSV', 'test_file2.CSV']:
51+
yield Preprocess(
52+
gist_input_url=f'{self.gist_url}{filename}',
53+
filename=filename,
54+
connection_string=self.azure_connection_string,
55+
)
56+
57+
58+
if __name__ == "__main__":
59+
luigi.build([PreprocessAllFiles()], local_scheduler=True)
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# 03 - Run Luigi in Kubernetes
2+
3+
This module will run you through the steps necessary to work with luigi in kubernetes.
4+
5+
1. [Move Data into the cloud]()
6+
2. [Use Kubernetes API]()
7+
3. [Inspect results]()

0 commit comments

Comments
 (0)