Skip to content

Commit 2aa39f2

Browse files
committed
add pipeline scripts
1 parent f6682f6 commit 2aa39f2

File tree

8 files changed

+368
-0
lines changed

8 files changed

+368
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Copyright (c) Microsoft. All rights reserved.
2+
# Licensed under the MIT license.
3+
4+
import argparse
5+
import os
6+
import pandas as pd
7+
import azureml.dataprep as dprep
8+
9+
10+
def get_dict(dict_str):
11+
pairs = dict_str.strip("{}").split("\;")
12+
new_dict = {}
13+
for pair in pairs:
14+
key, value = pair.strip('\\').split(":")
15+
new_dict[key.strip().strip("'")] = value.strip().strip("'")
16+
17+
return new_dict
18+
19+
20+
print("Cleans the input data")
21+
22+
parser = argparse.ArgumentParser("cleanse")
23+
parser.add_argument("--input_cleanse", type=str, help="raw taxi data")
24+
parser.add_argument("--output_cleanse", type=str, help="cleaned taxi data directory")
25+
parser.add_argument("--useful_columns", type=str, help="useful columns to keep")
26+
parser.add_argument("--columns", type=str, help="rename column pattern")
27+
28+
args = parser.parse_args()
29+
30+
print("Argument 1(input taxi data path): %s" % args.input_cleanse)
31+
print("Argument 2(columns to keep): %s" % str(args.useful_columns.strip("[]").split("\;")))
32+
print("Argument 3(columns renaming mapping): %s" % str(args.columns.strip("{}").split("\;")))
33+
print("Argument 4(output cleansed taxi data path): %s" % args.output_cleanse)
34+
35+
raw_df = dprep.read_csv(path=args.input_cleanse, header=dprep.PromoteHeadersMode.GROUPED)
36+
37+
# These functions ensure that null data is removed from the data set,
38+
# which will help increase machine learning model accuracy.
39+
# Visit https://docs.microsoft.com/en-us/azure/machine-learning/service/tutorial-data-prep
40+
# for more details
41+
42+
useful_columns = [s.strip().strip("'") for s in args.useful_columns.strip("[]").split("\;")]
43+
columns = get_dict(args.columns)
44+
45+
all_columns = dprep.ColumnSelector(term=".*", use_regex=True)
46+
drop_if_all_null = [all_columns, dprep.ColumnRelationship(dprep.ColumnRelationship.ALL)]
47+
48+
new_df = (raw_df
49+
.replace_na(columns=all_columns)
50+
.drop_nulls(*drop_if_all_null)
51+
.rename_columns(column_pairs=columns)
52+
.keep_columns(columns=useful_columns))
53+
54+
if not (args.output_cleanse is None):
55+
os.makedirs(args.output_cleanse, exist_ok=True)
56+
print("%s created" % args.output_cleanse)
57+
write_df = new_df.write_to_csv(directory_path=dprep.LocalFileOutput(args.output_cleanse))
58+
write_df.run_local()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import argparse
2+
import os
3+
import azureml.dataprep as dprep
4+
5+
print("Filters out coordinates for locations that are outside the city border.",
6+
"Chain the column filter commands within the filter() function",
7+
"and define the minimum and maximum bounds for each field.")
8+
9+
parser = argparse.ArgumentParser("filter")
10+
parser.add_argument("--input_filter", type=str, help="merged taxi data directory")
11+
parser.add_argument("--output_filter", type=str, help="filter out out of city locations")
12+
13+
args = parser.parse_args()
14+
15+
print("Argument 1(input taxi data path): %s" % args.input_filter)
16+
print("Argument 2(output filtered taxi data path): %s" % args.output_filter)
17+
18+
combined_df = dprep.read_csv(args.input_filter + '/part-*')
19+
20+
# These functions filter out coordinates for locations that are outside the city border.
21+
# Visit https://docs.microsoft.com/en-us/azure/machine-learning/service/tutorial-data-prep for more details
22+
23+
# Create a condensed view of the dataflow to just show the lat/long fields,
24+
# which makes it easier to evaluate missing or out-of-scope coordinates
25+
decimal_type = dprep.TypeConverter(data_type=dprep.FieldType.DECIMAL)
26+
combined_df = combined_df.set_column_types(type_conversions={
27+
"pickup_longitude": decimal_type,
28+
"pickup_latitude": decimal_type,
29+
"dropoff_longitude": decimal_type,
30+
"dropoff_latitude": decimal_type
31+
})
32+
33+
# Filter out coordinates for locations that are outside the city border.
34+
# Chain the column filter commands within the filter() function
35+
# and define the minimum and maximum bounds for each field
36+
latlong_filtered_df = (combined_df
37+
.drop_nulls(columns=["pickup_longitude",
38+
"pickup_latitude",
39+
"dropoff_longitude",
40+
"dropoff_latitude"],
41+
column_relationship=dprep.ColumnRelationship(dprep.ColumnRelationship.ANY))
42+
.filter(dprep.f_and(dprep.col("pickup_longitude") <= -73.72,
43+
dprep.col("pickup_longitude") >= -74.09,
44+
dprep.col("pickup_latitude") <= 40.88,
45+
dprep.col("pickup_latitude") >= 40.53,
46+
dprep.col("dropoff_longitude") <= -73.72,
47+
dprep.col("dropoff_longitude") >= -74.09,
48+
dprep.col("dropoff_latitude") <= 40.88,
49+
dprep.col("dropoff_latitude") >= 40.53)))
50+
51+
if not (args.output_filter is None):
52+
os.makedirs(args.output_filter, exist_ok=True)
53+
print("%s created" % args.output_filter)
54+
write_df = latlong_filtered_df.write_to_csv(directory_path=dprep.LocalFileOutput(args.output_filter))
55+
write_df.run_local()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
2+
import argparse
3+
import os
4+
import azureml.dataprep as dprep
5+
6+
print("Merge Green and Yellow taxi data")
7+
8+
parser = argparse.ArgumentParser("merge")
9+
parser.add_argument("--input_green_merge", type=str, help="cleaned green taxi data directory")
10+
parser.add_argument("--input_yellow_merge", type=str, help="cleaned yellow taxi data directory")
11+
parser.add_argument("--output_merge", type=str, help="green and yellow taxi data merged")
12+
13+
args = parser.parse_args()
14+
15+
print("Argument 1(input green taxi data path): %s" % args.input_green_merge)
16+
print("Argument 2(input yellow taxi data path): %s" % args.input_yellow_merge)
17+
print("Argument 3(output merge taxi data path): %s" % args.output_merge)
18+
19+
green_df = dprep.read_csv(args.input_green_merge + '/part-*')
20+
yellow_df = dprep.read_csv(args.input_yellow_merge + '/part-*')
21+
22+
# Appending yellow data to green data
23+
combined_df = green_df.append_rows([yellow_df])
24+
25+
if not (args.output_merge is None):
26+
os.makedirs(args.output_merge, exist_ok=True)
27+
print("%s created" % args.output_merge)
28+
write_df = combined_df.write_to_csv(directory_path=dprep.LocalFileOutput(args.output_merge))
29+
write_df.run_local()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import argparse
2+
import os
3+
import azureml.dataprep as dprep
4+
5+
print("Replace undefined values to relavant values and rename columns to meaningful names")
6+
7+
parser = argparse.ArgumentParser("normalize")
8+
parser.add_argument("--input_normalize", type=str, help="combined and converted taxi data")
9+
parser.add_argument("--output_normalize", type=str, help="replaced undefined values and renamed columns")
10+
11+
args = parser.parse_args()
12+
13+
print("Argument 1(input taxi data path): %s" % args.input_normalize)
14+
print("Argument 2(output normalized taxi data path): %s" % args.output_normalize)
15+
16+
combined_converted_df = dprep.read_csv(args.input_normalize + '/part-*')
17+
18+
# These functions replace undefined values and rename to use meaningful names.
19+
# Visit https://docs.microsoft.com/en-us/azure/machine-learning/service/tutorial-data-prep for more details
20+
21+
replaced_stfor_vals_df = combined_converted_df.replace(columns="store_forward",
22+
find="0",
23+
replace_with="N").fill_nulls("store_forward", "N")
24+
25+
replaced_distance_vals_df = replaced_stfor_vals_df.replace(columns="distance",
26+
find=".00",
27+
replace_with=0).fill_nulls("distance", 0)
28+
29+
replaced_distance_vals_df = replaced_distance_vals_df.to_number(["distance"])
30+
31+
time_split_df = (replaced_distance_vals_df
32+
.split_column_by_example(source_column="pickup_datetime")
33+
.split_column_by_example(source_column="dropoff_datetime"))
34+
35+
# Split the pickup and dropoff datetime values into the respective date and time columns
36+
renamed_col_df = (time_split_df
37+
.rename_columns(column_pairs={
38+
"pickup_datetime_1": "pickup_date",
39+
"pickup_datetime_2": "pickup_time",
40+
"dropoff_datetime_1": "dropoff_date",
41+
"dropoff_datetime_2": "dropoff_time"}))
42+
43+
if not (args.output_normalize is None):
44+
os.makedirs(args.output_normalize, exist_ok=True)
45+
print("%s created" % args.output_normalize)
46+
write_df = renamed_col_df.write_to_csv(directory_path=dprep.LocalFileOutput(args.output_normalize))
47+
write_df.run_local()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import argparse
2+
import os
3+
import azureml.dataprep as dprep
4+
5+
print("Transforms the renamed taxi data to the required format")
6+
7+
parser = argparse.ArgumentParser("transform")
8+
parser.add_argument("--input_transform", type=str, help="renamed taxi data")
9+
parser.add_argument("--output_transform", type=str, help="transformed taxi data")
10+
11+
args = parser.parse_args()
12+
13+
print("Argument 1(input taxi data path): %s" % args.input_transform)
14+
print("Argument 2(output final transformed taxi data): %s" % args.output_transform)
15+
16+
renamed_df = dprep.read_csv(args.input_transform + '/part-*')
17+
18+
# These functions transform the renamed data to be used finally for training.
19+
# Visit https://docs.microsoft.com/en-us/azure/machine-learning/service/tutorial-data-prep for more details
20+
21+
# Split the pickup and dropoff date further into the day of the week, day of the month, and month values.
22+
# To get the day of the week value, use the derive_column_by_example() function.
23+
# The function takes an array parameter of example objects that define the input data,
24+
# and the preferred output. The function automatically determines your preferred transformation.
25+
# For the pickup and dropoff time columns, split the time into the hour, minute, and second by using
26+
# the split_column_by_example() function with no example parameter. After you generate the new features,
27+
# use the drop_columns() function to delete the original fields as the newly generated features are preferred.
28+
# Rename the rest of the fields to use meaningful descriptions.
29+
30+
transformed_features_df = (renamed_df
31+
.derive_column_by_example(
32+
source_columns="pickup_date",
33+
new_column_name="pickup_weekday",
34+
example_data=[("2009-01-04", "Sunday"), ("2013-08-22", "Thursday")])
35+
.derive_column_by_example(
36+
source_columns="dropoff_date",
37+
new_column_name="dropoff_weekday",
38+
example_data=[("2013-08-22", "Thursday"), ("2013-11-03", "Sunday")])
39+
40+
.split_column_by_example(source_column="pickup_time")
41+
.split_column_by_example(source_column="dropoff_time")
42+
43+
.split_column_by_example(source_column="pickup_time_1")
44+
.split_column_by_example(source_column="dropoff_time_1")
45+
.drop_columns(columns=[
46+
"pickup_date", "pickup_time", "dropoff_date", "dropoff_time",
47+
"pickup_date_1", "dropoff_date_1", "pickup_time_1", "dropoff_time_1"])
48+
49+
.rename_columns(column_pairs={
50+
"pickup_date_2": "pickup_month",
51+
"pickup_date_3": "pickup_monthday",
52+
"pickup_time_1_1": "pickup_hour",
53+
"pickup_time_1_2": "pickup_minute",
54+
"pickup_time_2": "pickup_second",
55+
"dropoff_date_2": "dropoff_month",
56+
"dropoff_date_3": "dropoff_monthday",
57+
"dropoff_time_1_1": "dropoff_hour",
58+
"dropoff_time_1_2": "dropoff_minute",
59+
"dropoff_time_2": "dropoff_second"}))
60+
61+
# Drop the pickup_datetime and dropoff_datetime columns because they're
62+
# no longer needed (granular time features like hour,
63+
# minute and second are more useful for model training).
64+
processed_df = transformed_features_df.drop_columns(columns=["pickup_datetime", "dropoff_datetime"])
65+
66+
# Use the type inference functionality to automatically check the data type of each field,
67+
# and display the inference results.
68+
type_infer = processed_df.builders.set_column_types()
69+
type_infer.learn()
70+
71+
# The inference results look correct based on the data. Now apply the type conversions to the dataflow.
72+
type_converted_df = type_infer.to_dataflow()
73+
74+
# Before you package the dataflow, run two final filters on the data set.
75+
# To eliminate incorrectly captured data points,
76+
# filter the dataflow on records where both the cost and distance variable values are greater than zero.
77+
# This step will significantly improve machine learning model accuracy,
78+
# because data points with a zero cost or distance represent major outliers that throw off prediction accuracy.
79+
80+
final_df = type_converted_df.filter(dprep.col("distance") > 0)
81+
final_df = final_df.filter(dprep.col("cost") > 0)
82+
83+
# Writing the final dataframe to use for training in the following steps
84+
if not (args.output_transform is None):
85+
os.makedirs(args.output_transform, exist_ok=True)
86+
print("%s created" % args.output_transform)
87+
write_df = final_df.write_to_csv(directory_path=dprep.LocalFileOutput(args.output_transform))
88+
write_df.run_local()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import argparse
2+
import os
3+
import azureml.dataprep as dprep
4+
import azureml.core
5+
6+
print("Extracts important features from prepared data")
7+
8+
parser = argparse.ArgumentParser("featurization")
9+
parser.add_argument("--input_featurization", type=str, help="input featurization")
10+
parser.add_argument("--useful_columns", type=str, help="columns to use")
11+
parser.add_argument("--output_featurization", type=str, help="output featurization")
12+
13+
args = parser.parse_args()
14+
15+
print("Argument 1(input training data path): %s" % args.input_featurization)
16+
print("Argument 2(column features to use): %s" % str(args.useful_columns.strip("[]").split("\;")))
17+
print("Argument 3:(output featurized training data path) %s" % args.output_featurization)
18+
19+
dflow_prepared = dprep.read_csv(args.input_featurization + '/part-*')
20+
21+
# These functions extracts useful features for training
22+
# Visit https://docs.microsoft.com/en-us/azure/machine-learning/service/tutorial-auto-train-models for more detail
23+
24+
useful_columns = [s.strip().strip("'") for s in args.useful_columns.strip("[]").split("\;")]
25+
dflow = dflow_prepared.keep_columns(useful_columns)
26+
27+
if not (args.output_featurization is None):
28+
os.makedirs(args.output_featurization, exist_ok=True)
29+
print("%s created" % args.output_featurization)
30+
write_df = dflow.write_to_csv(directory_path=dprep.LocalFileOutput(args.output_featurization))
31+
write_df.run_local()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
2+
import os
3+
import pandas as pd
4+
5+
6+
def get_data():
7+
print("In get_data")
8+
print(os.environ['AZUREML_DATAREFERENCE_output_split_train_x'])
9+
X_train = pd.read_csv(os.environ['AZUREML_DATAREFERENCE_output_split_train_x'] + "/part-00000", header=0)
10+
y_train = pd.read_csv(os.environ['AZUREML_DATAREFERENCE_output_split_train_y'] + "/part-00000", header=0)
11+
12+
return {"X": X_train.values, "y": y_train.values.flatten()}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import argparse
2+
import os
3+
import azureml.dataprep as dprep
4+
import azureml.core
5+
from sklearn.model_selection import train_test_split
6+
7+
8+
def write_output(df, path):
9+
os.makedirs(path, exist_ok=True)
10+
print("%s created" % path)
11+
df.to_csv(path + "/part-00000", index=False)
12+
13+
14+
print("Split the data into train and test")
15+
16+
parser = argparse.ArgumentParser("split")
17+
parser.add_argument("--input_split_features", type=str, help="input split features")
18+
parser.add_argument("--input_split_labels", type=str, help="input split labels")
19+
parser.add_argument("--output_split_train_x", type=str, help="output split train features")
20+
parser.add_argument("--output_split_train_y", type=str, help="output split train labels")
21+
parser.add_argument("--output_split_test_x", type=str, help="output split test features")
22+
parser.add_argument("--output_split_test_y", type=str, help="output split test labels")
23+
24+
args = parser.parse_args()
25+
26+
print("Argument 1(input taxi data features path): %s" % args.input_split_features)
27+
print("Argument 2(input taxi data labels path): %s" % args.input_split_labels)
28+
print("Argument 3(output training features split path): %s" % args.output_split_train_x)
29+
print("Argument 4(output training labels split path): %s" % args.output_split_train_y)
30+
print("Argument 5(output test features split path): %s" % args.output_split_test_x)
31+
print("Argument 6(output test labels split path): %s" % args.output_split_test_y)
32+
33+
x_df = dprep.read_csv(path=args.input_split_features, header=dprep.PromoteHeadersMode.GROUPED).to_pandas_dataframe()
34+
y_df = dprep.read_csv(path=args.input_split_labels, header=dprep.PromoteHeadersMode.GROUPED).to_pandas_dataframe()
35+
36+
# These functions splits the input features and labels into test and train data
37+
# Visit https://docs.microsoft.com/en-us/azure/machine-learning/service/tutorial-auto-train-models for more detail
38+
39+
x_train, x_test, y_train, y_test = train_test_split(x_df, y_df, test_size=0.2, random_state=223)
40+
41+
if not (args.output_split_train_x is None and
42+
args.output_split_test_x is None and
43+
args.output_split_train_y is None and
44+
args.output_split_test_y is None):
45+
write_output(x_train, args.output_split_train_x)
46+
write_output(y_train, args.output_split_train_y)
47+
write_output(x_test, args.output_split_test_x)
48+
write_output(y_test, args.output_split_test_y)

0 commit comments

Comments
 (0)