Skip to content

Commit

Permalink
Match concurrency to available CPU bandwidth
Browse files Browse the repository at this point in the history
This change allows ninja to throttle number of parallel tasks based
on feedback from Linux kernel's PSI (Pressure Stall Information)
interfacts.  It extends "-l" parameter to accept negative values;
"-l-NN" means that ninja should limit concurrency when processes
in current cgroup spend more than NN% of their time stalled on CPU.

E.g., running "ninja -j100 -l-10" on a 32-core machine will quickly
settle on parallelism of 32-34.

This option is designed to make ninja use all CPU bandwidth available
to a cgroup-based container, while not starting excessive number of
processes, which could eat up all RAM.

The motivation for this feature is to automatically reduce
parallelism when the system is about to run out of RAM.
If the system has swap enabled, "ninja -l-10" will dance with
parallelism on the edge of just using a bit of swap.  As soon as
a process starts swapping, CPU "stalled" cycles increase, and
parallelism is reduced.  The same argument works when a process is
waiting for its turn to use IO and/or network.
  • Loading branch information
maxim-kuvyrkov committed Jun 15, 2023
1 parent 36843d3 commit b78e715
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 6 deletions.
42 changes: 38 additions & 4 deletions src/build.cc
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,44 @@ void RealCommandRunner::Abort() {

bool RealCommandRunner::CanRunMore() const {
size_t subproc_number =
subprocs_.running_.size() + subprocs_.finished_.size();
return (int)subproc_number < config_.parallelism
&& ((subprocs_.running_.empty() || config_.max_load_average <= 0.0f)
|| GetLoadAverage() < config_.max_load_average);
subprocs_.running_.size() + subprocs_.finished_.size();

if ((int)subproc_number >= config_.parallelism)
return false;

if (subprocs_.running_.empty())
return true;

if (config_.max_load_average > 0.0f) {
double loadavg = GetLoadAverage();

if (loadavg < config_.max_load_average)
return true;

if (g_syslimits)
fprintf (stderr, "\nninja syslimits: loadavg %.0f >= %.0f\n",
loadavg, config_.max_load_average);

return false;
} else if (config_.max_load_average < -0.1f) {
double wait_ratio = GetCPUWaitRatio(subproc_number, config_.parallelism);

if (wait_ratio < 0.0f) {
fprintf (stderr, "\nninja syslimits: system does not support PSI\n");
return false;
}

if (wait_ratio < -config_.max_load_average)
return true;

if (g_syslimits)
fprintf (stderr,
"\nninja syslimits: wait_ratio %.0f >= %.0f; subprocs: %zu\n",
wait_ratio, -config_.max_load_average, subproc_number);

return false;
} else
return true;
}

bool RealCommandRunner::StartCommand(Edge* edge) {
Expand Down
2 changes: 2 additions & 0 deletions src/debug_flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

bool g_explaining = false;

bool g_syslimits = false;

bool g_keep_depfile = false;

bool g_keep_rsp = false;
Expand Down
2 changes: 2 additions & 0 deletions src/debug_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ extern bool g_keep_rsp;

extern bool g_experimental_statcache;

extern bool g_syslimits;

#endif // NINJA_EXPLAIN_H_
14 changes: 12 additions & 2 deletions src/ninja.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,13 @@ void Usage(const BuildConfig& config) {
"\n"
" -j N run N jobs in parallel (0 means infinity) [default=%d on this system]\n"
" -k N keep going until N jobs fail (0 means infinity) [default=1]\n"
" -l N do not start new jobs if the load average is greater than N\n"
" -l N do not start new jobs if system load is greater than N;\n"
" if N is positive,\n"
" then compare against system load average (absolute value);\n"
" if N is negative,\n"
" then compare against process stalled time (percentage);\n"
" e.g., -l-10 will not start new jobs if existing processes\n"
" spend, on average, 10% of their time waiting for CPU slice;\n"
" -n dry run (don't run commands but act like they succeeded)\n"
"\n"
" -d MODE enable debugging (use '-d list' to list modes)\n"
Expand Down Expand Up @@ -1161,6 +1167,7 @@ bool DebugEnable(const string& name) {
#ifdef _WIN32
" nostatcache don't batch stat() calls per directory and cache them\n"
#endif
" syslimits print notes when parallelism is limited by system pressure\n"
"multiple modes can be enabled via -d FOO -d BAR\n");
return false;
} else if (name == "stats") {
Expand All @@ -1178,11 +1185,14 @@ bool DebugEnable(const string& name) {
} else if (name == "nostatcache") {
g_experimental_statcache = false;
return true;
} else if (name == "syslimits") {
g_syslimits = true;
return true;
} else {
const char* suggestion =
SpellcheckString(name.c_str(),
"stats", "explain", "keepdepfile", "keeprsp",
"nostatcache", NULL);
"nostatcache", "syslimits", NULL);
if (suggestion) {
Error("unknown debug setting '%s', did you mean '%s'?",
name.c_str(), suggestion);
Expand Down
88 changes: 88 additions & 0 deletions src/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#elif defined(_AIX) && !defined(__PASE__)
#include <libperfstat.h>
#elif defined(linux) || defined(__GLIBC__)
#include <fstream>
#include <sys/sysinfo.h>
#include <fstream>
#include <map>
Expand All @@ -59,6 +60,7 @@
#endif

#include "edit_distance.h"
#include "metrics.h"

using namespace std;

Expand Down Expand Up @@ -835,6 +837,92 @@ double GetLoadAverage() {
}
#endif // _WIN32

double GetCPUWaitRatio(size_t subproc_number, int parallelism) {
#if defined(linux) || defined(__GLIBC__)
static double oncpu_ratio = 100.0;
static uint64_t prev_stalled(0);
static int64_t prev_timestamp(0);

// We use kernel's PSI infrastructure to calculate amount of time
// we are waiting for CPU. It would be great to just use 10-second
// average (avg10 below), but, unfortunately, that's too "slow"
// an average to provide satisfactory results. Using avg10 we will
// oscillate too far into overloading and underloading the system.
// Instead, we use raw total stalled count and divide it by time
// elapsed since previous measurement.
//
// The "total" units are microseconds, but documentation does not say
// whether it's cumulative across all CPUs or not. Apparently, it's
// not cumulative. IIUC, on an 8-core system if we have 6 processes
// running at 100% and another 2 stalled at 100% -- then every second
// the "total" stalled count will be increased by 1000000 [microseconds].
// The count will be increased by the same 1000000 [microseconds] if all
// 8 processes are 100% stalled.

ifstream cpupressure("/sys/fs/cgroup/cpu.pressure", ifstream::in);
string token;
uint64_t stalled(0);
while (cpupressure >> token) {
// Extract "total" from
// some avg10=0.01 avg60=4.76 avg300=6.17 total=11527181835
if (token == "some") {
cpupressure >> token; // avg10=
cpupressure >> token; // avg60=
cpupressure >> token; // avg300=
cpupressure >> token; // total=

// Parse total=NUM
token = token.substr(token.find("=") + 1);
stalled = stoull(token);
break;
}
}

if (stalled == 0)
// Unsupported.
return -1.0;

// We could use micro-second HighResTimer(), if we wanted to,
// but milliseconds provide good-enough granularity.
int64_t timestamp = GetTimeMillis();

if (prev_timestamp == 0) {
prev_timestamp = timestamp;
prev_stalled = stalled;
return 0.0f;
}

uint64_t stalled_ticks = stalled - prev_stalled;
uint64_t clock_ticks = 1000 * (timestamp - prev_timestamp);

if (stalled_ticks < clock_ticks) {
// Clock advanced, so update oncpu_ratio with latest measurements.
// Pass new measurements through a simple noise filter.
oncpu_ratio *= ((double) subproc_number
/ (subproc_number + 1));
oncpu_ratio += ((100.0 * (clock_ticks - stalled_ticks) / clock_ticks)
/ (subproc_number + 1));

if (0 < stalled_ticks) {
// Again, to reduce noise in oncpu_ratio we update prev_* values only
// we get a new "stalled" reading.
prev_timestamp = timestamp;
prev_stalled = stalled;
}
} else {
// Clock didn't advance, this usually happens during initial
// startup, when we start config_.parallelism tasks in rapid
// succession. Slightly reduce oncpu_ratio to throttle startup
// of new processes until we get an updated measurement.
oncpu_ratio *= (double) parallelism / (parallelism + 1);
}

return 100.0 - oncpu_ratio;
#else
return -1.0;
#endif
}

string ElideMiddle(const string& str, size_t width) {
switch (width) {
case 0: return "";
Expand Down
4 changes: 4 additions & 0 deletions src/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ int GetProcessorCount();
/// on error.
double GetLoadAverage();

/// @return percentage of time tasks are waiting for CPU.
/// A negative value is returned for unsupported platforms.
double GetCPUWaitRatio(size_t subproc_number, int parallelism);

/// Elide the given string @a str with '...' in the middle if the length
/// exceeds @a width.
std::string ElideMiddle(const std::string& str, size_t width);
Expand Down

0 comments on commit b78e715

Please sign in to comment.