Skip to content

Commit

Permalink
Support multiplex workers in WorkerMetricsCollector.
Browse files Browse the repository at this point in the history
I've changed mapping from workerId->workerProperties to processId->workerProperties, because different workers could have same workerId. Also `workerId` field in blaze.invocations table is now repeated and contains multiple worker ids.

PiperOrigin-RevId: 497136616
Change-Id: I04dba5152b453e55e987dfaf3477e1193fa08577
  • Loading branch information
Googler authored and copybara-github committed Dec 22, 2022
1 parent 78140bf commit d31dd09
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1039,8 +1039,11 @@ message BuildMetrics {

// Information about all workers that were alive during the invocation.
message WorkerMetrics {
// Unique id of worker.
int32 worker_id = 1;
// Deprecated. Use worker_ids instead of this field.
int32 worker_id = 1 [deprecated = true];

// Ids of workers. Could be multiple in case of multiplex workers
repeated uint32 worker_ids = 8;
// Worker process id. If there is no process for worker, equals to zero.
uint32 process_id = 2;
// Mnemonic of running worker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ ImmutableSet<Integer> collectEvictionCandidates(
.filter(
metric ->
metric.getWorkerStat() != null
&& idleWorkers.contains(metric.getWorkerProperties().getWorkerId()))
&& idleWorkers.containsAll(metric.getWorkerProperties().getWorkerIds()))
.collect(Collectors.toList());

if (idleWorkerMetrics.size() != idleWorkers.size()) {
Expand All @@ -141,7 +141,7 @@ ImmutableSet<Integer> collectEvictionCandidates(
ImmutableSet.Builder<Integer> candidates = ImmutableSet.builder();
int freeMemoryMb = 0;
for (WorkerMetric metric : idleWorkerMetrics) {
candidates.add(metric.getWorkerProperties().getWorkerId());
candidates.addAll(metric.getWorkerProperties().getWorkerIds());
freeMemoryMb += metric.getWorkerStat().getUsedMemoryInKB() / 1000;

if (workerMemeoryUsageMb - freeMemoryMb <= memoryLimitMb) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.google.devtools.build.lib.worker;

import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildMetrics.WorkerMetrics;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildMetrics.WorkerMetrics.WorkerStats;
import java.time.Instant;
Expand Down Expand Up @@ -55,7 +56,7 @@ public static WorkerStat create(int usedMemoryInKB, Instant lastCallTime, Instan
/** Worker properties */
@AutoValue
public abstract static class WorkerProperties {
public abstract int getWorkerId();
public abstract ImmutableList<Integer> getWorkerIds();

public abstract long getProcessId();

Expand All @@ -66,9 +67,13 @@ public abstract static class WorkerProperties {
public abstract boolean isSandboxed();

public static WorkerProperties create(
int workerId, long processId, String mnemonic, boolean isMultiplex, boolean isSandboxed) {
ImmutableList<Integer> workerIds,
long processId,
String mnemonic,
boolean isMultiplex,
boolean isSandboxed) {
return new AutoValue_WorkerMetric_WorkerProperties(
workerId, processId, mnemonic, isMultiplex, isSandboxed);
workerIds, processId, mnemonic, isMultiplex, isSandboxed);
}
}

Expand All @@ -78,7 +83,7 @@ public WorkerMetrics toProto() {

WorkerMetrics.Builder builder =
WorkerMetrics.newBuilder()
.setWorkerId(workerProperties.getWorkerId())
.addAllWorkerIds(workerProperties.getWorkerIds())
.setProcessId((int) workerProperties.getProcessId())
.setMnemonic(workerProperties.getMnemonic())
.setIsSandbox(workerProperties.isSandboxed())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package com.google.devtools.build.lib.worker;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.auto.value.AutoValue;
Expand Down Expand Up @@ -49,13 +48,13 @@ public class WorkerMetricsCollector {
private Clock clock;

/**
* Mapping of worker ids to their metrics. Contains worker ids, which memory usage could be
* measured.
* Mapping of worker process ids to their properties. One process could be mapped to multiple
* workers because of multiplex workers.
*/
private final Map<Integer, WorkerMetric.WorkerProperties> workerIdToWorkerProperties =
private final Map<Long, WorkerMetric.WorkerProperties> processIdToWorkerProperties =
new ConcurrentHashMap<>();

private final Map<Integer, Instant> workerLastCallTime = new ConcurrentHashMap<>();
private final Map<Long, Instant> workerLastCallTime = new ConcurrentHashMap<>();

private MetricsWithTime lastMetrics;

Expand Down Expand Up @@ -206,65 +205,83 @@ public ImmutableList<WorkerMetric> collectMetrics(Duration interval) {
public ImmutableList<WorkerMetric> collectMetrics() {
MemoryCollectionResult memoryCollectionResult =
collectMemoryUsageByPid(
OS.getCurrent(),
workerIdToWorkerProperties.values().stream()
.map(WorkerMetric.WorkerProperties::getProcessId)
.collect(toImmutableSet()));
OS.getCurrent(), ImmutableSet.copyOf(processIdToWorkerProperties.keySet()));

ImmutableMap<Long, Integer> pidToMemoryInKb = memoryCollectionResult.pidToMemoryInKb;
Instant collectionTime = memoryCollectionResult.collectionTime;

ImmutableList.Builder<WorkerMetric> workerMetrics = new ImmutableList.Builder<>();
List<Integer> nonMeasurableWorkerIds = new ArrayList<>();
for (WorkerMetric.WorkerProperties workerProperties : workerIdToWorkerProperties.values()) {
List<Long> nonMeasurableProcessIds = new ArrayList<>();
for (WorkerMetric.WorkerProperties workerProperties : processIdToWorkerProperties.values()) {
Long pid = workerProperties.getProcessId();
Integer workerId = workerProperties.getWorkerId();

WorkerStat workerStats =
WorkerStat.create(
pidToMemoryInKb.getOrDefault(pid, 0),
workerLastCallTime.get(workerId),
collectionTime);
pidToMemoryInKb.getOrDefault(pid, 0), workerLastCallTime.get(pid), collectionTime);

workerMetrics.add(
WorkerMetric.create(
workerProperties, workerStats, /* isMeasurable= */ pidToMemoryInKb.containsKey(pid)));

if (!pidToMemoryInKb.containsKey(pid)) {
nonMeasurableWorkerIds.add(workerId);
nonMeasurableProcessIds.add(pid);
}
}

workerIdToWorkerProperties.keySet().removeAll(nonMeasurableWorkerIds);
processIdToWorkerProperties.keySet().removeAll(nonMeasurableProcessIds);

return updateLastCollectMetrics(workerMetrics.build(), collectionTime).metrics;
}

public void clear() {
workerIdToWorkerProperties.clear();
processIdToWorkerProperties.clear();
workerLastCallTime.clear();
lastMetrics = null;
}

@VisibleForTesting
public Map<Integer, WorkerMetric.WorkerProperties> getWorkerIdToWorkerProperties() {
return workerIdToWorkerProperties;
public Map<Long, WorkerMetric.WorkerProperties> getProcessIdToWorkerProperties() {
return processIdToWorkerProperties;
}

@VisibleForTesting
public Map<Integer, Instant> getWorkerLastCallTime() {
public Map<Long, Instant> getWorkerLastCallTime() {
return workerLastCallTime;
}

/**
* Initializes workerIdToWorkerProperties for workers. If worker metrics already exists for this
* worker, only updates workerLastCallTime.
*/
public void registerWorker(WorkerMetric.WorkerProperties properties) {
int workerId = properties.getWorkerId();
public synchronized void registerWorker(
int workerId, long processId, String mnemonic, boolean isMultiplex, boolean isSandboxed) {
WorkerMetric.WorkerProperties existingWorkerProperties =
processIdToWorkerProperties.get(processId);

workerLastCallTime.put(processId, Instant.ofEpochMilli(clock.currentTimeMillis()));

if (existingWorkerProperties == null) {
processIdToWorkerProperties.put(
processId,
WorkerMetric.WorkerProperties.create(
ImmutableList.of(workerId), processId, mnemonic, isMultiplex, isSandboxed));
return;
}

if (existingWorkerProperties.getWorkerIds().contains(workerId)) {
return;
}

ImmutableList<Integer> updatedWorkerIds =
ImmutableList.<Integer>builder()
.addAll(existingWorkerProperties.getWorkerIds())
.add(workerId)
.build();

workerIdToWorkerProperties.putIfAbsent(workerId, properties);
workerLastCallTime.put(workerId, Instant.ofEpochMilli(clock.currentTimeMillis()));
WorkerMetric.WorkerProperties updatedWorkerProperties =
WorkerMetric.WorkerProperties.create(
updatedWorkerIds, processId, mnemonic, isMultiplex, isSandboxed);
processIdToWorkerProperties.put(processId, updatedWorkerProperties);
}

private synchronized MetricsWithTime updateLastCollectMetrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,14 +770,12 @@ private WorkResponse executeRequest(
}

private void initializeMetrics(WorkerKey workerKey, Worker worker) {
WorkerMetric.WorkerProperties properties =
WorkerMetric.WorkerProperties.create(
worker.getWorkerId(),
worker.getProcessId(),
workerKey.getMnemonic(),
workerKey.isMultiplex(),
workerKey.isSandboxed());
this.metricsCollector.registerWorker(properties);
this.metricsCollector.registerWorker(
worker.getWorkerId(),
worker.getProcessId(),
workerKey.getMnemonic(),
workerKey.isMultiplex(),
workerKey.isSandboxed());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public void testProfilerWorkerMetrics() throws Exception {
ImmutableList.of(
WorkerMetric.create(
WorkerMetric.WorkerProperties.create(
/* workerId= */ 1,
/* workerIds= */ ImmutableList.of(1),
/* processId= */ 1,
/* mnemonic= */ "dummy1",
/* isMultiplex= */ true,
Expand All @@ -294,7 +294,7 @@ public void testProfilerWorkerMetrics() throws Exception {
/* isMeasurable= */ true),
WorkerMetric.create(
WorkerMetric.WorkerProperties.create(
/* workerId= */ 1,
/* workerIds= */ ImmutableList.of(1),
/* processId= */ 1,
/* mnemonic= */ "dummy2",
/* isMultiplex= */ false,
Expand All @@ -303,7 +303,7 @@ public void testProfilerWorkerMetrics() throws Exception {
/* isMeasurable= */ true),
WorkerMetric.create(
WorkerMetric.WorkerProperties.create(
/* workerId= */ 1,
/* workerIds= */ ImmutableList.of(1),
/* processId= */ 1,
/* mnemonic= */ "dummy2",
/* isMultiplex= */ false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,11 @@ public void testGetEvictionCandidates_evictDifferentWorkerKeys() throws Exceptio
private static WorkerMetric.WorkerProperties createWorkerProperties(
int workerId, long processId, String mnemonic) {
return WorkerMetric.WorkerProperties.create(
workerId, processId, mnemonic, /* isMultiplex= */ false, /* isSandboxed= */ false);
ImmutableList.of(workerId),
processId,
mnemonic,
/* isMultiplex= */ false,
/* isSandboxed= */ false);
}

private static WorkerMetric.WorkerStat createWorkerStat(int memoryUsage) {
Expand Down
Loading

0 comments on commit d31dd09

Please sign in to comment.