Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add capability to define any impl of OTLP metrics sending #5691

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2022 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.registry.otlp;

import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;

public interface OltpMetricsSender {

void send(ExportMetricsServiceRequest request);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2022 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.registry.otlp;

import io.micrometer.common.util.internal.logging.InternalLogger;
import io.micrometer.common.util.internal.logging.InternalLoggerFactory;
import io.micrometer.core.ipc.http.HttpSender;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;

public class OtlpHttpMetricsSender implements OltpMetricsSender {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(OtlpHttpMetricsSender.class);

// VisibleForTesting
final HttpSender httpSender;

private final OtlpConfig config;

private final String userAgentHeader;

public OtlpHttpMetricsSender(HttpSender httpSender, OtlpConfig config) {
this.httpSender = httpSender;
this.config = config;
this.userAgentHeader = getUserAgentHeader();
}

@Override
public void send(ExportMetricsServiceRequest request) {
HttpSender.Request.Builder httpRequest = this.httpSender.post(config.url())
.withHeader("User-Agent", userAgentHeader)
.withContent("application/x-protobuf", request.toByteArray());
config.headers().forEach(httpRequest::withHeader);
try {
HttpSender.Response response = httpRequest.send();
if (!response.isSuccessful()) {
logger.warn(
"Failed to publish metrics (context: {}). Server responded with HTTP status code {} and body {}",
getConfigurationContext(), response.code(), response.body());
}
}
catch (Throwable e) {
logger.warn("Failed to publish metrics (context: {}) ", getConfigurationContext(), e);
}
}

private String getUserAgentHeader() {
String plainExporter = "Micrometer-OTLP-Exporter-Java";
if (OtlpMeterRegistry.class.getPackage().getImplementationVersion() == null) {
return plainExporter;
}
return plainExporter + "/" + OtlpMeterRegistry.class.getPackage().getImplementationVersion();
}

/**
* Get the configuration context.
* @return A message containing enough information for the log reader to figure out
* what configuration details may have contributed to the failure.
*/
private String getConfigurationContext() {
// While other values may contribute to failures, these two are most common
return "url=" + config.url() + ", resource-attributes=" + config.resourceAttributes();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
import io.micrometer.common.lang.Nullable;
import io.micrometer.common.util.internal.logging.InternalLogger;
import io.micrometer.common.util.internal.logging.InternalLoggerFactory;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.config.NamingConvention;
import io.micrometer.core.instrument.distribution.*;
import io.micrometer.core.instrument.distribution.Histogram;
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
import io.micrometer.core.instrument.internal.DefaultGauge;
import io.micrometer.core.instrument.internal.DefaultLongTaskTimer;
Expand All @@ -36,14 +34,14 @@
import io.micrometer.core.instrument.util.MeterPartition;
import io.micrometer.core.instrument.util.NamedThreadFactory;
import io.micrometer.core.instrument.util.TimeUtils;
import io.micrometer.core.ipc.http.HttpSender;
import io.micrometer.core.ipc.http.HttpUrlConnectionSender;
import io.micrometer.registry.otlp.internal.CumulativeBase2ExponentialHistogram;
import io.micrometer.registry.otlp.internal.DeltaBase2ExponentialHistogram;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.*;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
import io.opentelemetry.proto.resource.v1.Resource;

import java.time.Duration;
Expand All @@ -56,8 +54,7 @@
import java.util.function.ToLongFunction;

/**
* Publishes meters in OTLP (OpenTelemetry Protocol) format. HTTP with Protobuf encoding
* is the only option currently supported.
* Publishes meters in OTLP (OpenTelemetry Protocol) format.
*
* @author Tommy Ludwig
* @author Lenin Jaganathan
Expand All @@ -83,16 +80,14 @@ public class OtlpMeterRegistry extends PushMeterRegistry {

private final OtlpConfig config;

private final HttpSender httpSender;
private final OltpMetricsSender metricsSender;

private final Resource resource;

private final AggregationTemporality aggregationTemporality;

private final TimeUnit baseTimeUnit;

private final String userAgentHeader;

// Time when the last scheduled rollOver has started. Applicable only for delta
// flavour.
private volatile long lastMeterRolloverStartTime = -1;
Expand All @@ -109,27 +104,32 @@ public OtlpMeterRegistry(OtlpConfig config, Clock clock) {
}

/**
* Create an {@code OtlpMeterRegistry} instance.
* Create an {@code OtlpMeterRegistry} instance with an HTTP metrics sender.
* @param config config
* @param clock clock
* @param threadFactory thread factory
* @since 1.14.0
*/
public OtlpMeterRegistry(OtlpConfig config, Clock clock, ThreadFactory threadFactory) {
this(config, clock, threadFactory, new HttpUrlConnectionSender());
this(config, clock, threadFactory, new OtlpHttpMetricsSender(new HttpUrlConnectionSender(), config));
}

// VisibleForTesting
// not public until we decide what we want to expose in public API
// HttpSender may not be a good idea if we will support a non-HTTP transport
OtlpMeterRegistry(OtlpConfig config, Clock clock, ThreadFactory threadFactory, HttpSender httpSender) {
/**
* Create an {@code OtlpMeterRegistry} instance.
* @param config config
* @param clock clock
* @param threadFactory thread factory
* @param metricsSender metrics sender
* @since 1.14.0
*/
public OtlpMeterRegistry(OtlpConfig config, Clock clock, ThreadFactory threadFactory,
OltpMetricsSender metricsSender) {
super(config, clock);
this.config = config;
this.baseTimeUnit = config.baseTimeUnit();
this.httpSender = httpSender;
this.metricsSender = metricsSender;
this.resource = Resource.newBuilder().addAllAttributes(getResourceAttributes()).build();
this.aggregationTemporality = config.aggregationTemporality();
this.userAgentHeader = getUserAgentHeader();
config().namingConvention(NamingConvention.dot);
start(threadFactory);
}
Expand Down Expand Up @@ -178,34 +178,15 @@ protected void publish() {
.build())
.build())
.build();
HttpSender.Request.Builder httpRequest = this.httpSender.post(this.config.url())
.withHeader("User-Agent", this.userAgentHeader)
.withContent("application/x-protobuf", request.toByteArray());
this.config.headers().forEach(httpRequest::withHeader);
HttpSender.Response response = httpRequest.send();
if (!response.isSuccessful()) {
logger.warn(
"Failed to publish metrics (context: {}). Server responded with HTTP status code {} and body {}",
getConfigurationContext(), response.code(), response.body());
}

metricsSender.send(request);
}
catch (Throwable e) {
logger.warn(String.format("Failed to publish metrics to OTLP receiver (context: %s)",
getConfigurationContext()), e);
logger.warn("Failed to publish metrics to OTLP receiver", e);
}
}
}

/**
* Get the configuration context.
* @return A message containing enough information for the log reader to figure out
* what configuration details may have contributed to the failure.
*/
private String getConfigurationContext() {
// While other values may contribute to failures, these two are most common
return "url=" + config.url() + ", resource-attributes=" + config.resourceAttributes();
}

@Override
protected <T> Gauge newGauge(Meter.Id id, @Nullable T obj, ToDoubleFunction<T> valueFunction) {
return new DefaultGauge<>(id, obj, valueFunction);
Expand Down Expand Up @@ -390,12 +371,12 @@ Iterable<KeyValue> getResourceAttributes() {
}

static Histogram getHistogram(Clock clock, DistributionStatisticConfig distributionStatisticConfig,
OtlpConfig otlpConfig) {
OtlpConfig otlpConfig) {
return getHistogram(clock, distributionStatisticConfig, otlpConfig, null);
}

static Histogram getHistogram(final Clock clock, final DistributionStatisticConfig distributionStatisticConfig,
final OtlpConfig otlpConfig, @Nullable final TimeUnit baseTimeUnit) {
final OtlpConfig otlpConfig, @Nullable final TimeUnit baseTimeUnit) {
// While publishing to OTLP, we export either Histogram datapoint (Explicit
// ExponentialBuckets
// or Exponential) / Summary
Expand All @@ -411,14 +392,14 @@ static Histogram getHistogram(final Clock clock, final DistributionStatisticConf
}

return otlpConfig.aggregationTemporality() == AggregationTemporality.DELTA
? new DeltaBase2ExponentialHistogram(otlpConfig.maxScale(), otlpConfig.maxBucketCount(),
minimumExpectedValue, baseTimeUnit, clock, otlpConfig.step().toMillis())
: new CumulativeBase2ExponentialHistogram(otlpConfig.maxScale(), otlpConfig.maxBucketCount(),
minimumExpectedValue, baseTimeUnit);
? new DeltaBase2ExponentialHistogram(otlpConfig.maxScale(), otlpConfig.maxBucketCount(),
minimumExpectedValue, baseTimeUnit, clock, otlpConfig.step().toMillis())
: new CumulativeBase2ExponentialHistogram(otlpConfig.maxScale(), otlpConfig.maxBucketCount(),
minimumExpectedValue, baseTimeUnit);
}

Histogram explicitBucketHistogram = getExplicitBucketHistogram(clock, distributionStatisticConfig,
otlpConfig.aggregationTemporality(), otlpConfig.step().toMillis());
otlpConfig.aggregationTemporality(), otlpConfig.step().toMillis());
if (explicitBucketHistogram != null) {
return explicitBucketHistogram;
}
Expand All @@ -431,22 +412,22 @@ static Histogram getHistogram(final Clock clock, final DistributionStatisticConf
}

static HistogramFlavor histogramFlavor(HistogramFlavor preferredHistogramFlavor,
DistributionStatisticConfig distributionStatisticConfig) {
DistributionStatisticConfig distributionStatisticConfig) {

final double[] serviceLevelObjectiveBoundaries = distributionStatisticConfig
.getServiceLevelObjectiveBoundaries();
if (distributionStatisticConfig.isPublishingHistogram()
&& preferredHistogramFlavor == HistogramFlavor.BASE2_EXPONENTIAL_BUCKET_HISTOGRAM
&& (serviceLevelObjectiveBoundaries == null || serviceLevelObjectiveBoundaries.length == 0)) {
&& preferredHistogramFlavor == HistogramFlavor.BASE2_EXPONENTIAL_BUCKET_HISTOGRAM
&& (serviceLevelObjectiveBoundaries == null || serviceLevelObjectiveBoundaries.length == 0)) {
return HistogramFlavor.BASE2_EXPONENTIAL_BUCKET_HISTOGRAM;
}
return HistogramFlavor.EXPLICIT_BUCKET_HISTOGRAM;
}

@Nullable
private static Histogram getExplicitBucketHistogram(final Clock clock,
final DistributionStatisticConfig distributionStatisticConfig,
final AggregationTemporality aggregationTemporality, final long stepMillis) {
final DistributionStatisticConfig distributionStatisticConfig,
final AggregationTemporality aggregationTemporality, final long stepMillis) {

double[] sloWithPositiveInf = getSloWithPositiveInf(distributionStatisticConfig);
if (AggregationTemporality.isCumulative(aggregationTemporality)) {
Expand All @@ -461,11 +442,11 @@ private static Histogram getExplicitBucketHistogram(final Clock clock,
}
if (AggregationTemporality.isDelta(aggregationTemporality) && stepMillis > 0) {
return new OtlpStepBucketHistogram(clock, stepMillis,
DistributionStatisticConfig.builder()
.serviceLevelObjectives(sloWithPositiveInf)
.build()
.merge(distributionStatisticConfig),
true, false);
DistributionStatisticConfig.builder()
.serviceLevelObjectives(sloWithPositiveInf)
.build()
.merge(distributionStatisticConfig),
true, false);
}

return null;
Expand All @@ -492,11 +473,4 @@ static double[] getSloWithPositiveInf(DistributionStatisticConfig distributionSt
return sloWithPositiveInf;
}

private String getUserAgentHeader() {
if (this.getClass().getPackage().getImplementationVersion() == null) {
return "Micrometer-OTLP-Exporter-Java";
}
return "Micrometer-OTLP-Exporter-Java/" + this.getClass().getPackage().getImplementationVersion();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,23 @@ abstract class OtlpMeterRegistryTest {

protected MockClock clock;

private HttpSender mockHttpSender;

OtlpMeterRegistry registry;

OtlpMeterRegistry registryWithExponentialHistogram;

private OtlpHttpMetricsSender metricsSender;

abstract OtlpConfig otlpConfig();

abstract OtlpConfig exponentialHistogramOtlpConfig();

@BeforeEach
void setUp() {
this.clock = new MockClock();
this.mockHttpSender = mock(HttpSender.class);
this.registry = new OtlpMeterRegistry(otlpConfig(), this.clock,
new NamedThreadFactory("otlp-metrics-publisher"), this.mockHttpSender);
OtlpConfig config = otlpConfig();
this.metricsSender = new OtlpHttpMetricsSender(mock(HttpSender.class), config);
this.registry = new OtlpMeterRegistry(config, this.clock, new NamedThreadFactory("otlp-metrics-publisher"),
metricsSender);
this.registryWithExponentialHistogram = new OtlpMeterRegistry(exponentialHistogramOtlpConfig(), clock);
}

Expand Down Expand Up @@ -148,15 +149,16 @@ void timeGauge() {
@Issue("#5577")
@Test
void httpHeaders() throws Throwable {
HttpSender.Request.Builder builder = HttpSender.Request.build(otlpConfig().url(), this.mockHttpSender);
when(mockHttpSender.post(otlpConfig().url())).thenReturn(builder);
HttpSender.Request.Builder builder = HttpSender.Request.build(otlpConfig().url(),
this.metricsSender.httpSender);
when(metricsSender.httpSender.post(otlpConfig().url())).thenReturn(builder);

when(mockHttpSender.send(isA(HttpSender.Request.class))).thenReturn(new HttpSender.Response(200, ""));
when(metricsSender.httpSender.send(isA(HttpSender.Request.class))).thenReturn(new HttpSender.Response(200, ""));

writeToMetric(TimeGauge.builder("gauge.time", this, TimeUnit.MICROSECONDS, o -> 24).register(registry));
registry.publish();

verify(this.mockHttpSender).send(assertArg(request -> {
verify(this.metricsSender.httpSender).send(assertArg(request -> {
assertThat(request.getRequestHeaders().get("User-Agent")).startsWith("Micrometer-OTLP-Exporter-Java");
assertThat(request.getRequestHeaders()).containsEntry("Content-Type", "application/x-protobuf");
}));
Expand Down