Skip to content

Commit

Permalink
Merge pull request #3317 from iRevive/runtime-metrics
Browse files Browse the repository at this point in the history
Implement `IORuntimeMetrics`
  • Loading branch information
djspiewak authored Nov 26, 2024
2 parents 3ef2c3d + 9b7cd4c commit dfda4ae
Show file tree
Hide file tree
Showing 18 changed files with 571 additions and 138 deletions.
24 changes: 21 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,15 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
ProblemFilters.exclude[ReversedMissingMethodProblem]("cats.effect.IOLocal.reset"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("cats.effect.IOLocal.lens"),
// this filter is particulary terrible, because it can also mask real issues :(
ProblemFilters.exclude[DirectMissingMethodProblem]("cats.effect.IOLocal.lens")
ProblemFilters.exclude[DirectMissingMethodProblem]("cats.effect.IOLocal.lens"),
// internal API change, makes CpuStarvationMetrics available on all platforms
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.metrics.JvmCpuStarvationMetrics$NoOpCpuStarvationMetrics"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvationMetrics"),
// package-private classes moved to the `cats.effect.unsafe.metrics` package
ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvation"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvation$"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvationMBean")
) ++ {
if (tlIsScala3.value) {
// Scala 3 specific exclusions
Expand Down Expand Up @@ -839,7 +847,12 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
ProblemFilters.exclude[Problem]("cats.effect.CallbackStackOps.*"),
// introduced by #3695, which ported fiber monitoring to Native
// internal API change
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.ES2021FiberMonitor")
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.ES2021FiberMonitor"),
// internal API change, makes CpuStarvationMetrics available on all platforms
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.metrics.JsCpuStarvationMetrics"),
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.metrics.JsCpuStarvationMetrics$")
)
},
mimaBinaryIssueFilters ++= {
Expand Down Expand Up @@ -874,7 +887,12 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.unsafe.PollingExecutorScheduler$SleepTask"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.QueueExecutorScheduler"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.QueueExecutorScheduler$")
ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.QueueExecutorScheduler$"),
// internal API change, makes CpuStarvationMetrics available on all platforms
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.metrics.NativeCpuStarvationMetrics"),
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.metrics.NativeCpuStarvationMetrics$")
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2020-2024 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.unsafe.metrics

import scala.concurrent.ExecutionContext

private[metrics] abstract class IORuntimeMetricsCompanionPlatform {
this: IORuntimeMetrics.type =>

private[unsafe] def apply(ec: ExecutionContext): IORuntimeMetrics = {
val _ = ec
new IORuntimeMetrics {
private[effect] val cpuStarvationSampler: CpuStarvationSampler =
CpuStarvationSampler()

val cpuStarvation: CpuStarvationMetrics =
CpuStarvationMetrics(cpuStarvationSampler)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,6 @@
* limitations under the License.
*/

package cats.effect.metrics
package cats.effect.unsafe.metrics

import cats.effect.IO

import scala.concurrent.duration.FiniteDuration

private[effect] trait CpuStarvationMetrics {
def incCpuStarvationCount: IO[Unit]

def recordClockDrift(drift: FiniteDuration): IO[Unit]
}
private[metrics] trait IORuntimeMetricsPlatform { this: IORuntimeMetrics => }
4 changes: 2 additions & 2 deletions core/js/src/main/scala/cats/effect/IOApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package cats.effect

import cats.effect.metrics.{CpuStarvationWarningMetrics, JsCpuStarvationMetrics}
import cats.effect.metrics.CpuStarvationWarningMetrics
import cats.effect.std.Console
import cats.effect.tracing.TracingConstants._

Expand Down Expand Up @@ -260,7 +260,7 @@ trait IOApp {
val fiber = Spawn[IO]
.raceOutcome[ExitCode, Nothing](
CpuStarvationCheck
.run(runtimeConfig, JsCpuStarvationMetrics(), onCpuStarvationWarn)
.run(runtimeConfig, runtime.metrics.cpuStarvationSampler, onCpuStarvationWarn)
.background
.surround(run(argList)),
keepAlive)
Expand Down
6 changes: 3 additions & 3 deletions core/jvm/src/main/scala/cats/effect/IOApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,10 @@ trait IOApp {
val queue = this.queue

val fiber =
JvmCpuStarvationMetrics()
.flatMap { cpuStarvationMetrics =>
JvmCpuStarvationMetrics(runtime.metrics.cpuStarvationSampler)
.flatMap { _ =>
CpuStarvationCheck
.run(runtimeConfig, cpuStarvationMetrics, onCpuStarvationWarn)
.run(runtimeConfig, runtime.metrics.cpuStarvationSampler, onCpuStarvationWarn)
.background
}
.surround(ioa)
Expand Down
57 changes: 0 additions & 57 deletions core/jvm/src/main/scala/cats/effect/metrics/CpuStarvation.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,17 @@ package cats.effect.metrics

import cats.effect.{IO, Resource}
import cats.effect.std.Console

import scala.concurrent.duration.FiniteDuration
import cats.effect.unsafe.metrics.{CpuStarvation, CpuStarvationSampler}
import cats.syntax.functor._

import java.io.{PrintWriter, StringWriter}
import java.lang.management.ManagementFactory

import javax.management.{MBeanServer, ObjectName}

private[effect] class JvmCpuStarvationMetrics private (mbean: CpuStarvation)
extends CpuStarvationMetrics {
override def incCpuStarvationCount: IO[Unit] = mbean.incStarvationCount

override def recordClockDrift(drift: FiniteDuration): IO[Unit] = mbean.recordDrift(drift)
}

private[effect] object JvmCpuStarvationMetrics {
private[this] val mBeanObjectName = new ObjectName("cats.effect.metrics:type=CpuStarvation")
private[this] val mBeanObjectName = new ObjectName(
"cats.effect.unsafe.metrics:type=CpuStarvation")

private[this] def warning(th: Throwable) = {
val exceptionWriter = new StringWriter()
Expand All @@ -46,28 +40,18 @@ private[effect] object JvmCpuStarvationMetrics {
|""".stripMargin
}

private[this] class NoOpCpuStarvationMetrics extends CpuStarvationMetrics {
override def incCpuStarvationCount: IO[Unit] = IO.unit

override def recordClockDrift(drift: FiniteDuration): IO[Unit] = IO.unit
}

private[effect] def apply(): Resource[IO, CpuStarvationMetrics] = {
val acquire: IO[(MBeanServer, JvmCpuStarvationMetrics)] = for {
private[effect] def apply(metrics: CpuStarvationSampler): Resource[IO, Unit] = {
val acquire: IO[MBeanServer] = for {
mBeanServer <- IO.delay(ManagementFactory.getPlatformMBeanServer)
mBean <- CpuStarvation()
mBean <- IO.pure(new CpuStarvation(metrics))
// To allow user-defined program to use the compute pool from the beginning,
// here we use `IO.delay` rather than `IO.blocking`.
_ <- IO.delay(mBeanServer.registerMBean(mBean, mBeanObjectName))
} yield (mBeanServer, new JvmCpuStarvationMetrics(mBean))
} yield mBeanServer

Resource
.make(acquire) {
case (mbeanServer, _) => IO.blocking(mbeanServer.unregisterMBean(mBeanObjectName))
}
.map(_._2)
.handleErrorWith[CpuStarvationMetrics] { th =>
Resource.eval(Console[IO].errorln(warning(th))).map(_ => new NoOpCpuStarvationMetrics)
}
.make(acquire)(mbeanServer => IO.blocking(mbeanServer.unregisterMBean(mBeanObjectName)))
.void
.handleErrorWith[Unit](th => Resource.eval(Console[IO].errorln(warning(th))))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@
* limitations under the License.
*/

package cats.effect.metrics
package cats.effect.unsafe.metrics

import cats.effect.IO
private[effect] final class CpuStarvation(
sampler: CpuStarvationSampler
) extends CpuStarvationMBean {

import scala.concurrent.duration.FiniteDuration
def getCpuStarvationCount(): Long =
sampler.cpuStarvationCount()

private[effect] class JsCpuStarvationMetrics extends CpuStarvationMetrics {
override def incCpuStarvationCount: IO[Unit] = IO.unit
def getMaxClockDriftMs(): Long =
sampler.clockDriftMaxMs()

override def recordClockDrift(drift: FiniteDuration): IO[Unit] = IO.unit
}

private[effect] object JsCpuStarvationMetrics {
private[effect] def apply(): CpuStarvationMetrics = new JsCpuStarvationMetrics
def getCurrentClockDriftMs(): Long =
sampler.clockDriftCurrentMs()
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
* limitations under the License.
*/

package cats.effect.metrics
package cats.effect.unsafe.metrics

/**
* An MBean interfaces for monitoring when CPU starvation occurs.
*/
private[metrics] trait CpuStarvationMBean {
private[unsafe] trait CpuStarvationMBean {

/**
* Returns the number of times CPU starvation has occurred.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2020-2024 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.unsafe.metrics

import scala.concurrent.ExecutionContext

private[metrics] abstract class IORuntimeMetricsCompanionPlatform {
this: IORuntimeMetrics.type =>

private[unsafe] def apply(ec: ExecutionContext): IORuntimeMetrics =
new IORuntimeMetrics {
private[effect] val cpuStarvationSampler: CpuStarvationSampler =
CpuStarvationSampler()

val cpuStarvation: CpuStarvationMetrics =
CpuStarvationMetrics(cpuStarvationSampler)

val workStealingThreadPool: Option[WorkStealingPoolMetrics] =
WorkStealingPoolMetrics(ec)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@
* limitations under the License.
*/

package cats.effect.metrics
package cats.effect.unsafe.metrics

import cats.effect.IO
private[metrics] trait IORuntimeMetricsPlatform { this: IORuntimeMetrics =>

import scala.concurrent.duration.FiniteDuration

private[effect] class NativeCpuStarvationMetrics extends CpuStarvationMetrics {
override def incCpuStarvationCount: IO[Unit] = IO.unit

override def recordClockDrift(drift: FiniteDuration): IO[Unit] = IO.unit
}

private[effect] object NativeCpuStarvationMetrics {
private[effect] def apply(): CpuStarvationMetrics = new NativeCpuStarvationMetrics
/**
* Returns work-stealing thread pool metrics.
*
* @example
* {{{
* val runtime: IORuntime = ???
* val totalWorkers = runtime.metrics.workStealingThreadPool.map(_.compute.workerThreadCount())
* }}}
*/
def workStealingThreadPool: Option[WorkStealingPoolMetrics]
}
Loading

0 comments on commit dfda4ae

Please sign in to comment.