diff --git a/pom.xml b/pom.xml
index be960d5..f84cd52 100644
--- a/pom.xml
+++ b/pom.xml
@@ -19,17 +19,29 @@
4.0.0
io.kyligence
- spark-influxdb-sink_2.11
- 1.0.0
+ spark-influxdb-sink
+ 1.1.0
jar
Spark Influxdb Sink
UTF-8
2.11
- 2.4.1-kylin-r74
+
+ 2.4.8
+
+
+ spark-3.1
+
+ 2.12
+
+ 3.1.1
+
+
+
+
org.apache.spark
@@ -109,7 +121,7 @@
- 2.11
+ ${scala.binary.version}
@@ -166,10 +178,6 @@
org.tukaani
kylin_sis_shaded.org.tukaani
-
- org.slf4j
- kylin_sis_shaded.org.slf4j
-
net.logstash
kylin_sis_shaded.net.logstash
diff --git a/src/main/java/com/codahale/metrics/influxdb/InfluxDbReporter.java b/src/main/java/com/codahale/metrics/influxdb/InfluxDbReporter.java
index fef738d..4d0dd30 100644
--- a/src/main/java/com/codahale/metrics/influxdb/InfluxDbReporter.java
+++ b/src/main/java/com/codahale/metrics/influxdb/InfluxDbReporter.java
@@ -33,7 +33,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.ConnectException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Locale;
@@ -109,7 +108,7 @@ public InfluxDbReporter build(InfluxDB influxDB, String dataBase) {
private final InfluxDB influxDB;
private final String dataBase;
- private final String profilerName = "DAGScheduler";
+ private final String profilerNamePrefix = "SparkMetrics";
private InfluxDbReporter(MetricRegistry registry,
InfluxDB influxDB,
@@ -134,33 +133,148 @@ public void report(SortedMap gauges, SortedMap c
final long now = System.currentTimeMillis();
try {
- Map> groupedGauges = groupGauges(gauges);
+ // BatchPoints
+ BatchPoints batchPoints = BatchPoints.database(this.dataBase)
+ .consistency(InfluxDB.ConsistencyLevel.ONE)
+ .retentionPolicy("autogen")
+ .build();
+
+ /* Map> groupedGauges = groupGauges(gauges);
for (Map.Entry> entry : groupedGauges.entrySet()) {
- reportGaugeGroup(entry.getKey(), entry.getValue(), now);
+ reportGaugeGroup(entry.getKey(), entry.getValue(), now, batchPoints);
+ } */
+ for (Map.Entry entry : gauges.entrySet()) {
+ reportGauge(entry.getKey(), entry.getValue(), now, batchPoints);
}
- this.influxDB.flush();
- // TODO : support to reporter these types of data
- /* for (Map.Entry entry : counters.entrySet()) {
- reportCounter(timestamp, entry.getKey(), entry.getValue());
+ for (Map.Entry entry : counters.entrySet()) {
+ reportCounter(entry.getKey(), entry.getValue(), now, batchPoints);
}
for (Map.Entry entry : histograms.entrySet()) {
- reportHistogram(timestamp, entry.getKey(), entry.getValue());
+ reportHistogram(entry.getKey(), entry.getValue(), now, batchPoints);
}
for (Map.Entry entry : meters.entrySet()) {
- reportMeter(timestamp, entry.getKey(), entry.getValue());
+ reportMeter(entry.getKey(), entry.getValue(), now, batchPoints);
}
for (Map.Entry entry : timers.entrySet()) {
- reportTimer(timestamp, entry.getKey(), entry.getValue());
- } */
+ reportTimer(entry.getKey(), entry.getValue(), now, batchPoints);
+ }
+
+ // Write
+ this.influxDB.write(batchPoints);
} catch (Exception e) {
LOGGER.warn("Unable to report to InfluxDB with error '{}'. Discarding data.", e.getMessage());
}
}
+ private void reportGauge(String name, Gauge gauge, long now, BatchPoints batchPoints) {
+ Map fields = new HashMap();
+ Object gaugeValue = sanitizeGauge(gauge.getValue());
+ fields.put("value", gaugeValue);
+
+ // Point
+ Point point = Point.measurement(profilerNamePrefix + "-Gauge")
+ .time(now, TimeUnit.MILLISECONDS)
+ .fields(fields)
+ .tag("metricsTag", name)
+ .build();
+
+ batchPoints.point(point);
+ }
+
+ private void reportCounter(String name, Counter counter, long now, BatchPoints batchPoints) {
+ Map fields = new HashMap();
+ fields.put("count", counter.getCount());
+
+ // Point
+ Point point = Point.measurement(profilerNamePrefix + "-Counter")
+ .time(now, TimeUnit.MILLISECONDS)
+ .fields(fields)
+ .tag("metricsTag", name)
+ .build();
+
+ batchPoints.point(point);
+ }
+
+ private void reportHistogram(String name, Histogram histogram,
+ long now, BatchPoints batchPoints) {
+ final Snapshot snapshot = histogram.getSnapshot();
+ Map fields = new HashMap();
+ fields.put("count", histogram.getCount());
+ fields.put("min", snapshot.getMin());
+ fields.put("max", snapshot.getMax());
+ fields.put("mean", snapshot.getMean());
+ fields.put("stddev", snapshot.getStdDev());
+ fields.put("p50", snapshot.getMedian());
+ fields.put("p75", snapshot.get75thPercentile());
+ fields.put("p95", snapshot.get95thPercentile());
+ fields.put("p98", snapshot.get98thPercentile());
+ fields.put("p99", snapshot.get99thPercentile());
+ fields.put("p999", snapshot.get999thPercentile());
+
+ // Point
+ Point point = Point.measurement(profilerNamePrefix + "-Histogram")
+ .time(now, TimeUnit.MILLISECONDS)
+ .fields(fields)
+ .tag("metricsTag", name)
+ .build();
+
+ batchPoints.point(point);
+ }
+
+ private void reportMeter(String name, Meter meter, long now, BatchPoints batchPoints) {
+ Map fields = new HashMap();
+ fields.put("count", meter.getCount());
+ fields.put("mean_rate", convertRate(meter.getMeanRate()));
+ fields.put("m1_rate", convertRate(meter.getOneMinuteRate()));
+ fields.put("m5_rate", convertRate(meter.getFiveMinuteRate()));
+ fields.put("m15_rate", convertRate(meter.getFifteenMinuteRate()));
+ fields.put("rate_unit", "events/" + getRateUnit());
+
+ // Point
+ Point point = Point.measurement(profilerNamePrefix + "-Meter")
+ .time(now, TimeUnit.MILLISECONDS)
+ .fields(fields)
+ .tag("metricsTag", name)
+ .build();
+
+ batchPoints.point(point);
+ }
+
+ private void reportTimer(String name, Timer timer, long now, BatchPoints batchPoints) {
+ final Snapshot snapshot = timer.getSnapshot();
+ Map fields = new HashMap();
+ fields.put("count", timer.getCount());
+ fields.put("min", convertDuration(snapshot.getMin()));
+ fields.put("max", convertDuration(snapshot.getMax()));
+ fields.put("mean", convertDuration(snapshot.getMean()));
+ fields.put("stddev", convertDuration(snapshot.getStdDev()));
+ fields.put("p50", convertDuration(snapshot.getMedian()));
+ fields.put("p75", convertDuration(snapshot.get75thPercentile()));
+ fields.put("p95", convertDuration(snapshot.get95thPercentile()));
+ fields.put("p98", convertDuration(snapshot.get98thPercentile()));
+ fields.put("p99", convertDuration(snapshot.get99thPercentile()));
+ fields.put("p999", convertDuration(snapshot.get999thPercentile()));
+ fields.put("mean_rate", convertRate(timer.getMeanRate()));
+ fields.put("m1_rate", convertRate(timer.getOneMinuteRate()));
+ fields.put("m5_rate", convertRate(timer.getFiveMinuteRate()));
+ fields.put("m15_rate", convertRate(timer.getFifteenMinuteRate()));
+ fields.put("rate_unit", "calls/" + getRateUnit());
+ fields.put("duration_unit", getDurationUnit());
+
+ // Point
+ Point point = Point.measurement(profilerNamePrefix + "-Timer")
+ .time(now, TimeUnit.MILLISECONDS)
+ .fields(fields)
+ .tag("metricsTag", name)
+ .build();
+
+ batchPoints.point(point);
+ }
+
private Map> groupGauges(SortedMap gauges) {
Map> groupedGauges = new HashMap<>();
for (Map.Entry entry : gauges.entrySet()) {
@@ -201,7 +315,8 @@ private Object sanitizeGauge(Object value) {
return finalValue;
}
- private void reportGaugeGroup(String name, Map gaugeGroup, long now) {
+ private void reportGaugeGroup(String name, Map gaugeGroup, long now,
+ BatchPoints batchPoints) {
Map fields = new HashMap();
for (Map.Entry entry : gaugeGroup.entrySet()) {
Object gaugeValue = sanitizeGauge(entry.getValue().getValue());
@@ -211,19 +326,13 @@ private void reportGaugeGroup(String name, Map gaugeGroup, long n
}
// Point
- Point point = Point.measurement(profilerName)
+ Point point = Point.measurement(profilerNamePrefix)
.time(now, TimeUnit.MILLISECONDS)
.fields(fields)
.tag("metricsTag", name)
.build();
- // BatchPoints
- BatchPoints batchPoints = BatchPoints.database(this.dataBase)
- .consistency(InfluxDB.ConsistencyLevel.ALL)
- .retentionPolicy("autogen")
- .build();
+
batchPoints.point(point);
- // Write
- this.influxDB.write(batchPoints);
}
protected String sanitize(String name) {
diff --git a/src/main/scala/org/apache/spark/metrics/sink/InfluxdbSink.scala b/src/main/scala/org/apache/spark/metrics/sink/InfluxdbSink.scala
index b7f1d92..5d434db 100644
--- a/src/main/scala/org/apache/spark/metrics/sink/InfluxdbSink.scala
+++ b/src/main/scala/org/apache/spark/metrics/sink/InfluxdbSink.scala
@@ -19,11 +19,10 @@ package org.apache.spark.metrics.sink
import java.util.{Locale, Properties}
import java.util.concurrent.TimeUnit
-
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.influxdb.InfluxDbReporter
-import org.influxdb.InfluxDBFactory
-
+import okhttp3.OkHttpClient
+import org.influxdb.{InfluxDB, InfluxDBFactory}
import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem
@@ -66,7 +65,15 @@ class InfluxdbSink(val property: Properties, val registry: MetricRegistry,
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
val url: String = "http://" + host + ":" + port
- val influxDB = InfluxDBFactory.connect(url, userName, password)
+ val client = (new OkHttpClient.Builder())
+ .connectTimeout(1, TimeUnit.MINUTES)
+ .readTimeout(1, TimeUnit.MINUTES)
+ .writeTimeout(1, TimeUnit.MINUTES)
+ .retryOnConnectionFailure(true)
+ val influxDB = InfluxDBFactory.connect(url, userName, password, client)
+ .enableBatch(100, 1000, TimeUnit.MILLISECONDS)
+ .enableGzip()
+ .setLogLevel(InfluxDB.LogLevel.NONE)
val reporter: InfluxDbReporter = InfluxDbReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)