Skip to content

Commit

Permalink
Reuse a single ThreadPoolExecutor for all ThreadedResolvers.
Browse files Browse the repository at this point in the history
The primary motivation is that shutting down a ThreadPoolExecutor takes
100ms in the 2.x backported version of concurrent.futures.  It's also
generally unnecessary to create lots of DNS resolver threads just
because multiple resolver objects are used.

Document ExecutorResolver for public use.
  • Loading branch information
bdarnell committed Jun 1, 2013
1 parent c938106 commit be52944
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 6 deletions.
45 changes: 40 additions & 5 deletions tornado/netutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,27 @@ def close(self):


class ExecutorResolver(Resolver):
def initialize(self, io_loop=None, executor=None):
"""Resolver implementation using a `concurrent.futures.Executor`.
Use this instead of `ThreadedResolver` when you require additional
control over the executor being used.
The executor will be shut down when the resolver is closed unless
``close_resolver=False``; use this if you want to reuse the same
executor elsewhere.
"""
def initialize(self, io_loop=None, executor=None, close_executor=True):
self.io_loop = io_loop or IOLoop.current()
self.executor = executor or dummy_executor
if executor is not None:
self.executor = executor
self.close_executor = close_executor
else:
self.executor = dummy_executor
self.close_executor = False

def close(self):
self.executor.shutdown()
if self.close_executor:
self.executor.shutdown()
self.executor = None

@run_on_executor
Expand Down Expand Up @@ -260,11 +275,31 @@ class ThreadedResolver(ExecutorResolver):
Resolver.configure('tornado.netutil.ThreadedResolver',
num_threads=10)
.. versionchanged:: 3.1
All ``ThreadedResolvers`` share a single thread pool, whose
size is set by the first one to be created.
"""
_threadpool = None
_threadpool_pid = None

def initialize(self, io_loop=None, num_threads=10):
from concurrent.futures import ThreadPoolExecutor
threadpool = ThreadedResolver._create_threadpool(num_threads)
super(ThreadedResolver, self).initialize(
io_loop=io_loop, executor=ThreadPoolExecutor(num_threads))
io_loop=io_loop, executor=threadpool, close_executor=False)

@classmethod
def _create_threadpool(cls, num_threads):
pid = os.getpid()
if cls._threadpool_pid != pid:
# Threads cannot survive after a fork, so if our pid isn't what it
# was when we created the pool then delete it.
cls._threadpool = None
if cls._threadpool is None:
from concurrent.futures import ThreadPoolExecutor
cls._threadpool = ThreadPoolExecutor(num_threads)
cls._threadpool_pid = pid
return cls._threadpool


class OverrideResolver(Resolver):
Expand Down
2 changes: 1 addition & 1 deletion tornado/test/netutil_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def setUp(self):
self.resolver = ThreadedResolver(io_loop=self.io_loop)

def tearDown(self):
self.resolver.executor.shutdown()
self.resolver.close()
super(ThreadedResolverTest, self).tearDown()


Expand Down

0 comments on commit be52944

Please sign in to comment.