-
Notifications
You must be signed in to change notification settings - Fork 0
/
index_server.py
114 lines (93 loc) · 4.56 KB
/
index_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import socket
from threading import Thread
import datetime
import pickle
import time
class UpdateConnectedNodes(Thread):
def __init__(self, hostname_to_last_message_time, connected_nodes):
Thread.__init__(self)
self.hostname_to_last_message_time = hostname_to_last_message_time
self.connected_nodes = connected_nodes
def run(self):
while True:
now = datetime.datetime.now()
keys = list(self.hostname_to_last_message_time.keys())
for key in keys:
if now - datetime.timedelta(seconds=90) < self.hostname_to_last_message_time[key]:
if self.hostname_to_last_message_time.get(key):
del self.hostname_to_last_message_time[key]
if self.connected_nodes.get(key):
del self.connected_nodes[key]
class HandleGetHostnameSocketConnection(Thread):
def __init__(self, filename_to_hostname, connection):
Thread.__init__(self)
self.filename_to_hostname = filename_to_hostname
self.connection = connection
def run(self):
while True:
message = self.connection.recv(1024)
if not message:
break
message = pickle.loads(message)
if len(message) == 0:
break
print("GetHostname")
print(message)
self.connection.sendall(pickle.dumps({"containers": self.filename_to_hostname.get(message, [])}))
print(pickle.dumps({"containers": self.filename_to_hostname.get(message, [])}))
print({"containers": self.filename_to_hostname.get(message, [])})
self.connection.close()
class HandleGetHostnameSocket(Thread):
def __init__(self, get_hostname_socket, filename_to_hostname):
Thread.__init__(self)
self.get_hostname_socket = get_hostname_socket
self.filename_to_hostname = filename_to_hostname
def run(self):
while True:
connection, address = self.get_hostname_socket.accept()
handle_get_hostname = HandleGetHostnameSocketConnection(self.filename_to_hostname, connection)
handle_get_hostname.start()
class HandleRegistrationSocket(Thread):
def __init__(self, registration_socket, get_hostname_socket, hostname_to_last_message_time, filename_to_hostname, connected_nodes):
Thread.__init__(self)
self.registration_socket = registration_socket
self.get_hostname_socket = get_hostname_socket
self.hostname_to_last_message_time = hostname_to_last_message_time
self.filename_to_hostname = filename_to_hostname
self.connected_nodes = connected_nodes
def run(self):
while True:
data, address = self.registration_socket.recvfrom(4096)
if not data:
break
received = pickle.loads(data)
ip_port = address[0] + ":" + str(received["port"])
self.hostname_to_last_message_time[ip_port] = datetime.datetime.now()
self.connected_nodes[ip_port] = True
print(received)
for filename in received["files"]:
if not self.filename_to_hostname.get(filename):
self.filename_to_hostname[filename] = []
if not ip_port in self.filename_to_hostname[filename]:
self.filename_to_hostname[filename].append(ip_port)
print(self.filename_to_hostname[filename])
self.registration_socket.close()
class IndexServer:
def __init__(self):
self.registration_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.get_hostname_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.registration_socket.bind(('localhost', 4000))
self.get_hostname_socket.bind(('localhost', 4001))
self.get_hostname_socket.listen(5)
self.hostname_to_last_message_time = {}
self.filename_to_hostname = {}
self.connected_nodes = {}
def start(self):
update_connected_nodes = UpdateConnectedNodes(self.hostname_to_last_message_time, self.connected_nodes)
handle_get_hostname_socket = HandleGetHostnameSocket(self.get_hostname_socket, self.filename_to_hostname)
handle_registration_socket = HandleRegistrationSocket(self.registration_socket, self.get_hostname_socket, self.hostname_to_last_message_time, self.filename_to_hostname, self.connected_nodes)
update_connected_nodes.start()
handle_get_hostname_socket.start()
handle_registration_socket.start()
index_server = IndexServer()
index_server.start()