From 9149767f2b8946810e68de412ad42af8379d26b1 Mon Sep 17 00:00:00 2001 From: Ben Burns Date: Sun, 22 Jun 2014 15:58:17 +1200 Subject: [PATCH] Remove old VDE2 version, add rate-limiting --- README.md | 12 ++------ limiter.py | 25 +++++++++++++++ switchedrelay.py | 80 ++++++++++++++++++++++++++++++------------------ tuntapapp.py | 61 ------------------------------------ 4 files changed, 79 insertions(+), 99 deletions(-) create mode 100644 limiter.py delete mode 100644 tuntapapp.py diff --git a/README.md b/README.md index 632b15d..a9cb710 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,6 @@ #WebSockets Proxy -This is two different Python WebSockets servers which handle sending/receivng -ethernet frames. tuntapapp.py works with TAP devices (creates one device per -connection) and vdeapp.py connects to a VDE2 switch. Both are written using -Tornado's awesome websocket server support. +A virtual ethernet switch built using Tornado in Python -Please note that these aren't super awesome, super configurable productioni -ready services. These are just two quick hacks I put together to set up a demo. - -Also, to use vdeapp.py, make sure you've built VDE2 from source and installed -the python bindings in order to enable VdePlug. +Accepts ethernet frames via a WebSocket, or an ethernet TAP device. +Rate limits WebSocket connections to prevent abuse. Could use some cleaning! diff --git a/limiter.py b/limiter.py new file mode 100644 index 0000000..1abf8bb --- /dev/null +++ b/limiter.py @@ -0,0 +1,25 @@ +import time + +class RateLimitingState(object): + def __init__(self, rate, clientip, name): + self.name = name + self.clientip = clientip + self.rate = rate + self.allowance = rate + self.last_check = time.time() + + def do_throttle(self, message): + current = time.time() + time_passed = current - self.last_check + + self.last_check = current + self.allowance += time_passed * self.rate + + if self.allowance > self.rate: + self.allowance = self.rate #throttle + + if self.allowance > 1.0: + self.allowance -= len(message) + return True; + + return False diff --git a/switchedrelay.py b/switchedrelay.py index b00b5c9..3cb4a1e 100644 --- a/switchedrelay.py +++ b/switchedrelay.py @@ -2,29 +2,43 @@ import time import threading import logging +import traceback + from select import poll +from select import POLLIN, POLLOUT, POLLHUP, POLLERR, POLLNVAL from pytun import TunTapDevice, IFF_TAP, IFF_NO_PI -from select import POLLIN, POLLOUT, POLLHUP, POLLERR, POLLNVAL + +from limiter import RateLimitingState import tornado.ioloop import tornado.web import tornado.options +from tornado.concurrent import return_future + from tornado import websocket FORMAT = '%(asctime)-15s %(message)s' RATE = 40980.0 #unit: bytes -PER = 1.0 #unit: seconds BROADCAST = '%s%s%s%s%s%s' % (chr(0xff),chr(0xff),chr(0xff),chr(0xff),chr(0xff),chr(0xff)) +PING_INTERVAL = 30 logger = logging.getLogger('relay') macmap = {} +@return_future +def delay_future(t, callback): + timestamp = time.time() + if timestamp < t: + return + else: + callback(t) + class TunThread(threading.Thread): def __init__(self, *args, **kwargs): super(TunThread, self).__init__(*args, **kwargs) @@ -35,6 +49,9 @@ def __init__(self, *args, **kwargs): self.tun.mtu = 1500 self.tun.up() + def write(self, message): + self.tun.write(message) + def run(self): p = poll() p.register(self.tun, POLLIN) @@ -48,12 +65,10 @@ def run(self): if len(buf): mac = buf[0:6] if mac == BROADCAST or (ord(mac[0]) & 0x1) == 1: - #print 'sending broadcast frame:' - #print ':'.join('{0:02x}'.format(ord(c)) for c in buf) for socket in macmap.values(): def send_message(): try: - socket.write_message(str(buf),binary=True) + socket.rate_limited_downstream(str(buf)) except: pass @@ -62,15 +77,13 @@ def send_message(): elif macmap.get(mac, False): def send_message(): try: - macmap[mac].write_message(str(buf),binary=True) + macmap[mac].rate_limited_downstream(str(buf)) except: pass loop.add_callback(send_message) - else: - print("couldn't find recipient for mac %s from %s " % (':'.join('{0:02x}'.format(ord(c)) for c in mac), ':'.join('{0:02x}'.format(ord(c)) for c in buf[6:12]))) except: - print 'closing due to tun error' + logger.error('closing due to tun error') finally: self.tun.close() @@ -84,7 +97,24 @@ def __init__(self, *args, **kwargs): self.mac = '' self.allowance = RATE #unit: messages self.last_check = time.time() #floating-point, e.g. usec accuracy. Unit: seconds + self.upstream = RateLimitingState(RATE, name='upstream', clientip=self.remote_ip) + self.downstream = RateLimitingState(RATE, name='downstream', clientip=self.remote_ip) + ping_future = delay_future(time.time()+PING_INTERVAL, self.do_ping) + loop.add_future(ping_future, lambda: None) + + def do_ping(self, timestamp): + self.ping(str(timestamp)) + + ping_future = delay_future(time.time()+PING_INTERVAL, self.do_ping) + loop.add_future(ping_future, lambda: None) + + def on_pong(self, data): + pass + + def rate_limited_downstream(self, message): + if self.downstream.do_throttle(message): + self.write_message(message, binary=True) def open(self): self.set_nodelay(True) @@ -105,36 +135,28 @@ def on_message(self, message): dest = message[0:6] try: - #rate limiting algorithm from http://stackoverflow.com/a/668327/203705 - #stolen with love ;-) - current = time.time() - time_passed = current - self.last_check - self.last_check = current - self.allowance += time_passed * (RATE / PER) - if self.allowance > RATE: - self.allowance = RATE #throttle - if self.allowance < 1.0: - return - else: - if dest == BROADCAST or (ord(dest[0]) & 0x1) == 1: + if dest == BROADCAST or (ord(dest[0]) & 0x1) == 1: + if self.upstream.do_throttle(message): for socket in macmap.values(): try: - socket.write_message(str(message),binary=True) + socket.write_message(str(message),binary=True) except: pass - tunthread.tun.write(message) - elif macmap.get(dest, False): + tunthread.write(message) + elif macmap.get(dest, False): + if self.upstream.do_throttle(message): try: macmap[dest].write_message(str(message),binary=True) except: - print('macmap %s not found' % dest) pass - else: - tunthread.tun.write(message) + else: + if self.upstream.do_throttle(message): + tunthread.write(message) - self.allowance -= len(message) except: + tb = traceback.format_exc() + logger.error('%s: error on receive. Closing\n%s' % (self.remote_ip, tb)) try: self.close() except: @@ -159,7 +181,7 @@ def on_close(self): tunthread.start() args = sys.argv - args.append('--log_file_prefix=/var/log/relay/relay-server2.log') + args.append('--log_file_prefix=/var/log/relay/relay-server.log') tornado.options.parse_command_line(args) application.listen(9999) loop = tornado.ioloop.IOLoop.instance() diff --git a/tuntapapp.py b/tuntapapp.py deleted file mode 100644 index 53e189e..0000000 --- a/tuntapapp.py +++ /dev/null @@ -1,61 +0,0 @@ -import threading -from select import select - -import tornado.ioloop -import tornado.web - -from tornado import websocket -from pytun import TunTapDevice, IFF_TAP, IFF_NO_PI - -class TunThread(threading.Thread): - def __init__(self, tun, socket, *args, **kwargs): - super(TunThread, self).__init__(*args, **kwargs) - self.tun = tun - self.socket = socket - self.running = True; - - def run(self): - while(self.running): - readable, writable, excepted = select([self.tun], [], [self.tun], 0.01) - for tun in readable: - buf = tun.read(tun.mtu) - print('read %s byte message from %s' % (len(buf), tun.name)) - print(':'.join('{0:x}'.format(ord(c)) for c in str(buf))) - self.socket.write_message(str(buf), binary=True) - for tun in excepted: - self.running = False - - self.tun.close() - -class MainHandler(websocket.WebSocketHandler): - def __init__(self, *args, **kwargs): - super(MainHandler, self).__init__(*args,**kwargs) - self.thread = None - - def open(self): - self.set_nodelay(True) - self.tun = TunTapDevice(flags= (IFF_TAP | IFF_NO_PI)) - self.tun.addr = '192.168.1.1' - self.tun.dstaddr = '192.168.1.2' - self.tun.netmask = '255.255.255.0' - self.tun.mtu = 1500 - self.tun.up() - self.thread = TunThread(tun=self.tun, socket = self) - self.thread.start() - - def on_message(self, message): - print('wrote %s byte message to %s' % (len(message), self.tun.name)) - print(':'.join('{0:x}'.format(ord(c)) for c in message)) - self.tun.write(message) - - def on_close(self): - print('Closing %s' % self.tun.name) - self.tun.close() - if self.thread is not None: - self.thread.running = False - -application = tornado.web.Application([(r'/', MainHandler)]) - -if __name__ == '__main__': - application.listen(9999) - tornado.ioloop.IOLoop.instance().start()