forked from anirudhagar13/chicago-crime-analysis-bigdata
-
Notifications
You must be signed in to change notification settings - Fork 0
/
run.py
96 lines (84 loc) · 2.64 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
'''
Module to run data pipeline
'''
import sys
import datetime
import pandas as pd
from analytics import *
from ingestion import data_ingestion
from manipulation import data_cleansing, data_extraction
def data_load(file_path):
'''
Loading data from file
'''
# Loading data from csv file dump
start_time = datetime.datetime.now()
data = pd.read_csv(file_path)
end_time = datetime.datetime.now()
print ('Data Acquisition: ', (end_time - start_time).total_seconds())
return data
def pipeline(csv_dump_path, cb_user, cb_pwd, cb_host, cb_bucket):
'''
Module to run data pipeline
'''
# Getting data from file
crime_data = data_load(csv_dump_path)
# Cleansing data
start_time = datetime.datetime.now()
cleansed_data = data_cleansing(crime_data)
end_time = datetime.datetime.now()
print ('Data Cleansing: ', (end_time - start_time).total_seconds())
# Data tranformation to fit model
start_time = datetime.datetime.now()
extracted_data = data_extraction(cleansed_data)
end_time = datetime.datetime.now()
print ('Data Extraction: ', (end_time - start_time).total_seconds())
# Data ingestion into couchbase
start_time = datetime.datetime.now()
data_ingestion(extracted_data, cb_user, cb_pwd, cb_host, cb_bucket)
end_time = datetime.datetime.now()
print ('Data Ingestion: ', (end_time - start_time).total_seconds())
def data_analysis(csv_dump_path, bq_id):
'''
Perform analytics depending on business question
'''
# Getting data from file as of now
crime_data = data_load(csv_dump_path)
if bq_id == '1':
time_data_analysis(crime_data)
elif bq_id == '2':
arrest_analysis(crime_data)
elif bq_id == '3':
count_per_year_analysis(crime_data)
elif bq_id == '4':
max_arrests(crime_data)
elif bq_id == '5':
crime_distribution_by_year(crime_data)
elif bq_id == '6':
unsafe_neighbourhood(crime_data)
elif bq_id == '7':
# One hot encoding, each column is a crime type
x_test = np.array([[1, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0,0]], np.int32)
crime_type_prediction(crime_data, x_test)
else:
print ('Unknown business question ID.')
if __name__ == '__main__':
# Getting csv dump file path from command line arguments
try:
csv_dump_path = sys.argv[1]
cb_user = sys.argv[2]
cb_pwd = sys.argv[3]
cb_host = sys.argv[4]
cb_bucket = sys.argv[5]
if len(sys.argv) == 6:
pipeline(csv_dump_path, cb_user, cb_pwd, cb_host, cb_bucket)
elif len(sys.argv) == 7:
bq_id = sys.argv[6]
data_analysis(csv_dump_path, bq_id)
else:
print ('Invalid no of system arguments')
except Exception as e:
print ('Insufficient parameters to run data pipeline.', e)