Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalize metrics from measurements #45

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add description in commit message

import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -153,8 +152,8 @@ private void addMeanMaxMeasurements(Map<String, double[]> loadedMetrics, List<Me
{
Optional<StatisticalSummary> statistics = getStats(loadedMetrics, metricName);
if (statistics.isPresent()) {
measurements.add(Measurement.measurement(metricName, unit, statistics.get().getMax(), ImmutableMap.of("scope", "cluster", "aggregate", "max")));
measurements.add(Measurement.measurement(metricName, unit, statistics.get().getMean(), ImmutableMap.of("scope", "cluster", "aggregate", "mean")));
measurements.add(Measurement.measurement(metricName, unit, statistics.get().getMax(), ImmutableMap.of("scope", METRIC_SCOPE, "aggregate", "max")));
measurements.add(Measurement.measurement(metricName, unit, statistics.get().getMean(), ImmutableMap.of("scope", METRIC_SCOPE, "aggregate", "mean")));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.benchto.driver.service.Measurement;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;

Expand All @@ -35,7 +36,11 @@ public CompletableFuture<List<Measurement>> loadMeasurements(Measurable measurab
{
List<Measurement> measurements;
if (shouldMeasureDuration(measurable)) {
measurements = ImmutableList.of(measurement("duration", "MILLISECONDS", measurable.getQueryDuration().toMillis()));
measurements = ImmutableList.of(measurement(
"duration",
"MILLISECONDS",
measurable.getQueryDuration().toMillis(),
Collections.singletonMap("scope", "driver")));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use guava immutable collections (do we have guava as dependency?)

}
else {
measurements = ImmutableList.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.benchto.driver.service.Measurement;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;

Expand All @@ -34,7 +35,11 @@ public CompletableFuture<List<Measurement>> loadMeasurements(Measurable measurab
{
List<Measurement> measurements;
if (measurable instanceof BenchmarkExecutionResult && measurable.getBenchmark().isConcurrent() && measurable.isSuccessful()) {
measurements = ImmutableList.of(Measurement.measurement("throughput", "QUERY_PER_SECOND", calculateThroughput((BenchmarkExecutionResult) measurable)));
measurements = ImmutableList.of(Measurement.measurement(
"throughput",
"QUERY_PER_SECOND",
calculateThroughput((BenchmarkExecutionResult) measurable),
Collections.singletonMap("scope", "driver")));
}
else {
measurements = emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class PrestoClient
.put("peakTotalMemoryReservation", BYTE)
.put("physicalWrittenDataSize", BYTE)
.build();
public static final String METRIC_SCOPE = "query";

@Autowired
private RestTemplate restTemplate;
Expand Down Expand Up @@ -124,7 +125,7 @@ private URI buildQueryInfoURI(String queryId)
private Measurement parseQueryStatistic(String name, Object statistic, Unit requiredUnit)
{
double value = UnitConverter.parseValueAsUnit(statistic.toString(), requiredUnit);
return measurement(name, UnitConverter.format(requiredUnit), value, Collections.singletonMap("scope", "prestoQuery"));
return measurement(name, UnitConverter.format(requiredUnit), value, Collections.singletonMap("scope", METRIC_SCOPE));
}

@SuppressWarnings("unused")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
*/
package io.trino.benchto.driver;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.benchto.driver.execution.ExecutionDriver;
import io.trino.benchto.driver.macro.MacroService;
import io.trino.benchto.driver.service.Measurement;
import org.hamcrest.Matcher;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
Expand All @@ -24,9 +27,12 @@
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.test.web.client.RequestMatcher;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static io.trino.benchto.driver.service.Measurement.measurement;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
Expand All @@ -50,8 +56,14 @@ public class DriverAppIntegrationTest
",{\"target\":\"network\",\"datapoints\":[[10, 10],[10, 10]]}" +
",{\"target\":\"network_total\",\"datapoints\":[[10, 10],[10, 10]]}" +
"]";
private static final List<String> GRAPHITE_MEASUREMENT_NAMES = ImmutableList.of(
"cluster-memory_max", "cluster-memory_mean", "cluster-cpu_max", "cluster-cpu_mean", "cluster-network_max", "cluster-network_mean", "cluster-network_total");
private static final List<Measurement> GRAPHITE_MEASUREMENT_NAMES = ImmutableList.of(
measurement("memory", "PERCENT", 0.0, ImmutableMap.of("scope", "cluster", "aggregate", "max")),
measurement("memory", "PERCENT", 0.0, ImmutableMap.of("scope", "cluster", "aggregate", "mean")),
measurement("cpu", "PERCENT", 0.0, ImmutableMap.of("scope", "cluster", "aggregate", "max")),
measurement("cpu", "PERCENT", 0.0, ImmutableMap.of("scope", "cluster", "aggregate", "mean")),
measurement("network", "BYTES", 0.0, ImmutableMap.of("scope", "cluster", "aggregate", "max")),
measurement("network", "BYTES", 0.0, ImmutableMap.of("scope", "cluster", "aggregate", "mean")),
measurement("network", "BYTES", 0.0, ImmutableMap.of("scope", "cluster", "aggregate", "total")));

private static final Matcher<String> ENDED_STATUS_MATCHER = is("ENDED");

Expand Down Expand Up @@ -89,11 +101,11 @@ public void testBenchmark()
@Test
public void testConcurrentBenchmark()
{
ImmutableList<String> concurrentQueryMeasurementName = ImmutableList.of("duration");
ImmutableList<String> concurrentBenchmarkMeasurementNames = ImmutableList.<String>builder()
List<Measurement> concurrentQueryMeasurement = ImmutableList.of(measurement("duration", "MILLISECONDS", 0.0, ImmutableMap.of("scope", "driver")));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like a factory method. Maybe something like Measurement.of?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but I'd rather do this in a separate PR.

List<Measurement> concurrentBenchmarkMeasurements = ImmutableList.<Measurement>builder()
.addAll(GRAPHITE_MEASUREMENT_NAMES)
.add("throughput")
.add("duration")
.add(measurement("throughput", "QUERY_PER_SECOND", 0.0, ImmutableMap.of("scope", "driver")))
.add(measurement("duration", "MILLISECONDS", 0.0, ImmutableMap.of("scope", "driver")))
.build();

setBenchmark("test_concurrent_benchmark");
Expand All @@ -104,10 +116,10 @@ public void testConcurrentBenchmark()
verifyBenchmarkStart("test_concurrent_benchmark", "test_concurrent_benchmark");
for (int i = preWarmRuns; i < allRuns; i++) {
verifyExecutionStarted("test_concurrent_benchmark", i);
verifyExecutionFinished("test_concurrent_benchmark", i, concurrentQueryMeasurementName);
verifyExecutionFinished("test_concurrent_benchmark", i, concurrentQueryMeasurement);
}
verifyGetGraphiteMeasurements();
verifyBenchmarkFinish("test_concurrent_benchmark", concurrentBenchmarkMeasurementNames);
verifyBenchmarkFinish("test_concurrent_benchmark", concurrentBenchmarkMeasurements);

verifyComplete(allRuns);
}
Expand Down Expand Up @@ -146,14 +158,21 @@ private void verifyBenchmarkStart(String benchmarkName, String uniqueBenchmarkNa
)).andRespond(withSuccess());
}

private void verifyBenchmarkFinish(String uniqueBenchmarkName, List<String> measurementNames)
private void verifyBenchmarkFinish(String uniqueBenchmarkName, List<Measurement> measurements)
{
restServiceServer.expect(matchAll(
ObjectMapper mapper = new ObjectMapper();
List<RequestMatcher> matchers = Arrays.asList(
requestTo("http://benchmark-service:8080/v1/benchmark/" + uniqueBenchmarkName + "/BEN_SEQ_ID/finish"),
method(HttpMethod.POST),
jsonPath("$.status", ENDED_STATUS_MATCHER),
jsonPath("$.measurements.[*].name", containsInAnyOrder(measurementNames.toArray()))
)).andRespond(withSuccess());
jsonPath("$.measurements[*]['name', 'unit', 'attributes']", containsInAnyOrder(measurements.stream().map(item -> {
Map map = mapper.convertValue(item, Map.class);
map.remove("value");
return map;
}).toArray())));
restServiceServer
.expect(matchAll(matchers.toArray(new RequestMatcher[0])))
.andRespond(withSuccess());

restServiceServer.expect(matchAll(
requestTo("http://graphite:18088/events/"),
Expand All @@ -166,13 +185,13 @@ private void verifyBenchmarkFinish(String uniqueBenchmarkName, List<String> meas

private void verifySerialExecution(String uniqueBenchmarkName, String queryName, int executionNumber)
{
ImmutableList<String> serialQueryMeasurementNames = ImmutableList.<String>builder()
List<Measurement> serialQueryMeasurements = ImmutableList.<Measurement>builder()
.addAll(GRAPHITE_MEASUREMENT_NAMES)
.add("duration")
.add(measurement("duration", "MILLISECONDS", 0.0, ImmutableMap.of("scope", "driver")))
.build();
verifySerialExecutionStarted(uniqueBenchmarkName, queryName, executionNumber);
verifyGetGraphiteMeasurements();
verifySerialExecutionFinished(uniqueBenchmarkName, queryName, executionNumber, serialQueryMeasurementNames);
verifySerialExecutionFinished(uniqueBenchmarkName, queryName, executionNumber, serialQueryMeasurements);
}

private void verifySerialExecutionStarted(String uniqueBenchmarkName, String queryName, int executionNumber)
Expand All @@ -196,9 +215,9 @@ private void verifyExecutionStarted(String benchmarkName, int executionNumber)
)).andRespond(withSuccess());
}

private void verifySerialExecutionFinished(String uniqueBenchmarkName, String queryName, int executionNumber, List<String> measurementNames)
private void verifySerialExecutionFinished(String uniqueBenchmarkName, String queryName, int executionNumber, List<Measurement> measurements)
{
verifyExecutionFinished(uniqueBenchmarkName, executionNumber, measurementNames);
verifyExecutionFinished(uniqueBenchmarkName, executionNumber, measurements);

restServiceServer.expect(matchAll(
requestTo("http://graphite:18088/events/"),
Expand All @@ -209,14 +228,21 @@ private void verifySerialExecutionFinished(String uniqueBenchmarkName, String qu
)).andRespond(withSuccess());
}

private void verifyExecutionFinished(String uniqueBenchmarkName, int executionNumber, List<String> measurementNames)
private void verifyExecutionFinished(String uniqueBenchmarkName, int executionNumber, List<Measurement> measurements)
{
restServiceServer.expect(matchAll(
ObjectMapper mapper = new ObjectMapper();
List<RequestMatcher> matchers = Arrays.asList(
requestTo("http://benchmark-service:8080/v1/benchmark/" + uniqueBenchmarkName + "/BEN_SEQ_ID/execution/" + executionNumber + "/finish"),
method(HttpMethod.POST),
jsonPath("$.status", ENDED_STATUS_MATCHER),
jsonPath("$.measurements.[*].name", containsInAnyOrder(measurementNames.toArray()))
)).andRespond(withSuccess());
jsonPath("$.measurements[*]['name', 'unit', 'attributes']", containsInAnyOrder(measurements.stream().map(item -> {
Map map = mapper.convertValue(item, Map.class);
map.remove("value");
return map;
}).toArray())));
restServiceServer
.expect(matchAll(matchers.toArray(new RequestMatcher[0])))
.andRespond(withSuccess());
}

private void verifyGetGraphiteMeasurements()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testPrestoClientLoadMetrics()

List<Measurement> measurements = prestoClient.loadMetrics("test_query_id");

Map<String, String> attributes = Collections.singletonMap("scope", "prestoQuery");
Map<String, String> attributes = Collections.singletonMap("scope", "query");
assertThat(measurements).containsExactly(
Measurement.measurement("analysisTime", "MILLISECONDS", 21.07, attributes),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will break scripts (jupyther). I think it's fine, but needs to be announced

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not only scripts but our workflows as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean alerts?

Measurement.measurement("planningTime", "MILLISECONDS", 24.72, attributes),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@
*/
package io.trino.benchto.service;

import com.google.common.collect.ImmutableList;
import io.trino.benchto.service.model.AggregatedMeasurement;
import io.trino.benchto.service.model.BenchmarkRun;
import io.trino.benchto.service.model.BenchmarkRunExecution;
import io.trino.benchto.service.model.Environment;
import io.trino.benchto.service.model.Measurement;
import io.trino.benchto.service.model.MeasurementUnit;
import io.trino.benchto.service.model.Metric;
import io.trino.benchto.service.model.QueryInfo;
import io.trino.benchto.service.model.Status;
import io.trino.benchto.service.repo.BenchmarkRunRepo;
import io.trino.benchto.service.repo.MetricRepo;
import io.trino.benchto.service.rest.requests.FlatMeasurement;
import org.hibernate.Hibernate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,15 +41,18 @@
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.trino.benchto.service.model.Status.STARTED;
import static io.trino.benchto.service.utils.BenchmarkUniqueNameUtils.generateBenchmarkUniqueName;
import static io.trino.benchto.service.utils.TimeUtils.currentDateTime;
import static java.util.stream.Collectors.groupingBy;

@Service
public class BenchmarkService
Expand All @@ -54,6 +62,9 @@ public class BenchmarkService
@Autowired
private BenchmarkRunRepo benchmarkRunRepo;

@Autowired
private MetricRepo metricRepo;

@Autowired
private EnvironmentService environmentService;

Expand Down Expand Up @@ -83,10 +94,10 @@ public String startBenchmarkRun(String uniqueName, String name, String sequenceI

@Retryable(value = {TransientDataAccessException.class, DataIntegrityViolationException.class})
@Transactional
public void finishBenchmarkRun(String uniqueName, String sequenceId, Status status, Optional<Instant> endTime, List<Measurement> measurements, Map<String, String> attributes)
public void finishBenchmarkRun(String uniqueName, String sequenceId, Status status, Optional<Instant> endTime, List<FlatMeasurement> measurements, Map<String, String> attributes)
{
BenchmarkRun benchmarkRun = findBenchmarkRun(uniqueName, sequenceId);
benchmarkRun.getMeasurements().addAll(measurements);
benchmarkRun.getMeasurements().addAll(normalizeMeasurements(measurements));
benchmarkRun.getAttributes().putAll(attributes);
benchmarkRun.setEnded(fromInstantOrCurrentDateTime(endTime));
benchmarkRun.setStatus(status);
Expand Down Expand Up @@ -130,7 +141,7 @@ public void startExecution(String uniqueName, String benchmarkSequenceId, String
@Retryable(value = {TransientDataAccessException.class, DataIntegrityViolationException.class})
@Transactional
public void finishExecution(String uniqueName, String benchmarkSequenceId, String executionSequenceId, Status status,
Optional<Instant> endTime, List<Measurement> measurements, Map<String, String> attributes, String queryInfo)
Optional<Instant> endTime, List<FlatMeasurement> measurements, Map<String, String> attributes, String queryInfo)
{
BenchmarkRun benchmarkRun = findBenchmarkRun(uniqueName, benchmarkSequenceId);

Expand All @@ -140,7 +151,7 @@ public void finishExecution(String uniqueName, String benchmarkSequenceId, Strin

checkState(execution.getStatus() == STARTED, "Wrong execution status: %s", execution.getStatus());

execution.getMeasurements().addAll(measurements);
execution.getMeasurements().addAll(normalizeMeasurements(measurements));
execution.getAttributes().putAll(attributes);
execution.setEnded(fromInstantOrCurrentDateTime(endTime));
execution.setStatus(status);
Expand All @@ -159,6 +170,31 @@ public void finishExecution(String uniqueName, String benchmarkSequenceId, Strin
LOG.debug("Finishing execution - {}", execution);
}

private List<Measurement> normalizeMeasurements(List<FlatMeasurement> input)
nineinchnick marked this conversation as resolved.
Show resolved Hide resolved
{
// use a lookup map to avoid building a complex SQL query that compares attributes list
Map<Metric, Metric> metrics = new HashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a Set is more suited to what you're doing here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried using it initially but I had the issue where I need to save new metrics and update the reference in the set. I'm not sure how to do that.

metricRepo.findAll().forEach(metric -> metrics.put(metric, metric));
Map<Metric, List<FlatMeasurement>> groups = input.stream()
.collect(groupingBy(flatMeasurement -> new Metric(
flatMeasurement.getName(),
MeasurementUnit.valueOf(flatMeasurement.getUnit()),
flatMeasurement.getAttributes())));
ImmutableList.Builder<Measurement> result = ImmutableList.builder();
for (Map.Entry<Metric, List<FlatMeasurement>> entry : groups.entrySet()) {
Metric metric = metrics.get(entry.getKey());
if (metric == null) {
metric = metricRepo.save(entry.getKey());
metrics.put(metric, metric);
}
Metric finalMetric = metric;
result.addAll(entry.getValue().stream()
.map(flatMeasurement -> new Measurement(finalMetric, flatMeasurement.getValue()))
.collect(Collectors.toList()));
}
return result.build();
}

@Transactional
public BenchmarkRun findBenchmarkRun(String uniqueName, String sequenceId)
{
Expand Down
Loading