Skip to content

Commit 0d0e065

Browse files
authored
Merge pull request #3 from zorroblue/crawler
Added crawler
2 parents 0e2e98f + c00c321 commit 0d0e065

File tree

10 files changed

+3800
-2651
lines changed

10 files changed

+3800
-2651
lines changed

CONTRIBUTING.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,6 @@
4747

4848
python -m grpc_tools.protoc -I./protos --python_out=. --grpc_python_out=. protos/search.proto
4949

50+
#### Defaults:
51+
52+
The master server runs in port 50051 while replica runs in port 50052. The database name of master is 'masterdb' while replica's is 'replicadb'

crawler.py

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
from concurrent import futures
2+
import time
3+
import math
4+
5+
from argparse import ArgumentParser
6+
import argparse
7+
8+
import grpc
9+
import search_pb2
10+
import search_pb2_grpc
11+
12+
import json
13+
import logging
14+
15+
from utils import querydb, init_logger, parse_level
16+
from data.generatedata import generate_indices
17+
18+
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
19+
20+
21+
def build_parser():
22+
parser = ArgumentParser()
23+
parser.add_argument('--master',
24+
dest='master', help='Master IP address',
25+
default='localhost:50051',
26+
required=False)
27+
parser.add_argument('--replica',
28+
dest='replica',
29+
default='localhost:50052',
30+
help='Replica IP address',
31+
required=False)
32+
choices = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
33+
parser.add_argument('--logging',
34+
dest='logging_level', help='Logging level',
35+
choices=choices,
36+
default='DEBUG',
37+
required=False)
38+
return parser
39+
40+
41+
def write_to_master_replica(master, replica, logging_level, lower=51, higher=70):
42+
# initialize logger
43+
logger = init_logger('crawler', logging_level)
44+
# generate data
45+
indices = generate_indices('pending', lower, higher)
46+
indices = json.dumps(indices)
47+
# initiate 2 phase commit protocol
48+
49+
# PHASE 1
50+
logger.debug("Initiate 2 phase commit protocol")
51+
print "Phase 1: Prepare"
52+
logger.debug("Starting Phase 1: Prepare")
53+
54+
master_vote = None
55+
replica_vote = None
56+
57+
master_channel = grpc.insecure_channel(master)
58+
master_stub = search_pb2_grpc.DatabaseWriteStub(master_channel)
59+
60+
request = search_pb2.CommitRequest(data=indices)
61+
62+
replica_channel = grpc.insecure_channel(replica)
63+
replica_stub = search_pb2_grpc.DatabaseWriteStub(replica_channel)
64+
65+
try:
66+
logger.info("Send COMMIT_REQUEST to master")
67+
master_vote = master_stub.QueryToCommit(request)
68+
print "Master Status: ", master_vote.status
69+
if master_vote.status == 1:
70+
logger.info("Received AGREED from master")
71+
else:
72+
logger.info("Received ABORT from master")
73+
except Exception as e:
74+
print e.code()
75+
logger.error("Master not reachable due to "+ str(e.code()))
76+
77+
try:
78+
logger.info("Send COMMIT_REQUEST to backup")
79+
replica_vote = replica_stub.QueryToCommit(request)
80+
print "Backup Status: ", replica_vote.status
81+
if replica_vote.status == 1:
82+
logger.info("Received AGREED from backup")
83+
else:
84+
logger.info("Received ABORT from backup")
85+
except Exception as e:
86+
print e.code()
87+
logger.error("Backup not reachable due to "+ str(e.code()))
88+
89+
# PHASE 2
90+
print "Phase 2: Prepare"
91+
logger.debug("Starting Phase 2: Commit")
92+
93+
if master_vote == None or replica_vote == None or replica_vote.status == 0 or master_vote.status == 0:
94+
try:
95+
request = search_pb2.CommitStatusUpdate(code=search_pb2.ROLL_BACK)
96+
try:
97+
logger.info("Sending ROLL_BACK to master")
98+
master_ack = master_stub.CommitPhase(request)
99+
except Exception as e:
100+
print str(e)
101+
logger.info("Master not able to receive ROLL_BACK due to "+ str(e.code()))
102+
103+
try:
104+
logger.info("Sending ROLL_BACK to replica")
105+
replica_ack = replica_stub.CommitPhase(request)
106+
except Exception as e:
107+
print str(e)
108+
logger.info("Replica not able to receive ROLL_BACK due to "+ str(e.code()))
109+
110+
except Exception as e:
111+
print e.code()
112+
logger.info("Rolled back transaction")
113+
return False
114+
115+
# Commit Phase
116+
request = search_pb2.CommitStatusUpdate(code=search_pb2.COMMIT)
117+
master_ack_received = False
118+
replica_ack_received = False
119+
retries = 0
120+
121+
while (not master_ack_received or not replica_ack_received) and retries < 3:
122+
if retries > 0:
123+
logger.info("Retrying")
124+
if not master_ack_received:
125+
try:
126+
logger.info("Sending COMMIT to master")
127+
master_ack = master_stub.CommitPhase(request)
128+
master_ack_received = True
129+
except Exception as e:
130+
logger.info("Master failed to receive due to "+ str(e.code()))
131+
if not replica_ack_received:
132+
try:
133+
logger.info("Sending COMMIT to backup")
134+
replica_ack = replica_stub.CommitPhase(request)
135+
replica_ack_received = True
136+
except Exception as e:
137+
logger.info("Backup failed to receive due to "+ str(e.code()))
138+
retries+=1
139+
print master_ack_received, replica_ack_received
140+
141+
142+
if retries < 3:
143+
logger.info("COMMIT")
144+
# TODO rollback
145+
# not doing it now to progress further in the work
146+
# PS if one of them fails, they will resync with the crawler. The crawler is aware of this as COMMIT isn't written till then.
147+
148+
print "Phase 2:"
149+
print "Master status", master_ack.status
150+
print "Replica status", replica_ack.status
151+
return master_ack.status == 1 and replica_ack.status == 1
152+
153+
154+
def main():
155+
parser = build_parser()
156+
options = parser.parse_args()
157+
master = options.master
158+
replica = options.replica
159+
logging_level = parse_level(options.logging_level)
160+
status = write_to_master_replica(master=master, replica=replica, logging_level=logging_level)
161+
if status:
162+
print "Write successful"
163+
164+
if __name__ == '__main__':
165+
main()

data/__init__.py

Whitespace-only changes.

data/generatedata.py

Lines changed: 46 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,48 +2,69 @@
22
from random_words import RandomWords
33
import random
44
from datetime import datetime
5+
import json
6+
import pkg_resources
57

68
random.seed(datetime.now())
7-
rw = RandomWords()
8-
9-
f = open("synonyms.txt")
109

11-
indices = {}
12-
linecount = 0
1310

1411
def generate_random_urls(word):
1512
'''Generate random urls for a word
1613
For any word, pick a random number of urls to return
1714
and choose random urls starting with the second letter of the word
1815
'''
16+
rw = RandomWords()
1917
r = random.randint(2,6)
2018
words = rw.random_words(count=r, letter=word[0])
2119
urls = []
2220
for word in words:
2321
url = "https://www."+word+".com"
2422
urls.append(url)
25-
print urls
23+
#print urls
2624
return urls
2725

28-
for line in f:
29-
words = line.strip().split(",")
30-
for i in range(len(words)):
31-
words[i] = words[i].strip()
32-
linecount += 1
33-
# pick only first 50 sets
34-
if linecount > 50:
35-
break
36-
for word in words:
37-
word = word.strip()
38-
indices[word] = {}
39-
indices[word]['sim_words'] = words
40-
indices[word]['urls'] = generate_random_urls(word)
26+
def generate_indices(status='committed', lower_range=1, upper_range=50):
27+
resource_package = __name__ # Could be any module/package name
28+
resource_path = 'synonyms.txt' # Do not use os.path.join(), see below
29+
f = pkg_resources.resource_stream(resource_package, resource_path)
30+
31+
indices = {}
32+
linecount = 0
33+
for line in f:
34+
words = line.strip().split(",")
35+
for i in range(len(words)):
36+
words[i] = words[i].strip()
37+
linecount += 1
38+
# pick only first sets in lines [lower_range, upper range]
39+
if linecount < lower_range:
40+
continue
41+
if linecount > upper_range:
42+
break
43+
for word in words:
44+
word = word.strip()
45+
if len(word) == 0:
46+
continue
47+
indices[word] = {}
48+
indices[word]['status'] = status
49+
indices[word]['sim_words'] = words
50+
indices[word]['urls'] = generate_random_urls(word)
51+
json_list = []
52+
53+
for word in indices:
54+
indices[word]['name'] = word
55+
json_list.append(indices[word])
56+
return json_list
57+
58+
def main():
59+
indices = generate_indices()
60+
json_list = []
4161

42-
json_list = []
62+
for word in indices:
63+
indices[word]['name'] = word
64+
json_list.append(indices[word])
4365

44-
for word in indices:
45-
indices[word]['name'] = word
46-
json_list.append(indices[word])
66+
with open('indices.json', 'w') as fp:
67+
json.dump(json_list, fp, sort_keys=True, indent=4)
4768

48-
with open('indices.json', 'w') as fp:
49-
json.dump(json_list, fp, sort_keys=True, indent=4)
69+
if __name__ == '__main__':
70+
main()

0 commit comments

Comments
 (0)