Skip to content

Commit

Permalink
sdk-metrics: add SdkObservableUpDownCounter
Browse files Browse the repository at this point in the history
  • Loading branch information
iRevive committed Apr 23, 2024
1 parent 3feb3ac commit 2c4b6b2
Show file tree
Hide file tree
Showing 2 changed files with 286 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright 2024 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.typelevel.otel4s.sdk.metrics

import cats.data.NonEmptyList
import cats.effect.Clock
import cats.effect.MonadCancelThrow
import cats.effect.Resource
import cats.effect.std.Console
import cats.syntax.flatMap._
import cats.syntax.foldable._
import cats.syntax.functor._
import org.typelevel.ci.CIString
import org.typelevel.otel4s.metrics.Measurement
import org.typelevel.otel4s.metrics.MeasurementValue
import org.typelevel.otel4s.metrics.ObservableMeasurement
import org.typelevel.otel4s.metrics.ObservableUpDownCounter
import org.typelevel.otel4s.sdk.context.AskContext
import org.typelevel.otel4s.sdk.metrics.internal.CallbackRegistration
import org.typelevel.otel4s.sdk.metrics.internal.InstrumentDescriptor
import org.typelevel.otel4s.sdk.metrics.internal.MeterSharedState

/** An asynchronous instrument that reports additive values.
*
* @see
* [[https://opentelemetry.io/docs/specs/otel/metrics/api/#asynchronous-updowncounter]]
*/
private object SdkObservableUpDownCounter {

final case class Builder[
F[_]: MonadCancelThrow: Clock: Console: AskContext,
A: MeasurementValue
](
name: String,
sharedState: MeterSharedState[F],
unit: Option[String] = None,
description: Option[String] = None
) extends ObservableUpDownCounter.Builder[F, A] {

def withUnit(unit: String): ObservableUpDownCounter.Builder[F, A] =
copy(unit = Some(unit))

def withDescription(
description: String
): ObservableUpDownCounter.Builder[F, A] =
copy(description = Some(description))

def createWithCallback(
cb: ObservableMeasurement[F, A] => F[Unit]
): Resource[F, ObservableUpDownCounter] = {
val descriptor = makeDescriptor

val makeCallbackRegistration: F[CallbackRegistration[F]] =
MeasurementValue[A] match {
case MeasurementValue.LongMeasurementValue(cast) =>
sharedState
.registerObservableMeasurement[Long](descriptor)
.map { observable =>
val runnable = cb { (value, attributes) =>
observable.record(cast(value), attributes)
}

new CallbackRegistration[F](
NonEmptyList.one(observable),
runnable
)
}

case MeasurementValue.DoubleMeasurementValue(cast) =>
sharedState
.registerObservableMeasurement[Double](descriptor)
.map { observable =>
val runnable = cb { (value, attributes) =>
observable.record(cast(value), attributes)
}

new CallbackRegistration[F](
NonEmptyList.one(observable),
runnable
)
}
}

for {
cr <- Resource.eval(makeCallbackRegistration)
_ <- sharedState.withCallback(cr)
} yield new ObservableUpDownCounter {}
}

def create(
measurements: F[Iterable[Measurement[A]]]
): Resource[F, ObservableUpDownCounter] =
createWithCallback { cb =>
for {
m <- measurements
_ <- m.toVector.traverse_(m => cb.record(m.value, m.attributes))
} yield ()
}

def createObserver: F[ObservableMeasurement[F, A]] = {
val descriptor = makeDescriptor

MeasurementValue[A] match {
case MeasurementValue.LongMeasurementValue(cast) =>
sharedState
.registerObservableMeasurement[Long](descriptor)
.map { observable => (value, attributes) =>
observable.record(cast(value), attributes)
}

case MeasurementValue.DoubleMeasurementValue(cast) =>
sharedState
.registerObservableMeasurement[Double](descriptor)
.map { observable => (value, attributes) =>
observable.record(cast(value), attributes)
}
}
}

private def makeDescriptor: InstrumentDescriptor.Asynchronous =
InstrumentDescriptor.asynchronous(
name = CIString(name),
description = description,
unit = unit,
instrumentType = InstrumentType.ObservableUpDownCounter
)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright 2024 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.typelevel.otel4s.sdk.metrics

import cats.effect.IO
import cats.effect.std.Random
import cats.mtl.Ask
import munit.CatsEffectSuite
import munit.ScalaCheckEffectSuite
import org.scalacheck.Gen
import org.scalacheck.effect.PropF
import org.typelevel.otel4s.metrics.Measurement
import org.typelevel.otel4s.metrics.MeasurementValue
import org.typelevel.otel4s.sdk.TelemetryResource
import org.typelevel.otel4s.sdk.common.InstrumentationScope
import org.typelevel.otel4s.sdk.context.Context
import org.typelevel.otel4s.sdk.metrics.data.AggregationTemporality
import org.typelevel.otel4s.sdk.metrics.data.MetricData
import org.typelevel.otel4s.sdk.metrics.data.MetricPoints
import org.typelevel.otel4s.sdk.metrics.exemplar.ExemplarFilter
import org.typelevel.otel4s.sdk.metrics.exemplar.TraceContextLookup
import org.typelevel.otel4s.sdk.metrics.exporter.AggregationTemporalitySelector
import org.typelevel.otel4s.sdk.metrics.exporter.InMemoryMetricReader
import org.typelevel.otel4s.sdk.metrics.exporter.MetricProducer
import org.typelevel.otel4s.sdk.metrics.internal.MeterSharedState
import org.typelevel.otel4s.sdk.metrics.internal.exporter.RegisteredReader
import org.typelevel.otel4s.sdk.metrics.scalacheck.Gens
import org.typelevel.otel4s.sdk.metrics.test.PointDataUtils
import org.typelevel.otel4s.sdk.metrics.view.ViewRegistry

import scala.concurrent.duration.FiniteDuration

class SdkObservableUpDownCounterSuite
extends CatsEffectSuite
with ScalaCheckEffectSuite {

private implicit val askContext: Ask[IO, Context] = Ask.const(Context.root)

test("record values") {
PropF.forAllF(
Gens.telemetryResource,
Gens.instrumentationScope,
Gens.timeWindow,
Gens.attributes,
Gens.nonEmptyString,
Gen.option(Gen.alphaNumStr),
Gen.option(Gen.alphaNumStr),
Gen.either(Gen.posNum[Long], Gen.double)
) { (resource, scope, window, attrs, name, unit, description, value) =>
def test[A: MeasurementValue](value: A): IO[Unit] = {
val expected = MetricData(
resource,
scope,
name,
description,
unit,
MetricPoints.sum(
points = PointDataUtils.toNumberPoints(
Vector(value),
attrs,
window
),
monotonic = false,
aggregationTemporality = AggregationTemporality.Cumulative
)
)

for {
reader <- createReader(window.start)
state <- createState(resource, scope, reader, window.start)

_ <- SdkObservableUpDownCounter
.Builder[IO, A](name, state, unit, description)
.createWithCallback(cb => cb.record(value, attrs))
.surround(
state.collectAll(reader, window.end)
)
.assertEquals(Vector(expected))

_ <- SdkObservableUpDownCounter
.Builder[IO, A](name, state, unit, description)
.create(IO.pure(Vector(Measurement(value, attrs))))
.surround(
state.collectAll(reader, window.end)
)
.assertEquals(Vector(expected))
} yield ()
}

value match {
case Left(long) => test[Long](long)
case Right(double) => test[Double](double)
}
}
}

private def createReader(start: FiniteDuration): IO[RegisteredReader[IO]] = {
val inMemory = new InMemoryMetricReader[IO](
emptyProducer,
AggregationTemporalitySelector.alwaysCumulative
)

RegisteredReader.create(start, inMemory)
}

private def createState(
resource: TelemetryResource,
scope: InstrumentationScope,
reader: RegisteredReader[IO],
start: FiniteDuration
): IO[MeterSharedState[IO]] =
Random.scalaUtilRandom[IO].flatMap { implicit R: Random[IO] =>
MeterSharedState.create[IO](
resource,
scope,
start,
ExemplarFilter.alwaysOff,
TraceContextLookup.noop,
ViewRegistry(Vector.empty),
Vector(reader)
)
}

private def emptyProducer: MetricProducer[IO] =
new MetricProducer[IO] {
def produce: IO[Vector[MetricData]] = IO.pure(Vector.empty)
}

}

0 comments on commit 2c4b6b2

Please sign in to comment.