Skip to content
This repository has been archived by the owner on Nov 25, 2020. It is now read-only.

Limit the size of a single datagram and send multiple datagrams if you h... #7

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 34 additions & 12 deletions src/main/java/com/bealetech/metrics/reporting/StatsdReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@

public class StatsdReporter extends AbstractPollingReporter implements MetricProcessor<Long> {

private static final int MAX_UDPDATAGRAM_LENGTH = 1300; // In reality, usually closer to 1500

public static enum StatType { COUNTER, TIMER, GAUGE }

private static final Logger LOG = LoggerFactory.getLogger(StatsdReporter.class);
Expand All @@ -41,8 +43,12 @@ public static enum StatType { COUNTER, TIMER, GAUGE }
protected final MetricPredicate predicate;
protected final Locale locale = Locale.US;
protected final Clock clock;

protected final UDPSocketProvider socketProvider;
protected DatagramSocket currentSocket = null;

protected final VirtualMachineMetrics vm;

protected Writer writer;
protected ByteArrayOutputStream outputData;

Expand Down Expand Up @@ -110,24 +116,19 @@ public void setPrintVMMetrics(boolean printVMMetrics) {

@Override
public void run() {
DatagramSocket socket = null;

try {
socket = this.socketProvider.get();
outputData.reset();
prependNewline = false;
writer = new BufferedWriter(new OutputStreamWriter(this.outputData));
currentSocket = this.socketProvider.get();
resetWriterState();

final long epoch = clock.time() / 1000;
if (this.printVMMetrics) {
if (printVMMetrics) {
printVmMetrics(epoch);
}
printRegularMetrics(epoch);

// Send UDP data
writer.flush();
DatagramPacket packet = this.socketProvider.newPacket(outputData);
packet.setData(outputData.toByteArray());
socket.send(packet);
sendDatagram();
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Error writing to Graphite", e);
Expand All @@ -142,13 +143,28 @@ public void run() {
}
}
} finally {
if (socket != null) {
socket.close();
if (currentSocket != null) {
currentSocket.close();
}
writer = null;
}
}

private void resetWriterState() {
outputData.reset();
prependNewline = false;
writer = new BufferedWriter(new OutputStreamWriter(this.outputData));
}

private void sendDatagram() throws IOException {
writer.flush();
if (outputData.size() > 0) { // Don't send an empty datagram
DatagramPacket packet = this.socketProvider.newPacket(outputData);
packet.setData(outputData.toByteArray());
currentSocket.send(packet);
}
}

protected void printVmMetrics(long epoch) {
// Memory
sendFloat("jvm.memory.totalInit", StatType.GAUGE, vm.totalInit());
Expand Down Expand Up @@ -321,6 +337,12 @@ protected void sendData(String name, String value, StatType statType) {
writer.write(statTypeStr);
prependNewline = true;
writer.flush();

if (outputData.size() > MAX_UDPDATAGRAM_LENGTH) {
// Need to send our UDP packet now before it gets too big.
sendDatagram();
resetWriterState();
}
} catch (IOException e) {
LOG.error("Error sending to Graphite:", e);
}
Expand Down