From 2c4b6b2cd87697988144925228c50cfb13bafee1 Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Tue, 23 Apr 2024 21:28:38 +0300 Subject: [PATCH] sdk-metrics: add `SdkObservableUpDownCounter` --- .../metrics/SdkObservableUpDownCounter.scala | 143 ++++++++++++++++++ .../SdkObservableUpDownCounterSuite.scala | 143 ++++++++++++++++++ 2 files changed, 286 insertions(+) create mode 100644 sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/SdkObservableUpDownCounter.scala create mode 100644 sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/SdkObservableUpDownCounterSuite.scala diff --git a/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/SdkObservableUpDownCounter.scala b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/SdkObservableUpDownCounter.scala new file mode 100644 index 000000000..c42358b39 --- /dev/null +++ b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/SdkObservableUpDownCounter.scala @@ -0,0 +1,143 @@ +/* + * Copyright 2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.metrics + +import cats.data.NonEmptyList +import cats.effect.Clock +import cats.effect.MonadCancelThrow +import cats.effect.Resource +import cats.effect.std.Console +import cats.syntax.flatMap._ +import cats.syntax.foldable._ +import cats.syntax.functor._ +import org.typelevel.ci.CIString +import org.typelevel.otel4s.metrics.Measurement +import org.typelevel.otel4s.metrics.MeasurementValue +import org.typelevel.otel4s.metrics.ObservableMeasurement +import org.typelevel.otel4s.metrics.ObservableUpDownCounter +import org.typelevel.otel4s.sdk.context.AskContext +import org.typelevel.otel4s.sdk.metrics.internal.CallbackRegistration +import org.typelevel.otel4s.sdk.metrics.internal.InstrumentDescriptor +import org.typelevel.otel4s.sdk.metrics.internal.MeterSharedState + +/** An asynchronous instrument that reports additive values. + * + * @see + * [[https://opentelemetry.io/docs/specs/otel/metrics/api/#asynchronous-updowncounter]] + */ +private object SdkObservableUpDownCounter { + + final case class Builder[ + F[_]: MonadCancelThrow: Clock: Console: AskContext, + A: MeasurementValue + ]( + name: String, + sharedState: MeterSharedState[F], + unit: Option[String] = None, + description: Option[String] = None + ) extends ObservableUpDownCounter.Builder[F, A] { + + def withUnit(unit: String): ObservableUpDownCounter.Builder[F, A] = + copy(unit = Some(unit)) + + def withDescription( + description: String + ): ObservableUpDownCounter.Builder[F, A] = + copy(description = Some(description)) + + def createWithCallback( + cb: ObservableMeasurement[F, A] => F[Unit] + ): Resource[F, ObservableUpDownCounter] = { + val descriptor = makeDescriptor + + val makeCallbackRegistration: F[CallbackRegistration[F]] = + MeasurementValue[A] match { + case MeasurementValue.LongMeasurementValue(cast) => + sharedState + .registerObservableMeasurement[Long](descriptor) + .map { observable => + val runnable = cb { (value, attributes) => + observable.record(cast(value), attributes) + } + + new CallbackRegistration[F]( + NonEmptyList.one(observable), + runnable + ) + } + + case MeasurementValue.DoubleMeasurementValue(cast) => + sharedState + .registerObservableMeasurement[Double](descriptor) + .map { observable => + val runnable = cb { (value, attributes) => + observable.record(cast(value), attributes) + } + + new CallbackRegistration[F]( + NonEmptyList.one(observable), + runnable + ) + } + } + + for { + cr <- Resource.eval(makeCallbackRegistration) + _ <- sharedState.withCallback(cr) + } yield new ObservableUpDownCounter {} + } + + def create( + measurements: F[Iterable[Measurement[A]]] + ): Resource[F, ObservableUpDownCounter] = + createWithCallback { cb => + for { + m <- measurements + _ <- m.toVector.traverse_(m => cb.record(m.value, m.attributes)) + } yield () + } + + def createObserver: F[ObservableMeasurement[F, A]] = { + val descriptor = makeDescriptor + + MeasurementValue[A] match { + case MeasurementValue.LongMeasurementValue(cast) => + sharedState + .registerObservableMeasurement[Long](descriptor) + .map { observable => (value, attributes) => + observable.record(cast(value), attributes) + } + + case MeasurementValue.DoubleMeasurementValue(cast) => + sharedState + .registerObservableMeasurement[Double](descriptor) + .map { observable => (value, attributes) => + observable.record(cast(value), attributes) + } + } + } + + private def makeDescriptor: InstrumentDescriptor.Asynchronous = + InstrumentDescriptor.asynchronous( + name = CIString(name), + description = description, + unit = unit, + instrumentType = InstrumentType.ObservableUpDownCounter + ) + } + +} diff --git a/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/SdkObservableUpDownCounterSuite.scala b/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/SdkObservableUpDownCounterSuite.scala new file mode 100644 index 000000000..6ced69e5e --- /dev/null +++ b/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/SdkObservableUpDownCounterSuite.scala @@ -0,0 +1,143 @@ +/* + * Copyright 2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.metrics + +import cats.effect.IO +import cats.effect.std.Random +import cats.mtl.Ask +import munit.CatsEffectSuite +import munit.ScalaCheckEffectSuite +import org.scalacheck.Gen +import org.scalacheck.effect.PropF +import org.typelevel.otel4s.metrics.Measurement +import org.typelevel.otel4s.metrics.MeasurementValue +import org.typelevel.otel4s.sdk.TelemetryResource +import org.typelevel.otel4s.sdk.common.InstrumentationScope +import org.typelevel.otel4s.sdk.context.Context +import org.typelevel.otel4s.sdk.metrics.data.AggregationTemporality +import org.typelevel.otel4s.sdk.metrics.data.MetricData +import org.typelevel.otel4s.sdk.metrics.data.MetricPoints +import org.typelevel.otel4s.sdk.metrics.exemplar.ExemplarFilter +import org.typelevel.otel4s.sdk.metrics.exemplar.TraceContextLookup +import org.typelevel.otel4s.sdk.metrics.exporter.AggregationTemporalitySelector +import org.typelevel.otel4s.sdk.metrics.exporter.InMemoryMetricReader +import org.typelevel.otel4s.sdk.metrics.exporter.MetricProducer +import org.typelevel.otel4s.sdk.metrics.internal.MeterSharedState +import org.typelevel.otel4s.sdk.metrics.internal.exporter.RegisteredReader +import org.typelevel.otel4s.sdk.metrics.scalacheck.Gens +import org.typelevel.otel4s.sdk.metrics.test.PointDataUtils +import org.typelevel.otel4s.sdk.metrics.view.ViewRegistry + +import scala.concurrent.duration.FiniteDuration + +class SdkObservableUpDownCounterSuite + extends CatsEffectSuite + with ScalaCheckEffectSuite { + + private implicit val askContext: Ask[IO, Context] = Ask.const(Context.root) + + test("record values") { + PropF.forAllF( + Gens.telemetryResource, + Gens.instrumentationScope, + Gens.timeWindow, + Gens.attributes, + Gens.nonEmptyString, + Gen.option(Gen.alphaNumStr), + Gen.option(Gen.alphaNumStr), + Gen.either(Gen.posNum[Long], Gen.double) + ) { (resource, scope, window, attrs, name, unit, description, value) => + def test[A: MeasurementValue](value: A): IO[Unit] = { + val expected = MetricData( + resource, + scope, + name, + description, + unit, + MetricPoints.sum( + points = PointDataUtils.toNumberPoints( + Vector(value), + attrs, + window + ), + monotonic = false, + aggregationTemporality = AggregationTemporality.Cumulative + ) + ) + + for { + reader <- createReader(window.start) + state <- createState(resource, scope, reader, window.start) + + _ <- SdkObservableUpDownCounter + .Builder[IO, A](name, state, unit, description) + .createWithCallback(cb => cb.record(value, attrs)) + .surround( + state.collectAll(reader, window.end) + ) + .assertEquals(Vector(expected)) + + _ <- SdkObservableUpDownCounter + .Builder[IO, A](name, state, unit, description) + .create(IO.pure(Vector(Measurement(value, attrs)))) + .surround( + state.collectAll(reader, window.end) + ) + .assertEquals(Vector(expected)) + } yield () + } + + value match { + case Left(long) => test[Long](long) + case Right(double) => test[Double](double) + } + } + } + + private def createReader(start: FiniteDuration): IO[RegisteredReader[IO]] = { + val inMemory = new InMemoryMetricReader[IO]( + emptyProducer, + AggregationTemporalitySelector.alwaysCumulative + ) + + RegisteredReader.create(start, inMemory) + } + + private def createState( + resource: TelemetryResource, + scope: InstrumentationScope, + reader: RegisteredReader[IO], + start: FiniteDuration + ): IO[MeterSharedState[IO]] = + Random.scalaUtilRandom[IO].flatMap { implicit R: Random[IO] => + MeterSharedState.create[IO]( + resource, + scope, + start, + ExemplarFilter.alwaysOff, + TraceContextLookup.noop, + ViewRegistry(Vector.empty), + Vector(reader) + ) + } + + private def emptyProducer: MetricProducer[IO] = + new MetricProducer[IO] { + def produce: IO[Vector[MetricData]] = IO.pure(Vector.empty) + } + +}