-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
b9cf9d7
commit 8884896
Showing
4 changed files
with
659 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
env/ | ||
.idea/ | ||
__pycache__/ | ||
__pycache__/ | ||
.DS_Store |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,219 @@ | ||
import asyncio | ||
import json | ||
import time | ||
from .sangoma_utils import AES_encrypt | ||
from .sangoma_connector import ConnectionHandler | ||
|
||
def setG(global_G): | ||
global G | ||
G = global_G | ||
|
||
|
||
|
||
class MainSystemServerLambdaAuthenticator(): | ||
""" | ||
For requests coming in from AWS lambda functions. | ||
Performs basic key authentication on the first message. | ||
""" | ||
|
||
def __init__(self): | ||
self.connection_id_counter = 0 | ||
|
||
async def auth_incoming(self, this_websocket, auth_results, connections): | ||
""" | ||
this_websocket: the current websocket connection object | ||
auth_results: | ||
connections: the connection pool to which the websocket is added if authentication is sucessful | ||
""" | ||
global G | ||
try: | ||
first_message = await this_websocket.recv() #the very first message from the other server needs to have a specific format | ||
#G['logger'].log(f"first_message in MainSystemServerLambdaAuthenticator = |{first_message}|") | ||
D = json.loads(first_message) | ||
if D['authorization_key_used_by_incoming_aws_lambda_fcts'] == G['settings']['apis']['authorization_key_used_by_incoming_aws_lambda_fcts']: | ||
this_connection = ConnectionHandler.connection(this_websocket, use_compression=False, AES_encryptor=None) # wrap this as a connection object | ||
this_connection.id = self.connection_id_counter | ||
this_connection.time_established = time.time() | ||
connections[self.connection_id_counter] = this_connection | ||
auth_results['this_connection'] = this_connection | ||
self.connection_id_counter += 1 | ||
return True | ||
else: | ||
return False | ||
except: | ||
G['logger'].log('Error decoding first authentication message: {first_message}') | ||
return False | ||
|
||
|
||
class WebClientAuthenticator: | ||
""" | ||
an object of this type is instantiated, configured and passed to the ConnectionHandler object | ||
for server role: | ||
- handles the authentication of any incoming connections, | ||
identifies and saves the connection object to the respective group | ||
for client role: | ||
- Handles the authentication for connecting to the server | ||
""" | ||
def __init__(self): | ||
self.connection_counter = 0 #help to create unique ids | ||
|
||
async def auth_incoming(self, this_websocket, auth_results, connections): | ||
|
||
|
||
#print('waiting for incoming message...') | ||
#msg = await this_websocket.recv() | ||
#print(f'msg = {msg}') | ||
|
||
this_connection = ConnectionHandler.connection(this_websocket, use_compression=False, AES_encryptor=None) # wrap this as a connection object | ||
this_connection_id = 'conn_' + str(self.connection_counter) #create a unique id. This can be extended to contain the client user name | ||
self.connection_counter += 1 | ||
connections[this_connection_id] = this_connection | ||
auth_results['this_connection'] = this_connection | ||
print('incoming connection in WebClientAuthenticator accepted. No authentication performed.') | ||
return True | ||
|
||
# -------------------------------------------------- | ||
|
||
|
||
class MainSystemServerAuthenticator: | ||
""" | ||
used to handle the standardized authentication of one of the main system servers. | ||
""" | ||
def __init__(self, use_compression=False, use_encryption=False): | ||
self.authentication_time_threshold = 60 | ||
self.use_compression = use_compression | ||
self.use_encryption=use_encryption | ||
try: | ||
self.AES_encryptor = AES_encrypt(key_hex=G['settings']['communication_encryption_key_hex']) | ||
except: | ||
print('problem creating encryptor with hex key from settings file') | ||
|
||
async def auth_incoming(self, this_websocket, auth_results, connections): | ||
"""returns True if client is authenticated, False if not. | ||
Other properties from the authentication are saved in auth_results. | ||
The connections dictionary is passed that the authentication routine can assign the connection object to the appropriate groups. | ||
The connection object is also returned in the auth_results (implicitly by reference) | ||
For authentication, the beginning of the message after encrypting needs to exactly match and the time difference | ||
between the send time and local system time needs to be lower than a threshold | ||
""" | ||
|
||
raw = await this_websocket.recv() #the very first message from the other server needs to have a specific format | ||
auth_msg = self.AES_encryptor.decrypt(raw) | ||
try: | ||
auth_dict = json.loads(auth_msg[33:]) | ||
self.use_compression = auth_dict['use_compression'] #use the settings specified by the client | ||
self.use_encryption = auth_dict['use_encryption'] | ||
t_diff = abs(auth_dict['time']-time.time()) #the time difference between the time marked in the authentication message and the current system time | ||
if auth_msg[:34] != 'ThisPreSnippetIsForAuthentication{' or t_diff>self.authentication_time_threshold: | ||
return False #not properly authenticated | ||
|
||
#now assign the connection to a specific group | ||
this_connection = ConnectionHandler.connection(this_websocket, use_compression=self.use_compression, AES_encryptor=None) #wrap this as a connection object | ||
if self.use_encryption: | ||
this_connection.AES_encryptor = AES_encrypt(key_hex=G['settings']['communication_encryption_key_hex']) | ||
|
||
#pass some parameters associated with this connection to the outside | ||
auth_results['service_id'] = auth_dict['service_id'] | ||
auth_results['time_connected'] = round(time.time()) | ||
auth_results['this_connection'] = this_connection | ||
|
||
|
||
#send confirmation message to client that connection is accepted | ||
await this_connection.send(json.dumps({ | ||
"authentication_success" : True, | ||
"this_host_role": G["settings"]["this_host_role"] | ||
})) | ||
|
||
#assign this connection into connections[this_host_role][service_id] | ||
try: | ||
if auth_dict['this_host_role'] not in connections: | ||
connections[auth_dict['this_host_role']] = {} | ||
|
||
if auth_dict['service_id'] in connections[auth_dict['this_host_role']]: | ||
print(f'connection with name {auth_dict["service_id"]} already present in connections[{auth_dict["this_host_role"]}]. Rejecting new connection attempt') | ||
return False | ||
|
||
connections[auth_dict['this_host_role']][auth_dict['service_id']] = this_connection #e.g. connections['main_system_server']['Interoffshore'] | ||
|
||
except Exception as exc: | ||
print(f'problem assigning connection from name transmitted by client. Exception: {exc} and auth_dict={str(auth_dict)}') | ||
return False | ||
|
||
return True #authetication successful and connection assigned | ||
|
||
except Exception as exc: | ||
print(f'Error occurred in authentication: {exc}') | ||
return False | ||
|
||
async def auth_me_to_host(self, this_websocket, auth_results, connections): | ||
"""returns True if the authentication was successful. Sets up the connection into the passed 'connections' | ||
dictionary with the key specified by the role of the server (given in its repsonse).""" | ||
try: | ||
first_auth_message = f'ThisPreSnippetIsForAuthentication' + json.dumps({ | ||
"time": round(time.time(), 2), | ||
"this_host_role" : G["settings"]["this_host_role"], | ||
"service_id": G["settings"]["service_id"], | ||
"use_compression" : self.use_compression, #for this channel | ||
"use_encryption" : self.use_encryption | ||
}) | ||
enc = self.AES_encryptor.encrypt(first_auth_message) | ||
await this_websocket.send(enc) # send the authentication message. Now wait for confirmation | ||
except: | ||
print(f'problem constructing or sending auth message in MainSystemServerAuthenticator.auth_me_to_host') | ||
|
||
|
||
#create the connection object | ||
this_connection = ConnectionHandler.connection(this_websocket, use_compression=self.use_compression, AES_encryptor=None) # wrap this as a connection object | ||
if self.use_encryption: | ||
this_connection.AES_encryptor = AES_encrypt(key_hex=G['settings']['communication_encryption_key_hex']) | ||
|
||
#now get a response from the server that the authentication went well | ||
try: | ||
auth_response = json.loads(await this_connection.recv()) | ||
if auth_response['authentication_success'] != True: | ||
return False #not properly authenticated | ||
|
||
# now assign the connection to a specific group | ||
connections[auth_response['this_host_role']] = this_connection | ||
|
||
auth_results['partner_host_role'] = auth_response["this_host_role"] | ||
auth_results['time_connected'] = round(time.time()) | ||
auth_results['this_connection'] = this_connection | ||
|
||
return True | ||
except Exception as exc: | ||
print(f'problem in receiving or decoding authorization response message from server {exc}') | ||
return False | ||
|
||
# -------------------------------------------------- | ||
# Added | ||
class MonitoringServiceAuthenticator(): | ||
|
||
def __init__(self): | ||
self.connection_id_counter = 0 | ||
|
||
async def auth_incoming(self, this_websocket, auth_results, connections): | ||
""" | ||
this_websocket: the current websocket connection object | ||
auth_results: | ||
connections: the connection pool to which the websocket is added if authentication is sucessful | ||
""" | ||
global G | ||
try: | ||
first_message = await this_websocket.recv() # the very first message from the other server needs to have a specific format | ||
# D = json.loads(first_message) | ||
# TODO: Make Authorization check here, Currently accepts any connection | ||
this_connection = ConnectionHandler.connection(this_websocket, use_compression=False, AES_encryptor=None) # wrap this as a connection object | ||
this_connection.id = self.connection_id_counter | ||
this_connection.time_established = time.time() | ||
connections[self.connection_id_counter] = this_connection | ||
auth_results['this_connection'] = this_connection | ||
self.connection_id_counter += 1 | ||
return True | ||
except: | ||
G['logger'].log('Error decoding first authentication message: {first_message}') | ||
return False | ||
|
||
|
||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
import asyncio | ||
import websockets | ||
import time | ||
import json | ||
import random | ||
import zlib #for compression | ||
from .sangoma_utils import delete_element_from_nested_list_dict | ||
|
||
|
||
|
||
class ConnectionHandler: | ||
"""a class to handle all connections, both for the server and the client side. | ||
As a server, execute ConnectionHandler.accept_connections() after instantiating the object. | ||
All active connections are saved in the dictionary self.connections, possibly with subgroupings | ||
to create hierarchical groups | ||
""" | ||
def __init__(self, authenticator, message_manager): | ||
self.connections = {} | ||
self.loop = asyncio.get_event_loop() | ||
self.authenticator = authenticator | ||
self.message_manager = message_manager | ||
|
||
# -------------------------------------------------- | ||
|
||
def accept_connections(self, port, ip_filter = '0.0.0.0'): | ||
try: | ||
loop = asyncio.get_event_loop() | ||
start_server = websockets.serve(self.websocket_handler, ip_filter, port) | ||
asyncio.get_event_loop().run_until_complete(start_server) | ||
print(f' ********* accepting connections on port {port} from {ip_filter} ************** ') | ||
#loop.run_forever() #don't call this here in case multiple accept_connections are called from a higher level. Only call this once all coroutines have been added to the event loop | ||
except Exception as exc: | ||
print(f'problem occurred in ConnectionHandler.accept_connections({ip_filter}, {port}): {exc}') | ||
|
||
# -------------------------------------------------- | ||
|
||
async def connect_to_host(self, host_ip, host_port, attempt_reconnect_if_dropped = False): | ||
print(f'connect_to_host called with {host_ip} and {host_port}') | ||
try: | ||
async with websockets.connect(f'ws://{host_ip}:{host_port}') as websocket: | ||
this_connection = None # allocate that this exists for cleanup | ||
auth_results = {} | ||
if await self.authenticator.auth_me_to_host(this_websocket=websocket, auth_results=auth_results, connections=self.connections): | ||
print('successfully connected and authenticated to host.') | ||
else: | ||
print('authenticated to host not successful. Quitting connection attempt.') | ||
return False | ||
|
||
try: | ||
this_connection = auth_results['this_connection'] | ||
while True: # keep this loop running for this connection as long as it is open | ||
try: | ||
received_data = await this_connection.recv() | ||
asyncio.get_event_loop().create_task(self.message_manager.process_message(received_data)) # just launch it to return to receiving, don't gather anywhere. | ||
except: | ||
print(f'problem in the receive loop for connection {str(auth_results)}') | ||
break | ||
|
||
except Exception as exc: | ||
print(f'problem in receive loop for connection {str(auth_results)}. Exception: {exc}') | ||
finally: #clean up this connection: remove from all occurences in the dictionary | ||
print('------------- connection closed ------------- ') | ||
delete_element_from_nested_list_dict(self.connections, this_connection) #cleans up as long as this is a structure of nested dicts and lists | ||
except Exception as exc2: | ||
print(f'problem in outer connection loop for connection. Exception: {exc2}') | ||
finally: | ||
if attempt_reconnect_if_dropped: | ||
await asyncio.sleep(3) | ||
print(f'attempting reconnecting to {host_ip}:{host_port}') | ||
asyncio.get_event_loop().create_task(self.connect_to_host(host_ip, host_port, attempt_reconnect_if_dropped)) # python 3.7 supports asyncio.create_task(...) | ||
|
||
|
||
# -------------------------------------------------- | ||
|
||
|
||
class connection(): | ||
"""a class wrapping a connection individually. Different encryption and compression methods | ||
may be used on a connection-specific basis.""" | ||
def __init__(self, this_socket, use_compression = False, AES_encryptor = None): | ||
self.id = None | ||
self.this_socket = this_socket | ||
self.AES_encryptor = AES_encryptor | ||
self.use_compression = use_compression | ||
self.zlib_compression_level = 6 #ranges between [0, 9] (0 = no compression), see https://stackabuse.com/python-zlib-library-tutorial/ | ||
self.time_established = None | ||
|
||
async def send(self, msg): | ||
if type(msg) is dict: | ||
msg = json.dumps(msg) | ||
|
||
|
||
if self.use_compression: | ||
msg = zlib.compress(msg.encode('utf-8'), self.zlib_compression_level) | ||
if self.AES_encryptor: | ||
msg = self.AES_encryptor.encrypt(msg) | ||
#print('actually sending: ' + str(msg) ) | ||
try: | ||
await self.this_socket.send(msg) | ||
except: | ||
import traceback | ||
traceback.print_exc() | ||
print(f"problem in ConnectionHandler.connection.send with msg = {msg}") | ||
|
||
|
||
async def recv(self): | ||
msg = await self.this_socket.recv() | ||
#print('actually received: ' + str(msg)) | ||
if self.AES_encryptor: #if this is defined, use encryption | ||
msg = self.AES_encryptor.decrypt(msg) | ||
if self.use_compression: | ||
msg = zlib.decompress(msg).decode('utf-8') | ||
return msg | ||
|
||
# -------------------------------------------------- | ||
|
||
async def websocket_handler(self, this_websocket, path): | ||
"""this function is executed by the server each time a new client attempts to connect. | ||
this_websocket is passed by the websocket routine in start_server. | ||
1) identify the category of the connection | ||
2) verify that this is legitimate | ||
3) save the connection object created from this_websocket into the respective connection group | ||
4) this function keeps running listening on the port for incoming data. | ||
5) cleanup: remove the connection object from all groups if the connection is closed.""" | ||
|
||
#print('---- connection accepted by handler ----------- ') | ||
this_connection = None #allocate that this exists for cleanup | ||
auth_results = {} #the authentication routine will write its results in here | ||
try: | ||
if await self.authenticator.auth_incoming(this_websocket=this_websocket, auth_results=auth_results, connections=self.connections): | ||
print('authentication successful') | ||
else: | ||
print('authentication failed. Exiting websocket handler') | ||
return | ||
except Exception as ex: | ||
print(f'error occurred in authentication: {str(ex)}') | ||
|
||
try: | ||
this_connection = auth_results['this_connection'] #get the connection object from the dictionary passed back from the authenticator, where the connector was created | ||
while True: #keep this loop running for this connection as long as it is open | ||
try: | ||
received_msg_str = await this_connection.recv() | ||
connection_and_msg_str = {'connection': this_connection, 'receivedMsgStr': received_msg_str} #the connection needs to be passed along to know exactly which lambda fct ws connection to return the state to | ||
asyncio.get_event_loop().create_task(self.message_manager.process_message(connection_and_msg_str)) #just launch it to return to receiving, don't gather anywhere. | ||
|
||
except: | ||
#print(f'problem in the receive loop for connection {auth_results}') | ||
break | ||
except Exception as exc: | ||
print(f'problem in connection {str(auth_results)}. Exception: {exc}') | ||
|
||
finally: | ||
# remove this websocket from all places it was saved in the dictionary | ||
#print('------------- connection closed ------------- ') | ||
delete_element_from_nested_list_dict(self.connections, this_connection) | ||
|
||
|
||
|
||
|
Oops, something went wrong.