From 7ce2dc3df200b0a0439450aea2d59440df50e62a Mon Sep 17 00:00:00 2001 From: esdeathlove Date: Mon, 13 Feb 2017 18:25:27 +0800 Subject: [PATCH] remove multi thread --- apiconfig.py | 1 - db_transfer.py | 3 - server_pool.py | 165 +++++++++++++--------------------------- shadowsocks/tcprelay.py | 2 +- shadowsocks/udprelay.py | 19 ----- web_transfer.py | 3 - 6 files changed, 53 insertions(+), 140 deletions(-) diff --git a/apiconfig.py b/apiconfig.py index 9b4453bf..58e66cbe 100644 --- a/apiconfig.py +++ b/apiconfig.py @@ -7,7 +7,6 @@ CLOUDSAFE = 1 ANTISSATTACK = 0 AUTOEXEC = 0 -MULTI_THREAD = 0 MU_SUFFIX = 'zhaoj.in' MU_REGEX = '%5m%id.%suffix' diff --git a/db_transfer.py b/db_transfer.py index e2b49757..59b6e732 100644 --- a/db_transfer.py +++ b/db_transfer.py @@ -493,9 +493,6 @@ def del_server_out_of_bound_safe(self, last_rows, rows): logging.error('more than one user use the same port [%s]' % (port,)) continue - if get_config().MULTI_THREAD == 0: - cfg['node_speedlimit'] = 0.00 - if cfg['is_multi_user'] != 0: cfg['users_table'] = md5_users.copy() self.mu_port_list.append(port) diff --git a/server_pool.py b/server_pool.py index 7c44efab..cf440c02 100644 --- a/server_pool.py +++ b/server_pool.py @@ -121,121 +121,60 @@ def new_server(self, port, user_config): port = int(port) ipv6_ok = False - if user_config['node_speedlimit'] == 0.00: - if 'server_ipv6' in self.config: - if port in self.tcp_ipv6_servers_pool: - logging.info("server already at %s:%d" % (self.config['server_ipv6'], port)) - return 'this port server is already running' - else: - a_config = self.config.copy() - a_config.update(user_config) - if len(a_config['server_ipv6']) > 2 and a_config['server_ipv6'][0] == "[" and a_config['server_ipv6'][-1] == "]": - a_config['server_ipv6'] = a_config['server_ipv6'][1:-1] - a_config['server'] = a_config['server_ipv6'] - a_config['server_port'] = port - a_config['max_connect'] = 128 - a_config['method'] = common.to_str(a_config['method']) - try: - logging.info("starting server at [%s]:%d" % (common.to_str(a_config['server']), port)) - - tcp_server = tcprelay.TCPRelay(a_config, self.dns_resolver, False, stat_counter=self.stat_counter) - tcp_server.add_to_loop(self.loop) - self.tcp_ipv6_servers_pool.update({port: tcp_server}) - - - udp_server = udprelay.UDPRelay(a_config, self.dns_resolver, False, stat_counter=self.stat_counter) - udp_server.add_to_loop(self.loop) - self.udp_ipv6_servers_pool.update({port: udp_server}) - - if common.to_str(a_config['server_ipv6']) == "::": - ipv6_ok = True - except Exception as e: - logging.warn("IPV6 %s " % (e,)) - - if 'server' in self.config: - if port in self.tcp_servers_pool: - logging.info("server already at %s:%d" % (common.to_str(self.config['server']), port)) - return 'this port server is already running' - else: - a_config = self.config.copy() - a_config.update(user_config) - a_config['server_port'] = port - a_config['max_connect'] = 128 - a_config['method'] = common.to_str(a_config['method']) - try: - logging.info("starting server at %s:%d" % (common.to_str(a_config['server']), port)) - - tcp_server = tcprelay.TCPRelay(a_config, self.dns_resolver, False) - tcp_server.add_to_loop(self.loop) - self.tcp_servers_pool.update({port: tcp_server}) - - udp_server = udprelay.UDPRelay(a_config, self.dns_resolver, False) - udp_server.add_to_loop(self.loop) - self.udp_servers_pool.update({port: udp_server}) - - except Exception as e: - if not ipv6_ok: - logging.warn("IPV4 %s " % (e,)) - else: - self.dns_resolver_pool[port] = self.dns_resolver = asyncdns.DNSResolver() - self.eventloop_pool[port] = eventloop.EventLoop() - self.thread_pool[port] = MainThread( (self.eventloop_pool[port], self.dns_resolver_pool[port], self.mgr) ) - self.thread_pool[port].start() + if 'server_ipv6' in self.config: + if port in self.tcp_ipv6_servers_pool: + logging.info("server already at %s:%d" % (self.config['server_ipv6'], port)) + return 'this port server is already running' + else: + a_config = self.config.copy() + a_config.update(user_config) + if len(a_config['server_ipv6']) > 2 and a_config['server_ipv6'][0] == "[" and a_config['server_ipv6'][-1] == "]": + a_config['server_ipv6'] = a_config['server_ipv6'][1:-1] + a_config['server'] = a_config['server_ipv6'] + a_config['server_port'] = port + a_config['max_connect'] = 128 + a_config['method'] = common.to_str(a_config['method']) + try: + logging.info("starting server at [%s]:%d" % (common.to_str(a_config['server']), port)) + tcp_server = tcprelay.TCPRelay(a_config, self.dns_resolver, False, stat_counter=self.stat_counter) + tcp_server.add_to_loop(self.loop) + self.tcp_ipv6_servers_pool.update({port: tcp_server}) - if 'server_ipv6' in self.config: - if port in self.tcp_ipv6_servers_pool: - logging.info("server already at %s:%d" % (self.config['server_ipv6'], port)) - return 'this port server is already running' - else: - a_config = self.config.copy() - a_config.update(user_config) - if len(a_config['server_ipv6']) > 2 and a_config['server_ipv6'][0] == "[" and a_config['server_ipv6'][-1] == "]": - a_config['server_ipv6'] = a_config['server_ipv6'][1:-1] - a_config['server'] = a_config['server_ipv6'] - a_config['server_port'] = port - a_config['max_connect'] = 128 - a_config['method'] = common.to_str(a_config['method']) - try: - logging.info("starting server at [%s]:%d" % (common.to_str(a_config['server']), port)) - - tcp_server = tcprelay.TCPRelay(a_config, self.dns_resolver_pool[port], False, stat_counter=self.stat_counter) - tcp_server.add_to_loop(self.eventloop_pool[port]) - self.tcp_ipv6_servers_pool.update({port: tcp_server}) - - udp_server = udprelay.UDPRelay(a_config, self.dns_resolver_pool[port], False, stat_counter=self.stat_counter) - udp_server.add_to_loop(self.eventloop_pool[port]) - self.udp_ipv6_servers_pool.update({port: udp_server}) - - if common.to_str(a_config['server_ipv6']) == "::": - ipv6_ok = True - except Exception as e: - logging.warn("IPV6 %s " % (e,)) - - if 'server' in self.config: - if port in self.tcp_servers_pool: - logging.info("server already at %s:%d" % (common.to_str(self.config['server']), port)) - return 'this port server is already running' - else: - a_config = self.config.copy() - a_config.update(user_config) - a_config['server_port'] = port - a_config['max_connect'] = 128 - a_config['method'] = common.to_str(a_config['method']) - try: - logging.info("starting server at %s:%d" % (common.to_str(a_config['server']), port)) - - tcp_server = tcprelay.TCPRelay(a_config, self.dns_resolver_pool[port], False) - tcp_server.add_to_loop(self.eventloop_pool[port]) - self.tcp_servers_pool.update({port: tcp_server}) - - udp_server = udprelay.UDPRelay(a_config, self.dns_resolver_pool[port], False) - udp_server.add_to_loop(self.eventloop_pool[port]) - self.udp_servers_pool.update({port: udp_server}) - - except Exception as e: - if not ipv6_ok: - logging.warn("IPV4 %s " % (e,)) + + udp_server = udprelay.UDPRelay(a_config, self.dns_resolver, False, stat_counter=self.stat_counter) + udp_server.add_to_loop(self.loop) + self.udp_ipv6_servers_pool.update({port: udp_server}) + + if common.to_str(a_config['server_ipv6']) == "::": + ipv6_ok = True + except Exception as e: + logging.warn("IPV6 %s " % (e,)) + + if 'server' in self.config: + if port in self.tcp_servers_pool: + logging.info("server already at %s:%d" % (common.to_str(self.config['server']), port)) + return 'this port server is already running' + else: + a_config = self.config.copy() + a_config.update(user_config) + a_config['server_port'] = port + a_config['max_connect'] = 128 + a_config['method'] = common.to_str(a_config['method']) + try: + logging.info("starting server at %s:%d" % (common.to_str(a_config['server']), port)) + + tcp_server = tcprelay.TCPRelay(a_config, self.dns_resolver, False) + tcp_server.add_to_loop(self.loop) + self.tcp_servers_pool.update({port: tcp_server}) + + udp_server = udprelay.UDPRelay(a_config, self.dns_resolver, False) + udp_server.add_to_loop(self.loop) + self.udp_servers_pool.update({port: udp_server}) + + except Exception as e: + if not ipv6_ok: + logging.warn("IPV4 %s " % (e,)) return True diff --git a/shadowsocks/tcprelay.py b/shadowsocks/tcprelay.py index 61c7074f..2f24cbc2 100644 --- a/shadowsocks/tcprelay.py +++ b/shadowsocks/tcprelay.py @@ -1371,7 +1371,7 @@ def __init__(self, config, dns_resolver, is_local, stat_callback=None, stat_coun if 'node_speedlimit' not in config: self.bandwidth = 0 else: - self.bandwidth = float(config['node_speedlimit']) * 1024 * 1024 / 8 + self.bandwidth = float(config['node_speedlimit']) * 128 self.protocol_data = obfs.obfs(config['protocol']).init_data() self.obfs_data = obfs.obfs(config['obfs']).init_data() diff --git a/shadowsocks/udprelay.py b/shadowsocks/udprelay.py index 5e9ce67c..ad643d9a 100644 --- a/shadowsocks/udprelay.py +++ b/shadowsocks/udprelay.py @@ -283,9 +283,6 @@ def __init__(self, server, reqid_to_handlers, fd_to_handlers, loop, self._dns_resolver = dns_resolver self._local_id = local_id - self._bytesSent = 0 - self._timeCreated = time.time() - self._is_local = is_local self._stage = STAGE_INIT self._password = config['password'] @@ -386,14 +383,6 @@ def _write_to_sock(self, data, sock, addr = None): if not data or not sock: return False - if float(self._server.bandwidth) > 0: - now = time.time() - connectionDuration = now - self._timeCreated - self._bytesSent += len(data) - requiredDuration = self._bytesSent / self._server.bandwidth - time.sleep(max(requiredDuration - connectionDuration, self._server.latency)) - - uncomplete = False retry = 0 if sock == self._local_sock: @@ -833,8 +822,6 @@ def handle_client(self, addr, cmd, request_id, data): self.handle_stream_sync_status(addr, cmd, request_id, pack_id, max_send_id, data) def handle_event(self, sock, event): - self._bytesSent = 0 - self._timeCreated = time.time() # handle all events in this handler and dispatch them to methods handle = False @@ -1019,12 +1006,6 @@ def __init__(self, config, dns_resolver, is_local, stat_callback=None, stat_coun self._reqid_to_hd = {} self._data_to_write_to_server_socket = [] - self.latency = 0 - if 'node_speedlimit' not in config: - self.bandwidth = 0 - else: - self.bandwidth = float(config['node_speedlimit']) * 1024 * 1024 / 8 - self._timeouts = [] # a list for all the handlers # we trim the timeouts once a while self._timeout_offset = 0 # last checked position for timeout diff --git a/web_transfer.py b/web_transfer.py index d4de3538..e649526c 100644 --- a/web_transfer.py +++ b/web_transfer.py @@ -331,9 +331,6 @@ def del_server_out_of_bound_safe(self, last_rows, rows): logging.error('more than one user use the same port [%s]' % (port,)) continue - if get_config().MULTI_THREAD == 0: - cfg['node_speedlimit'] = 0.00 - if cfg['is_multi_user'] != 0: cfg['users_table'] = md5_users.copy() self.mu_port_list.append(port)