Skip to content

Commit

Permalink
Reloadable buffer pools (zio#6447)
Browse files Browse the repository at this point in the history
* Reloadable buffer pools

* scheduleBackround

* Fix

* Fiber => Fiber.Runtime
  • Loading branch information
vigoo authored Mar 18, 2022
1 parent 3f29d8c commit 7e5d7df
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 39 deletions.
11 changes: 5 additions & 6 deletions core/jvm/src/main/scala/zio/metrics/jvm/BufferPools.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ final case class BufferPools(

object BufferPools {
@silent("JavaConverters")
val live: ZLayer[Clock with JvmMetricsSchedule, Throwable, BufferPools] =
val live: ZLayer[Clock with JvmMetricsSchedule, Throwable, Reloadable[BufferPools]] =
ZLayer.scoped {
for {
bufferPoolMXBeans <- ZIO.attempt(ManagementFactory.getPlatformMXBeans(classOf[BufferPoolMXBean]).asScala)
Expand Down Expand Up @@ -50,10 +50,9 @@ object BufferPools {
})

schedule <- ZIO.service[JvmMetricsSchedule]
_ <- bufferPoolUsedBytes.launch(schedule.value)
_ <- bufferPoolCapacityBytes.launch(schedule.value)
_ <- bufferPoolUsedBuffers.launch(schedule.value)
// TODO: periodically update the list of pools
_ <- bufferPoolUsedBytes.launch(schedule.updateMetrics)
_ <- bufferPoolCapacityBytes.launch(schedule.updateMetrics)
_ <- bufferPoolUsedBuffers.launch(schedule.updateMetrics)
} yield BufferPools(bufferPoolUsedBytes, bufferPoolCapacityBytes, bufferPoolUsedBuffers)
}
}.reloadableAutoFromConfig[JvmMetricsSchedule](config => config.get.reloadDynamicMetrics)
}
6 changes: 3 additions & 3 deletions core/jvm/src/main/scala/zio/metrics/jvm/ClassLoading.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ object ClassLoading {
)

schedule <- ZIO.service[JvmMetricsSchedule]
_ <- loadedClassCount.launch(schedule.value)
_ <- totalLoadedClassCount.launch(schedule.value)
_ <- unloadedClassCount.launch(schedule.value)
_ <- loadedClassCount.launch(schedule.updateMetrics)
_ <- totalLoadedClassCount.launch(schedule.updateMetrics)
_ <- unloadedClassCount.launch(schedule.updateMetrics)
} yield ClassLoading(loadedClassCount, totalLoadedClassCount, unloadedClassCount)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ trait DefaultJvmMetrics {
lazy val live: ZLayer[
Clock with System,
Throwable,
BufferPools with ClassLoading with GarbageCollector with MemoryAllocation with MemoryPools with Standard with Thread with VersionInfo
Reloadable[
BufferPools
] with ClassLoading with GarbageCollector with MemoryAllocation with MemoryPools with Standard with Thread with VersionInfo
] =
jvmMetricsSchedule >>>
(BufferPools.live ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ object GarbageCollector {
})

schedule <- ZIO.service[JvmMetricsSchedule]
_ <- gcCollectionSecondsSum.launch(schedule.value)
_ <- gcCollectionSecondsCount.launch(schedule.value)
_ <- gcCollectionSecondsSum.launch(schedule.updateMetrics)
_ <- gcCollectionSecondsCount.launch(schedule.updateMetrics)
} yield GarbageCollector(gcCollectionSecondsSum, gcCollectionSecondsCount)
}
}
17 changes: 15 additions & 2 deletions core/jvm/src/main/scala/zio/metrics/jvm/JvmMetricsSchedule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,21 @@ package zio.metrics.jvm

import zio._

final case class JvmMetricsSchedule(value: Schedule[Any, Any, Unit])
/**
* Configuration for the JVM metrics
*
* @param updateMetrics
* Schedule for periodically updating each JVM metric
* @param reloadDynamicMetrics
* Schedule for regenerating the dynamic JVM metrics such as buffer pool
* metrics
*/
final case class JvmMetricsSchedule(
updateMetrics: Schedule[Any, Any, Any],
reloadDynamicMetrics: Schedule[Any, Any, Any]
)

object JvmMetricsSchedule {
val default: ULayer[JvmMetricsSchedule] = ZLayer.succeed(JvmMetricsSchedule(Schedule.fixed(10.seconds).unit))
val default: ULayer[JvmMetricsSchedule] =
ZLayer.succeed(JvmMetricsSchedule(Schedule.fixed(10.seconds), Schedule.fixed(1.minute)))
}
16 changes: 8 additions & 8 deletions core/jvm/src/main/scala/zio/metrics/jvm/MemoryPools.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,14 @@ object MemoryPools {
})

schedule <- ZIO.service[JvmMetricsSchedule]
_ <- memoryBytesUsed.launch(schedule.value)
_ <- memoryBytesCommitted.launch(schedule.value)
_ <- memoryBytesMax.launch(schedule.value)
_ <- memoryBytesInit.launch(schedule.value)
_ <- poolBytesUsed.launch(schedule.value)
_ <- poolBytesCommitted.launch(schedule.value)
_ <- poolBytesMax.launch(schedule.value)
_ <- poolBytesInit.launch(schedule.value)
_ <- memoryBytesUsed.launch(schedule.updateMetrics)
_ <- memoryBytesCommitted.launch(schedule.updateMetrics)
_ <- memoryBytesMax.launch(schedule.updateMetrics)
_ <- memoryBytesInit.launch(schedule.updateMetrics)
_ <- poolBytesUsed.launch(schedule.updateMetrics)
_ <- poolBytesCommitted.launch(schedule.updateMetrics)
_ <- poolBytesMax.launch(schedule.updateMetrics)
_ <- poolBytesInit.launch(schedule.updateMetrics)
} yield MemoryPools(
memoryBytesUsed,
memoryBytesCommitted,
Expand Down
17 changes: 8 additions & 9 deletions core/jvm/src/main/scala/zio/metrics/jvm/Standard.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,14 @@ object Standard {
residentMemorySize = Metric.gauge("process_resident_memory_bytes")

schedule <- ZIO.service[JvmMetricsSchedule]
_ <- cpuSecondsTotal.launch(schedule.value)
_ <- processStartTime.launch(schedule.value)
_ <- openFdCount.launch(schedule.value).when(getOpenFileDescriptorCount.isAvailable)
_ <- maxFdCount.launch(schedule.value).when(getMaxFileDescriptorCount.isAvailable)
_ <- ZIO
.acquireRelease(
collectMemoryMetricsLinux(virtualMemorySize, residentMemorySize).repeat(schedule.value).forkDaemon
)(_.interrupt)
.when(isLinux)
_ <- cpuSecondsTotal.launch(schedule.updateMetrics)
_ <- processStartTime.launch(schedule.updateMetrics)
_ <- openFdCount.launch(schedule.updateMetrics).when(getOpenFileDescriptorCount.isAvailable)
_ <- maxFdCount.launch(schedule.updateMetrics).when(getMaxFileDescriptorCount.isAvailable)
_ <-
collectMemoryMetricsLinux(virtualMemorySize, residentMemorySize)
.scheduleBackground(schedule.updateMetrics)
.when(isLinux)

} yield Standard(
cpuSecondsTotal,
Expand Down
15 changes: 7 additions & 8 deletions core/jvm/src/main/scala/zio/metrics/jvm/Thread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,13 @@ object Thread {
)

schedule <- ZIO.service[JvmMetricsSchedule]
_ <- threadsCurrent.launch(schedule.value)
_ <- threadsDaemon.launch(schedule.value)
_ <- threadsPeak.launch(schedule.value)
_ <- threadsStartedTotal.launch(schedule.value)
_ <- threadsDeadlocked.launch(schedule.value)
_ <- threadsDeadlockedMonitor.launch(schedule.value)
_ <- ZIO.acquireRelease(refreshThreadStateCounts(threadMXBean).repeat(schedule.value).forkDaemon)(_.interrupt)

_ <- threadsCurrent.launch(schedule.updateMetrics)
_ <- threadsDaemon.launch(schedule.updateMetrics)
_ <- threadsPeak.launch(schedule.updateMetrics)
_ <- threadsStartedTotal.launch(schedule.updateMetrics)
_ <- threadsDeadlocked.launch(schedule.updateMetrics)
_ <- threadsDeadlockedMonitor.launch(schedule.updateMetrics)
_ <- refreshThreadStateCounts(threadMXBean).scheduleBackground(schedule.updateMetrics)
} yield Thread(
threadsCurrent,
threadsDaemon,
Expand Down
9 changes: 9 additions & 0 deletions core/shared/src/main/scala/zio/ZIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2034,6 +2034,15 @@ sealed trait ZIO[-R, +E, +A] extends Serializable with ZIOPlatformSpecific[R, E,
): ZIO[R1 with Clock, E, B] =
Clock.schedule(self)(schedule)

/**
* Runs this effect according to the specified schedule in a new fiber
* attached to the global scope
*/
final def scheduleBackground[R1 <: R, B](schedule: => Schedule[R1, Any, B])(implicit
trace: ZTraceElement
): ZIO[R1 with Clock with Scope, E, Fiber.Runtime[Any, B]] =
ZIO.acquireRelease(ZIO.interruptible(self.schedule(schedule)).forkDaemon)(_.interrupt)

/**
* Runs this effect according to the specified schedule starting from the
* specified input value.
Expand Down

0 comments on commit 7e5d7df

Please sign in to comment.