Skip to content

Commit

Permalink
Save metrics normalized from measurements
Browse files Browse the repository at this point in the history
  • Loading branch information
nineinchnick committed Sep 16, 2022
1 parent 57d2b8b commit f7bbcef
Show file tree
Hide file tree
Showing 13 changed files with 506 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -153,8 +152,8 @@ private void addMeanMaxMeasurements(Map<String, double[]> loadedMetrics, List<Me
{
Optional<StatisticalSummary> statistics = getStats(loadedMetrics, metricName);
if (statistics.isPresent()) {
measurements.add(Measurement.measurement(metricName, unit, statistics.get().getMax(), ImmutableMap.of("scope", "cluster", "aggregate", "max")));
measurements.add(Measurement.measurement(metricName, unit, statistics.get().getMean(), ImmutableMap.of("scope", "cluster", "aggregate", "mean")));
measurements.add(Measurement.measurement(metricName, unit, statistics.get().getMax(), ImmutableMap.of("scope", METRIC_SCOPE, "aggregate", "max")));
measurements.add(Measurement.measurement(metricName, unit, statistics.get().getMean(), ImmutableMap.of("scope", METRIC_SCOPE, "aggregate", "mean")));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.benchto.driver.service.Measurement;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;

Expand All @@ -35,7 +36,11 @@ public CompletableFuture<List<Measurement>> loadMeasurements(Measurable measurab
{
List<Measurement> measurements;
if (shouldMeasureDuration(measurable)) {
measurements = ImmutableList.of(measurement("duration", "MILLISECONDS", measurable.getQueryDuration().toMillis()));
measurements = ImmutableList.of(measurement(
"duration",
"MILLISECONDS",
measurable.getQueryDuration().toMillis(),
Collections.singletonMap("scope", "driver")));
}
else {
measurements = ImmutableList.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.benchto.driver.service.Measurement;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;

Expand All @@ -34,7 +35,11 @@ public CompletableFuture<List<Measurement>> loadMeasurements(Measurable measurab
{
List<Measurement> measurements;
if (measurable instanceof BenchmarkExecutionResult && measurable.getBenchmark().isConcurrent() && measurable.isSuccessful()) {
measurements = ImmutableList.of(Measurement.measurement("throughput", "QUERY_PER_SECOND", calculateThroughput((BenchmarkExecutionResult) measurable)));
measurements = ImmutableList.of(Measurement.measurement(
"throughput",
"QUERY_PER_SECOND",
calculateThroughput((BenchmarkExecutionResult) measurable),
Collections.singletonMap("scope", "driver")));
}
else {
measurements = emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class PrestoClient
.put("peakTotalMemoryReservation", BYTE)
.put("physicalWrittenDataSize", BYTE)
.build();
public static final String METRIC_SCOPE = "query";

@Autowired
private RestTemplate restTemplate;
Expand Down Expand Up @@ -124,7 +125,7 @@ private URI buildQueryInfoURI(String queryId)
private Measurement parseQueryStatistic(String name, Object statistic, Unit requiredUnit)
{
double value = UnitConverter.parseValueAsUnit(statistic.toString(), requiredUnit);
return measurement(name, UnitConverter.format(requiredUnit), value, Collections.singletonMap("scope", "prestoQuery"));
return measurement(name, UnitConverter.format(requiredUnit), value, Collections.singletonMap("scope", METRIC_SCOPE));
}

@SuppressWarnings("unused")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
*/
package io.trino.benchto.driver;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.benchto.driver.execution.ExecutionDriver;
import io.trino.benchto.driver.macro.MacroService;
import io.trino.benchto.driver.service.Measurement;
import org.hamcrest.Matcher;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
Expand All @@ -24,9 +27,12 @@
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.test.web.client.RequestMatcher;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static io.trino.benchto.driver.service.Measurement.measurement;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
Expand All @@ -50,8 +56,14 @@ public class DriverAppIntegrationTest
",{\"target\":\"network\",\"datapoints\":[[10, 10],[10, 10]]}" +
",{\"target\":\"network_total\",\"datapoints\":[[10, 10],[10, 10]]}" +
"]";
private static final List<String> GRAPHITE_MEASUREMENT_NAMES = ImmutableList.of(
"cluster-memory_max", "cluster-memory_mean", "cluster-cpu_max", "cluster-cpu_mean", "cluster-network_max", "cluster-network_mean", "cluster-network_total");
private static final List<Measurement> GRAPHITE_MEASUREMENT_NAMES = ImmutableList.of(
measurement("memory", "PERCENT", 0.0, ImmutableMap.of("scope", "cluster", "aggregate", "max")),
measurement("memory", "PERCENT", 0.0, ImmutableMap.of("scope", "cluster", "aggregate", "mean")),
measurement("cpu", "PERCENT", 0.0, ImmutableMap.of("scope", "cluster", "aggregate", "max")),
measurement("cpu", "PERCENT", 0.0, ImmutableMap.of("scope", "cluster", "aggregate", "mean")),
measurement("network", "BYTES", 0.0, ImmutableMap.of("scope", "cluster", "aggregate", "max")),
measurement("network", "BYTES", 0.0, ImmutableMap.of("scope", "cluster", "aggregate", "mean")),
measurement("network", "BYTES", 0.0, ImmutableMap.of("scope", "cluster", "aggregate", "total")));

private static final Matcher<String> ENDED_STATUS_MATCHER = is("ENDED");

Expand Down Expand Up @@ -89,11 +101,11 @@ public void testBenchmark()
@Test
public void testConcurrentBenchmark()
{
ImmutableList<String> concurrentQueryMeasurementName = ImmutableList.of("duration");
ImmutableList<String> concurrentBenchmarkMeasurementNames = ImmutableList.<String>builder()
List<Measurement> concurrentQueryMeasurement = ImmutableList.of(measurement("duration", "MILLISECONDS", 0.0, ImmutableMap.of("scope", "driver")));
List<Measurement> concurrentBenchmarkMeasurements = ImmutableList.<Measurement>builder()
.addAll(GRAPHITE_MEASUREMENT_NAMES)
.add("throughput")
.add("duration")
.add(measurement("throughput", "QUERY_PER_SECOND", 0.0, ImmutableMap.of("scope", "driver")))
.add(measurement("duration", "MILLISECONDS", 0.0, ImmutableMap.of("scope", "driver")))
.build();

setBenchmark("test_concurrent_benchmark");
Expand All @@ -104,10 +116,10 @@ public void testConcurrentBenchmark()
verifyBenchmarkStart("test_concurrent_benchmark", "test_concurrent_benchmark");
for (int i = preWarmRuns; i < allRuns; i++) {
verifyExecutionStarted("test_concurrent_benchmark", i);
verifyExecutionFinished("test_concurrent_benchmark", i, concurrentQueryMeasurementName);
verifyExecutionFinished("test_concurrent_benchmark", i, concurrentQueryMeasurement);
}
verifyGetGraphiteMeasurements();
verifyBenchmarkFinish("test_concurrent_benchmark", concurrentBenchmarkMeasurementNames);
verifyBenchmarkFinish("test_concurrent_benchmark", concurrentBenchmarkMeasurements);

verifyComplete(allRuns);
}
Expand Down Expand Up @@ -146,14 +158,21 @@ private void verifyBenchmarkStart(String benchmarkName, String uniqueBenchmarkNa
)).andRespond(withSuccess());
}

private void verifyBenchmarkFinish(String uniqueBenchmarkName, List<String> measurementNames)
private void verifyBenchmarkFinish(String uniqueBenchmarkName, List<Measurement> measurements)
{
restServiceServer.expect(matchAll(
ObjectMapper mapper = new ObjectMapper();
List<RequestMatcher> matchers = Arrays.asList(
requestTo("http://benchmark-service:8080/v1/benchmark/" + uniqueBenchmarkName + "/BEN_SEQ_ID/finish"),
method(HttpMethod.POST),
jsonPath("$.status", ENDED_STATUS_MATCHER),
jsonPath("$.measurements.[*].name", containsInAnyOrder(measurementNames.toArray()))
)).andRespond(withSuccess());
jsonPath("$.measurements[*]['name', 'unit', 'attributes']", containsInAnyOrder(measurements.stream().map(item -> {
Map map = mapper.convertValue(item, Map.class);
map.remove("value");
return map;
}).toArray())));
restServiceServer
.expect(matchAll(matchers.toArray(new RequestMatcher[0])))
.andRespond(withSuccess());

restServiceServer.expect(matchAll(
requestTo("http://graphite:18088/events/"),
Expand All @@ -166,13 +185,13 @@ private void verifyBenchmarkFinish(String uniqueBenchmarkName, List<String> meas

private void verifySerialExecution(String uniqueBenchmarkName, String queryName, int executionNumber)
{
ImmutableList<String> serialQueryMeasurementNames = ImmutableList.<String>builder()
List<Measurement> serialQueryMeasurements = ImmutableList.<Measurement>builder()
.addAll(GRAPHITE_MEASUREMENT_NAMES)
.add("duration")
.add(measurement("duration", "MILLISECONDS", 0.0, ImmutableMap.of("scope", "driver")))
.build();
verifySerialExecutionStarted(uniqueBenchmarkName, queryName, executionNumber);
verifyGetGraphiteMeasurements();
verifySerialExecutionFinished(uniqueBenchmarkName, queryName, executionNumber, serialQueryMeasurementNames);
verifySerialExecutionFinished(uniqueBenchmarkName, queryName, executionNumber, serialQueryMeasurements);
}

private void verifySerialExecutionStarted(String uniqueBenchmarkName, String queryName, int executionNumber)
Expand All @@ -196,9 +215,9 @@ private void verifyExecutionStarted(String benchmarkName, int executionNumber)
)).andRespond(withSuccess());
}

private void verifySerialExecutionFinished(String uniqueBenchmarkName, String queryName, int executionNumber, List<String> measurementNames)
private void verifySerialExecutionFinished(String uniqueBenchmarkName, String queryName, int executionNumber, List<Measurement> measurements)
{
verifyExecutionFinished(uniqueBenchmarkName, executionNumber, measurementNames);
verifyExecutionFinished(uniqueBenchmarkName, executionNumber, measurements);

restServiceServer.expect(matchAll(
requestTo("http://graphite:18088/events/"),
Expand All @@ -209,14 +228,21 @@ private void verifySerialExecutionFinished(String uniqueBenchmarkName, String qu
)).andRespond(withSuccess());
}

private void verifyExecutionFinished(String uniqueBenchmarkName, int executionNumber, List<String> measurementNames)
private void verifyExecutionFinished(String uniqueBenchmarkName, int executionNumber, List<Measurement> measurements)
{
restServiceServer.expect(matchAll(
ObjectMapper mapper = new ObjectMapper();
List<RequestMatcher> matchers = Arrays.asList(
requestTo("http://benchmark-service:8080/v1/benchmark/" + uniqueBenchmarkName + "/BEN_SEQ_ID/execution/" + executionNumber + "/finish"),
method(HttpMethod.POST),
jsonPath("$.status", ENDED_STATUS_MATCHER),
jsonPath("$.measurements.[*].name", containsInAnyOrder(measurementNames.toArray()))
)).andRespond(withSuccess());
jsonPath("$.measurements[*]['name', 'unit', 'attributes']", containsInAnyOrder(measurements.stream().map(item -> {
Map map = mapper.convertValue(item, Map.class);
map.remove("value");
return map;
}).toArray())));
restServiceServer
.expect(matchAll(matchers.toArray(new RequestMatcher[0])))
.andRespond(withSuccess());
}

private void verifyGetGraphiteMeasurements()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testPrestoClientLoadMetrics()

List<Measurement> measurements = prestoClient.loadMetrics("test_query_id");

Map<String, String> attributes = Collections.singletonMap("scope", "prestoQuery");
Map<String, String> attributes = Collections.singletonMap("scope", "query");
assertThat(measurements).containsExactly(
Measurement.measurement("analysisTime", "MILLISECONDS", 21.07, attributes),
Measurement.measurement("planningTime", "MILLISECONDS", 24.72, attributes),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@
*/
package io.trino.benchto.service;

import com.google.common.collect.ImmutableList;
import io.trino.benchto.service.model.AggregatedMeasurement;
import io.trino.benchto.service.model.BenchmarkRun;
import io.trino.benchto.service.model.BenchmarkRunExecution;
import io.trino.benchto.service.model.Environment;
import io.trino.benchto.service.model.Measurement;
import io.trino.benchto.service.model.MeasurementUnit;
import io.trino.benchto.service.model.Metric;
import io.trino.benchto.service.model.QueryInfo;
import io.trino.benchto.service.model.Status;
import io.trino.benchto.service.repo.BenchmarkRunRepo;
import io.trino.benchto.service.repo.MetricRepo;
import io.trino.benchto.service.rest.requests.FlatMeasurement;
import org.hibernate.Hibernate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,15 +41,18 @@
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.trino.benchto.service.model.Status.STARTED;
import static io.trino.benchto.service.utils.BenchmarkUniqueNameUtils.generateBenchmarkUniqueName;
import static io.trino.benchto.service.utils.TimeUtils.currentDateTime;
import static java.util.stream.Collectors.groupingBy;

@Service
public class BenchmarkService
Expand All @@ -54,6 +62,9 @@ public class BenchmarkService
@Autowired
private BenchmarkRunRepo benchmarkRunRepo;

@Autowired
private MetricRepo metricRepo;

@Autowired
private EnvironmentService environmentService;

Expand Down Expand Up @@ -83,10 +94,10 @@ public String startBenchmarkRun(String uniqueName, String name, String sequenceI

@Retryable(value = {TransientDataAccessException.class, DataIntegrityViolationException.class})
@Transactional
public void finishBenchmarkRun(String uniqueName, String sequenceId, Status status, Optional<Instant> endTime, List<Measurement> measurements, Map<String, String> attributes)
public void finishBenchmarkRun(String uniqueName, String sequenceId, Status status, Optional<Instant> endTime, List<FlatMeasurement> measurements, Map<String, String> attributes)
{
BenchmarkRun benchmarkRun = findBenchmarkRun(uniqueName, sequenceId);
benchmarkRun.getMeasurements().addAll(measurements);
benchmarkRun.getMeasurements().addAll(normalizeMeasurements(measurements));
benchmarkRun.getAttributes().putAll(attributes);
benchmarkRun.setEnded(fromInstantOrCurrentDateTime(endTime));
benchmarkRun.setStatus(status);
Expand Down Expand Up @@ -130,7 +141,7 @@ public void startExecution(String uniqueName, String benchmarkSequenceId, String
@Retryable(value = {TransientDataAccessException.class, DataIntegrityViolationException.class})
@Transactional
public void finishExecution(String uniqueName, String benchmarkSequenceId, String executionSequenceId, Status status,
Optional<Instant> endTime, List<Measurement> measurements, Map<String, String> attributes, String queryInfo)
Optional<Instant> endTime, List<FlatMeasurement> measurements, Map<String, String> attributes, String queryInfo)
{
BenchmarkRun benchmarkRun = findBenchmarkRun(uniqueName, benchmarkSequenceId);

Expand All @@ -140,7 +151,7 @@ public void finishExecution(String uniqueName, String benchmarkSequenceId, Strin

checkState(execution.getStatus() == STARTED, "Wrong execution status: %s", execution.getStatus());

execution.getMeasurements().addAll(measurements);
execution.getMeasurements().addAll(normalizeMeasurements(measurements));
execution.getAttributes().putAll(attributes);
execution.setEnded(fromInstantOrCurrentDateTime(endTime));
execution.setStatus(status);
Expand All @@ -159,6 +170,31 @@ public void finishExecution(String uniqueName, String benchmarkSequenceId, Strin
LOG.debug("Finishing execution - {}", execution);
}

private List<Measurement> normalizeMeasurements(List<FlatMeasurement> input)
{
// use a lookup map to avoid building a complex SQL query that compares attributes list
Map<Metric, Metric> metrics = new HashMap<>();
metricRepo.findAll().forEach(metric -> metrics.put(metric, metric));
Map<Metric, List<FlatMeasurement>> groups = input.stream()
.collect(groupingBy(flatMeasurement -> new Metric(
flatMeasurement.getName(),
MeasurementUnit.valueOf(flatMeasurement.getUnit()),
flatMeasurement.getAttributes())));
ImmutableList.Builder<Measurement> result = ImmutableList.builder();
for (Map.Entry<Metric, List<FlatMeasurement>> entry : groups.entrySet()) {
Metric metric = metrics.get(entry.getKey());
if (metric == null) {
metric = metricRepo.save(entry.getKey());
metrics.put(metric, metric);
}
Metric finalMetric = metric;
result.addAll(entry.getValue().stream()
.map(flatMeasurement -> new Measurement(finalMetric, flatMeasurement.getValue()))
.collect(Collectors.toList()));
}
return result.build();
}

@Transactional
public BenchmarkRun findBenchmarkRun(String uniqueName, String sequenceId)
{
Expand Down
Loading

0 comments on commit f7bbcef

Please sign in to comment.