-
Notifications
You must be signed in to change notification settings - Fork 1
/
pm_DTR.py
147 lines (104 loc) · 5.21 KB
/
pm_DTR.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#
# Script name: bank.py
#
# Script propose: Using PySpark to analysis the data of Santander Bank Customer Satisfaction
# and try to predict which customers are happy customers(predict the TARGET value by history data)
#
# Step:
# 1. Import lib spark-csv to support the csv format in PySpark
# 2. Load the training data from HDFS to PySpark and transform it to RDD
# 3. Analyze the data by the machine learning library in PySpark
# 4. Create the predict model for testing data and do the prediction
# 5. Pop out the result csv and exit the program
#
# Script owner: Kenie Liu
#
# Last modified date: 160315
#
"""
Santander Bank Customer Satisfaction Practice
"""
from __future__ import print_function
import sys
from pyspark import SparkContext, SQLContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql.functions import lit
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
if __name__ == "__main__":
sc = SparkContext(appName="PM2.5_Prediction_Test")
sqlContext = SQLContext(sc)
segment = 108
target = "t107"
file = "train_2/output_pm25_108_HC.csv"
output_file = "result"
# Load the training data into spark
data = sqlContext.read.format("com.databricks.spark.csv").options(header="true", inferschema="true").load(file)
# Modify the data type of column "TARGET" from Integer to Double
data = data.withColumn(target, data[target].cast(DoubleType()))
# data_TAR_wo_0 = data.filter(data.TARGET > 0)
# data_TAR_0_2000 = data.filter(data.TARGET == 0).limit(2000)
# data_new = data_TAR_wo_0.unionAll(data_TAR_0_2000)
# Output the schema of training data
data.printSchema()
print("Training data size is %d" %(data.count()))
# Get the list of column name of training data
columns = data.columns
print(columns)
# Remove the column "t47" since no need to add this column to predict
columns.remove(columns[len(columns) - 1])
print(columns)
# Index labels, adding metadata to the label column
# Fit on whole dataset to include all labels in index
#targetIndexer = StringIndexer(inputCol="TARGET", outputCol="indexedLabel").fit(data)
print("After labelIndexer~~~~~~~~~~~~~~~~")
# Identify categorical features, and index them
#featureIndexer = VectorIndexer(inputCol="TARGET", outputCol="indexedFeatures", maxCategories = 371).fit(data)
assembler = VectorAssembler(inputCols = columns, outputCol = "indexedFeatures")
print("After featureIndexer~~~~~~~~~~~~~~~")
# Split the data into training and test sets (10% held out for testing)
(trainingData, testData) = data.randomSplit([0.1, 0.9])
print("After randomSplit~~~~~~~~~~~~~~~~")
# Train a DecisionTreeRegressor model
dtr = DecisionTreeRegressor(labelCol = target, featuresCol = "indexedFeatures")
print("After DecisionTreeRegressor~~~~~")
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages = [assembler, dtr])
print("After pipeline~~~~~~~~~~~~~~~~~~~")
# Train model. This also runs the indexers
model = pipeline.fit(data)
print(model)
print("After model fit~~~~~~~~~~~~~~~~~")
# Make predictions.
predictions = model.transform(testData)
print("After predictions~~~~~~~~~~~~~~~")
# Select example rows to display.
predictions.select("prediction", "indexedFeatures").show(5)
predictions.select(target, "prediction",).write.mode("overwrite").format("com.databricks.spark.csv").save(output_file)
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(predictionCol = "prediction", labelCol = target)
# mse|rmse|r2|mae
mse = evaluator.evaluate(predictions, {evaluator.metricName: "mse"})
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
print("mse:", mse, " rmse:", rmse, " r2:", r2, " mae:", mae)
# # Load the testing data into spark
# realData = sqlContext.read.format("com.databricks.spark.csv").options(header="true", inferschema="true").load("test/test_bank.csv")
# # Add one empty column named "TARGET" to match the same schema between training/testing data
# new_realData = realData.withColumn("TARGET", lit(0.0))
# realPredictions = model.transform(new_realData)
# realPredictions.select("ID", "prediction").show(5)
# realPredictions.select("ID", "prediction").write.mode("overwrite").format("com.databricks.spark.csv").save("result")
#realPredictions.select("ID", "prediction").write.mode("overwrite").format("com.databricks.spark.csv").save("file:/home/hduser/result_bank")
# Select (prediction, true label) and compute test error
#evaluator = MulticlassClassificationEvaluator(
# labelCol="indexedLabel", predictionCol="prediction", metricName="precision")
#accuracy = evaluator.evaluate(predictions)
#print("Test Error = %g " % (1.0 - accuracy))
treeModel = model.stages[1]
# summary only
print(treeModel)
sc.stop()