@@ -28,6 +28,9 @@ public static enum StatType { COUNTER, TIMER, GAUGE }
28
28
private final String host ;
29
29
private final int port ;
30
30
31
+ private String prefix = "spark" ;
32
+ private String appPrefix = "spark.app-" ;
33
+
31
34
private boolean prependNewline = false ;
32
35
33
36
private ByteArrayOutputStream outputData ;
@@ -54,6 +57,38 @@ public void connect() throws IllegalStateException, SocketException {
54
57
this .writer = new BufferedWriter (new OutputStreamWriter (outputData ));
55
58
}
56
59
60
+ public void setNamePrefix (String namePrefix ) {
61
+ prefix = namePrefix ;
62
+ appPrefix = namePrefix + ".app-" ;
63
+ }
64
+
65
+ private String buildMetricName (String rawName ) throws IllegalArgumentException {
66
+ rawName = WHITESPACE .matcher (rawName ).replaceAll ("-" );
67
+
68
+ // e.g. spark.worker.executors
69
+ if (!rawName .startsWith (appPrefix )) {
70
+ return rawName ;
71
+ }
72
+
73
+ String [] parts = rawName .split ("\\ ." );
74
+ if (parts .length < 5 ) {
75
+ throw new IllegalArgumentException ("A spark app metric name must contain at least 4 parts: " + rawName );
76
+ }
77
+
78
+ StringBuilder stringBuilder = new StringBuilder (prefix );
79
+ if ("driver" .equals (parts [2 ])) {
80
+ // e.g. spark.app-20141209201233-0145.driver.BlockManager.memory.maxMem_MB
81
+ stringBuilder .append (rawName .substring (rawName .indexOf (".driver." )));
82
+ } else if ("executor" .equals (parts [3 ])) {
83
+ // e.g. spark.app-20141209201027-0139.31.executor.filesystem.file.read_bytes
84
+ stringBuilder .append (rawName .substring (rawName .indexOf (".executor." )));
85
+ } else {
86
+ throw new IllegalArgumentException ("Unrecognized metric name pattern: " + rawName );
87
+ }
88
+
89
+ return stringBuilder .toString ();
90
+ }
91
+
57
92
public void send (String name , String value , StatType statType ) throws IOException {
58
93
String statTypeStr = "" ;
59
94
switch (statType ) {
@@ -67,32 +102,14 @@ public void send(String name, String value, StatType statType) throws IOExceptio
67
102
statTypeStr = "ms" ;
68
103
break ;
69
104
}
70
- name = sanitizeString (name );
71
- String tags = null ;
72
- List <String > parts = new ArrayList <String >(Arrays .asList (name .split ("\\ ." )));
73
- String prefix = parts .remove (0 );
74
- String source = parts .remove (0 );
75
- if (source .equals ("executor" )) { // "spark.executor.0.filesystem.file.largeRead_ops" (ExecutorSource)
76
- String executorId = parts .remove (0 );
77
- tags = String .format ("#executor:%s" , executorId );
78
- while (parts .size () > 3 ) {
79
- parts .remove (parts .size () -1 );
80
- }
81
- name = String .format ("%s.%s.%s" , prefix , source , StringUtils .join (parts , "_" ));
82
- } else if (source .equals ("application" )) { // "spark.application.Apriori.1394489355680.runtime_ms" (ApplicationSource)
83
- String applicationName = parts .remove (0 );
84
- String currentTime = parts .remove (0 );
85
- String metricName = parts .remove (0 );
86
- tags = String .format ("#application:%s" , applicationName );
87
- name = String .format ("%s.%s." , prefix , source ) + metricName ;
88
- } else {
89
- String realSource = parts .remove (0 );
90
- // "spark.OrdersModel.DAGScheduler.stage.failedStages" (DAGSchedulerSource)
91
- // "spark.OrdersModel.BlockManager.memory.maxMem_MB" (BlockManagerSource)
92
- if (realSource .equals ("DAGScheduler" ) || realSource .equals ("BlockManager" )) {
93
- tags = String .format ("#application:%s" , source );
94
- name = String .format ("%s.application.%s." , prefix , realSource ) + String .format ("%s_%s" , parts .toArray ());
95
- }
105
+
106
+ String tags = null ; // TODO: Would be nice to get the job name and job user as tags
107
+
108
+ try {
109
+ name = buildMetricName (name );
110
+ } catch (IllegalArgumentException e ) {
111
+ logger .error ("Error sending to Statsd:" , e );
112
+ return ; // Drop metrics that we can't process so we don't push metrics with app names (e.g. 20141209201233-0145)
96
113
}
97
114
98
115
try {
@@ -129,10 +146,6 @@ public void close() throws IOException {
129
146
this .writer = null ;
130
147
}
131
148
132
- private String sanitizeString (String s ) {
133
- return WHITESPACE .matcher (s ).replaceAll ("-" );
134
- }
135
-
136
149
private DatagramPacket newPacket (ByteArrayOutputStream out ) {
137
150
byte [] dataBuffer ;
138
151
0 commit comments