From be529443b047752b3c75282a8760d4697963777c Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Sat, 1 Jun 2013 17:30:33 -0400 Subject: [PATCH] Reuse a single ThreadPoolExecutor for all ThreadedResolvers. The primary motivation is that shutting down a ThreadPoolExecutor takes 100ms in the 2.x backported version of concurrent.futures. It's also generally unnecessary to create lots of DNS resolver threads just because multiple resolver objects are used. Document ExecutorResolver for public use. --- tornado/netutil.py | 45 ++++++++++++++++++++++++++++++++---- tornado/test/netutil_test.py | 2 +- 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/tornado/netutil.py b/tornado/netutil.py index a7ba082f1c..bbd11485a6 100644 --- a/tornado/netutil.py +++ b/tornado/netutil.py @@ -217,12 +217,27 @@ def close(self): class ExecutorResolver(Resolver): - def initialize(self, io_loop=None, executor=None): + """Resolver implementation using a `concurrent.futures.Executor`. + + Use this instead of `ThreadedResolver` when you require additional + control over the executor being used. + + The executor will be shut down when the resolver is closed unless + ``close_resolver=False``; use this if you want to reuse the same + executor elsewhere. + """ + def initialize(self, io_loop=None, executor=None, close_executor=True): self.io_loop = io_loop or IOLoop.current() - self.executor = executor or dummy_executor + if executor is not None: + self.executor = executor + self.close_executor = close_executor + else: + self.executor = dummy_executor + self.close_executor = False def close(self): - self.executor.shutdown() + if self.close_executor: + self.executor.shutdown() self.executor = None @run_on_executor @@ -260,11 +275,31 @@ class ThreadedResolver(ExecutorResolver): Resolver.configure('tornado.netutil.ThreadedResolver', num_threads=10) + + .. versionchanged:: 3.1 + All ``ThreadedResolvers`` share a single thread pool, whose + size is set by the first one to be created. """ + _threadpool = None + _threadpool_pid = None + def initialize(self, io_loop=None, num_threads=10): - from concurrent.futures import ThreadPoolExecutor + threadpool = ThreadedResolver._create_threadpool(num_threads) super(ThreadedResolver, self).initialize( - io_loop=io_loop, executor=ThreadPoolExecutor(num_threads)) + io_loop=io_loop, executor=threadpool, close_executor=False) + + @classmethod + def _create_threadpool(cls, num_threads): + pid = os.getpid() + if cls._threadpool_pid != pid: + # Threads cannot survive after a fork, so if our pid isn't what it + # was when we created the pool then delete it. + cls._threadpool = None + if cls._threadpool is None: + from concurrent.futures import ThreadPoolExecutor + cls._threadpool = ThreadPoolExecutor(num_threads) + cls._threadpool_pid = pid + return cls._threadpool class OverrideResolver(Resolver): diff --git a/tornado/test/netutil_test.py b/tornado/test/netutil_test.py index 245ef096ff..cf587bcbd8 100644 --- a/tornado/test/netutil_test.py +++ b/tornado/test/netutil_test.py @@ -53,7 +53,7 @@ def setUp(self): self.resolver = ThreadedResolver(io_loop=self.io_loop) def tearDown(self): - self.resolver.executor.shutdown() + self.resolver.close() super(ThreadedResolverTest, self).tearDown()