Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 41 additions & 24 deletions bin/wfbench
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2021-2025 The WfCommons Team.
Expand All @@ -20,13 +20,17 @@ import re
import json
import logging
import pandas as pd
import psutil

from io import StringIO
from filelock import FileLock
from pathos.helpers import mp as multiprocessing
from typing import List, Optional


int32_max = 2147483647


# Configure logging
logging.basicConfig(
level=logging.INFO, # Change this to control the verbosity
Expand All @@ -39,6 +43,16 @@ logging.basicConfig(
this_dir = pathlib.Path(__file__).resolve().parent


def kill_process_and_children(proc):
try:
parent = psutil.Process(proc.pid)
children = parent.children(recursive=True)
for child in children:
child.kill()
parent.kill()
except psutil.NoSuchProcess:
pass # Process is already dead

def log_info(msg: str):
"""
Log an info message to stderr
Expand Down Expand Up @@ -165,34 +179,38 @@ def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue,
:rtype: List
"""
total_mem = f"{total_mem}B" if total_mem else f"{100.0 / os.cpu_count()}%"
cpu_work_per_thread = int(cpu_work / cpu_threads)

cpu_procs = []
mem_procs = []
cpu_prog = [f"{this_dir.joinpath('cpu-benchmark')}", f"{cpu_work_per_thread}"]
cpu_work_per_thread = int(1000000 * cpu_work / (16384 * cpu_threads))
cpu_samples = min(cpu_work_per_thread, int32_max)
cpu_ops = (cpu_work_per_thread + int32_max - 1) // int32_max
if cpu_ops > int32_max:
log_info("Exceeded maximum number of cpu work.")
cpu_ops = int32_max

cpu_proc = None
mem_proc = None

cpu_prog = ["stress-ng", "--monte-carlo", f"{cpu_threads}",
"--monte-carlo-method", "pi",
"--monte-carlo-rand", "lcg",
"--monte-carlo-samples", f"{cpu_samples}",
"--monte-carlo-ops", f"{cpu_ops}"]
mem_prog = ["stress-ng", "--vm", f"{mem_threads}",
"--vm-bytes", f"{total_mem}", "--vm-keep"]

for i in range(cpu_threads):
cpu_proc = subprocess.Popen(cpu_prog, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if cpu_threads > 0:
cpu_proc = subprocess.Popen(cpu_prog, preexec_fn=os.setsid)

# NOTE: might be a good idea to use psutil to set the affinity (works across platforms)
if core:
os.sched_setaffinity(cpu_proc.pid, {core})
cpu_procs.append(cpu_proc)

# Start a thread to monitor the progress of each CPU benchmark process
monitor_thread = multiprocessing.Process(target=monitor_progress, args=(cpu_proc, cpu_queue))
monitor_thread.start()

if mem_threads > 0:
# NOTE: add a check to use creationflags=subprocess.CREATE_NEW_PROCESS_GROUP for Windows
mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid)
if core:
os.sched_setaffinity(mem_proc.pid, {core})
mem_procs.append(mem_proc)

return cpu_procs, mem_procs
return cpu_proc, mem_proc


def io_read_benchmark_user_input_data_size(inputs,
Expand Down Expand Up @@ -446,22 +464,22 @@ def main():
log_debug(f"{args.name} acquired core {core}")

mem_threads=int(10 - 10 * args.percent_cpu)
cpu_procs, mem_procs = cpu_mem_benchmark(cpu_queue=cpu_queue,
cpu_proc, mem_proc = cpu_mem_benchmark(cpu_queue=cpu_queue,
cpu_threads=int(10 * args.percent_cpu),
mem_threads=mem_threads,
cpu_work=sys.maxsize if args.time else int(args.cpu_work),
cpu_work=int32_max**2 if args.time else int(args.cpu_work),
core=core,
total_mem=mem_bytes)

procs.extend(cpu_procs)
procs.append(cpu_proc)
if args.time:
time.sleep(int(args.time))
for proc in procs:
if isinstance(proc, multiprocessing.Process):
if proc.is_alive():
proc.terminate()
elif isinstance(proc, subprocess.Popen):
proc.terminate()
kill_process_and_children(proc)
else:
for proc in procs:
if isinstance(proc, subprocess.Popen):
Expand All @@ -470,11 +488,10 @@ def main():
# io_proc.terminate()
io_proc.join()

for mem_proc in mem_procs:
try:
os.kill(mem_proc.pid, signal.SIGKILL) # Force kill if SIGTERM fails
except subprocess.TimeoutExpired:
log_debug("Memory process did not terminate; force-killing.")
try:
os.kill(mem_proc.pid, signal.SIGKILL) # Force kill if SIGTERM fails
except subprocess.TimeoutExpired:
log_debug("Memory process did not terminate; force-killing.")
# As a fallback, use pkill if any remaining instances are stuck
subprocess.Popen(["pkill", "-f", "stress-ng"]).wait()

Expand Down
Loading