Skip to content

Commit

Permalink
Add OpenTelemetry attributes to the metrics backend (#2327)
Browse files Browse the repository at this point in the history
  • Loading branch information
varshith257 authored Oct 30, 2024
1 parent 8d93049 commit 53b6b89
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package sttp.client4.opentelemetry

import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.common.{AttributeKey, Attributes}
import io.opentelemetry.api.metrics.{DoubleHistogram, LongCounter, LongUpDownCounter, Meter}
import sttp.client4.listener.{ListenerBackend, RequestListener}
import sttp.client4.wrappers.FollowRedirectsBackend
Expand Down Expand Up @@ -108,44 +108,60 @@ private class OpenTelemetryMetricsListener(
private val upAndDownCounter = new ConcurrentHashMap[String, LongUpDownCounter]()

override def beforeRequest(request: GenericRequest[_, _]): Option[Long] = {
updateInProgressCounter(request, 1)
recordHistogram(requestToSizeHistogramMapper(request), request.contentLength)
requestToLatencyHistogramMapper(request).map(_ => clock.millis())
val attributes = createRequestAttributes(request)

updateInProgressCounter(request, 1, attributes)
recordHistogram(requestToSizeHistogramMapper(request), request.contentLength, attributes)
requestToLatencyHistogramMapper(request).map { _ =>
val timestamp = clock.millis()
timestamp
}
}

override def requestSuccessful(request: GenericRequest[_, _], response: Response[_], tag: Option[Long]): Unit = {
val requestAttributes = createRequestAttributes(request)
val responseAttributes = createResponseAttributes(response)

val combinedAttributes = requestAttributes.toBuilder().putAll(responseAttributes).build()

if (response.isSuccess) {
incrementCounter(responseToSuccessCounterMapper(response))
incrementCounter(responseToSuccessCounterMapper(response), combinedAttributes)
} else {
incrementCounter(requestToErrorCounterMapper(response))
incrementCounter(requestToErrorCounterMapper(response), combinedAttributes)
}
recordHistogram(responseToSizeHistogramMapper(response), response.contentLength)
recordHistogram(requestToLatencyHistogramMapper(request), tag.map(clock.millis() - _))
updateInProgressCounter(request, -1)

recordHistogram(responseToSizeHistogramMapper(response), response.contentLength, combinedAttributes)
recordHistogram(requestToLatencyHistogramMapper(request), tag.map(clock.millis() - _), combinedAttributes)
updateInProgressCounter(request, -1, requestAttributes)
}

override def requestException(request: GenericRequest[_, _], tag: Option[Long], e: Exception): Unit =
override def requestException(request: GenericRequest[_, _], tag: Option[Long], e: Exception): Unit = {
val requestAttributes = createRequestAttributes(request)
val errorAttributes = createErrorAttributes(e)

HttpError.find(e) match {
case Some(HttpError(body, statusCode)) =>
requestSuccessful(request, Response(body, statusCode, request.onlyMetadata), tag)
case _ =>
incrementCounter(requestToFailureCounterMapper(request, e))
recordHistogram(requestToLatencyHistogramMapper(request), tag.map(clock.millis() - _))
updateInProgressCounter(request, -1)
incrementCounter(requestToFailureCounterMapper(request, e), errorAttributes)
recordHistogram(requestToLatencyHistogramMapper(request), tag.map(clock.millis() - _), errorAttributes)
updateInProgressCounter(request, -1, requestAttributes)
}
}

private def updateInProgressCounter[R, T](request: GenericRequest[T, R], delta: Long): Unit =
private def updateInProgressCounter[R, T](request: GenericRequest[T, R], delta: Long, attributes: Attributes): Unit =
requestToInProgressCounterMapper(request)
.foreach(config =>
getOrCreateMetric(upAndDownCounter, config, createNewUpDownCounter).add(delta, config.attributes)
)
.foreach(config => getOrCreateMetric(upAndDownCounter, config, createNewUpDownCounter).add(delta, attributes))

private def recordHistogram(config: Option[HistogramCollectorConfig], size: Option[Long]): Unit = config.foreach {
cfg =>
getOrCreateHistogram(histograms, cfg, createNewHistogram).record(size.getOrElse(0L).toDouble, cfg.attributes)
private def recordHistogram(
config: Option[HistogramCollectorConfig],
size: Option[Long],
attributes: Attributes
): Unit = config.foreach { cfg =>
getOrCreateHistogram(histograms, cfg, createNewHistogram).record(size.getOrElse(0L).toDouble, attributes)
}

private def incrementCounter(collectorConfig: Option[CollectorConfig]): Unit =
private def incrementCounter(collectorConfig: Option[CollectorConfig], attributes: Attributes): Unit =
collectorConfig
.foreach(config => getOrCreateMetric(counters, config, createNewCounter).add(1, config.attributes))

Expand Down Expand Up @@ -195,6 +211,40 @@ private class OpenTelemetryMetricsListener(
b = config.description.fold(b)(b.setDescription)
b.build()
}

/*
OpenTelemetry HTTP Client Metrics Spec: Mapping request attributes as per
https://opentelemetry.io/docs/specs/semconv/http/http-metrics/#http-client
* */
private def createRequestAttributes(request: GenericRequest[_, _]): Attributes = {
val attributes = Attributes
.builder()
.put(AttributeKey.stringKey("http.request.method"), request.method.method)
.put(AttributeKey.stringKey("server.address"), request.uri.host.getOrElse("unknown"))
.put(AttributeKey.longKey("server.port"), request.uri.port.getOrElse(80))
.build()

attributes
}

/*
OpenTelemetry HTTP Client Metrics Spec: Mapping response attributes as per
https://opentelemetry.io/docs/specs/semconv/http/http-metrics/#http-client
* */
private def createResponseAttributes(response: Response[_]): Attributes =
Attributes
.builder()
.put(AttributeKey.longKey("http.response.status_code"), response.code.code)
.build()

private def createErrorAttributes(e: Throwable): Attributes = {
val errorType = e match {
case _: java.net.UnknownHostException => "unknown_host"
case _ => e.getClass.getSimpleName
}
Attributes.builder().put("error.type", errorType).build()
}

}

case class CollectorConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.metrics.SdkMeterProvider
import io.opentelemetry.sdk.metrics.data.{HistogramPointData, MetricData}
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader
import io.opentelemetry.api.common.{AttributeKey, Attributes}
import org.scalatest.OptionValues
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
Expand Down Expand Up @@ -166,7 +167,7 @@ class OpenTelemetryMetricsBackendTest extends AnyFlatSpec with Matchers with Opt
getHistogramValue(reader, OpenTelemetryMetricsBackend.DefaultResponseSizeHistogramName).value.getSum shouldBe 50
}

it should "use histogram for request latencies" in {
it should "use histogram for request latencies and validate attributes" in {
// given
val response = ResponseStub("Ok", StatusCode.Ok, "Ok", Seq(Header.contentLength(10)))
val backendStub = SyncBackendStub.whenAnyRequest.thenRespond(response)
Expand All @@ -177,7 +178,8 @@ class OpenTelemetryMetricsBackendTest extends AnyFlatSpec with Matchers with Opt
(0 until 5).foreach(_ => basicRequest.get(uri"http://127.0.0.1/foo").send(backend))

// then
getHistogramValue(reader, OpenTelemetryMetricsBackend.DefaultLatencyHistogramName).map(_.getSum) should not be empty
val metrics = reader.collectAllMetrics().asScala.toList
specTest(metrics, OpenTelemetryMetricsBackend.DefaultLatencyHistogramName)
}

it should "use error counter when http error is thrown" in {
Expand Down Expand Up @@ -238,6 +240,20 @@ class OpenTelemetryMetricsBackendTest extends AnyFlatSpec with Matchers with Opt
getMetricValue(reader, OpenTelemetryMetricsBackend.DefaultErrorCounterName) shouldBe None
}

it should "validate http.client.request.duration semantic conventions" in {
// given
val reader = InMemoryMetricReader.create()
val backend = OpenTelemetryMetricsBackend(stubAlwaysOk, spawnNewOpenTelemetry(reader))

// when
basicRequest.get(uri"http://127.0.0.1/foo").send(backend)

// then
val metrics = reader.collectAllMetrics().asScala.toList
val expectedMetricName = "http.client.request.duration"
specTest(metrics, expectedMetricName)
}

private[this] def getMetricValue(reader: InMemoryMetricReader, name: String): Option[Long] =
reader
.collectAllMetrics()
Expand All @@ -261,4 +277,25 @@ class OpenTelemetryMetricsBackendTest extends AnyFlatSpec with Matchers with Opt
.find(_.getName.equals(name))
.head

private[this] def specTest(metrics: List[MetricData], expectedMetricName: String): Unit = {
val metric = metrics.find(_.getName == expectedMetricName)
assert(
metric.isDefined,
s"$expectedMetricName metric is missing. Available [${metrics.map(_.getName).mkString(", ")}]"
)

val clue = s"[$expectedMetricName] has a mismatched property"

metric.foreach { md =>
assert(md.getName == expectedMetricName, clue)
assert(md.getUnit == "ms", clue)

md.getHistogramData.getPoints.forEach { point =>
val attributes = point.getAttributes
assert(attributes.get(AttributeKey.stringKey("http.request.method")) == "GET")
assert(attributes.get(AttributeKey.stringKey("server.address")) == "127.0.0.1")
assert(attributes.get(AttributeKey.longKey("http.response.status_code")) == 200L)
}
}
}
}

0 comments on commit 53b6b89

Please sign in to comment.