-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy paththreaded_server.py
executable file
·128 lines (106 loc) · 4.4 KB
/
threaded_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/env python3
from iax2.packet import iax2
from verboselogs import VerboseLogger as getLogger
import importlib
import logging
import socketserver
import threading
import time
import configargparse
import register
import notify
__author__ = "Jason Kendall VE3YCA"
__copyright__ = "Copyright 2020-2021, Jason Kendall"
__credits__ = ["Jason Kendall"]
__license__ = "AGPL 3.0 or Later"
__version__ = "1.0.0"
__maintainer__ = "Jason Kendall"
__email__ = "ve3yca@ve3yca.com"
__status__ = "Dev"
logger = getLogger('pyIAX-Regserver')
class pyIAX(socketserver.BaseRequestHandler):
def handle(self):
data = self.request[0]
host = self.client_address
response = self.server.iax2.parse_packet(data, host)
logger.debug(f"Response: {response}")
if response:
self.request[1].sendto(response, host)
class ThreadedUDPServer(socketserver.ThreadingMixIn, socketserver.UDPServer):
def __init__(self, *args, **kwargs):
self.register = kwargs.pop('register')
self.notify = kwargs.pop('notify')
self.args = kwargs.pop('args')
self.iax2 = iax2(self.register, self.notify)
socketserver.UDPServer.__init__(self, *args, **kwargs)
if __name__ == "__main__":
parser = configargparse.ArgParser(
default_config_files=['/etc/pyiax-reg/*.conf', './pyiax-reg.conf'],
description='IAX2 Registration Server',
add_help=False
)
parser.add_argument('-h', dest='HELP', action='store_true', help='show this help message and exit')
parser.add_argument('--listen_ip', dest='HOST', metavar='IP', help='The IP to listen on - default: 0.0.0.0', default='0.0.0.0')
parser.add_argument('--port', dest='PORT', metavar='PORT', type=int, help='The UDP Port to listen on - default: 4569', default=4569)
parser.add_argument('-v', dest='VERBOSE', action='count', help='Verbose Logs (More is more verbose)', default=0)
parser.add_argument('-c', dest='COLOR', action='store_true', help='Display Colored logs - default False')
parser.add_argument('--register', dest="REGISTER", choices=register.__all__, help='Select the registration module to use')
parser.add_argument('--notify', dest="NOTIFY", choices=notify.__all__, help='Select the noitification module to use')
args, argv = parser.parse_known_args()
if args.HELP:
for moduleName in register.__all__:
module = importlib.import_module(f"register.{moduleName}")
if "help" in dir(module):
module.help(parser)
for moduleName in notify.__all__:
try:
module = importlib.import_module(f"notify.{moduleName}")
if "help" in dir(module):
module.help(parser)
except:
pass
parser.parse_known_args()
parser.print_help()
exit()
if args.REGISTER is None:
print("Register Argument is not optional")
exit()
# Configure logger for requested verbosity.
if args.VERBOSE >= 3:
logging_level = "DEBUG"
elif args.VERBOSE >= 2:
logging_level = "VERBOSE"
elif args.VERBOSE >= 1:
logging_level = "NOTICE"
elif args.VERBOSE <= 0:
logging_level = "WARNING"
if args.COLOR:
import coloredlogs
coloredlogs.install(level=logging_level)
else:
logging.basicConfig(level=logging_level)
registerMod = importlib.import_module(f"register.{args.REGISTER}")
if "help" in dir(registerMod):
registerMod.help(parser)
args, argv = parser.parse_known_args()
registerHandler = getattr(registerMod, args.REGISTER)(args=args)
if args.NOTIFY:
notifyMod = importlib.import_module(f"notify.{args.NOTIFY}")
if "help" in dir(notifyMod):
notifyMod.help(parser)
args, argv = parser.parse_known_args()
notifyHandler = getattr(notifyMod, args.NOTIFY)(args=args)
else:
notifyHandler = None
server = ThreadedUDPServer((args.HOST, args.PORT), pyIAX, register=registerHandler, notify=notifyHandler, args=args)
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
try:
server_thread.start()
logger.success("Server started at {} port {}".format(args.HOST, args.PORT))
while True:
time.sleep(100)
except (KeyboardInterrupt, SystemExit):
server.shutdown()
server.server_close()
exit()