Skip to content

Commit ee75235

Browse files
author
root
committed
Initial Commit
1 parent aada0e4 commit ee75235

File tree

7 files changed

+724
-0
lines changed

7 files changed

+724
-0
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
__pycache__
2+
credentials.ini
3+

api/Comment.py

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
import time
2+
import html
3+
from collections import defaultdict
4+
import Parameters
5+
from Helpers import *
6+
7+
8+
class search:
9+
params = None
10+
def on_get(self, req, resp):
11+
start = time.time()
12+
q = req.get_param('q');
13+
self.params = req.params
14+
if 'ids' in self.params:
15+
data = self.getIds(self.params['ids'])
16+
else:
17+
data = self.doElasticSearch()
18+
end = time.time()
19+
data["metadata"]["execution_time_milliseconds"] = round((end - start) * 1000,2)
20+
data["metadata"]["version"] = "v3.0"
21+
resp.cache_control = ["public","max-age=2","s-maxage=2"]
22+
resp.body = json.dumps(data,sort_keys=True,indent=4, separators=(',', ': '))
23+
24+
def getIds(self,ids):
25+
if not isinstance(ids, (list, tuple)):
26+
ids = [ids]
27+
ids_to_get_from_db = []
28+
for id in ids:
29+
id = id.lower()
30+
if id[:3] == "t1_":
31+
id = id[3:]
32+
ids_to_get_from_db.append(base36decode(id))
33+
rows = pgdb.execute("SELECT * FROM comment WHERE (json->>'id')::bigint IN %s LIMIT 5000",tuple(ids_to_get_from_db))
34+
results = []
35+
data = {}
36+
if rows:
37+
for row in rows:
38+
comment = row[0]
39+
comment['id'] = base36encode(comment['id'])
40+
if 'parent_id' not in comment or comment['parent_id'] == None:
41+
comment['parent_id'] = "t3_" + base36encode(comment['link_id'])
42+
elif comment['parent_id'] == comment['link_id']:
43+
comment['parent_id'] = "t3_" + base36encode(comment['link_id'])
44+
else:
45+
comment['parent_id'] = "t1_" + base36encode(comment['parent_id'])
46+
if 'subreddit_id' in comment:
47+
comment['subreddit_id'] = "t5_" + base36encode(comment['subreddit_id'])
48+
comment['link_id'] = "t3_" + base36encode(comment['link_id'])
49+
comment.pop('name', None)
50+
results.append(comment)
51+
data["data"] = results
52+
data["metadata"] = {}
53+
return data
54+
55+
def doElasticSearch(self):
56+
57+
response = self.search("http://mars:9200/rc/comments/_search")
58+
results = []
59+
data = {}
60+
for hit in response["data"]["hits"]["hits"]:
61+
source = hit["_source"]
62+
source["id"] = base36encode(int(hit["_id"]))
63+
source["link_id"] = "t3_" + base36encode(source["link_id"])
64+
65+
if 'parent_id' in source:
66+
source["parent_id"] = "t1_" + base36encode(source["parent_id"])
67+
else:
68+
source["parent_id"] = source["link_id"]
69+
70+
source["subreddit_id"] = "t5_" + base36encode(source["subreddit_id"])
71+
72+
if 'author_flair_text' in source:
73+
source["author_flair_text"] = html.unescape(source["author_flair_text"])
74+
else:
75+
source["author_flair_text"] = None
76+
77+
if 'author_flair_css_class' in source:
78+
source["author_flair_css_class"] = html.unescape(source["author_flair_css_class"])
79+
else:
80+
source["author_flair_css_class"] = None
81+
82+
if 'fields' in self.params:
83+
if isinstance(self.params['fields'], str):
84+
self.params['fields'] = [self.params['fields']]
85+
self.params['fields'] = [x.lower() for x in self.params['fields']]
86+
for key in list(source):
87+
if key.lower() not in self.params['fields']:
88+
source.pop(key, None)
89+
90+
results.append(source)
91+
92+
if 'aggregations' in response["data"]:
93+
data["aggs"] = {}
94+
if 'subreddit' in response["data"]["aggregations"]:
95+
for bucket in response["data"]["aggregations"]["subreddit"]["buckets"]:
96+
bucket["score"] = bucket["doc_count"] / bucket["bg_count"]
97+
newlist = sorted(response["data"]["aggregations"]["subreddit"]["buckets"], key=lambda k: k['score'], reverse=True)
98+
data["aggs"]["subreddit"] = newlist
99+
100+
if 'author' in response["data"]["aggregations"]:
101+
for bucket in response["data"]["aggregations"]["author"]["buckets"]:
102+
if 'score' in bucket:
103+
bucket["score"] = bucket["doc_count"] / bucket["bg_count"]
104+
newlist = response["data"]["aggregations"]["author"]["buckets"]
105+
data["aggs"]["author"] = newlist
106+
107+
if 'created_utc' in response["data"]["aggregations"]:
108+
for bucket in response["data"]["aggregations"]["created_utc"]["buckets"]:
109+
bucket.pop('key_as_string', None)
110+
bucket["key"] = int(bucket["key"] / 1000)
111+
data["aggs"]["created_utc"] = response["data"]["aggregations"]["created_utc"]["buckets"]
112+
113+
if 'link_id' in response["data"]["aggregations"]:
114+
ids = []
115+
for bucket in response["data"]["aggregations"]["link_id"]["buckets"]:
116+
if 'score' in bucket:
117+
bucket["score"] = bucket["doc_count"] / bucket["bg_count"]
118+
ids.append(bucket["key"])
119+
submission_data = getSubmissionsFromES(ids)
120+
newlist = []
121+
after = 0
122+
if "after" in self.params:
123+
after = int(self.params["after"])
124+
for item in response["data"]["aggregations"]["link_id"]["buckets"]:
125+
if item["key"] in submission_data and submission_data[item["key"]]["created_utc"] > after:
126+
item["data"] = submission_data[item["key"]]
127+
item["data"]["full_link"] = "https://www.reddit.com" + item["data"]["permalink"]
128+
newlist.append(item)
129+
data["aggs"]["link_id"] = newlist
130+
131+
data["data"] = results
132+
data["metadata"] = {}
133+
data["metadata"] = response["metadata"]
134+
data["metadata"]["results_returned"] = len(response["data"]["hits"]["hits"])
135+
data["metadata"]["timed_out"] = response["data"]["timed_out"]
136+
data["metadata"]["total_results"] = response["data"]["hits"]["total"]
137+
data["metadata"]["shards"] = {}
138+
data["metadata"]["shards"] = response["data"]["_shards"]
139+
return data
140+
141+
def search(self, uri):
142+
nested_dict = lambda: defaultdict(nested_dict)
143+
q = nested_dict()
144+
q['query']['bool']['filter'] = []
145+
146+
if 'q' in self.params and self.params['q'] is not None:
147+
sqs = nested_dict()
148+
sqs['simple_query_string']['query'] = self.params['q']
149+
sqs['simple_query_string']['fields'] = ['body']
150+
sqs['simple_query_string']['default_operator'] = 'and'
151+
q['query']['bool']['filter'].append(sqs)
152+
153+
self.params, q = Parameters.process(self.params,q)
154+
155+
min_doc_count = 0
156+
if 'min_doc_count' in self.params and self.params['min_doc_count'] is not None and LooksLikeInt(self.params['min_doc_count']):
157+
min_doc_count = params['min_doc_count']
158+
159+
if 'aggs' in self.params:
160+
if isinstance(self.params['aggs'], str):
161+
self.params['aggs'] = [self.params['aggs']]
162+
for agg in list(self.params['aggs']):
163+
if agg.lower() == 'subreddit':
164+
q['aggs']['subreddit']['significant_terms']['field'] = "subreddit.keyword"
165+
q['aggs']['subreddit']['significant_terms']['size'] = 1000
166+
q['aggs']['subreddit']['significant_terms']['script_heuristic']['script']['lang'] = "painless"
167+
q['aggs']['subreddit']['significant_terms']['script_heuristic']['script']['inline'] = "params._subset_freq"
168+
q['aggs']['subreddit']['significant_terms']['min_doc_count'] = min_doc_count
169+
170+
if agg.lower() == 'author':
171+
q['aggs']['author']['terms']['field'] = 'author.keyword'
172+
q['aggs']['author']['terms']['size'] = 1000
173+
q['aggs']['author']['terms']['order']['_count'] = 'desc'
174+
175+
if agg.lower() == 'created_utc':
176+
q['aggs']['created_utc']['date_histogram']['field'] = "created_utc"
177+
if self.params['frequency'] is None:
178+
self.params['frequency'] = "day"
179+
q['aggs']['created_utc']['date_histogram']['interval'] = self.params['frequency']
180+
q['aggs']['created_utc']['date_histogram']['order']['_key'] = "asc"
181+
182+
if agg.lower() == 'link_id':
183+
q['aggs']['link_id']['terms']['field'] = "link_id"
184+
q['aggs']['link_id']['terms']['size'] = 250
185+
q['aggs']['link_id']['terms']['order']['_count'] = "desc"
186+
187+
response = None
188+
try:
189+
response = requests.get("http://mars:9200/rc/comments/_search", data=json.dumps(q))
190+
except requests.exceptions.RequestException as e:
191+
response = requests.get("http://jupiter:9200/rc/comments/_search", data=json.dumps(q))
192+
193+
results = {}
194+
results['data'] = json.loads(response.text)
195+
results['metadata'] = {}
196+
results['metadata']['size'] = self.params['size']
197+
results['metadata']['sort'] = self.params['sort']
198+
results['metadata']['sort_type'] = self.params['sort_type']
199+
if 'after' in self.params and self.params['after'] is not None:
200+
results['metadata']['after'] = self.params['after']
201+
return results
202+
203+

api/DBFunctions.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import psycopg2
2+
import time
3+
from configparser import ConfigParser
4+
5+
config = ConfigParser()
6+
config.read ('credentials.ini')
7+
8+
class pgdb:
9+
10+
def __init__(self):
11+
self.connect()
12+
13+
def connect(self):
14+
DB_PASSWORD = config.get('database','password')
15+
DB_USER = config.get('database','user')
16+
self.db = psycopg2.connect("dbname='reddit' user='" + DB_USER + "' host='jupiter' password='" + DB_PASSWORD + "'")
17+
self.db.set_session(autocommit=True)
18+
19+
def execute(self,sql,params):
20+
retries = 5
21+
while True:
22+
retries -= 1
23+
try:
24+
cur = self.db.cursor()
25+
cur.execute(sql,(params,))
26+
rows = cur.fetchall()
27+
cur.close()
28+
return rows
29+
except:
30+
if retries <= 0:
31+
raise
32+
try:
33+
time.sleep(1)
34+
self.connect()
35+
except:
36+
raise

api/Helpers.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
from collections import defaultdict
2+
import requests
3+
import json
4+
5+
def LooksLikeInt(s):
6+
try:
7+
int(s)
8+
return True
9+
except ValueError:
10+
return False
11+
12+
def base36encode(number, alphabet='0123456789abcdefghijklmnopqrstuvwxyz'):
13+
"""Converts an integer to a base36 string."""
14+
if not isinstance(number, (int, int)):
15+
raise TypeError('number must be an integer')
16+
17+
base36 = ''
18+
sign = ''
19+
20+
if number < 0:
21+
sign = '-'
22+
number = -number
23+
24+
if 0 <= number < len(alphabet):
25+
return sign + alphabet[number]
26+
27+
while number != 0:
28+
number, i = divmod(number, len(alphabet))
29+
base36 = alphabet[i] + base36
30+
31+
return sign + base36
32+
33+
def base36decode(number):
34+
return int(number, 36)
35+
36+
def getSubmissionsFromES(ids):
37+
nested_dict = lambda: defaultdict(nested_dict)
38+
if not isinstance(ids, (list, tuple)):
39+
ids = [ids]
40+
ids_to_get = []
41+
q = nested_dict()
42+
q["query"]["terms"]["id"] = ids
43+
q["size"] = 1000
44+
response = requests.get("http://mars:9200/rs/submissions/_search", data=json.dumps(q))
45+
s = json.loads(response.text)
46+
results = {}
47+
for hit in s["hits"]["hits"]:
48+
source = hit["_source"]
49+
base_10_id = source["id"]
50+
source["id"] = base36encode(int(hit["_id"]))
51+
results[base_10_id] = source
52+
return results
53+
54+
def getSubmissionsFromPg(ids):
55+
if not isinstance(ids, (list, tuple)):
56+
ids = [ids]
57+
ids_to_get_from_db = []
58+
rows = pgdb.execute("SELECT * FROM submission WHERE (json->>'id')::int IN %s LIMIT 5000",tuple(ids))
59+
results = {}
60+
data = {}
61+
if rows:
62+
for row in rows:
63+
submission = row[0]
64+
base_10_id = submission['id']
65+
submission['id'] = base36encode(submission['id'])
66+
if 'subreddit_id' in submission:
67+
submission['subreddit_id'] = "t5_" + base36encode(submission['subreddit_id'])
68+
submission.pop('name', None)
69+
results[base_10_id] = submission
70+
return results

0 commit comments

Comments
 (0)