diff --git a/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/aggregation/ExplicitBucketHistogramAggregator.scala b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/aggregation/ExplicitBucketHistogramAggregator.scala index 425b487c6..4c487f514 100644 --- a/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/aggregation/ExplicitBucketHistogramAggregator.scala +++ b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/aggregation/ExplicitBucketHistogramAggregator.scala @@ -123,7 +123,7 @@ private object ExplicitBucketHistogramAggregator { ) private def emptyState(buckets: Int): State = - State(0, Double.MaxValue, -1, 0L, Vector.fill(buckets + 1)(0)) + State(0, Double.MaxValue, Double.MinValue, 0L, Vector.fill(buckets + 1)(0)) private class Accumulator[F[_]: FlatMap, A: MeasurementValue]( stateRef: Ref[F, State], diff --git a/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/exporter/AggregationSelector.scala b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/exporter/AggregationSelector.scala index 38c58b652..2228a15e7 100644 --- a/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/exporter/AggregationSelector.scala +++ b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/exporter/AggregationSelector.scala @@ -23,12 +23,44 @@ trait AggregationSelector { /** Returns preferred [[Aggregation]] for the given [[InstrumentType]]. */ - def select(instrumentType: InstrumentType): Aggregation + final def select(instrumentType: InstrumentType): Aggregation = + instrumentType match { + case synchronous: InstrumentType.Synchronous => + forSynchronous(synchronous) + case asynchronous: InstrumentType.Asynchronous => + forAsynchronous(asynchronous) + } + + /** Returns preferred [[Aggregation]] for the given + * [[InstrumentType.Synchronous]]. + */ + def forSynchronous( + instrumentType: InstrumentType.Synchronous + ): Aggregation with Aggregation.Synchronous + + /** Returns preferred [[Aggregation]] for the given + * [[InstrumentType.Asynchronous]]. + */ + def forAsynchronous( + instrumentType: InstrumentType.Asynchronous + ): Aggregation with Aggregation.Asynchronous } object AggregationSelector { /** Returns [[Aggregation.default]] for all instruments. */ - def default: AggregationSelector = _ => Aggregation.default + def default: AggregationSelector = Default + + private object Default extends AggregationSelector { + def forSynchronous( + instrumentType: InstrumentType.Synchronous + ): Aggregation with Aggregation.Synchronous = + Aggregation.Default + + def forAsynchronous( + instrumentType: InstrumentType.Asynchronous + ): Aggregation with Aggregation.Asynchronous = + Aggregation.Default + } } diff --git a/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/internal/CallbackRegistration.scala b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/internal/CallbackRegistration.scala new file mode 100644 index 000000000..e2a3d67d8 --- /dev/null +++ b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/internal/CallbackRegistration.scala @@ -0,0 +1,54 @@ +/* + * Copyright 2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.metrics.internal + +import cats.data.NonEmptyList +import cats.effect.MonadCancelThrow +import cats.syntax.applicative._ +import cats.syntax.foldable._ +import org.typelevel.otel4s.sdk.metrics.data.TimeWindow +import org.typelevel.otel4s.sdk.metrics.internal.exporter.RegisteredReader + +private[metrics] final class CallbackRegistration[F[_]: MonadCancelThrow]( + measurements: NonEmptyList[SdkObservableMeasurement[F, _]], + callback: F[Unit] +) { + + private val hasStorages: Boolean = + measurements.exists(_.storages.nonEmpty) + + /** Set the active reader on each observable measurement so that measurements + * are only recorded to relevant storages. + * + * @param reader + * the reader to use + * + * @param timeWindow + * the time window of the measurement + */ + def invokeCallback( + reader: RegisteredReader[F], + timeWindow: TimeWindow + ): F[Unit] = + measurements + .traverse_(_.withActiveReader(reader, timeWindow)) + .surround(callback) + .whenA(hasStorages) + + override def toString: String = + s"CallbackRegistration{instrumentDescriptors=${measurements.map(_.descriptor).mkString_("[", ", ", "]")}" +} diff --git a/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/internal/MeterSharedState.scala b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/internal/MeterSharedState.scala new file mode 100644 index 000000000..fe1abdb2c --- /dev/null +++ b/sdk/metrics/src/main/scala/org/typelevel/otel4s/sdk/metrics/internal/MeterSharedState.scala @@ -0,0 +1,257 @@ +/* + * Copyright 2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.metrics.internal + +import cats.effect.Ref +import cats.effect.Resource +import cats.effect.Temporal +import cats.effect.std.Console +import cats.effect.std.Mutex +import cats.effect.std.Random +import cats.syntax.flatMap._ +import cats.syntax.foldable._ +import cats.syntax.functor._ +import cats.syntax.traverse._ +import org.typelevel.otel4s.metrics.MeasurementValue +import org.typelevel.otel4s.sdk.TelemetryResource +import org.typelevel.otel4s.sdk.common.InstrumentationScope +import org.typelevel.otel4s.sdk.context.AskContext +import org.typelevel.otel4s.sdk.metrics.Aggregation +import org.typelevel.otel4s.sdk.metrics.data.MetricData +import org.typelevel.otel4s.sdk.metrics.data.TimeWindow +import org.typelevel.otel4s.sdk.metrics.exemplar.ExemplarFilter +import org.typelevel.otel4s.sdk.metrics.exemplar.TraceContextLookup +import org.typelevel.otel4s.sdk.metrics.internal.exporter.RegisteredReader +import org.typelevel.otel4s.sdk.metrics.internal.storage.MetricStorage +import org.typelevel.otel4s.sdk.metrics.view.View +import org.typelevel.otel4s.sdk.metrics.view.ViewRegistry + +import scala.concurrent.duration.FiniteDuration + +private[metrics] final class MeterSharedState[ + F[_]: Temporal: Random: Console: AskContext +] private ( + mutex: Mutex[F], + viewRegistry: ViewRegistry[F], + resource: TelemetryResource, + val scope: InstrumentationScope, + startTimestamp: FiniteDuration, + exemplarFilter: ExemplarFilter, + traceContextLookup: TraceContextLookup, + callbacks: Ref[F, Vector[CallbackRegistration[F]]], + registries: Map[RegisteredReader[F], MetricStorageRegistry[F]] +) { + + /** Creates a metric storage for the given descriptor of a synchronous + * instrument. + * + * @param descriptor + * a descriptor to create a storage for + * + * @tparam A + * the type of the values to record + */ + def registerMetricStorage[A: MeasurementValue: Numeric]( + descriptor: InstrumentDescriptor.Synchronous + ): F[MetricStorage.Synchronous.Writeable[F, A]] = { + + def make( + reader: RegisteredReader[F], + registry: MetricStorageRegistry[F], + aggregation: Aggregation.Synchronous, + view: Option[View] + ): F[Vector[MetricStorage.Synchronous[F, A]]] = + for { + storage <- MetricStorage.synchronous( + reader, + view, + descriptor, + exemplarFilter, + traceContextLookup, + aggregation + ) + _ <- registry.register(storage) + } yield Vector(storage) + + registries.toVector + .flatTraverse { case (reader, registry) => + def defaultAggregation: Aggregation with Aggregation.Synchronous = + reader.reader.defaultAggregationSelector.forSynchronous( + descriptor.instrumentType + ) + + viewRegistry + .findViews(descriptor, scope) + .flatMap { + case Some(views) => + views.toVector.flatTraverse { view => + view.aggregation.getOrElse(defaultAggregation) match { + case aggregation: Aggregation.Synchronous => + make(reader, registry, aggregation, Some(view)) + + case _ => + Temporal[F].pure( + Vector.empty[MetricStorage.Synchronous[F, A]] + ) + } + } + + case None => + make(reader, registry, defaultAggregation, None) + } + } + .map { storages => + MetricStorage.Synchronous.Writeable.of(storages) + } + } + + /** Creates an observable measurement for the given descriptor of an + * asynchronous instrument. + * + * @param descriptor + * a descriptor to create an observable measurement for + * + * @tparam A + * the type of the values to record + */ + def registerObservableMeasurement[A: MeasurementValue: Numeric]( + descriptor: InstrumentDescriptor.Asynchronous + ): F[SdkObservableMeasurement[F, A]] = { + + def make( + reader: RegisteredReader[F], + registry: MetricStorageRegistry[F], + aggregation: Aggregation.Asynchronous, + view: Option[View] + ): F[Vector[MetricStorage.Asynchronous[F, A]]] = + for { + storage <- MetricStorage.asynchronous( + reader, + view, + descriptor, + aggregation + ) + _ <- registry.register(storage) + } yield Vector(storage) + + registries.toVector + .flatTraverse { case (reader, registry) => + def defaultAggregation: Aggregation with Aggregation.Asynchronous = + reader.reader.defaultAggregationSelector.forAsynchronous( + descriptor.instrumentType + ) + + viewRegistry + .findViews(descriptor, scope) + .flatMap { + case Some(views) => + views.toVector.flatTraverse { view => + view.aggregation.getOrElse(defaultAggregation) match { + case aggregation: Aggregation.Asynchronous => + make(reader, registry, aggregation, Some(view)) + + case _ => + Temporal[F].pure( + Vector.empty[MetricStorage.Asynchronous[F, A]] + ) + } + } + + case None => + make(reader, registry, defaultAggregation, None) + } + } + .flatMap { storages => + SdkObservableMeasurement.create(storages, scope, descriptor) + } + } + + /** Collects all metrics. + * + * @param reader + * the reader to use + * + * @param collectTimestamp + * the timestamp of the collection + */ + def collectAll( + reader: RegisteredReader[F], + collectTimestamp: FiniteDuration + ): F[Vector[MetricData]] = + callbacks.get.flatMap { currentCallbacks => + mutex.lock.surround { + val timeWindow = TimeWindow(startTimestamp, collectTimestamp) + + for { + _ <- currentCallbacks.traverse_(_.invokeCallback(reader, timeWindow)) + storages <- registries.get(reader).foldMapA(_.storages) + result <- storages.traverse { storage => + storage.collect(resource, scope, timeWindow) + } + } yield result.flatten.filter(_.nonEmpty) + } + } + + /** Registers a callback and removes it from the state upon resource + * finalization. + * + * @param callback + * a callback to register + */ + def withCallback(callback: CallbackRegistration[F]): Resource[F, Unit] = + Resource + .make(registerCallback(callback))(_ => removeCallback(callback)) + .void + + private def removeCallback(callback: CallbackRegistration[F]): F[Unit] = + callbacks.update(_.filter(_ != callback)) + + private def registerCallback(callback: CallbackRegistration[F]): F[Unit] = + callbacks.update(_ :+ callback) + +} + +private[metrics] object MeterSharedState { + + def create[F[_]: Temporal: Random: Console: AskContext]( + resource: TelemetryResource, + scope: InstrumentationScope, + startTimestamp: FiniteDuration, + exemplarFilter: ExemplarFilter, + traceContextLookup: TraceContextLookup, + viewRegistry: ViewRegistry[F], + registeredReaders: Vector[RegisteredReader[F]] + ): F[MeterSharedState[F]] = + for { + mutex <- Mutex[F] + callbacks <- Ref.empty[F, Vector[CallbackRegistration[F]]] + registries <- registeredReaders.traverse { reader => + MetricStorageRegistry.create[F].tupleLeft(reader) + } + } yield new MeterSharedState( + mutex, + viewRegistry, + resource, + scope, + startTimestamp, + exemplarFilter, + traceContextLookup, + callbacks, + registries.toMap + ) + +} diff --git a/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/internal/MeterSharedStateSuite.scala b/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/internal/MeterSharedStateSuite.scala new file mode 100644 index 000000000..41d0198e1 --- /dev/null +++ b/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/internal/MeterSharedStateSuite.scala @@ -0,0 +1,234 @@ +/* + * Copyright 2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.metrics.internal + +import cats.data.NonEmptyList +import cats.effect.IO +import cats.effect.std.Random +import cats.mtl.Ask +import munit.CatsEffectSuite +import munit.ScalaCheckEffectSuite +import org.scalacheck.Gen +import org.scalacheck.effect.PropF +import org.typelevel.otel4s.metrics.MeasurementValue +import org.typelevel.otel4s.sdk.TelemetryResource +import org.typelevel.otel4s.sdk.common.InstrumentationScope +import org.typelevel.otel4s.sdk.context.Context +import org.typelevel.otel4s.sdk.metrics.Aggregation +import org.typelevel.otel4s.sdk.metrics.InstrumentType +import org.typelevel.otel4s.sdk.metrics.data.AggregationTemporality +import org.typelevel.otel4s.sdk.metrics.data.MetricData +import org.typelevel.otel4s.sdk.metrics.data.MetricPoints +import org.typelevel.otel4s.sdk.metrics.exemplar.ExemplarFilter +import org.typelevel.otel4s.sdk.metrics.exemplar.TraceContextLookup +import org.typelevel.otel4s.sdk.metrics.exporter.AggregationTemporalitySelector +import org.typelevel.otel4s.sdk.metrics.exporter.InMemoryMetricReader +import org.typelevel.otel4s.sdk.metrics.exporter.MetricProducer +import org.typelevel.otel4s.sdk.metrics.internal.exporter.RegisteredReader +import org.typelevel.otel4s.sdk.metrics.scalacheck.Gens +import org.typelevel.otel4s.sdk.metrics.test.PointDataUtils +import org.typelevel.otel4s.sdk.metrics.view.ViewRegistry + +import scala.concurrent.duration._ + +class MeterSharedStateSuite extends CatsEffectSuite with ScalaCheckEffectSuite { + + test("synchronous instrument: record measurements") { + PropF.forAllF( + Gens.telemetryResource, + Gens.instrumentationScope, + Gens.timeWindow, + Gens.attributes, + Gens.synchronousInstrumentDescriptor, + Gen.either(Gen.long, Gen.double) + ) { (resource, scope, timeWindow, attributes, descriptor, value) => + def test[A: MeasurementValue: Numeric](value: A): IO[Unit] = { + val points = descriptor.instrumentType match { + case InstrumentType.Counter => + MetricPoints.sum( + points = PointDataUtils.toNumberPoints( + Vector(value), + attributes, + timeWindow + ), + monotonic = true, + aggregationTemporality = AggregationTemporality.Cumulative + ) + + case InstrumentType.UpDownCounter => + MetricPoints.sum( + PointDataUtils.toNumberPoints( + Vector(value), + attributes, + timeWindow + ), + monotonic = false, + aggregationTemporality = AggregationTemporality.Cumulative + ) + + case InstrumentType.Histogram => + MetricPoints.histogram( + Vector( + PointDataUtils.toHistogramPoint( + Vector(value), + attributes, + timeWindow, + Aggregation.Defaults.Boundaries + ) + ), + AggregationTemporality.Cumulative + ) + } + + val expected = MetricData( + resource, + scope, + descriptor.name.toString, + descriptor.description, + descriptor.unit, + points + ) + + for { + reader <- createReader(timeWindow.start) + state <- createState(resource, scope, reader, timeWindow.start) + storage <- state.registerMetricStorage[A](descriptor) + _ <- storage.record(value, attributes, Context.root) + metrics <- state.collectAll(reader, timeWindow.end) + } yield assertEquals(metrics, Vector(expected)) + } + + value match { + case Left(long) => test(long) + case Right(double) => test(double) + } + } + } + + test("asynchronous instrument: invoke callback during collection") { + PropF.forAllF( + Gens.telemetryResource, + Gens.instrumentationScope, + Gens.timeWindow, + Gens.attributes, + Gens.asynchronousInstrumentDescriptor, + Gen.either(Gen.long, Gen.double) + ) { (resource, scope, timeWindow, attributes, descriptor, value) => + def test[A: MeasurementValue: Numeric](value: A): IO[Unit] = { + val points = descriptor.instrumentType match { + case InstrumentType.ObservableCounter => + MetricPoints.sum( + points = PointDataUtils.toNumberPoints( + Vector(value), + attributes, + timeWindow + ), + monotonic = true, + aggregationTemporality = AggregationTemporality.Cumulative + ) + + case InstrumentType.ObservableUpDownCounter => + MetricPoints.sum( + PointDataUtils.toNumberPoints( + Vector(value), + attributes, + timeWindow + ), + monotonic = false, + aggregationTemporality = AggregationTemporality.Cumulative + ) + + case InstrumentType.ObservableGauge => + MetricPoints.gauge( + PointDataUtils.toNumberPoints( + Vector(value), + attributes, + timeWindow + ) + ) + } + + val expected = MetricData( + resource, + scope, + descriptor.name.toString, + descriptor.description, + descriptor.unit, + points + ) + + for { + reader <- createReader(timeWindow.start) + state <- createState(resource, scope, reader, timeWindow.start) + measurement <- state.registerObservableMeasurement[A](descriptor) + + registration = new CallbackRegistration[IO]( + NonEmptyList.of(measurement), + measurement.record(value, attributes) + ) + + metrics <- state + .withCallback(registration) + .surround( + state.collectAll(reader, timeWindow.end) + ) + } yield assertEquals(metrics, Vector(expected)) + } + + value match { + case Left(long) => test(long) + case Right(double) => test(double) + } + } + } + + private def createReader(start: FiniteDuration): IO[RegisteredReader[IO]] = { + val inMemory = new InMemoryMetricReader[IO]( + emptyProducer, + AggregationTemporalitySelector.alwaysCumulative + ) + + RegisteredReader.create(start, inMemory) + } + + private def createState( + resource: TelemetryResource, + scope: InstrumentationScope, + reader: RegisteredReader[IO], + start: FiniteDuration + ): IO[MeterSharedState[IO]] = { + implicit val askContext: Ask[IO, Context] = Ask.const(Context.root) + + Random.scalaUtilRandom[IO].flatMap { implicit R: Random[IO] => + MeterSharedState.create[IO]( + resource, + scope, + start, + ExemplarFilter.alwaysOff, + TraceContextLookup.noop, + ViewRegistry(Vector.empty), + Vector(reader) + ) + } + } + + private def emptyProducer: MetricProducer[IO] = + new MetricProducer[IO] { + def produce: IO[Vector[MetricData]] = IO.pure(Vector.empty) + } + +} diff --git a/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/internal/SdkObservableMeasurementSuite.scala b/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/internal/SdkObservableMeasurementSuite.scala index 5abf7bd31..f107ce3b8 100644 --- a/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/internal/SdkObservableMeasurementSuite.scala +++ b/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/internal/SdkObservableMeasurementSuite.scala @@ -22,20 +22,18 @@ import munit.CatsEffectSuite import munit.ScalaCheckEffectSuite import org.scalacheck.Gen import org.scalacheck.effect.PropF -import org.typelevel.otel4s.Attributes import org.typelevel.otel4s.metrics.MeasurementValue import org.typelevel.otel4s.sdk.context.Context import org.typelevel.otel4s.sdk.metrics.Aggregation import org.typelevel.otel4s.sdk.metrics.data.MetricData import org.typelevel.otel4s.sdk.metrics.data.MetricPoints -import org.typelevel.otel4s.sdk.metrics.data.PointData -import org.typelevel.otel4s.sdk.metrics.data.TimeWindow import org.typelevel.otel4s.sdk.metrics.exporter.AggregationTemporalitySelector import org.typelevel.otel4s.sdk.metrics.exporter.InMemoryMetricReader import org.typelevel.otel4s.sdk.metrics.exporter.MetricProducer import org.typelevel.otel4s.sdk.metrics.internal.exporter.RegisteredReader import org.typelevel.otel4s.sdk.metrics.internal.storage.MetricStorage import org.typelevel.otel4s.sdk.metrics.scalacheck.Gens +import org.typelevel.otel4s.sdk.metrics.test.PointDataUtils import org.typelevel.otel4s.sdk.test.InMemoryConsole import scala.concurrent.duration._ @@ -44,9 +42,6 @@ class SdkObservableMeasurementSuite extends CatsEffectSuite with ScalaCheckEffectSuite { - override def scalaCheckInitialSeed = - "JzbNab7748vUF4-GQunGLyY4VoVp0gqbRVsFf27he_I=" - test("log an error when reader is unset") { PropF.forAllF( Gens.instrumentationScope, @@ -116,7 +111,7 @@ class SdkObservableMeasurementSuite descriptor.description, descriptor.unit, MetricPoints.gauge( - toNumberPoints(Vector(value), attributes, timeWindow) + PointDataUtils.toNumberPoints(Vector(value), attributes, timeWindow) ) ) @@ -143,33 +138,6 @@ class SdkObservableMeasurementSuite } } - private def toNumberPoints[A: MeasurementValue]( - values: Vector[A], - attributes: Attributes, - timeWindow: TimeWindow - ): Vector[PointData.NumberPoint] = - MeasurementValue[A] match { - case MeasurementValue.LongMeasurementValue(cast) => - values.map { a => - PointData.longNumber( - timeWindow, - attributes, - Vector.empty, - cast(a) - ) - } - - case MeasurementValue.DoubleMeasurementValue(cast) => - values.map { a => - PointData.doubleNumber( - timeWindow, - attributes, - Vector.empty, - cast(a) - ) - } - } - private def createStorage[A: MeasurementValue: Numeric]( descriptor: InstrumentDescriptor.Asynchronous ): IO[MetricStorage.Asynchronous[IO, A]] = { diff --git a/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/internal/storage/AsynchronousStorageSuite.scala b/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/internal/storage/AsynchronousStorageSuite.scala index e74330c72..c9c0c4520 100644 --- a/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/internal/storage/AsynchronousStorageSuite.scala +++ b/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/internal/storage/AsynchronousStorageSuite.scala @@ -32,7 +32,6 @@ import org.typelevel.otel4s.sdk.metrics.InstrumentType import org.typelevel.otel4s.sdk.metrics.data.AggregationTemporality import org.typelevel.otel4s.sdk.metrics.data.MetricData import org.typelevel.otel4s.sdk.metrics.data.MetricPoints -import org.typelevel.otel4s.sdk.metrics.data.PointData import org.typelevel.otel4s.sdk.metrics.data.TimeWindow import org.typelevel.otel4s.sdk.metrics.exporter.AggregationTemporalitySelector import org.typelevel.otel4s.sdk.metrics.exporter.InMemoryMetricReader @@ -41,6 +40,7 @@ import org.typelevel.otel4s.sdk.metrics.internal.AsynchronousMeasurement import org.typelevel.otel4s.sdk.metrics.internal.InstrumentDescriptor import org.typelevel.otel4s.sdk.metrics.internal.exporter.RegisteredReader import org.typelevel.otel4s.sdk.metrics.scalacheck.Gens +import org.typelevel.otel4s.sdk.metrics.test.PointDataUtils import org.typelevel.otel4s.sdk.metrics.view.View import org.typelevel.otel4s.sdk.test.NoopConsole @@ -71,7 +71,7 @@ class AsynchronousStorageSuite descriptor.description, descriptor.unit, MetricPoints.sum( - toNumberPoints( + PointDataUtils.toNumberPoints( values.take(1), attributes, timeWindow @@ -119,7 +119,11 @@ class AsynchronousStorageSuite descriptor.description, descriptor.unit, MetricPoints.sum( - toNumberPoints(Vector(value), attributes, timeWindow), + PointDataUtils.toNumberPoints( + Vector(value), + attributes, + timeWindow + ), isMonotonic(descriptor), AggregationTemporality.Cumulative ) @@ -170,7 +174,7 @@ class AsynchronousStorageSuite descriptor.description, descriptor.unit, MetricPoints.sum( - toNumberPoints( + PointDataUtils.toNumberPoints( Vector(a), attributes, TimeWindow(Duration.Zero, timeWindow.end) @@ -230,7 +234,11 @@ class AsynchronousStorageSuite descriptor.description, descriptor.unit, MetricPoints.sum( - toNumberPoints(Vector(value), Attributes.empty, timeWindow), + PointDataUtils.toNumberPoints( + Vector(value), + Attributes.empty, + timeWindow + ), isMonotonic(descriptor), AggregationTemporality.Cumulative ) @@ -283,7 +291,7 @@ class AsynchronousStorageSuite descriptor.description, descriptor.unit, MetricPoints.sum( - toNumberPoints(Vector(value), attrs, timeWindow), + PointDataUtils.toNumberPoints(Vector(value), attrs, timeWindow), isMonotonic(descriptor), AggregationTemporality.Cumulative ) @@ -310,33 +318,6 @@ class AsynchronousStorageSuite } } - private def toNumberPoints[A: MeasurementValue]( - values: Vector[A], - attributes: Attributes, - timeWindow: TimeWindow - ): Vector[PointData.NumberPoint] = - MeasurementValue[A] match { - case MeasurementValue.LongMeasurementValue(cast) => - values.map { a => - PointData.longNumber( - timeWindow, - attributes, - Vector.empty, - cast(a) - ) - } - - case MeasurementValue.DoubleMeasurementValue(cast) => - values.map { a => - PointData.doubleNumber( - timeWindow, - attributes, - Vector.empty, - cast(a) - ) - } - } - private def createStorage[A: MeasurementValue: Numeric]( descriptor: InstrumentDescriptor.Asynchronous, producer: MetricProducer[IO] = emptyProducer, diff --git a/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/internal/storage/SynchronousStorageSuite.scala b/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/internal/storage/SynchronousStorageSuite.scala index a3ba04a31..04e92571a 100644 --- a/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/internal/storage/SynchronousStorageSuite.scala +++ b/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/internal/storage/SynchronousStorageSuite.scala @@ -32,7 +32,6 @@ import org.typelevel.otel4s.sdk.metrics.InstrumentType import org.typelevel.otel4s.sdk.metrics.data.AggregationTemporality import org.typelevel.otel4s.sdk.metrics.data.MetricData import org.typelevel.otel4s.sdk.metrics.data.MetricPoints -import org.typelevel.otel4s.sdk.metrics.data.PointData import org.typelevel.otel4s.sdk.metrics.data.TimeWindow import org.typelevel.otel4s.sdk.metrics.exemplar.ExemplarFilter import org.typelevel.otel4s.sdk.metrics.exemplar.TraceContextLookup @@ -42,6 +41,7 @@ import org.typelevel.otel4s.sdk.metrics.exporter.MetricProducer import org.typelevel.otel4s.sdk.metrics.internal.InstrumentDescriptor import org.typelevel.otel4s.sdk.metrics.internal.exporter.RegisteredReader import org.typelevel.otel4s.sdk.metrics.scalacheck.Gens +import org.typelevel.otel4s.sdk.metrics.test.PointDataUtils import org.typelevel.otel4s.sdk.metrics.view.View import org.typelevel.otel4s.sdk.test.NoopConsole @@ -86,7 +86,7 @@ class SynchronousStorageSuite descriptor.description, descriptor.unit, MetricPoints.sum( - toNumberPoints( + PointDataUtils.toNumberPoints( Vector(Vector.fill(repeat)(values).flatten.sum), attributes, timeWindow @@ -139,7 +139,7 @@ class SynchronousStorageSuite descriptor.description, descriptor.unit, MetricPoints.sum( - toNumberPoints( + PointDataUtils.toNumberPoints( Vector(values.sum), attributes, TimeWindow(Duration.Zero, timeWindow.end) @@ -198,7 +198,7 @@ class SynchronousStorageSuite descriptor.description, descriptor.unit, MetricPoints.sum( - toNumberPoints( + PointDataUtils.toNumberPoints( Vector(values.sum), Attributes.empty, TimeWindow(Duration.Zero, timeWindow.end) @@ -265,7 +265,7 @@ class SynchronousStorageSuite descriptor.description, descriptor.unit, MetricPoints.sum( - toNumberPoints( + PointDataUtils.toNumberPoints( Vector(values.sum), attrs, TimeWindow(Duration.Zero, timeWindow.end) @@ -304,33 +304,6 @@ class SynchronousStorageSuite } } - private def toNumberPoints[A: MeasurementValue]( - values: Vector[A], - attributes: Attributes, - timeWindow: TimeWindow - ): Vector[PointData.NumberPoint] = - MeasurementValue[A] match { - case MeasurementValue.LongMeasurementValue(cast) => - values.map { a => - PointData.longNumber( - timeWindow, - attributes, - Vector.empty, - cast(a) - ) - } - - case MeasurementValue.DoubleMeasurementValue(cast) => - values.map { a => - PointData.doubleNumber( - timeWindow, - attributes, - Vector.empty, - cast(a) - ) - } - } - private def createStorage[A: MeasurementValue: Numeric]( descriptor: InstrumentDescriptor.Synchronous, producer: MetricProducer[IO] = emptyProducer, diff --git a/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/test/PointDataUtils.scala b/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/test/PointDataUtils.scala new file mode 100644 index 000000000..b9ceafc6d --- /dev/null +++ b/sdk/metrics/src/test/scala/org/typelevel/otel4s/sdk/metrics/test/PointDataUtils.scala @@ -0,0 +1,91 @@ +/* + * Copyright 2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.sdk.metrics.test + +import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.metrics.BucketBoundaries +import org.typelevel.otel4s.metrics.MeasurementValue +import org.typelevel.otel4s.sdk.metrics.data.PointData +import org.typelevel.otel4s.sdk.metrics.data.TimeWindow + +object PointDataUtils { + + def toNumberPoints[A: MeasurementValue]( + values: Vector[A], + attributes: Attributes, + timeWindow: TimeWindow + ): Vector[PointData.NumberPoint] = + MeasurementValue[A] match { + case MeasurementValue.LongMeasurementValue(cast) => + values.map { a => + PointData.longNumber( + timeWindow, + attributes, + Vector.empty, + cast(a) + ) + } + + case MeasurementValue.DoubleMeasurementValue(cast) => + values.map { a => + PointData.doubleNumber( + timeWindow, + attributes, + Vector.empty, + cast(a) + ) + } + } + + def toHistogramPoint[A]( + values: Vector[A], + attributes: Attributes, + timeWindow: TimeWindow, + boundaries: BucketBoundaries + )(implicit N: Numeric[A]): PointData.Histogram = { + import N.mkNumericOps + + val stats: Option[PointData.Histogram.Stats] = + Option.when(values.nonEmpty)( + PointData.Histogram.Stats( + sum = values.sum.toDouble, + min = values.min.toDouble, + max = values.max.toDouble, + count = values.size.toLong + ) + ) + + val counts: Vector[Long] = + values.foldLeft(Vector.fill(boundaries.length + 1)(0L)) { + case (acc, value) => + val i = boundaries.boundaries.indexWhere(b => value.toDouble <= b) + val idx = if (i == -1) boundaries.length else i + + acc.updated(idx, acc(idx) + 1L) + } + + PointData.histogram( + timeWindow, + attributes, + Vector.empty, + stats, + boundaries, + counts + ) + } + +}