From 32ce8262e3096f78e53aaac6fa3dc7eb8d1431fb Mon Sep 17 00:00:00 2001 From: Luca Canali Date: Tue, 26 Sep 2023 13:42:35 +0200 Subject: [PATCH] Add plugin to use Pyroscope agents for monitoring Spark workloads --- README.md | 63 +++++++++++++--- build.sbt | 9 ++- src/main/scala/ch/cern/PyroscopePlugin.scala | 77 ++++++++++++++++++++ 3 files changed, 136 insertions(+), 13 deletions(-) create mode 100644 src/main/scala/ch/cern/PyroscopePlugin.scala diff --git a/README.md b/README.md index 0705b85..7b84e1b 100644 --- a/README.md +++ b/README.md @@ -2,17 +2,16 @@ ![SparkPlugins CI](https://github.com/cerndb/SparkPlugins/workflows/SparkPlugins%20CI/badge.svg?branch=master&event=push) [![Maven Central](https://maven-badges.herokuapp.com/maven-central/ch.cern.sparkmeasure/spark-plugins_2.12/badge.svg)](https://maven-badges.herokuapp.com/maven-central/ch.cern.sparkmeasure/spark-plugins_2.12) -This repository contains code and examples of how to use Apache Spark Plugins. +This repository contains code and examples of how to use Apache Spark Plugins. Spark plugins provide an interface, and related configuration, for injecting custom code on executors as they are initialized. -Spark plugins can also be used to implement custom extensions to the Spark metrics system. ### Motivations - One important use case for deploying Spark Plugins is extending Spark instrumentation with custom metrics. - Other use cases include running custom actions when the executors start up, typically useful for integrating with - external systems. -- This repo provides code and examples of plugins applied to measuring Spark on K8S, -Spark I/O from cloud Filesystems, OS metrics, and custom application metrics. + external systems, such as monitoring systems. +- This repo provides code and examples of plugins applied to measuring Spark on cluster resources (YARN, K8S, Standalone), + including measuring Spark I/O from cloud Filesystems, OS metrics, custom application metrics, and integrations with external systems like Pyroscope. - Note: The code in this repo is for Spark 3.x. For Spark 2.x, see instead [Executor Plugins for Spark 2.4](https://github.com/cerndb/SparkExecutorPlugins2.4) @@ -21,9 +20,9 @@ Spark I/O from cloud Filesystems, OS metrics, and custom application metrics. and can be used to run custom code at the startup of Spark executors and driver. - Plugins basic configuration: `--conf spark.plugins=` - Plugin JARs need to be made available to Spark executors - - you can distribute the plugin code to the executors using `--jars` and `--packages`. - - for K8S you can also consider making the jars available directly in the container image. -- Most of the Plugins described in this repo are intended to extend the Spark Metrics System. + - you can distribute the plugin code to the executors using `--jars` and `--packages` + - for K8S you can also consider making the jars available directly in the container image +- Most of the Plugins described in this repo are intended to extend the Spark Metrics System - See the details on the Spark metrics system at [Spark Monitoring documentation](https://spark.apache.org/docs/latest/monitoring.html#metrics). - You can find the metrics generated by the plugins in the Spark metrics system stream under the namespace `namespace=plugin.` @@ -32,7 +31,7 @@ Spark I/O from cloud Filesystems, OS metrics, and custom application metrics. ### Related Work and Spark Performance Dashboard - Spark Performance Dashboard - a solution to ingest and visualize Spark metrics - - Link to the repo on [how to deploy a Spark Performance Dashboard using Spark metrics](https://github.com/cerndb/spark-dashboard) + - link to the repo on [how to deploy a Spark Performance Dashboard using Spark metrics](https://github.com/cerndb/spark-dashboard) - DATA+AI summit 2020 talk [What is New with Apache Spark Performance Monitoring in Spark 3.0](https://databricks.com/session_eu20/what-is-new-with-apache-spark-performance-monitoring-in-spark-3-0) - DATA+AI summit 2021 talk [Monitor Apache Spark 3 on Kubernetes using Metrics and Plugins](https://databricks.com/session_na21/monitor-apache-spark-3-on-kubernetes-using-metrics-and-plugins) @@ -71,6 +70,52 @@ Spark I/O from cloud Filesystems, OS metrics, and custom application metrics. --- ## Plugins in this Repository +### Plugin for integrating with Pyroscope +[Grafana Pyroscope](https://grafana.com/oss/pyroscope/) is a tool for continuous profiling and Flame Graph visualization. This plugin allows to integrate Apache Spark and Pyroscope. +For details see: +[How to profile Spark with Pyroscope](https://github.com/LucaCanali/Miscellaneous/blob/master/Spark_Notes/Tools_Spark_Pyroscope_FlameGraph.md) + +An example of how to put all the configuration together and start Spark on a cluster with Pyroscope Flame Graph +continuous monitoring. Example: +1. Start Pyroscope +- Download from https://github.com/grafana/pyroscope/releases +- CLI start: `./pyroscope -server.http-listen-port 5040` +- Or use docker: `docker run -it -p 5040:4040 grafana/pyroscope` + +2. Spark Spark (spark-shell, PySpark, spark-submit +``` +bin/spark-shell --master yarn \ + --packages ch.cern.sparkmeasure:spark-plugins_2.12:0.3,io.pyroscope:agent:0.12.0 \ # update to use the latest versions + --conf spark.plugins=ch.cern.PyroscopePlugin \ + --conf spark.pyroscope.server="http://:5040" # match with the server and port used when starting Pyroscope +``` + +**Spark configurations:** +This plugin adds the following configurations: +``` + --conf spark.pyroscope.server - > default "http://localhost:4040", update to match the server name and port used by Pyroscope + --conf spark.pyroscope.applicationName -> default spark.conf.get("spark.app.id") + --conf spark.pyroscope.eventType -> default ITIMER, possible values ITIMER, CPU, WALL, ALLOC, LOCK +``` + +**Example:** +This is an example of how to use the configuration programmatically (using PySpark): +``` +from pyspark.sql import SparkSession + +# Get the Spark session +spark = (SparkSession.builder. + appName("Instrumented app").master("yarn") + .config("spark.executor.memory","16g") + .config("spark.executor.cores","4") + .config("spark.executor.instances", 2) + .config("spark.jars.packages", "ch.cern.sparkmeasure:spark-plugins_2.12:0.3,io.pyroscope:agent:0.12.0") + .config("spark.plugins", "ch.cern.PyroscopePlugin") + .config("spark.pyroscope.server", "http://:5040") + .getOrCreate() + ) +``` + ### OS metrics instrumentation with cgroups, for Spark on Kubernetes - [CgroupMetrics](src/main/scala/ch/cern/CgroupMetrics.scala) - Configure with: `--conf spark.plugins=ch.cern.CgroupMetrics` diff --git a/build.sbt b/build.sbt index f340192..f43290e 100644 --- a/build.sbt +++ b/build.sbt @@ -3,14 +3,15 @@ name := "spark-plugins" version := "0.3-SNAPSHOT" isSnapshot := true -scalaVersion := "2.12.17" -crossScalaVersions := Seq("2.12.17", "2.13.8") +scalaVersion := "2.12.18" +crossScalaVersions := Seq("2.12.18", "2.13.8") licenses += ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0")) -libraryDependencies += "io.dropwizard.metrics" % "metrics-core" % "4.2.15" +libraryDependencies += "io.dropwizard.metrics" % "metrics-core" % "4.2.19" libraryDependencies += "org.apache.hadoop" % "hadoop-client-api" % "3.3.4" -libraryDependencies += "org.apache.spark" %% "spark-core" % "3.4.1" +libraryDependencies += "io.pyroscope" % "agent" % "0.12.0" +libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.0" // publishing to Sonatype Nexus repository and Maven publishMavenStyle := true diff --git a/src/main/scala/ch/cern/PyroscopePlugin.scala b/src/main/scala/ch/cern/PyroscopePlugin.scala new file mode 100644 index 0000000..40f35a7 --- /dev/null +++ b/src/main/scala/ch/cern/PyroscopePlugin.scala @@ -0,0 +1,77 @@ +package ch.cern + +import io.pyroscope.javaagent.PyroscopeAgent +import io.pyroscope.javaagent.EventType +import io.pyroscope.javaagent.config.Config +import io.pyroscope.http.Format + +import java.net.InetAddress +import java.util.{Map => JMap} +import scala.collection.JavaConverters._ + +import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} +import org.apache.spark.SparkContext + +/** + * Use this Plugin to configure Pyroscope java agent data collection on Spark executors + * See https://grafana.com/docs/pyroscope/latest/configure-client/language-sdks/java/ + * + * This plugin adds the following configurations: + * --conf spark.pyroscope.server - > default "http://localhost:4040", update to match the server name and port used by Pyroscope + * --conf spark.pyroscope.applicationName -> default spark.conf.get("spark.app.id") + * --conf spark.pyroscope.eventType -> default ITIMER, possible values ITIMER, CPU, WALL, ALLOC, LOCK +*/ +class PyroscopePlugin extends SparkPlugin { + + // Return the plugin's driver-side component. + override def driverPlugin(): DriverPlugin = { + new DriverPlugin() { + override def init(sc: SparkContext, myContext: PluginContext): JMap[String, String] = { + Map.empty[String, String].asJava + } + } + } + + // Return the plugin's executor-side component. + // This implements an executor plugin to set up the configuration for Pyroscope + override def executorPlugin(): ExecutorPlugin = { + new ExecutorPlugin() { + override def init(myContext: PluginContext, extraConf: JMap[String, String]): Unit = { + + // Pyroscope server URL, match the URL with your Pyroscope runtime + val pyroscopeServerUrl = myContext.conf.get("spark.pyroscope.server", "http://localhost:4040") + + // this will be used for the application name + // note, in local mode spark.app.id in null, we use "local" to handle the case + val pyroscopeApplicationName = myContext.conf.get("spark.pyroscope.applicationName", + myContext.conf.get("spark.app.id", "local")) + + val executorId = myContext.executorID + val localHostname = InetAddress.getLocalHost.getHostName + + // this sets the event type to profile, default ITIMER, possible values ITIMER, CPU, WALL, ALLOC, LOCK + val pyroscopeEventType = myContext.conf.get("spark.pyroscope.eventType", "ITIMER") + + val eventType = pyroscopeEventType.toUpperCase match { + case "ITIMER" => EventType.ITIMER + case "CPU" => EventType.CPU + case "WALL" => EventType.WALL + case "ALLOC" => EventType.ALLOC + case "LOCK" => EventType.LOCK + case _ => throw new IllegalArgumentException(s"Invalid event type: $pyroscopeEventType") + } + + PyroscopeAgent.start( + new Config.Builder() + .setApplicationName(pyroscopeApplicationName) + .setProfilingEvent(eventType) + .setFormat(Format.JFR) + .setServerAddress(pyroscopeServerUrl) + .setLabels(Map("executorId" -> executorId.toString, "hostname" -> localHostname).asJava) + .build() + ) + } + } + } + +}