Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of DefaultLongTaskTimer#takeSnapshot #4001

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@
import io.micrometer.core.instrument.util.TimeUtils;

import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class DefaultLongTaskTimer extends AbstractMeter implements LongTaskTimer {
Expand All @@ -58,6 +56,8 @@ public class DefaultLongTaskTimer extends AbstractMeter implements LongTaskTimer

private final boolean supportsAggregablePercentiles;

private final double[] percentiles;

/**
* Create a {@code DefaultLongTaskTimer} instance.
* @param id ID
Expand Down Expand Up @@ -87,6 +87,17 @@ public DefaultLongTaskTimer(Id id, Clock clock, TimeUnit baseTimeUnit,
this.baseTimeUnit = baseTimeUnit;
this.distributionStatisticConfig = distributionStatisticConfig;
this.supportsAggregablePercentiles = supportsAggregablePercentiles;

double[] percentilesFromConfig = distributionStatisticConfig.getPercentiles();

if (percentilesFromConfig == null) {
this.percentiles = new double[0];
}
else {
this.percentiles = new double[percentilesFromConfig.length];
System.arraycopy(percentilesFromConfig, 0, this.percentiles, 0, percentilesFromConfig.length);
Arrays.sort(this.percentiles);
}
}

@Override
Expand Down Expand Up @@ -126,65 +137,80 @@ public TimeUnit baseTimeUnit() {
return baseTimeUnit;
}

private int percentilesInterpolatableLine(int activeTasksNumber) {
double lineValue = activeTasksNumber * 1.0 / (activeTasksNumber + 1);
for (int i = percentiles.length - 1; i >= 0; --i) {
if (percentiles[i] < lineValue) {
return i + 1;
}
}

return 0;
}

private Snapshot activeTaskDurationsSnapshot() {
Snapshot snapshot = new Snapshot(activeTasks.size() + 1);

StreamSupport.stream(((Iterable<SampleImpl>) activeTasks::descendingIterator).spliterator(), false)
.sequential()
.forEach(task -> {
double duration = task.duration(TimeUnit.NANOSECONDS);
if (duration < 0) {
return;
}
snapshot.append(duration);
});

return snapshot;
}

@Override
public HistogramSnapshot takeSnapshot() {
Queue<Double> percentilesRequested = new ArrayBlockingQueue<>(
distributionStatisticConfig.getPercentiles() == null ? 1
: distributionStatisticConfig.getPercentiles().length);
double[] percentilesRequestedArr = distributionStatisticConfig.getPercentiles();
if (percentilesRequestedArr != null && percentilesRequestedArr.length > 0) {
Arrays.stream(percentilesRequestedArr).sorted().boxed().forEach(percentilesRequested::add);
}
Snapshot snapshot = activeTaskDurationsSnapshot();

NavigableSet<Double> buckets = distributionStatisticConfig.getHistogramBuckets(supportsAggregablePercentiles);

CountAtBucket[] countAtBucketsArr = new CountAtBucket[0];

List<Double> percentilesAboveInterpolatableLine = percentilesRequested.stream()
.filter(p -> p * (activeTasks.size() + 1) > activeTasks.size())
.collect(Collectors.toList());
int percentilesInterpolatableLine = percentilesInterpolatableLine(snapshot.size());

percentilesRequested.removeAll(percentilesAboveInterpolatableLine);
ValueAtPercentile[] valueAtPercentiles = new ValueAtPercentile[percentiles.length];

List<ValueAtPercentile> valueAtPercentiles = new ArrayList<>(percentilesRequested.size());

if (!percentilesRequested.isEmpty() || !buckets.isEmpty()) {
Double percentile = percentilesRequested.poll();
if (percentilesInterpolatableLine > 0 || !buckets.isEmpty()) {
int currentPercentileIdx = 0;
Double bucket = buckets.pollFirst();

List<CountAtBucket> countAtBuckets = new ArrayList<>(buckets.size());

Double priorActiveTaskDuration = null;
double priorActiveTaskDuration = -1;
int count = 0;

// Make snapshot of active task durations
List<Double> youngestToOldestDurations = StreamSupport
.stream(((Iterable<SampleImpl>) activeTasks::descendingIterator).spliterator(), false)
.sequential()
.map(task -> task.duration(TimeUnit.NANOSECONDS))
.collect(Collectors.toList());
for (Double activeTaskDuration : youngestToOldestDurations) {
double[] youngestToOldestDurations = snapshot.durations();
for (int currentTaskDurationIdx = 0; currentTaskDurationIdx < snapshot.size(); ++currentTaskDurationIdx) {
double activeTaskDuration = youngestToOldestDurations[currentTaskDurationIdx];

while (bucket != null && activeTaskDuration > bucket) {
countAtBuckets.add(new CountAtBucket(bucket, count));
bucket = buckets.pollFirst();
}
count++;

if (percentile != null) {
double rank = percentile * (activeTasks.size() + 1);
if (currentPercentileIdx < percentilesInterpolatableLine) {
double percentile = percentiles[currentPercentileIdx];
double rank = percentile * (snapshot.size() + 1);

if (count >= rank) {
double percentileValue = activeTaskDuration;
if (count != rank && priorActiveTaskDuration != null) {
if (count != rank && priorActiveTaskDuration >= 0) {
// interpolate the percentile value when the active task rank
// is non-integral
double priorPercentileValue = priorActiveTaskDuration;
percentileValue = priorPercentileValue
+ ((percentileValue - priorPercentileValue) * (rank - (int) rank));
}

valueAtPercentiles.add(new ValueAtPercentile(percentile, percentileValue));
percentile = percentilesRequested.poll();
valueAtPercentiles[currentPercentileIdx] = new ValueAtPercentile(percentile, percentileValue);
++currentPercentileIdx;
}
}

Expand All @@ -200,21 +226,70 @@ public HistogramSnapshot takeSnapshot() {
countAtBucketsArr = countAtBuckets.toArray(countAtBucketsArr);
}

double duration = duration(TimeUnit.NANOSECONDS);
double max = max(TimeUnit.NANOSECONDS);
double duration = snapshot.total();
double max = snapshot.max();

// we wouldn't need to iterate over all the active tasks just to calculate the
// 100th percentile, which is just the max.
for (Double percentile : percentilesAboveInterpolatableLine) {
valueAtPercentiles.add(new ValueAtPercentile(percentile, max));
for (int currentPercentileIdx = percentilesInterpolatableLine; currentPercentileIdx < percentiles.length; ++currentPercentileIdx) {
valueAtPercentiles[currentPercentileIdx] = new ValueAtPercentile(percentiles[currentPercentileIdx], max);
}

ValueAtPercentile[] valueAtPercentilesArr = valueAtPercentiles.toArray(new ValueAtPercentile[0]);

return new HistogramSnapshot(activeTasks.size(), duration, max, valueAtPercentilesArr, countAtBucketsArr,
return new HistogramSnapshot(activeTasks.size(), duration, max, valueAtPercentiles, countAtBucketsArr,
(ps, scaling) -> ps.print("Summary output for LongTaskTimer histograms is not supported."));
}

static final class Snapshot {

private double[] durations;

private int size;

private double max;

private double total;

Snapshot(int initialCapacity) {
this.size = 0;
this.durations = new double[initialCapacity];
this.max = 0.0;
this.total = 0.0;
}

public void append(double duration) {
if (size >= durations.length) {
double[] newDurations = new double[(durations.length + 1) * 2];
System.arraycopy(durations, 0, newDurations, 0, durations.length);
durations = newDurations;
}

if (duration > max) {
max = duration;
}
total += duration;

durations[size] = duration;
++size;
}

public double[] durations() {
return durations;
}

public int size() {
return size;
}

public double max() {
return max;
}

public double total() {
return total;
}

}

class SampleImpl extends Sample {

private final long startTime;
Expand Down