Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

performance improvements: squash messages, and NIO #7

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
performance improvements:
     - fit as many messages (that are in the queue) into the MTU as possible
     - convert to NIO

In addition to any NIO improvements,
     this will offer a performance improvement when the incoming message rate per the time it takes to call blockingSend() goes above one.

Add NonBlockingStatsDClientPerfTest to ensure no messages are lost in a concurrent environment.
  • Loading branch information
michaelsembwever committed Mar 11, 2014
commit 5d34f1591b0c8cf7729632586323001ab7962110
138 changes: 90 additions & 48 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
package com.timgroup.statsd;

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.text.NumberFormat;
import java.util.Locale;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
* A simple StatsD client implementation facilitating metrics recording.
*
*
* <p>Upon instantiation, this client will establish a socket connection to a StatsD instance
* running on the specified host and port. Metrics are then sent over this connection as they are
* received by the client.
* </p>
*
*
* <p>Three key methods are provided for the submission of data-points for the application under
* scrutiny:
* <ul>
Expand All @@ -30,15 +33,17 @@
* IO operations being carried out in a separate thread. Furthermore, these methods are guaranteed
* not to throw an exception which may disrupt application execution.
* </p>
*
*
* <p>As part of a clean system shutdown, the {@link #stop()} method should be invoked
* on any StatsD clients.</p>
*
*
* @author Tom Denley
*
*/
public final class NonBlockingStatsDClient implements StatsDClient {

private static final int PACKET_SIZE_BYTES = 1500;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the packetSize be configurable?

Or can we presume that the local machine's MTU is what to use?
eg NetworkInterface.getByInetAddress(InetAddress.getLocalHost()).getMTU()
(we would need to also check that getLocalHost() isn't falling back onto the loopback).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/etsy/statsd/blob/master/docs/metric_types.md#multi-metric-packets says Fast Ethernet is on the order of 1432 so this seems like a reasonable size.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with the setting hardcoded for now.

But note that if a datacenter has increased their subnet's MTU (eg jumbo frames) then statsd will perform better if were also configurable here.


private static final StatsDClientErrorHandler NO_OP_HANDLER = new StatsDClientErrorHandler() {
@Override public void handle(Exception e) { /* No-op */ }
};
Expand All @@ -63,7 +68,8 @@ protected NumberFormat initialValue() {
};

private final String prefix;
private final DatagramSocket clientSocket;
private final DatagramChannel clientChannel;
private final InetSocketAddress address;
private final StatsDClientErrorHandler handler;
private final String constantTagsRendered;

Expand All @@ -77,6 +83,8 @@ protected NumberFormat initialValue() {
}
});

private final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();

/**
* Create a new StatsD client communicating with a StatsD instance on the
* specified host and port. All messages send via this client will have
Expand Down Expand Up @@ -109,7 +117,7 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port) throws
* be established. Once a client has been instantiated in this way, all
* exceptions thrown during subsequent usage are consumed, guaranteeing
* that failures in metrics will not affect normal code execution.
*
*
* @param prefix
* the prefix to apply to keys sent via this client
* @param hostname
Expand All @@ -135,7 +143,7 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, String[
* exceptions thrown during subsequent usage are passed to the specified
* handler and then consumed, guaranteeing that failures in metrics will
* not affect normal code execution.
*
*
* @param prefix
* the prefix to apply to keys sent via this client
* @param hostname
Expand Down Expand Up @@ -169,11 +177,12 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, String[
}

try {
this.clientSocket = new DatagramSocket();
this.clientSocket.connect(new InetSocketAddress(hostname, port));
this.clientChannel = DatagramChannel.open();
this.address = new InetSocketAddress(hostname, port);
} catch (Exception e) {
throw new StatsDClientException("Failed to start StatsD client", e);
}
this.executor.submit(new QueueConsumer());
}

/**
Expand All @@ -190,8 +199,13 @@ public void stop() {
handler.handle(e);
}
finally {
if (clientSocket != null) {
clientSocket.close();
if (clientChannel != null) {
try {
clientChannel.close();
}
catch (IOException e) {
handler.handle(e);
}
}
}
}
Expand Down Expand Up @@ -232,9 +246,9 @@ String tagString(final String[] tags) {

/**
* Adjusts the specified counter by a given delta.
*
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
*
* @param aspect
* the name of the counter to adjust
* @param delta
Expand All @@ -249,9 +263,9 @@ public void count(String aspect, int delta, String... tags) {

/**
* Increments the specified counter by one.
*
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
*
* @param aspect
* the name of the counter to increment
* @param tags
Expand All @@ -263,7 +277,7 @@ public void incrementCounter(String aspect, String... tags) {
}

/**
* Convenience method equivalent to {@link #incrementCounter(String, String[])}.
* Convenience method equivalent to {@link #incrementCounter(String, String[])}.
*/
@Override
public void increment(String aspect, String... tags) {
Expand All @@ -272,9 +286,9 @@ public void increment(String aspect, String... tags) {

/**
* Decrements the specified counter by one.
*
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
*
* @param aspect
* the name of the counter to decrement
* @param tags
Expand All @@ -286,7 +300,7 @@ public void decrementCounter(String aspect, String... tags) {
}

/**
* Convenience method equivalent to {@link #decrementCounter(String, String[])}.
* Convenience method equivalent to {@link #decrementCounter(String, String[])}.
*/
@Override
public void decrement(String aspect, String... tags) {
Expand All @@ -295,9 +309,9 @@ public void decrement(String aspect, String... tags) {

/**
* Records the latest fixed value for the specified named gauge.
*
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
*
* @param aspect
* the name of the gauge
* @param value
Expand All @@ -323,9 +337,9 @@ public void gauge(String aspect, double value, String... tags) {

/**
* Records the latest fixed value for the specified named gauge.
*
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
*
* @param aspect
* the name of the gauge
* @param value
Expand All @@ -339,7 +353,7 @@ public void recordGaugeValue(String aspect, int value, String... tags) {
}

/**
* Convenience method equivalent to {@link #recordGaugeValue(String, int, String[])}.
* Convenience method equivalent to {@link #recordGaugeValue(String, int, String[])}.
*/
@Override
public void gauge(String aspect, int value, String... tags) {
Expand All @@ -348,9 +362,9 @@ public void gauge(String aspect, int value, String... tags) {

/**
* Records an execution time in milliseconds for the specified named operation.
*
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
*
* @param aspect
* the name of the timed operation
* @param timeInMs
Expand Down Expand Up @@ -400,9 +414,9 @@ public void histogram(String aspect, double value, String... tags) {

/**
* Records a value for the specified named histogram.
*
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
*
* @param aspect
* the name of the histogram
* @param value
Expand All @@ -416,33 +430,61 @@ public void recordHistogramValue(String aspect, int value, String... tags) {
}

/**
* Convenience method equivalent to {@link #recordHistogramValue(String, int, String[])}.
* Convenience method equivalent to {@link #recordHistogramValue(String, int, String[])}.
*/
@Override
public void histogram(String aspect, int value, String... tags) {
recordHistogramValue(aspect, value, tags);
}

private void send(final String message) {
try {
executor.execute(new Runnable() {
@Override public void run() {
blockingSend(message);
private void send(String message) {
queue.offer(message);
}

private class QueueConsumer implements Runnable {
private final ByteBuffer sendBuffer = ByteBuffer.allocate(PACKET_SIZE_BYTES);

@Override public void run() {
while(!executor.isShutdown()) {
try {
String message = queue.poll(1, TimeUnit.SECONDS);
if(null != message) {
byte[] data = message.getBytes();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should always specify the encoding. I think that it would be best to assume that this encoding is UTF-8

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that. Although it's strictly not part of this pull request (the original code also uses getBytes()), and i'd rather keep it as a separate commit and pull request (for the sake of bisecting).

Verdict?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with it not being part of this.

if(sendBuffer.remaining() < (data.length + 1)) {
blockingSend();
}
if(sendBuffer.position() > 0) {
sendBuffer.put( (byte) '\n');

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for future reference https://github.com/DataDog/dd-agent/blob/de14dac5f1af5a444cec46cff047ca70fa6e294f/aggregator.py#L398 shows that dogstatsd supports '\n' separated metrics as well.

}
sendBuffer.put(data);
if(null == queue.peek()) {
blockingSend();
}
}
} catch (Exception e) {
handler.handle(e);
}
});
}
catch (Exception e) {
handler.handle(e);
}
}
}

private void blockingSend(String message) {
try {
final byte[] sendData = message.getBytes();
final DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length);
clientSocket.send(sendPacket);
} catch (Exception e) {
handler.handle(e);
private void blockingSend() throws IOException {
int sizeOfBuffer = sendBuffer.position();
sendBuffer.flip();
int sentBytes = clientChannel.send(sendBuffer, address);
sendBuffer.limit(sendBuffer.capacity());
sendBuffer.rewind();

if (sizeOfBuffer != sentBytes) {
handler.handle(
new IOException(
String.format(
"Could not send entirely stat %s to host %s:%d. Only sent %d bytes out of %d bytes",
sendBuffer.toString(),
address.getHostName(),
address.getPort(),
sentBytes,
sizeOfBuffer)));
}
}
}
}
54 changes: 54 additions & 0 deletions src/test/java/com/timgroup/statsd/DummyStatsDServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@

package com.timgroup.statsd;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.List;


final class DummyStatsDServer {
private final List<String> messagesReceived = new ArrayList<String>();
private final DatagramSocket server;

public DummyStatsDServer(int port) throws SocketException {
server = new DatagramSocket(port);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while(!server.isClosed()) {
try {
final DatagramPacket packet = new DatagramPacket(new byte[1500], 1500);
server.receive(packet);
for(String msg : new String(packet.getData()).split("\n")) {
messagesReceived.add(msg.trim());
}
} catch (IOException e) {
}
}
}
});
thread.setDaemon(true);
thread.start();
}

public void waitForMessage() {
while (messagesReceived.isEmpty()) {
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
}
}
}

public List<String> messagesReceived() {
return new ArrayList<String>(messagesReceived);
}

public void close() {
server.close();
}

}
Loading