Skip to content

Commit

Permalink
Support to report spark metrics to InfluxDB
Browse files Browse the repository at this point in the history
  • Loading branch information
zzcclp committed Oct 8, 2021
1 parent 4fddfbf commit 8788baa
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 33 deletions.
24 changes: 16 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,29 @@
<modelVersion>4.0.0</modelVersion>

<groupId>io.kyligence</groupId>
<artifactId>spark-influxdb-sink_2.11</artifactId>
<version>1.0.0</version>
<artifactId>spark-influxdb-sink</artifactId>
<version>1.1.0</version>
<packaging>jar</packaging>
<name>Spark Influxdb Sink</name>

<properties>
<encoding>UTF-8</encoding>
<scala.binary.version>2.11</scala.binary.version>
<spark.version>2.4.1-kylin-r74</spark.version>
<!-- <spark.version>2.4.1-kylin-r74</spark.version> -->
<spark.version>2.4.8</spark.version>
</properties>

<profiles>
<profile>
<id>spark-3.1</id>
<properties>
<scala.binary.version>2.12</scala.binary.version>
<!-- <spark.version>3.1.1-kylin-4.x-r33</spark.version> -->
<spark.version>3.1.1</spark.version>
</properties>
</profile>
</profiles>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down Expand Up @@ -109,7 +121,7 @@
</goals>
<configuration>
<!-- <checkMultipleScalaVersions>false</checkMultipleScalaVersions> -->
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaCompatVersion>${scala.binary.version}</scalaCompatVersion>
</configuration>
</execution>
<execution>
Expand Down Expand Up @@ -166,10 +178,6 @@
<pattern>org.tukaani</pattern>
<shadedPattern>kylin_sis_shaded.org.tukaani</shadedPattern>
</relocation>
<relocation>
<pattern>org.slf4j</pattern>
<shadedPattern>kylin_sis_shaded.org.slf4j</shadedPattern>
</relocation>
<relocation>
<pattern>net.logstash</pattern>
<shadedPattern>kylin_sis_shaded.net.logstash</shadedPattern>
Expand Down
151 changes: 130 additions & 21 deletions src/main/java/com/codahale/metrics/influxdb/InfluxDbReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.ConnectException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Locale;
Expand Down Expand Up @@ -109,7 +108,7 @@ public InfluxDbReporter build(InfluxDB influxDB, String dataBase) {
private final InfluxDB influxDB;
private final String dataBase;

private final String profilerName = "DAGScheduler";
private final String profilerNamePrefix = "SparkMetrics";

private InfluxDbReporter(MetricRegistry registry,
InfluxDB influxDB,
Expand All @@ -134,33 +133,148 @@ public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> c
final long now = System.currentTimeMillis();

try {
Map<String, Map<String, Gauge>> groupedGauges = groupGauges(gauges);
// BatchPoints
BatchPoints batchPoints = BatchPoints.database(this.dataBase)
.consistency(InfluxDB.ConsistencyLevel.ONE)
.retentionPolicy("autogen")
.build();

/* Map<String, Map<String, Gauge>> groupedGauges = groupGauges(gauges);
for (Map.Entry<String, Map<String, Gauge>> entry : groupedGauges.entrySet()) {
reportGaugeGroup(entry.getKey(), entry.getValue(), now);
reportGaugeGroup(entry.getKey(), entry.getValue(), now, batchPoints);
} */
for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
reportGauge(entry.getKey(), entry.getValue(), now, batchPoints);
}
this.influxDB.flush();

// TODO : support to reporter these types of data
/* for (Map.Entry<String, Counter> entry : counters.entrySet()) {
reportCounter(timestamp, entry.getKey(), entry.getValue());
for (Map.Entry<String, Counter> entry : counters.entrySet()) {
reportCounter(entry.getKey(), entry.getValue(), now, batchPoints);
}

for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
reportHistogram(timestamp, entry.getKey(), entry.getValue());
reportHistogram(entry.getKey(), entry.getValue(), now, batchPoints);
}

for (Map.Entry<String, Meter> entry : meters.entrySet()) {
reportMeter(timestamp, entry.getKey(), entry.getValue());
reportMeter(entry.getKey(), entry.getValue(), now, batchPoints);
}

for (Map.Entry<String, Timer> entry : timers.entrySet()) {
reportTimer(timestamp, entry.getKey(), entry.getValue());
} */
reportTimer(entry.getKey(), entry.getValue(), now, batchPoints);
}

// Write
this.influxDB.write(batchPoints);
} catch (Exception e) {
LOGGER.warn("Unable to report to InfluxDB with error '{}'. Discarding data.", e.getMessage());
}
}

private void reportGauge(String name, Gauge gauge, long now, BatchPoints batchPoints) {
Map<String, Object> fields = new HashMap<String, Object>();
Object gaugeValue = sanitizeGauge(gauge.getValue());
fields.put("value", gaugeValue);

// Point
Point point = Point.measurement(profilerNamePrefix + "-Gauge")
.time(now, TimeUnit.MILLISECONDS)
.fields(fields)
.tag("metricsTag", name)
.build();

batchPoints.point(point);
}

private void reportCounter(String name, Counter counter, long now, BatchPoints batchPoints) {
Map<String, Object> fields = new HashMap<String, Object>();
fields.put("count", counter.getCount());

// Point
Point point = Point.measurement(profilerNamePrefix + "-Counter")
.time(now, TimeUnit.MILLISECONDS)
.fields(fields)
.tag("metricsTag", name)
.build();

batchPoints.point(point);
}

private void reportHistogram(String name, Histogram histogram,
long now, BatchPoints batchPoints) {
final Snapshot snapshot = histogram.getSnapshot();
Map<String, Object> fields = new HashMap<String, Object>();
fields.put("count", histogram.getCount());
fields.put("min", snapshot.getMin());
fields.put("max", snapshot.getMax());
fields.put("mean", snapshot.getMean());
fields.put("stddev", snapshot.getStdDev());
fields.put("p50", snapshot.getMedian());
fields.put("p75", snapshot.get75thPercentile());
fields.put("p95", snapshot.get95thPercentile());
fields.put("p98", snapshot.get98thPercentile());
fields.put("p99", snapshot.get99thPercentile());
fields.put("p999", snapshot.get999thPercentile());

// Point
Point point = Point.measurement(profilerNamePrefix + "-Histogram")
.time(now, TimeUnit.MILLISECONDS)
.fields(fields)
.tag("metricsTag", name)
.build();

batchPoints.point(point);
}

private void reportMeter(String name, Meter meter, long now, BatchPoints batchPoints) {
Map<String, Object> fields = new HashMap<String, Object>();
fields.put("count", meter.getCount());
fields.put("mean_rate", convertRate(meter.getMeanRate()));
fields.put("m1_rate", convertRate(meter.getOneMinuteRate()));
fields.put("m5_rate", convertRate(meter.getFiveMinuteRate()));
fields.put("m15_rate", convertRate(meter.getFifteenMinuteRate()));
fields.put("rate_unit", "events/" + getRateUnit());

// Point
Point point = Point.measurement(profilerNamePrefix + "-Meter")
.time(now, TimeUnit.MILLISECONDS)
.fields(fields)
.tag("metricsTag", name)
.build();

batchPoints.point(point);
}

private void reportTimer(String name, Timer timer, long now, BatchPoints batchPoints) {
final Snapshot snapshot = timer.getSnapshot();
Map<String, Object> fields = new HashMap<String, Object>();
fields.put("count", timer.getCount());
fields.put("min", convertDuration(snapshot.getMin()));
fields.put("max", convertDuration(snapshot.getMax()));
fields.put("mean", convertDuration(snapshot.getMean()));
fields.put("stddev", convertDuration(snapshot.getStdDev()));
fields.put("p50", convertDuration(snapshot.getMedian()));
fields.put("p75", convertDuration(snapshot.get75thPercentile()));
fields.put("p95", convertDuration(snapshot.get95thPercentile()));
fields.put("p98", convertDuration(snapshot.get98thPercentile()));
fields.put("p99", convertDuration(snapshot.get99thPercentile()));
fields.put("p999", convertDuration(snapshot.get999thPercentile()));
fields.put("mean_rate", convertRate(timer.getMeanRate()));
fields.put("m1_rate", convertRate(timer.getOneMinuteRate()));
fields.put("m5_rate", convertRate(timer.getFiveMinuteRate()));
fields.put("m15_rate", convertRate(timer.getFifteenMinuteRate()));
fields.put("rate_unit", "calls/" + getRateUnit());
fields.put("duration_unit", getDurationUnit());

// Point
Point point = Point.measurement(profilerNamePrefix + "-Timer")
.time(now, TimeUnit.MILLISECONDS)
.fields(fields)
.tag("metricsTag", name)
.build();

batchPoints.point(point);
}

private Map<String, Map<String, Gauge>> groupGauges(SortedMap<String, Gauge> gauges) {
Map<String, Map<String, Gauge>> groupedGauges = new HashMap<>();
for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
Expand Down Expand Up @@ -201,7 +315,8 @@ private Object sanitizeGauge(Object value) {
return finalValue;
}

private void reportGaugeGroup(String name, Map<String, Gauge> gaugeGroup, long now) {
private void reportGaugeGroup(String name, Map<String, Gauge> gaugeGroup, long now,
BatchPoints batchPoints) {
Map<String, Object> fields = new HashMap<String, Object>();
for (Map.Entry<String, Gauge> entry : gaugeGroup.entrySet()) {
Object gaugeValue = sanitizeGauge(entry.getValue().getValue());
Expand All @@ -211,19 +326,13 @@ private void reportGaugeGroup(String name, Map<String, Gauge> gaugeGroup, long n
}

// Point
Point point = Point.measurement(profilerName)
Point point = Point.measurement(profilerNamePrefix)
.time(now, TimeUnit.MILLISECONDS)
.fields(fields)
.tag("metricsTag", name)
.build();
// BatchPoints
BatchPoints batchPoints = BatchPoints.database(this.dataBase)
.consistency(InfluxDB.ConsistencyLevel.ALL)
.retentionPolicy("autogen")
.build();

batchPoints.point(point);
// Write
this.influxDB.write(batchPoints);
}

protected String sanitize(String name) {
Expand Down
15 changes: 11 additions & 4 deletions src/main/scala/org/apache/spark/metrics/sink/InfluxdbSink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ package org.apache.spark.metrics.sink

import java.util.{Locale, Properties}
import java.util.concurrent.TimeUnit

import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.influxdb.InfluxDbReporter
import org.influxdb.InfluxDBFactory

import okhttp3.OkHttpClient
import org.influxdb.{InfluxDB, InfluxDBFactory}
import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem

Expand Down Expand Up @@ -66,7 +65,15 @@ class InfluxdbSink(val property: Properties, val registry: MetricRegistry,
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)

val url: String = "http://" + host + ":" + port
val influxDB = InfluxDBFactory.connect(url, userName, password)
val client = (new OkHttpClient.Builder())
.connectTimeout(1, TimeUnit.MINUTES)
.readTimeout(1, TimeUnit.MINUTES)
.writeTimeout(1, TimeUnit.MINUTES)
.retryOnConnectionFailure(true)
val influxDB = InfluxDBFactory.connect(url, userName, password, client)
.enableBatch(100, 1000, TimeUnit.MILLISECONDS)
.enableGzip()
.setLogLevel(InfluxDB.LogLevel.NONE)
val reporter: InfluxDbReporter = InfluxDbReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
Expand Down

0 comments on commit 8788baa

Please sign in to comment.