Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-37193: remove thread objects which finished process its request #23127

Merged
merged 6 commits into from
Dec 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 39 additions & 12 deletions Lib/socketserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,39 @@ def server_close(self):
self.collect_children(blocking=self.block_on_close)


class _Threads(list):
"""
Joinable list of all non-daemon threads.
"""
def append(self, thread):
self.reap()
if thread.daemon:
return
super().append(thread)

def pop_all(self):
self[:], result = [], self[:]
return result

def join(self):
for thread in self.pop_all():
thread.join()

def reap(self):
self[:] = (thread for thread in self if thread.is_alive())


class _NoThreads:
"""
Degenerate version of _Threads.
"""
def append(self, thread):
pass

def join(self):
pass


class ThreadingMixIn:
"""Mix-in class to handle each request in a new thread."""

Expand All @@ -636,9 +669,9 @@ class ThreadingMixIn:
daemon_threads = False
# If true, server_close() waits until all non-daemonic threads terminate.
block_on_close = True
# For non-daemonic threads, list of threading.Threading objects
# Threads object
# used by server_close() to wait for all threads completion.
_threads = None
_threads = _NoThreads()

def process_request_thread(self, request, client_address):
"""Same as in BaseServer but as a thread.
Expand All @@ -655,23 +688,17 @@ def process_request_thread(self, request, client_address):

def process_request(self, request, client_address):
"""Start a new thread to process the request."""
if self.block_on_close:
vars(self).setdefault('_threads', _Threads())
t = threading.Thread(target = self.process_request_thread,
args = (request, client_address))
t.daemon = self.daemon_threads
if not t.daemon and self.block_on_close:
if self._threads is None:
self._threads = []
self._threads.append(t)
self._threads.append(t)
t.start()

def server_close(self):
super().server_close()
if self.block_on_close:
threads = self._threads
self._threads = None
if threads:
for thread in threads:
thread.join()
self._threads.join()


if hasattr(os, "fork"):
Expand Down
23 changes: 23 additions & 0 deletions Lib/test/test_socketserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,13 @@ class MyHandler(socketserver.StreamRequestHandler):
t.join()
s.server_close()

def test_close_immediately(self):
class MyServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass

server = MyServer((HOST, 0), lambda: None)
server.server_close()

def test_tcpserver_bind_leak(self):
# Issue #22435: the server socket wouldn't be closed if bind()/listen()
# failed.
Expand Down Expand Up @@ -491,6 +498,22 @@ def shutdown_request(self, request):
self.assertEqual(server.shutdown_called, 1)
server.server_close()

def test_threads_reaped(self):
"""
In #37193, users reported a memory leak
due to the saving of every request thread. Ensure that
not all threads are kept forever.
"""
class MyServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass

server = MyServer((HOST, 0), socketserver.StreamRequestHandler)
for n in range(10):
with socket.create_connection(server.server_address):
server.handle_request()
self.assertLess(len(server._threads), 10)
server.server_close()


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fixed memory leak in ``socketserver.ThreadingMixIn`` introduced in Python
3.7.