Skip to content

Commit

Permalink
Add lastCallTime to the WorkerMetrics.
Browse files Browse the repository at this point in the history
Every new worker acquiring updates `lastCallTime`.

PiperOrigin-RevId: 471539520
Change-Id: If56782ca027a2f0e2f120733aef00d260fcdf304
  • Loading branch information
Googler authored and copybara-github committed Sep 1, 2022
1 parent 3818688 commit d233c89
Show file tree
Hide file tree
Showing 14 changed files with 212 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,8 @@ message BuildMetrics {
int64 collect_time_in_ms = 1;
// RSS size of worker process.
int32 worker_memory_in_kb = 2;
// Epoch unix time of last action started on specific worker.
int64 last_action_start_time_in_ms = 3;
}

// Combined workers statistics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ private MetricsCollector(
this.numAnalyses = numAnalyses;
this.numBuilds = numBuilds;
env.getEventBus().register(this);
WorkerMetricsCollector.instance().setClock(env.getClock());
}

static void installInEnv(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,8 @@ ProfilerStartedEvent initProfiler(
+ " will be omitted in merged actions."));
}
Profiler profiler = Profiler.instance();
WorkerMetricsCollector workerMetricsCollector = WorkerMetricsCollector.instance();
workerMetricsCollector.setClock(clock);
profiler.start(
profiledTasks,
out,
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/google/devtools/build/lib/worker/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/actions:file_metadata",
"//src/main/java/com/google/devtools/build/lib/actions:localhost_capacity",
"//src/main/java/com/google/devtools/build/lib/actions:resource_manager",
"//src/main/java/com/google/devtools/build/lib/clock",
"//src/main/java/com/google/devtools/build/lib/events",
"//src/main/java/com/google/devtools/build/lib/exec:bin_tools",
"//src/main/java/com/google/devtools/build/lib/exec:runfiles_tree_updater",
Expand Down Expand Up @@ -166,6 +167,7 @@ java_library(
],
deps = [
"//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:build_event_stream_java_proto",
"//src/main/java/com/google/devtools/build/lib/clock",
"//src/main/java/com/google/devtools/build/lib/util:os",
"//third_party:auto_value",
"//third_party:flogger",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ public static WorkerMetric create(
public abstract static class WorkerStat {
public abstract int getUsedMemoryInKB();

public abstract Instant getTimestamp();
public abstract Instant getLastCallTime();

public static WorkerStat create(int usedMemoryInKB, Instant timestamp) {
return new AutoValue_WorkerMetric_WorkerStat(usedMemoryInKB, timestamp);
public abstract Instant getCollectTime();

public static WorkerStat create(int usedMemoryInKB, Instant lastCallTime, Instant collectTime) {
return new AutoValue_WorkerMetric_WorkerStat(usedMemoryInKB, lastCallTime, collectTime);
}
}

Expand Down Expand Up @@ -86,8 +88,9 @@ public WorkerMetrics toProto() {
if (workerStat != null) {
WorkerStats stats =
WorkerMetrics.WorkerStats.newBuilder()
.setCollectTimeInMs(workerStat.getTimestamp().toEpochMilli())
.setCollectTimeInMs(workerStat.getCollectTime().toEpochMilli())
.setWorkerMemoryInKb(workerStat.getUsedMemoryInKB())
.setLastActionStartTimeInMs(workerStat.getLastCallTime().toEpochMilli())
.build();
builder.addWorkerStats(stats);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
package com.google.devtools.build.lib.worker;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.GoogleLogger;
import com.google.devtools.build.lib.clock.Clock;
import com.google.devtools.build.lib.util.OS;
import com.google.devtools.build.lib.worker.WorkerMetric.WorkerStat;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
Expand All @@ -43,10 +48,17 @@ public class WorkerMetricsCollector {
/** The metrics collector (a static singleton instance). Inactive by default. */
private static final WorkerMetricsCollector instance = new WorkerMetricsCollector();

/** Mapping of worker ids to their metrics. */
private Clock clock;

/**
* Mapping of worker ids to their metrics. Contains worker ids, which memory usage could be
* measured.
*/
private final Map<Integer, WorkerMetric.WorkerProperties> workerIdToWorkerProperties =
new ConcurrentHashMap<>();

private final Map<Integer, Instant> workerLastCallTime = new ConcurrentHashMap<>();

private MetricsWithTime lastMetrics = new MetricsWithTime(ImmutableList.of(), Instant.EPOCH);

private WorkerMetricsCollector() {}
Expand All @@ -55,17 +67,23 @@ public static WorkerMetricsCollector instance() {
return instance;
}

// Collects process stats for each worker
@VisibleForTesting
public Map<Long, WorkerMetric.WorkerStat> collectStats(OS os, List<Long> processIds) {
public void setClock(Clock clock) {
this.clock = clock;
}

/**
* Collects memory usage of all ancestors of processes by pid. If a pid does not allow collecting
* memory usage, it is silently ignored.
*/
MemoryCollectionResult collectMemoryUsageByPid(OS os, ImmutableSet<Long> processIds) {
// TODO(b/181317827): Support Windows.
if (os != OS.LINUX && os != OS.DARWIN) {
return new HashMap<>();
return new MemoryCollectionResult(
ImmutableMap.of(), Instant.ofEpochMilli(clock.currentTimeMillis()));
}

Map<Long, Long> pidsToWorkerPid = getSubprocesses(processIds);
Instant now = Instant.now();
Map<Long, Integer> psMemory = collectDataFromPs(pidsToWorkerPid.keySet());
ImmutableMap<Long, Long> pidsToWorkerPid = getSubprocesses(processIds);
ImmutableMap<Long, Integer> psMemory = collectDataFromPs(pidsToWorkerPid.keySet());

Map<Long, Integer> sumMemory = new HashMap<>();
psMemory.forEach(
Expand All @@ -78,20 +96,16 @@ public Map<Long, WorkerMetric.WorkerStat> collectStats(OS os, List<Long> process
sumMemory.put(parent, parentMemory + memory);
});

Map<Long, WorkerMetric.WorkerStat> pidResults = new HashMap<>();
sumMemory.forEach(
(parent, memory) -> pidResults.put(parent, WorkerMetric.WorkerStat.create(memory, now)));

return pidResults;
return new MemoryCollectionResult(
ImmutableMap.copyOf(sumMemory), Instant.ofEpochMilli(clock.currentTimeMillis()));
}

/**
* For each parent process collects pids of all descendants. Stores them into the map, where key
* is the descendant pid and the value is parent pid.
*/
@VisibleForTesting
public Map<Long, Long> getSubprocesses(List<Long> parents) {
Map<Long, Long> subprocessesToProcess = new HashMap<>();
ImmutableMap<Long, Long> getSubprocesses(ImmutableSet<Long> parents) {
ImmutableMap.Builder<Long, Long> subprocessesToProcess = ImmutableMap.builder();
for (Long pid : parents) {
Optional<ProcessHandle> processHandle = ProcessHandle.of(pid);

Expand All @@ -105,22 +119,21 @@ public Map<Long, Long> getSubprocesses(List<Long> parents) {
}
}

return subprocessesToProcess;
return subprocessesToProcess.buildKeepingLast();
}

// Collects memory usage for every process
private Map<Long, Integer> collectDataFromPs(Collection<Long> pids) {
private ImmutableMap<Long, Integer> collectDataFromPs(Collection<Long> pids) {
BufferedReader psOutput;
try {
psOutput =
new BufferedReader(
new InputStreamReader(this.buildPsProcess(pids).getInputStream(), UTF_8));
new BufferedReader(new InputStreamReader(buildPsProcess(pids).getInputStream(), UTF_8));
} catch (IOException e) {
logger.atWarning().withCause(e).log("Error while executing command for pids: %s", pids);
return new HashMap<>();
return ImmutableMap.of();
}

HashMap<Long, Integer> processMemory = new HashMap<>();
ImmutableMap.Builder<Long, Integer> processMemory = ImmutableMap.builder();

try {
// The output of the above ps command looks similar to this:
Expand Down Expand Up @@ -151,7 +164,7 @@ private Map<Long, Integer> collectDataFromPs(Collection<Long> pids) {
logger.atWarning().withCause(e).log("Error while parsing psOutput: %s", psOutput);
}

return processMemory;
return processMemory.buildOrThrow();
}

@VisibleForTesting
Expand All @@ -167,8 +180,8 @@ public Process buildPsProcess(Collection<Long> processIds) throws IOException {
* returns previously collected metrics;
*/
public ImmutableList<WorkerMetric> collectMetrics(Duration interval) {
Instant now = Instant.now();
if (Duration.between(this.lastMetrics.time, now).compareTo(interval) < 0) {
Instant now = Instant.ofEpochMilli(clock.currentTimeMillis());
if (Duration.between(lastMetrics.time, now).compareTo(interval) < 0) {
return lastMetrics.metrics;
}

Expand All @@ -177,59 +190,70 @@ public ImmutableList<WorkerMetric> collectMetrics(Duration interval) {

// TODO(wilwell): add exception if we couldn't collect the metrics.
public ImmutableList<WorkerMetric> collectMetrics() {
Map<Long, WorkerMetric.WorkerStat> workerStats =
collectStats(
MemoryCollectionResult memoryCollectionResult =
collectMemoryUsageByPid(
OS.getCurrent(),
this.workerIdToWorkerProperties.values().stream()
workerIdToWorkerProperties.values().stream()
.map(WorkerMetric.WorkerProperties::getProcessId)
.collect(toImmutableList()));
.collect(toImmutableSet()));

ImmutableMap<Long, Integer> pidToMemoryInKb = memoryCollectionResult.pidToMemoryInKb;
Instant collectionTime = memoryCollectionResult.collectionTime;

ImmutableList.Builder<WorkerMetric> workerMetrics = new ImmutableList.Builder<>();
List<Integer> nonMeasurableWorkerIds = new ArrayList<>();
for (WorkerMetric.WorkerProperties workerProperties :
this.workerIdToWorkerProperties.values()) {
for (WorkerMetric.WorkerProperties workerProperties : workerIdToWorkerProperties.values()) {
Long pid = workerProperties.getProcessId();
Integer workerId = workerProperties.getWorkerId();
if (workerStats.containsKey(pid)) {
workerMetrics.add(
WorkerMetric.create(workerProperties, workerStats.get(pid), /* isMeasurable= */ true));
} else {
workerMetrics.add(
WorkerMetric.create(
workerProperties, /* workerStat= */ null, /* isMeasurable= */ false));

WorkerStat workerStats =
WorkerStat.create(
pidToMemoryInKb.getOrDefault(pid, 0),
workerLastCallTime.get(workerId),
collectionTime);

workerMetrics.add(
WorkerMetric.create(
workerProperties, workerStats, /* isMeasurable= */ pidToMemoryInKb.containsKey(pid)));

if (!pidToMemoryInKb.containsKey(pid)) {
nonMeasurableWorkerIds.add(workerId);
}
}

workerIdToWorkerProperties.keySet().removeAll(nonMeasurableWorkerIds);

return updateLastCollectMetrics(workerMetrics.build(), Instant.now()).metrics;
return updateLastCollectMetrics(workerMetrics.build(), collectionTime).metrics;
}

public void clear() {
this.workerIdToWorkerProperties.clear();
workerIdToWorkerProperties.clear();
}

@VisibleForTesting
public Map<Integer, WorkerMetric.WorkerProperties> getWorkerIdToWorkerProperties() {
return workerIdToWorkerProperties;
}

@VisibleForTesting
public Map<Integer, Instant> getWorkerLastCallTime() {
return workerLastCallTime;
}

/**
* Initializes workerIdToWorkerProperties for workers. If worker metrics already exists for this
* worker, does nothing.
* worker, only updates workerLastCallTime.
*/
public void registerWorker(WorkerMetric.WorkerProperties properties) {
if (workerIdToWorkerProperties.containsKey(properties.getWorkerId())) {
return;
}
int workerId = properties.getWorkerId();

workerIdToWorkerProperties.put(properties.getWorkerId(), properties);
workerIdToWorkerProperties.putIfAbsent(workerId, properties);
workerLastCallTime.put(workerId, Instant.ofEpochMilli(clock.currentTimeMillis()));
}

private synchronized MetricsWithTime updateLastCollectMetrics(
ImmutableList<WorkerMetric> metrics, Instant time) {
this.lastMetrics = new MetricsWithTime(metrics, time);
lastMetrics = new MetricsWithTime(metrics, time);
return lastMetrics;
}

Expand All @@ -243,5 +267,16 @@ public MetricsWithTime(ImmutableList<WorkerMetric> metrics, Instant time) {
}
}

static class MemoryCollectionResult {
public final ImmutableMap<Long, Integer> pidToMemoryInKb;
public final Instant collectionTime;

public MemoryCollectionResult(
ImmutableMap<Long, Integer> pidToMemoryInKb, Instant collectionTime) {
this.pidToMemoryInKb = pidToMemoryInKb;
this.collectionTime = collectionTime;
}
}

// TODO(b/238416583) Add deregister function
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ public void registerSpawnStrategies(
RunfilesTreeUpdater.INSTANCE,
env.getOptions().getOptions(WorkerOptions.class),
WorkerMetricsCollector.instance(),
env.getXattrProvider());
env.getXattrProvider(),
env.getClock());
ExecutionOptions executionOptions =
checkNotNull(env.getOptions().getOptions(ExecutionOptions.class));
registryBuilder.registerStrategy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.devtools.build.lib.actions.Spawns;
import com.google.devtools.build.lib.actions.UserExecException;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.clock.Clock;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.exec.BinTools;
import com.google.devtools.build.lib.exec.RunfilesTreeUpdater;
Expand Down Expand Up @@ -111,7 +112,8 @@ public WorkerSpawnRunner(
RunfilesTreeUpdater runfilesTreeUpdater,
WorkerOptions workerOptions,
WorkerMetricsCollector workerMetricsCollector,
XattrProvider xattrProvider) {
XattrProvider xattrProvider,
Clock clock) {
this.helpers = helpers;
this.execRoot = execRoot;
this.workers = checkNotNull(workers);
Expand All @@ -124,6 +126,7 @@ public WorkerSpawnRunner(
this.workerOptions = workerOptions;
this.resourceManager.setWorkerPool(workers);
this.metricsCollector = workerMetricsCollector;
this.metricsCollector.setClock(clock);
}

@Override
Expand Down
2 changes: 2 additions & 0 deletions src/test/java/com/google/devtools/build/lib/metrics/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ java_test(
"//src/main/java/com/google/devtools/build/lib/actions",
"//src/main/java/com/google/devtools/build/lib/analysis:view_creation_failed_exception",
"//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:build_event_stream_java_proto",
"//src/main/java/com/google/devtools/build/lib/clock",
"//src/main/java/com/google/devtools/build/lib/metrics:event",
"//src/main/java/com/google/devtools/build/lib/metrics:memory-use-recorder",
"//src/main/java/com/google/devtools/build/lib/metrics:metrics_module",
"//src/main/java/com/google/devtools/build/lib/profiler",
"//src/main/java/com/google/devtools/build/lib/util:os",
"//src/main/java/com/google/devtools/build/lib/worker:worker_metric",
"//src/test/java/com/google/devtools/build/lib/buildtool/util",
"//third_party:guava",
"//third_party:junit4",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildMetrics.BuildGraphMetrics;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildMetrics.CumulativeMetrics;
import com.google.devtools.build.lib.buildtool.util.BuildIntegrationTestCase;
import com.google.devtools.build.lib.clock.JavaClock;
import com.google.devtools.build.lib.profiler.MemoryProfiler;
import com.google.devtools.build.lib.runtime.BlazeModule;
import com.google.devtools.build.lib.runtime.BlazeRuntime;
import com.google.devtools.build.lib.runtime.CommandEnvironment;
import com.google.devtools.build.lib.util.OS;
import com.google.devtools.build.lib.worker.WorkerMetricsCollector;
import java.util.List;
import org.junit.After;
import org.junit.Assume;
Expand Down Expand Up @@ -79,6 +81,11 @@ public void writeTrivialFooTarget() throws Exception {
")");
}

@Before
public void setUpWorkerMetricsCollecto() {
WorkerMetricsCollector.instance().setClock(new JavaClock());
}

@After
public void resetProfilers() throws Exception {
MemoryProfiler.instance().stop();
Expand Down
Loading

0 comments on commit d233c89

Please sign in to comment.