-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Normalize metrics from measurements #45
base: master
Are you sure you want to change the base?
Changes from 1 commit
23cd790
a75646c
87c507a
0f2f138
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
import io.trino.benchto.driver.service.Measurement; | ||
import org.springframework.stereotype.Component; | ||
|
||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.concurrent.CompletableFuture; | ||
|
||
|
@@ -35,7 +36,11 @@ public CompletableFuture<List<Measurement>> loadMeasurements(Measurable measurab | |
{ | ||
List<Measurement> measurements; | ||
if (shouldMeasureDuration(measurable)) { | ||
measurements = ImmutableList.of(measurement("duration", "MILLISECONDS", measurable.getQueryDuration().toMillis())); | ||
measurements = ImmutableList.of(measurement( | ||
"duration", | ||
"MILLISECONDS", | ||
measurable.getQueryDuration().toMillis(), | ||
Collections.singletonMap("scope", "driver"))); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use guava immutable collections (do we have guava as dependency?) |
||
} | ||
else { | ||
measurements = ImmutableList.of(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,9 +13,12 @@ | |
*/ | ||
package io.trino.benchto.driver; | ||
|
||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.google.common.collect.ImmutableList; | ||
import com.google.common.collect.ImmutableMap; | ||
import io.trino.benchto.driver.execution.ExecutionDriver; | ||
import io.trino.benchto.driver.macro.MacroService; | ||
import io.trino.benchto.driver.service.Measurement; | ||
import org.hamcrest.Matcher; | ||
import org.junit.Test; | ||
import org.mockito.ArgumentCaptor; | ||
|
@@ -24,9 +27,12 @@ | |
import org.springframework.test.util.ReflectionTestUtils; | ||
import org.springframework.test.web.client.RequestMatcher; | ||
|
||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
|
||
import static io.trino.benchto.driver.service.Measurement.measurement; | ||
import static org.assertj.core.api.Assertions.assertThat; | ||
import static org.hamcrest.Matchers.containsInAnyOrder; | ||
import static org.hamcrest.Matchers.containsString; | ||
|
@@ -50,8 +56,14 @@ public class DriverAppIntegrationTest | |
",{\"target\":\"network\",\"datapoints\":[[10, 10],[10, 10]]}" + | ||
",{\"target\":\"network_total\",\"datapoints\":[[10, 10],[10, 10]]}" + | ||
"]"; | ||
private static final List<String> GRAPHITE_MEASUREMENT_NAMES = ImmutableList.of( | ||
"cluster-memory_max", "cluster-memory_mean", "cluster-cpu_max", "cluster-cpu_mean", "cluster-network_max", "cluster-network_mean", "cluster-network_total"); | ||
private static final List<Measurement> GRAPHITE_MEASUREMENT_NAMES = ImmutableList.of( | ||
measurement("memory", "PERCENT", 0.0, ImmutableMap.of("scope", "cluster", "aggregate", "max")), | ||
measurement("memory", "PERCENT", 0.0, ImmutableMap.of("scope", "cluster", "aggregate", "mean")), | ||
measurement("cpu", "PERCENT", 0.0, ImmutableMap.of("scope", "cluster", "aggregate", "max")), | ||
measurement("cpu", "PERCENT", 0.0, ImmutableMap.of("scope", "cluster", "aggregate", "mean")), | ||
measurement("network", "BYTES", 0.0, ImmutableMap.of("scope", "cluster", "aggregate", "max")), | ||
measurement("network", "BYTES", 0.0, ImmutableMap.of("scope", "cluster", "aggregate", "mean")), | ||
measurement("network", "BYTES", 0.0, ImmutableMap.of("scope", "cluster", "aggregate", "total"))); | ||
|
||
private static final Matcher<String> ENDED_STATUS_MATCHER = is("ENDED"); | ||
|
||
|
@@ -89,11 +101,11 @@ public void testBenchmark() | |
@Test | ||
public void testConcurrentBenchmark() | ||
{ | ||
ImmutableList<String> concurrentQueryMeasurementName = ImmutableList.of("duration"); | ||
ImmutableList<String> concurrentBenchmarkMeasurementNames = ImmutableList.<String>builder() | ||
List<Measurement> concurrentQueryMeasurement = ImmutableList.of(measurement("duration", "MILLISECONDS", 0.0, ImmutableMap.of("scope", "driver"))); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like a factory method. Maybe something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, but I'd rather do this in a separate PR. |
||
List<Measurement> concurrentBenchmarkMeasurements = ImmutableList.<Measurement>builder() | ||
.addAll(GRAPHITE_MEASUREMENT_NAMES) | ||
.add("throughput") | ||
.add("duration") | ||
.add(measurement("throughput", "QUERY_PER_SECOND", 0.0, ImmutableMap.of("scope", "driver"))) | ||
.add(measurement("duration", "MILLISECONDS", 0.0, ImmutableMap.of("scope", "driver"))) | ||
.build(); | ||
|
||
setBenchmark("test_concurrent_benchmark"); | ||
|
@@ -104,10 +116,10 @@ public void testConcurrentBenchmark() | |
verifyBenchmarkStart("test_concurrent_benchmark", "test_concurrent_benchmark"); | ||
for (int i = preWarmRuns; i < allRuns; i++) { | ||
verifyExecutionStarted("test_concurrent_benchmark", i); | ||
verifyExecutionFinished("test_concurrent_benchmark", i, concurrentQueryMeasurementName); | ||
verifyExecutionFinished("test_concurrent_benchmark", i, concurrentQueryMeasurement); | ||
} | ||
verifyGetGraphiteMeasurements(); | ||
verifyBenchmarkFinish("test_concurrent_benchmark", concurrentBenchmarkMeasurementNames); | ||
verifyBenchmarkFinish("test_concurrent_benchmark", concurrentBenchmarkMeasurements); | ||
|
||
verifyComplete(allRuns); | ||
} | ||
|
@@ -146,14 +158,21 @@ private void verifyBenchmarkStart(String benchmarkName, String uniqueBenchmarkNa | |
)).andRespond(withSuccess()); | ||
} | ||
|
||
private void verifyBenchmarkFinish(String uniqueBenchmarkName, List<String> measurementNames) | ||
private void verifyBenchmarkFinish(String uniqueBenchmarkName, List<Measurement> measurements) | ||
{ | ||
restServiceServer.expect(matchAll( | ||
ObjectMapper mapper = new ObjectMapper(); | ||
List<RequestMatcher> matchers = Arrays.asList( | ||
requestTo("http://benchmark-service:8080/v1/benchmark/" + uniqueBenchmarkName + "/BEN_SEQ_ID/finish"), | ||
method(HttpMethod.POST), | ||
jsonPath("$.status", ENDED_STATUS_MATCHER), | ||
jsonPath("$.measurements.[*].name", containsInAnyOrder(measurementNames.toArray())) | ||
)).andRespond(withSuccess()); | ||
jsonPath("$.measurements[*]['name', 'unit', 'attributes']", containsInAnyOrder(measurements.stream().map(item -> { | ||
Map map = mapper.convertValue(item, Map.class); | ||
map.remove("value"); | ||
return map; | ||
}).toArray()))); | ||
restServiceServer | ||
.expect(matchAll(matchers.toArray(new RequestMatcher[0]))) | ||
.andRespond(withSuccess()); | ||
|
||
restServiceServer.expect(matchAll( | ||
requestTo("http://graphite:18088/events/"), | ||
|
@@ -166,13 +185,13 @@ private void verifyBenchmarkFinish(String uniqueBenchmarkName, List<String> meas | |
|
||
private void verifySerialExecution(String uniqueBenchmarkName, String queryName, int executionNumber) | ||
{ | ||
ImmutableList<String> serialQueryMeasurementNames = ImmutableList.<String>builder() | ||
List<Measurement> serialQueryMeasurements = ImmutableList.<Measurement>builder() | ||
.addAll(GRAPHITE_MEASUREMENT_NAMES) | ||
.add("duration") | ||
.add(measurement("duration", "MILLISECONDS", 0.0, ImmutableMap.of("scope", "driver"))) | ||
.build(); | ||
verifySerialExecutionStarted(uniqueBenchmarkName, queryName, executionNumber); | ||
verifyGetGraphiteMeasurements(); | ||
verifySerialExecutionFinished(uniqueBenchmarkName, queryName, executionNumber, serialQueryMeasurementNames); | ||
verifySerialExecutionFinished(uniqueBenchmarkName, queryName, executionNumber, serialQueryMeasurements); | ||
} | ||
|
||
private void verifySerialExecutionStarted(String uniqueBenchmarkName, String queryName, int executionNumber) | ||
|
@@ -196,9 +215,9 @@ private void verifyExecutionStarted(String benchmarkName, int executionNumber) | |
)).andRespond(withSuccess()); | ||
} | ||
|
||
private void verifySerialExecutionFinished(String uniqueBenchmarkName, String queryName, int executionNumber, List<String> measurementNames) | ||
private void verifySerialExecutionFinished(String uniqueBenchmarkName, String queryName, int executionNumber, List<Measurement> measurements) | ||
{ | ||
verifyExecutionFinished(uniqueBenchmarkName, executionNumber, measurementNames); | ||
verifyExecutionFinished(uniqueBenchmarkName, executionNumber, measurements); | ||
|
||
restServiceServer.expect(matchAll( | ||
requestTo("http://graphite:18088/events/"), | ||
|
@@ -209,14 +228,21 @@ private void verifySerialExecutionFinished(String uniqueBenchmarkName, String qu | |
)).andRespond(withSuccess()); | ||
} | ||
|
||
private void verifyExecutionFinished(String uniqueBenchmarkName, int executionNumber, List<String> measurementNames) | ||
private void verifyExecutionFinished(String uniqueBenchmarkName, int executionNumber, List<Measurement> measurements) | ||
{ | ||
restServiceServer.expect(matchAll( | ||
ObjectMapper mapper = new ObjectMapper(); | ||
List<RequestMatcher> matchers = Arrays.asList( | ||
requestTo("http://benchmark-service:8080/v1/benchmark/" + uniqueBenchmarkName + "/BEN_SEQ_ID/execution/" + executionNumber + "/finish"), | ||
method(HttpMethod.POST), | ||
jsonPath("$.status", ENDED_STATUS_MATCHER), | ||
jsonPath("$.measurements.[*].name", containsInAnyOrder(measurementNames.toArray())) | ||
)).andRespond(withSuccess()); | ||
jsonPath("$.measurements[*]['name', 'unit', 'attributes']", containsInAnyOrder(measurements.stream().map(item -> { | ||
Map map = mapper.convertValue(item, Map.class); | ||
map.remove("value"); | ||
return map; | ||
}).toArray()))); | ||
restServiceServer | ||
.expect(matchAll(matchers.toArray(new RequestMatcher[0]))) | ||
.andRespond(withSuccess()); | ||
} | ||
|
||
private void verifyGetGraphiteMeasurements() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,7 +46,7 @@ public void testPrestoClientLoadMetrics() | |
|
||
List<Measurement> measurements = prestoClient.loadMetrics("test_query_id"); | ||
|
||
Map<String, String> attributes = Collections.singletonMap("scope", "prestoQuery"); | ||
Map<String, String> attributes = Collections.singletonMap("scope", "query"); | ||
assertThat(measurements).containsExactly( | ||
Measurement.measurement("analysisTime", "MILLISECONDS", 21.07, attributes), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this will break scripts (jupyther). I think it's fine, but needs to be announced There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not only scripts but our workflows as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You mean alerts? |
||
Measurement.measurement("planningTime", "MILLISECONDS", 24.72, attributes), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,14 +13,19 @@ | |
*/ | ||
package io.trino.benchto.service; | ||
|
||
import com.google.common.collect.ImmutableList; | ||
import io.trino.benchto.service.model.AggregatedMeasurement; | ||
import io.trino.benchto.service.model.BenchmarkRun; | ||
import io.trino.benchto.service.model.BenchmarkRunExecution; | ||
import io.trino.benchto.service.model.Environment; | ||
import io.trino.benchto.service.model.Measurement; | ||
import io.trino.benchto.service.model.MeasurementUnit; | ||
import io.trino.benchto.service.model.Metric; | ||
import io.trino.benchto.service.model.QueryInfo; | ||
import io.trino.benchto.service.model.Status; | ||
import io.trino.benchto.service.repo.BenchmarkRunRepo; | ||
import io.trino.benchto.service.repo.MetricRepo; | ||
import io.trino.benchto.service.rest.requests.FlatMeasurement; | ||
import org.hibernate.Hibernate; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
@@ -36,15 +41,18 @@ | |
import java.time.Instant; | ||
import java.time.ZoneId; | ||
import java.time.ZonedDateTime; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.stream.Collectors; | ||
|
||
import static com.google.common.base.Preconditions.checkArgument; | ||
import static com.google.common.base.Preconditions.checkState; | ||
import static io.trino.benchto.service.model.Status.STARTED; | ||
import static io.trino.benchto.service.utils.BenchmarkUniqueNameUtils.generateBenchmarkUniqueName; | ||
import static io.trino.benchto.service.utils.TimeUtils.currentDateTime; | ||
import static java.util.stream.Collectors.groupingBy; | ||
|
||
@Service | ||
public class BenchmarkService | ||
|
@@ -54,6 +62,9 @@ public class BenchmarkService | |
@Autowired | ||
private BenchmarkRunRepo benchmarkRunRepo; | ||
|
||
@Autowired | ||
private MetricRepo metricRepo; | ||
|
||
@Autowired | ||
private EnvironmentService environmentService; | ||
|
||
|
@@ -83,10 +94,10 @@ public String startBenchmarkRun(String uniqueName, String name, String sequenceI | |
|
||
@Retryable(value = {TransientDataAccessException.class, DataIntegrityViolationException.class}) | ||
@Transactional | ||
public void finishBenchmarkRun(String uniqueName, String sequenceId, Status status, Optional<Instant> endTime, List<Measurement> measurements, Map<String, String> attributes) | ||
public void finishBenchmarkRun(String uniqueName, String sequenceId, Status status, Optional<Instant> endTime, List<FlatMeasurement> measurements, Map<String, String> attributes) | ||
{ | ||
BenchmarkRun benchmarkRun = findBenchmarkRun(uniqueName, sequenceId); | ||
benchmarkRun.getMeasurements().addAll(measurements); | ||
benchmarkRun.getMeasurements().addAll(normalizeMeasurements(measurements)); | ||
benchmarkRun.getAttributes().putAll(attributes); | ||
benchmarkRun.setEnded(fromInstantOrCurrentDateTime(endTime)); | ||
benchmarkRun.setStatus(status); | ||
|
@@ -130,7 +141,7 @@ public void startExecution(String uniqueName, String benchmarkSequenceId, String | |
@Retryable(value = {TransientDataAccessException.class, DataIntegrityViolationException.class}) | ||
@Transactional | ||
public void finishExecution(String uniqueName, String benchmarkSequenceId, String executionSequenceId, Status status, | ||
Optional<Instant> endTime, List<Measurement> measurements, Map<String, String> attributes, String queryInfo) | ||
Optional<Instant> endTime, List<FlatMeasurement> measurements, Map<String, String> attributes, String queryInfo) | ||
{ | ||
BenchmarkRun benchmarkRun = findBenchmarkRun(uniqueName, benchmarkSequenceId); | ||
|
||
|
@@ -140,7 +151,7 @@ public void finishExecution(String uniqueName, String benchmarkSequenceId, Strin | |
|
||
checkState(execution.getStatus() == STARTED, "Wrong execution status: %s", execution.getStatus()); | ||
|
||
execution.getMeasurements().addAll(measurements); | ||
execution.getMeasurements().addAll(normalizeMeasurements(measurements)); | ||
execution.getAttributes().putAll(attributes); | ||
execution.setEnded(fromInstantOrCurrentDateTime(endTime)); | ||
execution.setStatus(status); | ||
|
@@ -159,6 +170,31 @@ public void finishExecution(String uniqueName, String benchmarkSequenceId, Strin | |
LOG.debug("Finishing execution - {}", execution); | ||
} | ||
|
||
private List<Measurement> normalizeMeasurements(List<FlatMeasurement> input) | ||
nineinchnick marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
// use a lookup map to avoid building a complex SQL query that compares attributes list | ||
Map<Metric, Metric> metrics = new HashMap<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried using it initially but I had the issue where I need to save new metrics and update the reference in the set. I'm not sure how to do that. |
||
metricRepo.findAll().forEach(metric -> metrics.put(metric, metric)); | ||
Map<Metric, List<FlatMeasurement>> groups = input.stream() | ||
.collect(groupingBy(flatMeasurement -> new Metric( | ||
flatMeasurement.getName(), | ||
MeasurementUnit.valueOf(flatMeasurement.getUnit()), | ||
flatMeasurement.getAttributes()))); | ||
ImmutableList.Builder<Measurement> result = ImmutableList.builder(); | ||
for (Map.Entry<Metric, List<FlatMeasurement>> entry : groups.entrySet()) { | ||
Metric metric = metrics.get(entry.getKey()); | ||
if (metric == null) { | ||
metric = metricRepo.save(entry.getKey()); | ||
metrics.put(metric, metric); | ||
} | ||
Metric finalMetric = metric; | ||
result.addAll(entry.getValue().stream() | ||
.map(flatMeasurement -> new Measurement(finalMetric, flatMeasurement.getValue())) | ||
.collect(Collectors.toList())); | ||
} | ||
return result.build(); | ||
} | ||
|
||
@Transactional | ||
public BenchmarkRun findBenchmarkRun(String uniqueName, String sequenceId) | ||
{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add description in commit message