Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop the hanging threads in runner client forcefully after 30 seconds #416

Merged
merged 4 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ducktape/tests/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def to_json(self):
class TestResults(object):
"""Class used to aggregate individual TestResult objects from many tests."""

def __init__(self, session_context, cluster):
def __init__(self, session_context, cluster, client_status):
"""
:type session_context: ducktape.tests.session.SessionContext
"""
Expand All @@ -139,6 +139,7 @@ def __init__(self, session_context, cluster):
# For tracking total run time
self.start_time = -1
self.stop_time = -1
self.client_status = client_status

def append(self, obj):
return self._results.append(obj)
Expand Down Expand Up @@ -222,6 +223,7 @@ def to_json(self):
"num_failed": self.num_failed,
"num_ignored": self.num_ignored,
"parallelism": parallelism,
"client_status": {str(key): value for key, value in self.client_status.items()},
"results": [r for r in self._results]
}
if self.num_flaky:
Expand Down
63 changes: 50 additions & 13 deletions ducktape/tests/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import namedtuple
from collections import namedtuple, defaultdict
import copy
import logging
import multiprocessing
Expand All @@ -37,6 +37,8 @@
from ducktape.utils import persistence
from ducktape.errors import TimeoutError

DEFAULT_MP_JOIN_TIMEOUT = 30


class Receiver(object):
def __init__(self, min_port, max_port):
Expand Down Expand Up @@ -87,7 +89,8 @@ class TestRunner(object):

def __init__(self, cluster, session_context, session_logger, tests, deflake_num,
min_port=ConsoleDefaults.TEST_DRIVER_MIN_PORT,
max_port=ConsoleDefaults.TEST_DRIVER_MAX_PORT):
max_port=ConsoleDefaults.TEST_DRIVER_MAX_PORT,
finish_join_timeout=DEFAULT_MP_JOIN_TIMEOUT):

# Set handler for SIGTERM (aka kill -15)
# Note: it doesn't work to set a handler for SIGINT (Ctrl-C) in this parent process because the
Expand All @@ -106,7 +109,8 @@ def __init__(self, cluster, session_context, session_logger, tests, deflake_num,

self.session_context = session_context
self.max_parallel = session_context.max_parallel
self.results = TestResults(self.session_context, self.cluster)
self.client_report = defaultdict(dict)
self.results = TestResults(self.session_context, self.cluster, client_status=self.client_report)

self.exit_first = self.session_context.exit_first

Expand All @@ -122,6 +126,35 @@ def __init__(self, cluster, session_context, session_logger, tests, deflake_num,
self.active_tests = {}
self.finished_tests = {}
self.test_schedule_log = []
self.finish_join_timeout = finish_join_timeout

def _terminate_process(self, process: multiprocessing.Process):
# use os.kill rather than multiprocessing.terminate for more control
assert process.pid != os.getpid(), "Signal handler should not reach this point in a client subprocess."
if process.is_alive():
os.kill(process.pid, signal.SIGKILL)

def _join_test_process(self, process_key, timeout: int = DEFAULT_MP_JOIN_TIMEOUT):
# waits for process to complete, if it doesn't terminate it
process: multiprocessing.Process = self._client_procs[process_key]
start = time.time()
while time.time() - start <= timeout:
if not process.is_alive():
self.client_report[process_key]["status"] = "FINISHED"
break
time.sleep(.1)
else:
# Note: This can lead to some tmp files being uncleaned, otherwise nothing else should be executed by the
# client after this point.
self._log(logging.ERROR,
f"after waiting {timeout}s, process {process.name} failed to complete. Terminating...")
self._terminate_process(process)
self.client_report[process_key]["status"] = "TERMINATED"
process.join()
self.client_report[process_key]["exitcode"] = process.exitcode
self.client_report[process_key]["runner_end_time"] = time.time()
assert not process.is_alive()
del self._client_procs[process_key]

def _propagate_sigterm(self, signum, frame):
"""Handler SIGTERM and SIGINT by propagating SIGTERM to all client processes.
Expand All @@ -141,12 +174,7 @@ def _propagate_sigterm(self, signum, frame):

self.stop_testing = True
for p in self._client_procs.values():

# this handler should be a noop if we're in a client process, so it's an error if the current pid
# is in self._client_procs
assert p.pid != os.getpid(), "Signal handler should not reach this point in a client subprocess."
if p.is_alive():
os.kill(p.pid, signal.SIGTERM)
self._terminate_process(p)

def who_am_i(self):
"""Human-readable name helpful for logging."""
Expand Down Expand Up @@ -239,7 +267,7 @@ def run_all_tests(self):

# All processes are on the same machine, so treat communication failure as a fatal error
for proc in self._client_procs.values():
proc.terminate()
self._terminate_process(proc)
self._client_procs = {}
raise
except KeyboardInterrupt:
Expand All @@ -248,8 +276,12 @@ def run_all_tests(self):
"Received KeyboardInterrupt. Now waiting for currently running tests to finish...")
self.stop_testing = True

for proc in self._client_procs.values():
proc.join()
# All clients should be cleaned up in their finish block
if self._client_procs:
self._log(logging.WARNING, f"Some clients failed to clean up, waiting 10min to join: {self._client_procs}")
for proc in self._client_procs:
self._join_test_process(proc, self.finish_join_timeout)

self.receiver.close()

return self.results
Expand Down Expand Up @@ -281,6 +313,11 @@ def _run_single_test(self, test_context):

self._client_procs[test_key] = proc
proc.start()
self.client_report[test_key]["status"] = "RUNNING"
self.client_report[test_key]["pid"] = proc.pid
self.client_report[test_key]["name"] = proc.name
self.client_report[test_key]["runner_start_time"] = time.time()


def _preallocate_subcluster(self, test_context):
"""Preallocate the subcluster which will be used to run the test.
Expand Down Expand Up @@ -344,7 +381,7 @@ def _handle_finished(self, event):
del self._test_cluster[test_key]

# Join on the finished test process
self._client_procs[test_key].join()
self._join_test_process(test_key, timeout=self.finish_join_timeout)

# Report partial result summaries - it is helpful to have partial test reports available if the
# ducktape process is killed with a SIGKILL partway through
Expand Down
13 changes: 11 additions & 2 deletions ducktape/tests/runner_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import traceback
from typing import List, Mapping
import zmq
import psutil

from ducktape.services.service import MultiRunServiceIdFactory, service_id_factory
from ducktape.services.service_registry import ServiceRegistry
Expand Down Expand Up @@ -178,12 +179,20 @@ def ready(self):
def send(self, event):
return self.sender.send(event)

def _kill_all_child_processes(self, send_signal=signal.SIGTERM):
current_process = psutil.Process()
# if this client has any children - kill them (for instances background service)
children = current_process.children(recursive=True)
for child in children:
self.logger.warning(f"process {repr(child)} did not terminate on its own, killing with {send_signal}")
child.send_signal(send_signal)

def _sigterm_handler(self, signum, frame):
"""Translate SIGTERM to SIGINT on this process
python will treat SIGINT as a Keyboard exception. Exception handling does the rest.
"""
os.kill(os.getpid(), signal.SIGINT)
self._kill_all_child_processes(signal.SIGINT)

def _collect_test_context(self, directory, file_name, cls_name, method_name, injected_args):
loader = TestLoader(self.session_context, self.logger, injected_args=injected_args, cluster=self.cluster)
Expand Down Expand Up @@ -249,7 +258,6 @@ def run(self):

finally:
stop_time = time.time()

summary = self.process_run_summaries(summaries, test_status)
test_status, summary = self._check_cluster_utilization(test_status, summary)
# convert summary from list to string
Expand All @@ -274,6 +282,7 @@ def run(self):
# Tell the server we are finished
self._do_safely(lambda: self.send(self.message.finished(result=result)),
"Problem sending FINISHED message for " + str(self.test_metadata) + ":\n")
self._kill_all_child_processes()
# Release test_context resources only after creating the result and finishing logging activity
# The Sender object uses the same logger, so we postpone closing until after the finished message is sent
self.test_context.close()
Expand Down