Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observable instruments #162

Merged
merged 4 commits into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ object TraceBenchmark {
.setTracerProvider(tracerProvider)
.build()

OtelJava.forSync(otel).flatMap {
OtelJava.forAsync(otel).flatMap {
_.tracerProvider.tracer("trace-benchmark").get
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.typelevel.otel4s.metrics

import cats.Applicative
import cats.effect.Resource

trait Meter[F[_]] {

Expand Down Expand Up @@ -63,6 +64,16 @@ trait Meter[F[_]] {
name: String
): SyncInstrumentBuilder[F, UpDownCounter[F, Long]]

def observableGauge(
name: String
): ObservableInstrumentBuilder[F, Double, ObservableGauge]
def observableCounter(
name: String
): ObservableInstrumentBuilder[F, Long, ObservableCounter]
def observableUpDownCounter(
name: String
): ObservableInstrumentBuilder[F, Long, ObservableUpDownCounter]

}

object Meter {
Expand Down Expand Up @@ -96,5 +107,46 @@ object Meter {
def withDescription(description: String): Self = this
def create: F[UpDownCounter[F, Long]] = F.pure(UpDownCounter.noop)
}
def observableGauge(
name: String
): ObservableInstrumentBuilder[F, Double, ObservableGauge] =
new ObservableInstrumentBuilder[F, Double, ObservableGauge] {
type Self = this.type

def withUnit(unit: String): Self = this
def withDescription(description: String): Self = this
def createWithCallback(
cb: ObservableMeasurement[F, Double] => F[Unit]
): Resource[F, ObservableGauge] =
Resource.pure(new ObservableGauge {})
}

def observableCounter(
name: String
): ObservableInstrumentBuilder[F, Long, ObservableCounter] =
new ObservableInstrumentBuilder[F, Long, ObservableCounter] {
type Self = this.type

def withUnit(unit: String): Self = this
def withDescription(description: String): Self = this
def createWithCallback(
cb: ObservableMeasurement[F, Long] => F[Unit]
): Resource[F, ObservableCounter] =
Resource.pure(new ObservableCounter {})
}

def observableUpDownCounter(
name: String
): ObservableInstrumentBuilder[F, Long, ObservableUpDownCounter] =
new ObservableInstrumentBuilder[F, Long, ObservableUpDownCounter] {
type Self = this.type

def withUnit(unit: String): Self = this
def withDescription(description: String): Self = this
def createWithCallback(
cb: ObservableMeasurement[F, Long] => F[Unit]
): Resource[F, ObservableUpDownCounter] =
Resource.pure(new ObservableUpDownCounter {})
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@

package org.typelevel.otel4s.metrics

trait ObservableCounter[F[_], A]
trait ObservableCounter
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what to do with these. It seems strange that they have these type parameters, when we really only use them in the callback. Now I tried to remove them, and that seems to aid in testing at least.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been thinking about this for a while and I tend to agree.

Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@

package org.typelevel.otel4s.metrics

trait ObservableGauge[F[_], A]
trait ObservableGauge
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package org.typelevel.otel4s.metrics

trait ObservableInstrumentBuilder[F[_], A] {
type Self <: ObservableInstrumentBuilder[F, A]
import cats.effect.Resource

trait ObservableInstrumentBuilder[F[_], A, Instrument] {
type Self <: ObservableInstrumentBuilder[F, A, Instrument]

/** Sets the unit of measure for this instrument.
*
Expand All @@ -43,7 +45,20 @@ trait ObservableInstrumentBuilder[F[_], A] {
*/
def withDescription(description: String): Self

/** Creates an instrument with the given `unit` and `description` (if any).
/** Creates an instrument with the given callback, using `unit` and
* `description` (if any).
*
* The callback will be called when the instrument is being observed.
*
* The callback is expected to abide by the following restrictions:
* - Short-living and (ideally) non-blocking
* - Run in a finite amount of time
* - Safe to call repeatedly, across multiple threads
*
* @param cb
* The callback which observes measurements when invoked
*/
def create: F[A]
def createWithCallback(
cb: ObservableMeasurement[F, A] => F[Unit]
): Resource[F, Instrument]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.typelevel.otel4s.metrics

import org.typelevel.otel4s.Attribute

trait ObservableMeasurement[F[_], A] {

/** Records a value with a set of attributes.
*
* @param value
* the value to record
*
* @param attributes
* the set of attributes to associate with the value
*/
def record(value: A, attributes: Attribute[_]*): F[Unit]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scaladoc:

Suggested change
def record(value: A, attributes: Attribute[_]*): F[Unit]
/** Records a value with a set of attributes.
*
* @param value
* the value to record
*
* @param attributes
* the set of attributes to associate with the value
*/
def record(value: A, attributes: Attribute[_]*): F[Unit]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@christiankjaer it would be nice to preserve laziness via macro. It should be nearly identical to HistogramMacro.
Could you take a look there too?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. The reason why I didn't look at the macro is that I kinda assumed that the observable stuff is for instrumenting from the outside, and the noop is then rarely needed. I will take a look.

Copy link
Contributor

@iRevive iRevive Apr 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

observable stuff is for instrumenting from the outside

Good point. Since the callback is a function ObservableMeasurement[F, A] => F[Unit], it will never be executed if we use a noop implementation. So there's no need to worry about allocations.

So, we can keep it as-is and skip macro.

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@

package org.typelevel.otel4s.metrics

trait ObservableUpDownCounter[F, A]
trait ObservableUpDownCounter
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.typelevel.otel4s
package metrics

import cats.effect.IO
import cats.effect.Ref
import cats.effect.Resource
import munit.CatsEffectSuite

class ObservableSuite extends CatsEffectSuite {

import ObservableSuite._

test("observable test") {

for {
_ <- new InMemoryObservableInstrumentBuilder[Double]
.createWithCallback(instrument =>
instrument.record(2.0) *> instrument.record(3.0)
)
.use { r =>
for {
_ <- r.observations.get.assertEquals(List.empty)
_ <- r.run
_ <- r.observations.get.assertEquals(
List(Record(3.0, Seq.empty), Record(2.0, Seq.empty))
)
} yield ()
}
} yield ()

}

}

object ObservableSuite {

final case class Record[A](value: A, attributes: Seq[Attribute[_]])

final case class InMemoryObservable[A](
callback: ObservableMeasurement[IO, A] => IO[Unit],
observations: Ref[IO, List[Record[A]]]
) {
def run: IO[Unit] =
callback(new ObservableMeasurement[IO, A] {
def record(value: A, attributes: Attribute[_]*): IO[Unit] =
observations.update(Record(value, attributes) :: _)
})
}

class InMemoryObservableInstrumentBuilder[A]
extends ObservableInstrumentBuilder[IO, A, InMemoryObservable[A]] {

type Self =
ObservableInstrumentBuilder[IO, A, InMemoryObservable[A]]

def withUnit(unit: String): Self = this

def withDescription(description: String): Self = this

def createWithCallback(
cb: ObservableMeasurement[IO, A] => IO[Unit]
): Resource[IO, InMemoryObservable[A]] =
Resource
.eval(Ref.of[IO, List[Record[A]]](List.empty))
.map(obs => InMemoryObservable[A](cb, obs))

}

}
10 changes: 5 additions & 5 deletions examples/src/main/scala/KleisliExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,26 @@
*/

import cats.data.Kleisli
import cats.effect.Async
import cats.effect.IO
import cats.effect.IOApp
import cats.effect.Resource
import cats.effect.Sync
import cats.mtl.Local
import io.opentelemetry.api.GlobalOpenTelemetry
import org.typelevel.otel4s.java.OtelJava
import org.typelevel.otel4s.trace.Tracer
import org.typelevel.vault.Vault

object KleisliExample extends IOApp.Simple {
def work[F[_]: Sync: Tracer] =
Tracer[F].span("work").surround(Sync[F].delay(println("I'm working")))
def work[F[_]: Async: Tracer] =
Tracer[F].span("work").surround(Async[F].delay(println("I'm working")))

def tracerResource[F[_]](implicit
F: Sync[F],
F: Async[F],
L: Local[F, Vault]
): Resource[F, Tracer[F]] =
Resource
.eval(Sync[F].delay(GlobalOpenTelemetry.get))
.eval(Async[F].delay(GlobalOpenTelemetry.get))
.map(OtelJava.local[F])
.evalMap(_.tracerProvider.get("kleisli-example"))

Expand Down
51 changes: 51 additions & 0 deletions examples/src/main/scala/ObservableExample.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import cats.effect.IO
import cats.effect.IOApp
import cats.effect.Resource
import io.opentelemetry.api.GlobalOpenTelemetry
import org.typelevel.otel4s.java.OtelJava
import org.typelevel.otel4s.metrics.ObservableCounter

import java.lang.management.ManagementFactory
import javax.management.MBeanServer
import javax.management.ObjectName

object ObservableExample extends IOApp.Simple {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Thanks for the relevant example.


val mbeanServer: MBeanServer = ManagementFactory.getPlatformMBeanServer
val mbeanName = new ObjectName("cats.effect.metrics:type=CpuStarvation")

def meterResource: Resource[IO, ObservableCounter] =
Resource
.eval(IO(GlobalOpenTelemetry.get))
.evalMap(OtelJava.forAsync[IO])
.evalMap(_.meterProvider.get("observable-example"))
.flatMap(
_.observableCounter("cats-effect-runtime-cpu-starvation-count")
.withDescription("CE runtime starvation count")
.createWithCallback(obs =>
IO(
mbeanServer
.getAttribute(mbeanName, "CpuStarvationCount")
.asInstanceOf[Long]
).flatMap(c => obs.record(c))
)
)

def run: IO[Unit] = meterResource.useForever
}
2 changes: 1 addition & 1 deletion examples/src/main/scala/TraceExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ object TraceExample extends IOApp.Simple {
def globalOtel4s: Resource[IO, Otel4s[IO]] =
Resource
.eval(IO(GlobalOpenTelemetry.get))
.evalMap(OtelJava.forSync[IO])
.evalMap(OtelJava.forAsync[IO])

/** Run Method
*
Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/scala/TracingExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ object TracingExample extends IOApp.Simple {
def globalOtel4s: Resource[IO, Otel4s[IO]] =
Resource
.eval(IO(GlobalOpenTelemetry.get))
.evalMap(OtelJava.forSync[IO])
.evalMap(OtelJava.forAsync[IO])

def run: IO[Unit] = {
globalOtel4s.use { (otel4s: Otel4s[IO]) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ object OtelJava {
* @return
* An effect of an [[org.typelevel.otel4s.Otel4s]] resource.
*/
def forSync[F[_]: LiftIO: Sync](jOtel: JOpenTelemetry): F[Otel4s[F]] =
def forAsync[F[_]: LiftIO: Async](jOtel: JOpenTelemetry): F[Otel4s[F]] =
IOLocal(Vault.empty)
.map { implicit ioLocal: IOLocal[Vault] =>
local[F](jOtel)
Expand All @@ -55,8 +55,8 @@ object OtelJava {

def local[F[_]](
jOtel: JOpenTelemetry
)(implicit F: Sync[F], L: Local[F, Vault]): Otel4s[F] = {
val metrics = Metrics.forSync(jOtel)
)(implicit F: Async[F], L: Local[F, Vault]): Otel4s[F] = {
val metrics = Metrics.forAsync(jOtel)
val traces = Traces.local(jOtel)
new Otel4s[F] {
def propagators: ContextPropagators[F] =
Expand Down Expand Up @@ -85,5 +85,5 @@ object OtelJava {
.make(acquire)(sdk =>
asyncFromCompletableResultCode(Sync[F].delay(sdk.shutdown()))
)
.evalMap(forSync[F])
.evalMap(forAsync[F])
}
Loading