Skip to content

HADOOP-15230. fixes GraphiteSink to support point tags in correct format. #340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,166 +42,173 @@
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class GraphiteSink implements MetricsSink, Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(GraphiteSink.class);
private static final String SERVER_HOST_KEY = "server_host";
private static final String SERVER_PORT_KEY = "server_port";
private static final String METRICS_PREFIX = "metrics_prefix";
private String metricsPrefix = null;
private Graphite graphite = null;

@Override
public void init(SubsetConfiguration conf) {
// Get Graphite host configurations.
final String serverHost = conf.getString(SERVER_HOST_KEY);
final int serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY));

// Get Graphite metrics graph prefix.
metricsPrefix = conf.getString(METRICS_PREFIX);
if (metricsPrefix == null)
metricsPrefix = "";

graphite = new Graphite(serverHost, serverPort);
graphite.connect();
private static final Logger LOG =
LoggerFactory.getLogger(GraphiteSink.class);
private static final String SERVER_HOST_KEY = "server_host";
private static final String SERVER_PORT_KEY = "server_port";
private static final String METRICS_PREFIX = "metrics_prefix";
private String metricsPrefix = null;
private Graphite graphite = null;

@Override
public void init(SubsetConfiguration conf) {
// Get Graphite host configurations.
final String serverHost = conf.getString(SERVER_HOST_KEY);
final int serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY));

// Get Graphite metrics graph prefix.
metricsPrefix = conf.getString(METRICS_PREFIX);
if (metricsPrefix == null) {
metricsPrefix = "";
}
graphite = new Graphite(serverHost, serverPort);
graphite.connect();
}

@Override
public void putMetrics(MetricsRecord record) {
StringBuilder lines = new StringBuilder();
StringBuilder metricsPathPrefix = new StringBuilder();
StringBuilder pointTags = new StringBuilder("");

// Configure the hierarchical place to display the graph.
metricsPathPrefix.append(metricsPrefix).append(".")
.append(record.context()).append(".").append(record.name());

// collect point tags to be appended at the end of the metric name
for (MetricsTag tag : record.tags()) {
if (tag.value() != null && tag.value().trim().length() > 0) {
pointTags.append(";");
pointTags.append(tag.name().replace(' ', '_'));
pointTags.append("=");
pointTags.append(tag.value().replace(' ', '_'));
}
}

@Override
public void putMetrics(MetricsRecord record) {
StringBuilder lines = new StringBuilder();
StringBuilder metricsPathPrefix = new StringBuilder();

// Configure the hierarchical place to display the graph.
metricsPathPrefix.append(metricsPrefix).append(".")
.append(record.context()).append(".").append(record.name());

for (MetricsTag tag : record.tags()) {
if (tag.value() != null) {
metricsPathPrefix.append(".")
.append(tag.name())
.append("=")
.append(tag.value());
}
}

// The record timestamp is in milliseconds while Graphite expects an epoc time in seconds.
long timestamp = record.timestamp() / 1000L;

// Collect datapoints.
for (AbstractMetric metric : record.metrics()) {
lines.append(
metricsPathPrefix.toString() + "."
+ metric.name().replace(' ', '.')).append(" ")
.append(metric.value()).append(" ").append(timestamp)
.append("\n");
}

try {
graphite.write(lines.toString());
} catch (Exception e) {
LOG.warn("Error sending metrics to Graphite", e);
try {
graphite.close();
} catch (Exception e1) {
throw new MetricsException("Error closing connection to Graphite", e1);
}
}
// The record timestamp is in milliseconds while Graphite expects an epoc
// time in seconds.
long timestamp = record.timestamp() / 1000L;

// Collect datapoints.
for (AbstractMetric metric : record.metrics()) {
lines.append(
metricsPathPrefix.toString() + "."
+ metric.name().replace(' ', '.')).append(pointTags.toString())
.append(" ")
.append(metric.value()).append(" ").append(timestamp)
.append("\n");
}

@Override
public void flush() {
try {
graphite.write(lines.toString());
} catch (Exception e) {
LOG.warn("Error sending metrics to Graphite", e);
try {
graphite.flush();
} catch (Exception e) {
LOG.warn("Error flushing metrics to Graphite", e);
try {
graphite.close();
} catch (Exception e1) {
throw new MetricsException("Error closing connection to Graphite", e1);
}
graphite.close();
} catch (Exception e1) {
throw new MetricsException("Error closing connection to Graphite", e1);
}
}

@Override
public void close() throws IOException {
graphite.close();
}

@Override
public void flush() {
try {
graphite.flush();
} catch (Exception e) {
LOG.warn("Error flushing metrics to Graphite", e);
try {
graphite.close();
} catch (Exception e1) {
throw new MetricsException("Error closing connection to Graphite", e1);
}
}
}

@Override
public void close() throws IOException {
graphite.close();
}

/**
* internal class for managing connection and writing
* metrics to Graphite server.
*/
public static class Graphite {
private final static int MAX_CONNECTION_FAILURES = 5;
private String serverHost;
private int serverPort;
private Writer writer = null;
private Socket socket = null;
private int connectionFailures = 0;

public Graphite(String serverHost, int serverPort) {
this.serverHost = serverHost;
this.serverPort = serverPort;
}

public static class Graphite {
private final static int MAX_CONNECTION_FAILURES = 5;

private String serverHost;
private int serverPort;
private Writer writer = null;
private Socket socket = null;
private int connectionFailures = 0;

public Graphite(String serverHost, int serverPort) {
this.serverHost = serverHost;
this.serverPort = serverPort;
public void connect() {
if (isConnected()) {
throw new MetricsException("Already connected to Graphite");
}

public void connect() {
if (isConnected()) {
throw new MetricsException("Already connected to Graphite");
}
if (tooManyConnectionFailures()) {
// return silently (there was ERROR in logs when we reached limit for the first time)
return;
}
try {
// Open a connection to Graphite server.
socket = new Socket(serverHost, serverPort);
if (tooManyConnectionFailures()) {
// return silently (there was ERROR in logs when we reached limit for
// the first time)
return;
}
try {
// Open a connection to Graphite server.
socket = new Socket(serverHost, serverPort);
writer = new OutputStreamWriter(socket.getOutputStream(),
StandardCharsets.UTF_8);
} catch (Exception e) {
connectionFailures++;
if (tooManyConnectionFailures()) {
// first time when connection limit reached, report to logs
LOG.error("Too many connection failures, would not try to connect again.");
}
throw new MetricsException("Error creating connection, "
+ serverHost + ":" + serverPort, e);
StandardCharsets.UTF_8);
} catch (Exception e) {
connectionFailures++;
if (tooManyConnectionFailures()) {
// first time when connection limit reached, report to logs
LOG.error("Too many connection failures, would not try to "
+ "connect again.");
}
throw new MetricsException("Error creating connection, "
+ serverHost + ":" + serverPort, e);
}
}

public void write(String msg) throws IOException {
if (!isConnected()) {
connect();
}
if (isConnected()) {
writer.write(msg);
}
public void write(String msg) throws IOException {
if (!isConnected()) {
connect();
}

public void flush() throws IOException {
if (isConnected()) {
writer.flush();
}
if (isConnected()) {
writer.write(msg);
}
}

public boolean isConnected() {
return socket != null && socket.isConnected() && !socket.isClosed();
public void flush() throws IOException {
if (isConnected()) {
writer.flush();
}
}

public void close() throws IOException {
try {
if (writer != null) {
writer.close();
}
} catch (IOException ex) {
if (socket != null) {
socket.close();
}
} finally {
socket = null;
writer = null;
}
}
public boolean isConnected() {
return socket != null && socket.isConnected() && !socket.isClosed();
}

private boolean tooManyConnectionFailures() {
return connectionFailures > MAX_CONNECTION_FAILURES;
public void close() throws IOException {
try {
if (writer != null) {
writer.close();
}
} catch (IOException ex) {
if (socket != null) {
socket.close();
}
} finally {
socket = null;
writer = null;
}

}

private boolean tooManyConnectionFailures() {
return connectionFailures > MAX_CONNECTION_FAILURES;
}
}
}
Loading