Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.exemplar;

import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import java.util.List;
import java.util.Random;
import java.util.function.Supplier;

/**
* An interface for constructing an appropriate ExemplarReservoir for a given metric "memory cell".
*/
public interface ExemplarReservoirFactory {
ExemplarReservoir<LongExemplarData> createLongExemplarReservoir();

ExemplarReservoir<DoubleExemplarData> createDoubleExemplarReservoir();

/** An exemplar reservoir that stores no exemplars. */
static ExemplarReservoirFactory noSamples() {
return new ExemplarReservoirFactory() {
@Override
public ExemplarReservoir<LongExemplarData> createLongExemplarReservoir() {
return NoopExemplarReservoir.LONG_INSTANCE;
}

@Override
public ExemplarReservoir<DoubleExemplarData> createDoubleExemplarReservoir() {
return NoopExemplarReservoir.DOUBLE_INSTANCE;
}

@Override
public String toString() {
return "noSamples";
}
};
}

/**
* A reservoir with fixed size that stores the given number of exemplars.
*
* @param clock The clock to use when annotating measurements with time.
* @param size The maximum number of exemplars to preserve.
* @param randomSupplier The random number generator to use for sampling.
*/
static ExemplarReservoirFactory fixedSize(
Clock clock, int size, Supplier<Random> randomSupplier) {
return new ExemplarReservoirFactory() {
@Override
public ExemplarReservoir<LongExemplarData> createLongExemplarReservoir() {
return RandomFixedSizeExemplarReservoir.createLong(clock, size, randomSupplier);
}

@Override
public ExemplarReservoir<DoubleExemplarData> createDoubleExemplarReservoir() {
return RandomFixedSizeExemplarReservoir.createDouble(clock, size, randomSupplier);
}

@Override
public String toString() {
return "fixedSize(" + size + ")";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see this split out into a class so we can provide a more conventional toString implementation:

Suggested change
return "fixedSize(" + size + ")";
return "FixedSizeReservoirFactory(" + size + ")";

}
};
}

/**
* A Reservoir sampler that preserves the latest seen measurement per-histogram bucket.
*
* @param clock The clock to use when annotating measurements with time.
* @param boundaries A list of (inclusive) upper bounds for the histogram. Should be in order from
* lowest to highest.
*/
static ExemplarReservoirFactory histogramBucket(Clock clock, List<Double> boundaries) {
return new ExemplarReservoirFactory() {
@Override
public ExemplarReservoir<LongExemplarData> createLongExemplarReservoir() {
throw new UnsupportedOperationException(
"Cannot create long exemplars for histogram buckets");
}

@Override
public ExemplarReservoir<DoubleExemplarData> createDoubleExemplarReservoir() {
return new HistogramExemplarReservoir(clock, boundaries);
}

@Override
public String toString() {
return "histogramBucket(" + boundaries + ")";
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.view;

import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory;

/**
* An interface which allows customized configuration of aggregators.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public interface AggregationExtension extends Aggregation, AggregatorFactory {
/** Override the exemplar reservoir used for this aggregation. */
AggregationExtension setExemplarReservoirFactory(ExemplarReservoirFactory reservoirFactory);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,40 @@
import io.opentelemetry.sdk.metrics.data.MetricDataType;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.aggregator.DoubleBase2ExponentialHistogramAggregator;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory;

/**
* Exponential bucket histogram aggregation configuration.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class Base2ExponentialHistogramAggregation implements Aggregation, AggregatorFactory {
public final class Base2ExponentialHistogramAggregation implements AggregationExtension {

private static final int DEFAULT_MAX_BUCKETS = 160;
private static final int DEFAULT_MAX_SCALE = 20;

private static final ExemplarReservoirFactory DEFAULT_RESERVOIR =
ExemplarReservoirFactory.fixedSize(
Clock.getDefault(),
Runtime.getRuntime().availableProcessors(),
RandomSupplier.platformDefault());
private static final Aggregation DEFAULT =
new Base2ExponentialHistogramAggregation(DEFAULT_MAX_BUCKETS, DEFAULT_MAX_SCALE);
new Base2ExponentialHistogramAggregation(
DEFAULT_MAX_BUCKETS, DEFAULT_MAX_SCALE, DEFAULT_RESERVOIR);

private final int maxBuckets;
private final int maxScale;
private final ExemplarReservoirFactory reservoirFactory;

private Base2ExponentialHistogramAggregation(int maxBuckets, int maxScale) {
private Base2ExponentialHistogramAggregation(
int maxBuckets, int maxScale, ExemplarReservoirFactory reservoirFactory) {
this.maxBuckets = maxBuckets;
this.maxScale = maxScale;
this.reservoirFactory = reservoirFactory;
}

public static Aggregation getDefault() {
Expand All @@ -60,7 +68,7 @@ public static Aggregation getDefault() {
public static Aggregation create(int maxBuckets, int maxScale) {
checkArgument(maxBuckets >= 1, "maxBuckets must be > 0");
checkArgument(maxScale <= 20 && maxScale >= -10, "maxScale must be -10 <= x <= 20");
return new Base2ExponentialHistogramAggregation(maxBuckets, maxScale);
return new Base2ExponentialHistogramAggregation(maxBuckets, maxScale, DEFAULT_RESERVOIR);
}

@Override
Expand All @@ -71,11 +79,7 @@ public <T extends PointData, U extends ExemplarData> Aggregator<T, U> createAggr
new DoubleBase2ExponentialHistogramAggregator(
() ->
ExemplarReservoir.filtered(
exemplarFilter,
ExemplarReservoir.doubleFixedSizeReservoir(
Clock.getDefault(),
Runtime.getRuntime().availableProcessors(),
RandomSupplier.platformDefault())),
exemplarFilter, reservoirFactory.createDoubleExemplarReservoir()),
maxBuckets,
maxScale);
}
Expand All @@ -99,4 +103,11 @@ public String toString() {
+ maxScale
+ "}";
}

@Override
public AggregationExtension setExemplarReservoirFactory(
ExemplarReservoirFactory reservoirFactory) {
return new Base2ExponentialHistogramAggregation(
this.maxBuckets, this.maxScale, reservoirFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,22 @@
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/**
* Aggregation that selects the specified default based on instrument.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class DefaultAggregation implements Aggregation, AggregatorFactory {
public final class DefaultAggregation implements AggregationExtension {

private static final Aggregation INSTANCE = new DefaultAggregation();
private static final Aggregation INSTANCE = new DefaultAggregation(null);

public static Aggregation getInstance() {
return INSTANCE;
Expand All @@ -33,44 +34,60 @@ public static Aggregation getInstance() {
private static final ThrottlingLogger logger =
new ThrottlingLogger(Logger.getLogger(DefaultAggregation.class.getName()));

private DefaultAggregation() {}
@Nullable private final ExemplarReservoirFactory reservoirFactory;

private static Aggregation resolve(InstrumentDescriptor instrument, boolean withAdvice) {
private DefaultAggregation(@Nullable ExemplarReservoirFactory reservoirFactory) {
this.reservoirFactory = reservoirFactory;
}

private static AggregationExtension resolve(InstrumentDescriptor instrument, boolean withAdvice) {
switch (instrument.getType()) {
case COUNTER:
case UP_DOWN_COUNTER:
case OBSERVABLE_COUNTER:
case OBSERVABLE_UP_DOWN_COUNTER:
return SumAggregation.getInstance();
return (AggregationExtension) SumAggregation.getInstance();
case HISTOGRAM:
if (withAdvice && instrument.getAdvice().getExplicitBucketBoundaries() != null) {
return ExplicitBucketHistogramAggregation.create(
instrument.getAdvice().getExplicitBucketBoundaries());
return (AggregationExtension)
ExplicitBucketHistogramAggregation.create(
instrument.getAdvice().getExplicitBucketBoundaries());
}
return ExplicitBucketHistogramAggregation.getDefault();
return (AggregationExtension) ExplicitBucketHistogramAggregation.getDefault();
case OBSERVABLE_GAUGE:
return LastValueAggregation.getInstance();
return (AggregationExtension) LastValueAggregation.getInstance();
}
logger.log(Level.WARNING, "Unable to find default aggregation for instrument: " + instrument);
return DropAggregation.getInstance();
return (AggregationExtension) DropAggregation.getInstance();
}

@Override
public <T extends PointData, U extends ExemplarData> Aggregator<T, U> createAggregator(
InstrumentDescriptor instrumentDescriptor, ExemplarFilter exemplarFilter) {
return ((AggregatorFactory) resolve(instrumentDescriptor, /* withAdvice= */ true))
if (this.reservoirFactory != null) {
return resolve(instrumentDescriptor, /* withAdvice= */ true)
.setExemplarReservoirFactory(this.reservoirFactory)
.createAggregator(instrumentDescriptor, exemplarFilter);
}
return resolve(instrumentDescriptor, /* withAdvice= */ true)
.createAggregator(instrumentDescriptor, exemplarFilter);
}

@Override
public boolean isCompatibleWithInstrument(InstrumentDescriptor instrumentDescriptor) {
// This should always return true
return ((AggregatorFactory) resolve(instrumentDescriptor, /* withAdvice= */ false))
return resolve(instrumentDescriptor, /* withAdvice= */ false)
.isCompatibleWithInstrument(instrumentDescriptor);
}

@Override
public String toString() {
return "DefaultAggregation";
}

@Override
public AggregationExtension setExemplarReservoirFactory(
ExemplarReservoirFactory reservoirFactory) {
return new DefaultAggregation(reservoirFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory;

/**
* Configuration representing no aggregation.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class DropAggregation implements Aggregation, AggregatorFactory {
public final class DropAggregation implements AggregationExtension {

private static final Aggregation INSTANCE = new DropAggregation();

Expand All @@ -45,4 +45,10 @@ public boolean isCompatibleWithInstrument(InstrumentDescriptor instrumentDescrip
public String toString() {
return "DropAggregation";
}

@Override
public AggregationExtension setExemplarReservoirFactory(
ExemplarReservoirFactory reservoirFactory) {
throw new UnsupportedOperationException("DropAggregation does not allow exemplars");
}
}
Loading