-
Notifications
You must be signed in to change notification settings - Fork 0
/
rank.py
146 lines (118 loc) · 6.51 KB
/
rank.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os
os.environ['WORKDIR'] = "/home/ec2-user/Yelp-Challenge/"
from _functools import partial
import sys
from geopy.distance import vincenty
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.types import StructType, StructField, FloatType, StringType, \
ArrayType
from pyspark.sql import functions as F
from os import listdir
from os.path import isfile, join, getsize
import re
def getDistance(x1, y1, x2, y2):
return vincenty((x1, y1) , (x2, y2)).miles
def isUserlocal(input_row, latitude, longitude):
user_locations = input_row.cluster_centers
for center in user_locations:
if getDistance(latitude, longitude, center.latitude, center.longitude) < 5.0:
return True
return False
def isBusinessLocalAndRelevant(input_row, latitude, longitude, sub_categories):
if getDistance(latitude, longitude, input_row.latitude, input_row.longitude) > 5.0:
return False
categories = input_row.categories
for x in categories:
if x in sub_categories:
return True
return False
class MainApp(object):
def __init__(self):
self.category = cat
self.loc_lat = zipcode.latitude
self.loc_long = zipcode.longitude
pass
def init(self):
#os.environ["SPARK_HOME"] = "/Users/abhinavrungta/Desktop/setups/spark-1.5.2"
# os.environ['AWS_ACCESS_KEY_ID'] = <YOURKEY>
# os.environ['AWS_SECRET_ACCESS_KEY'] = <YOURKEY>
conf = SparkConf()
#conf.setMaster("local[10]")
#conf.setAppName("PySparkShell")
#conf.set("spark.executor.memory", "2g")
#conf.set("spark.driver.memory", "1g")
self.sc = sc
self.sqlContext = sqlContext
#self.sc = SparkContext(conf=conf)
#self.sqlContext = SQLContext(self.sc)
def loadData(self):
category_list = self.sc.textFile(os.environ['WORKDIR'] + "yelp_dataset_challenge_academic_dataset/cat_subcat.csv").map(lambda line: (line.split(',')[0], line.split(',')))
category_schema = StructType([
StructField("category", StringType(), True),
StructField("sub_category", ArrayType(StringType()), True)
])
# self.category_list.registerTempTable("categories_list")
# subcat = self.sqlContext.sql("SELECT sub_category FROM categories_list WHERE category = \"{0}\" LIMIT 1".format(self.category))
category_list = self.sqlContext.createDataFrame(category_list, category_schema)
subcat = category_list.where(category_list.category == self.category).first().sub_category
self.df_business = self.sqlContext.read.json(os.environ['WORKDIR'] + "yelp_dataset_challenge_academic_dataset/yelp_academic_dataset_business.json")
# self.df_business = self.sqlContext.read.json("s3n://ds-emr-spark/data/yelp_academic_dataset_business.json").cache()
self.df_business = self.df_business.select("business_id", "name", "stars", "latitude", "longitude", "categories")
filter_business = partial(isBusinessLocalAndRelevant, latitude = self.loc_lat, longitude = self.loc_long, sub_categories = subcat)
self.df_business = self.df_business.rdd.filter(filter_business)
self.df_business = self.sqlContext.createDataFrame(self.df_business)
self.df_business = self.df_business.select("business_id", "name", "stars")
self.df_business.registerTempTable("business")
schema_2 = StructType([
StructField("latitude", FloatType(), True),
StructField("longitude", FloatType(), True)
])
schema = StructType([
StructField("cluster_centers", ArrayType(schema_2), True),
StructField("user_id", StringType(), True)
])
self.df_user_locations = self.sqlContext.read.json(os.environ['WORKDIR'] + "clustering_models/center.json/dbscan", schema)
filter_users = partial(isUserlocal, latitude = self.loc_lat, longitude = self.loc_long)
self.df_user_locations = self.df_user_locations.rdd.filter(filter_users)
self.df_user_locations = self.sqlContext.createDataFrame(self.df_user_locations)
self.df_user_locations = self.df_user_locations.select("user_id")
self.df_user_locations.registerTempTable("user")
#print "user locations: ", self.self.df_user_locations.count()
self.df_review = self.sqlContext.read.json(os.environ['WORKDIR'] + "yelp_dataset_challenge_academic_dataset/yelp_academic_dataset_review.json")
self.df_review = self.df_review.select("business_id", "user_id", "stars")
self.df_review.registerTempTable("review")
#print "reviews: ", self.self.df_review.count()
self.df_joined = self.sqlContext.sql("SELECT r.user_id AS user_id, r.business_id AS business_id, first(b.name) AS business_name, first(b.stars) as business_stars, avg(r.stars) AS avg_rev_stars FROM review r, business b, user u WHERE r.business_id = b.business_id AND r.user_id = u.user_id GROUP BY r.user_id, r.business_id")
self.df_joined.registerTempTable("joined")
self.df_business.unpersist()
self.df_user_locations.unpersist()
self.df_review.unpersist()
self.df_category_pred = self.loadEliteScorePredictionsForCategory()
self.df_category_pred.registerTempTable("prediction")
self.df_joined = self.sqlContext.sql("SELECT j.*, p.prediction AS elite_score, (j.avg_rev_stars*p.prediction) AS w_score FROM joined j, prediction p WHERE j.user_id = p.user_id")
#print "joined: ", self.self.df_joined.count()
#self.self.df_joined.show()
self.df_category_pred.unpersist()
df_grouped = self.df_joined.groupBy("business_id", "business_name", "business_stars").agg(F.avg("w_score").alias("rank"))
df_grouped = df_grouped.sort("rank", ascending=False)
print df_grouped.count()
df_grouped.show()
self.df_joined.unpersist()
return df_grouped
def loadEliteScorePredictionsForCategory(self):
fileloc = "regression_models/"
filename = "pred_" + re.sub(" ", "_", self.category.lower()) + ".json"
category_file = os.environ['WORKDIR'] + join(fileloc, filename)
#print category_file
if isfile(category_file):
df_category_pred = self.sqlContext.read.json(category_file)
#print self.df_category_pred.count()
#self.df_category_pred.show()
return df_category_pred
def createCheckInDataPerUser(self):
pass
app = MainApp()
app.init()
app.loadData()