Skip to content

Commit

Permalink
bpo-44336: Prevent tests hanging on child process handles on Windows (G…
Browse files Browse the repository at this point in the history
…H-26578)

Replace the child process `typeperf.exe` with a daemon thread that reads the performance counters directly.  This prevents the issues that arise from inherited handles in grandchild processes (see issue37531 for discussion).

We only use the load tracker when running tests in multiprocess mode. This prevents inadvertent interactions with tests expecting a single threaded environment.  Displaying load is really only helpful for buildbots running in multiprocess mode anyway.
  • Loading branch information
jkloth authored Mar 22, 2022
1 parent 9ac2de9 commit 19058b9
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 185 deletions.
44 changes: 22 additions & 22 deletions Lib/test/libregrtest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,24 @@ def run_tests(self):

if self.ns.use_mp:
from test.libregrtest.runtest_mp import run_tests_multiprocess
run_tests_multiprocess(self)
# If we're on windows and this is the parent runner (not a worker),
# track the load average.
if sys.platform == 'win32' and self.worker_test_name is None:
from test.libregrtest.win_utils import WindowsLoadTracker

try:
self.win_load_tracker = WindowsLoadTracker()
except PermissionError as error:
# Standard accounts may not have access to the performance
# counters.
print(f'Failed to create WindowsLoadTracker: {error}')

try:
run_tests_multiprocess(self)
finally:
if self.win_load_tracker is not None:
self.win_load_tracker.close()
self.win_load_tracker = None
else:
self.run_tests_sequential()

Expand Down Expand Up @@ -695,28 +712,11 @@ def _main(self, tests, kwargs):
self.list_cases()
sys.exit(0)

# If we're on windows and this is the parent runner (not a worker),
# track the load average.
if sys.platform == 'win32' and self.worker_test_name is None:
from test.libregrtest.win_utils import WindowsLoadTracker

try:
self.win_load_tracker = WindowsLoadTracker()
except FileNotFoundError as error:
# Windows IoT Core and Windows Nano Server do not provide
# typeperf.exe for x64, x86 or ARM
print(f'Failed to create WindowsLoadTracker: {error}')
self.run_tests()
self.display_result()

try:
self.run_tests()
self.display_result()

if self.ns.verbose2 and self.bad:
self.rerun_failed_tests()
finally:
if self.win_load_tracker is not None:
self.win_load_tracker.close()
self.win_load_tracker = None
if self.ns.verbose2 and self.bad:
self.rerun_failed_tests()

self.finalize()

Expand Down
258 changes: 95 additions & 163 deletions Lib/test/libregrtest/win_utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
import _overlapped
import _thread
import _winapi
import math
import msvcrt
import os
import subprocess
import uuid
import struct
import winreg
from test.support import os_helper
from test.libregrtest.utils import print_warning


# Max size of asynchronous reads
BUFSIZE = 8192
# Seconds per measurement
SAMPLING_INTERVAL = 1
# Exponential damping factor to compute exponentially weighted moving average
Expand All @@ -19,174 +14,111 @@
# Initialize the load using the arithmetic mean of the first NVALUE values
# of the Processor Queue Length
NVALUE = 5
# Windows registry subkey of HKEY_LOCAL_MACHINE where the counter names
# of typeperf are registered
COUNTER_REGISTRY_KEY = (r"SOFTWARE\Microsoft\Windows NT\CurrentVersion"
r"\Perflib\CurrentLanguage")


class WindowsLoadTracker():
"""
This class asynchronously interacts with the `typeperf` command to read
the system load on Windows. Multiprocessing and threads can't be used
here because they interfere with the test suite's cases for those
modules.
This class asynchronously reads the performance counters to calculate
the system load on Windows. A "raw" thread is used here to prevent
interference with the test suite's cases for the threading module.
"""

def __init__(self):
# Pre-flight test for access to the performance data;
# `PermissionError` will be raised if not allowed
winreg.QueryInfoKey(winreg.HKEY_PERFORMANCE_DATA)

self._values = []
self._load = None
self._buffer = ''
self._popen = None
self.start()

def start(self):
# Create a named pipe which allows for asynchronous IO in Windows
pipe_name = r'\\.\pipe\typeperf_output_' + str(uuid.uuid4())

open_mode = _winapi.PIPE_ACCESS_INBOUND
open_mode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
open_mode |= _winapi.FILE_FLAG_OVERLAPPED

# This is the read end of the pipe, where we will be grabbing output
self.pipe = _winapi.CreateNamedPipe(
pipe_name, open_mode, _winapi.PIPE_WAIT,
1, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
)
# The write end of the pipe which is passed to the created process
pipe_write_end = _winapi.CreateFile(
pipe_name, _winapi.GENERIC_WRITE, 0, _winapi.NULL,
_winapi.OPEN_EXISTING, 0, _winapi.NULL
)
# Open up the handle as a python file object so we can pass it to
# subprocess
command_stdout = msvcrt.open_osfhandle(pipe_write_end, 0)

# Connect to the read end of the pipe in overlap/async mode
overlap = _winapi.ConnectNamedPipe(self.pipe, overlapped=True)
overlap.GetOverlappedResult(True)

# Spawn off the load monitor
counter_name = self._get_counter_name()
command = ['typeperf', counter_name, '-si', str(SAMPLING_INTERVAL)]
self._popen = subprocess.Popen(' '.join(command),
stdout=command_stdout,
cwd=os_helper.SAVEDCWD)

# Close our copy of the write end of the pipe
os.close(command_stdout)

def _get_counter_name(self):
# accessing the registry to get the counter localization name
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, COUNTER_REGISTRY_KEY) as perfkey:
counters = winreg.QueryValueEx(perfkey, 'Counter')[0]

# Convert [key1, value1, key2, value2, ...] list
# to {key1: value1, key2: value2, ...} dict
counters = iter(counters)
counters_dict = dict(zip(counters, counters))

# System counter has key '2' and Processor Queue Length has key '44'
system = counters_dict['2']
process_queue_length = counters_dict['44']
return f'"\\{system}\\{process_queue_length}"'

def close(self, kill=True):
if self._popen is None:
self._running = _overlapped.CreateEvent(None, True, False, None)
self._stopped = _overlapped.CreateEvent(None, True, False, None)

_thread.start_new_thread(self._update_load, (), {})

def _update_load(self,
# localize module access to prevent shutdown errors
_wait=_winapi.WaitForSingleObject,
_signal=_overlapped.SetEvent):
# run until signaled to stop
while _wait(self._running, 1000):
self._calculate_load()
# notify stopped
_signal(self._stopped)

def _calculate_load(self,
# localize module access to prevent shutdown errors
_query=winreg.QueryValueEx,
_hkey=winreg.HKEY_PERFORMANCE_DATA,
_unpack=struct.unpack_from):
# get the 'System' object
data, _ = _query(_hkey, '2')
# PERF_DATA_BLOCK {
# WCHAR Signature[4] 8 +
# DWOWD LittleEndian 4 +
# DWORD Version 4 +
# DWORD Revision 4 +
# DWORD TotalByteLength 4 +
# DWORD HeaderLength = 24 byte offset
# ...
# }
obj_start, = _unpack('L', data, 24)
# PERF_OBJECT_TYPE {
# DWORD TotalByteLength
# DWORD DefinitionLength
# DWORD HeaderLength
# ...
# }
data_start, defn_start = _unpack('4xLL', data, obj_start)
data_base = obj_start + data_start
defn_base = obj_start + defn_start
# find the 'Processor Queue Length' counter (index=44)
while defn_base < data_base:
# PERF_COUNTER_DEFINITION {
# DWORD ByteLength
# DWORD CounterNameTitleIndex
# ... [7 DWORDs/28 bytes]
# DWORD CounterOffset
# }
size, idx, offset = _unpack('LL28xL', data, defn_base)
defn_base += size
if idx == 44:
counter_offset = data_base + offset
# the counter is known to be PERF_COUNTER_RAWCOUNT (DWORD)
processor_queue_length, = _unpack('L', data, counter_offset)
break
else:
return

self._load = None

if kill:
self._popen.kill()
self._popen.wait()
self._popen = None

def __del__(self):
self.close()

def _parse_line(self, line):
# typeperf outputs in a CSV format like this:
# "07/19/2018 01:32:26.605","3.000000"
# (date, process queue length)
tokens = line.split(',')
if len(tokens) != 2:
raise ValueError

value = tokens[1]
if not value.startswith('"') or not value.endswith('"'):
raise ValueError
value = value[1:-1]
return float(value)

def _read_lines(self):
overlapped, _ = _winapi.ReadFile(self.pipe, BUFSIZE, True)
bytes_read, res = overlapped.GetOverlappedResult(False)
if res != 0:
return ()

output = overlapped.getbuffer()
output = output.decode('oem', 'replace')
output = self._buffer + output
lines = output.splitlines(True)

# bpo-36670: typeperf only writes a newline *before* writing a value,
# not after. Sometimes, the written line in incomplete (ex: only
# timestamp, without the process queue length). Only pass the last line
# to the parser if it's a valid value, otherwise store it in
# self._buffer.
try:
self._parse_line(lines[-1])
except ValueError:
self._buffer = lines.pop(-1)
# We use an exponentially weighted moving average, imitating the
# load calculation on Unix systems.
# https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
# https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
if self._load is not None:
self._load = (self._load * LOAD_FACTOR_1
+ processor_queue_length * (1.0 - LOAD_FACTOR_1))
elif len(self._values) < NVALUE:
self._values.append(processor_queue_length)
else:
self._buffer = ''
self._load = sum(self._values) / len(self._values)

return lines
def close(self, kill=True):
self.__del__()
return

def __del__(self,
# localize module access to prevent shutdown errors
_wait=_winapi.WaitForSingleObject,
_close=_winapi.CloseHandle,
_signal=_overlapped.SetEvent):
if self._running is not None:
# tell the update thread to quit
_signal(self._running)
# wait for the update thread to signal done
_wait(self._stopped, -1)
# cleanup events
_close(self._running)
_close(self._stopped)
self._running = self._stopped = None

def getloadavg(self):
if self._popen is None:
return None

returncode = self._popen.poll()
if returncode is not None:
self.close(kill=False)
return None

try:
lines = self._read_lines()
except BrokenPipeError:
self.close()
return None

for line in lines:
line = line.rstrip()

# Ignore the initial header:
# "(PDH-CSV 4.0)","\\\\WIN\\System\\Processor Queue Length"
if 'PDH-CSV' in line:
continue

# Ignore blank lines
if not line:
continue

try:
processor_queue_length = self._parse_line(line)
except ValueError:
print_warning("Failed to parse typeperf output: %a" % line)
continue

# We use an exponentially weighted moving average, imitating the
# load calculation on Unix systems.
# https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
# https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
if self._load is not None:
self._load = (self._load * LOAD_FACTOR_1
+ processor_queue_length * (1.0 - LOAD_FACTOR_1))
elif len(self._values) < NVALUE:
self._values.append(processor_queue_length)
else:
self._load = sum(self._values) / len(self._values)

return self._load

0 comments on commit 19058b9

Please sign in to comment.