Skip to content

Commit

Permalink
feat: Handle gracefully samples overflow
Browse files Browse the repository at this point in the history
When we send multiple samples at once, handle gracefully the case where the
output payload is bigger than the max packet size by splitting the samples into
several payloads.
  • Loading branch information
blemale committed Feb 7, 2024
1 parent bd9d85f commit 55ebdc5
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 64 deletions.
9 changes: 6 additions & 3 deletions src/main/java/com/timgroup/statsd/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,13 @@ protected Message(String aspect, Message.Type type, String[] tags) {
* Write this message to the provided {@link StringBuilder}. Will
* be called from the sender threads.
*
* @param builder
* StringBuilder the text representation will be written to.
* @param builder StringBuilder the text representation will be written to.
* @param capacity The capacity of the send buffer.
* @param containerID The container ID to be appended to the message.
* @return boolean indicating whether the message was partially written to the builder.
* If true, the method will be called again with the same arguments to continue writing.
*/
abstract void writeTo(StringBuilder builder, String containerID);
abstract boolean writeTo(StringBuilder builder, int capacity, String containerID);

/**
* Aggregate message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,30 @@ public NonBlockingDirectStatsDClient(final NonBlockingStatsDClientBuilder builde
@Override
public void recordDistributionValues(String aspect, double[] values, double sampleRate, String... tags) {
if ((Double.isNaN(sampleRate) || !isInvalidSample(sampleRate)) && values != null && values.length > 0) {
sendMetric(new DoublesStatsDMessage(aspect, Message.Type.DISTRIBUTION, values, sampleRate, 0, tags));
if (values.length == 1) {
recordDistributionValue(aspect, values[0], sampleRate, tags);
} else {
sendMetric(new DoublesStatsDMessage(aspect, Message.Type.DISTRIBUTION, values, sampleRate, 0, tags));
}
}
}

@Override
public void recordDistributionValues(String aspect, long[] values, double sampleRate, String... tags) {
if ((Double.isNaN(sampleRate) || !isInvalidSample(sampleRate)) && values != null && values.length > 0) {
sendMetric(new LongsStatsDMessage(aspect, Message.Type.DISTRIBUTION, values, sampleRate, 0, tags));
if (values.length == 1) {
recordDistributionValue(aspect, values[0], sampleRate, tags);
} else {
sendMetric(new LongsStatsDMessage(aspect, Message.Type.DISTRIBUTION, values, sampleRate, 0, tags));
}
}
}

abstract class MultiValuedStatsDMessage extends Message {
private final double sampleRate; // NaN for none
private final long timestamp; // zero for none
private int metadataSize = -1; // Cache the size of the metadata, -1 means not calculated yet
private int offset = 0; // The index of the first value that has not been written

MultiValuedStatsDMessage(String aspect, Message.Type type, String[] tags, double sampleRate, long timestamp) {
super(aspect, type, tags);
Expand All @@ -40,9 +50,31 @@ public final void aggregate(Message message) {
}

@Override
public final void writeTo(StringBuilder builder, String containerID) {
public final boolean writeTo(StringBuilder builder, int capacity, String containerID) {
int metadataSize = metadataSize(builder, containerID);
writeHeadMetadata(builder);
boolean partialWrite = writeValuesTo(builder, capacity - metadataSize);
writeTailMetadata(builder, containerID);
return partialWrite;

}

private int metadataSize(StringBuilder builder, String containerID) {
if (metadataSize == -1) {
int previousLength = builder.length();
writeHeadMetadata(builder);
writeTailMetadata(builder, containerID);
metadataSize = builder.length() - previousLength;
builder.setLength(previousLength);
}
return metadataSize;
}

private void writeHeadMetadata(StringBuilder builder) {
builder.append(prefix).append(aspect);
writeValuesTo(builder);
}

private void writeTailMetadata(StringBuilder builder, String containerID) {
builder.append('|').append(type);
if (!Double.isNaN(sampleRate)) {
builder.append('|').append('@').append(format(SAMPLE_RATE_FORMATTER, sampleRate));
Expand All @@ -58,7 +90,32 @@ public final void writeTo(StringBuilder builder, String containerID) {
builder.append('\n');
}

protected abstract void writeValuesTo(StringBuilder builder);
private boolean writeValuesTo(StringBuilder builder, int remainingCapacity) {
int maxLength = builder.length() + remainingCapacity;

// Add at least one value
builder.append(':');
writeValueTo(builder, offset);
int previousLength = builder.length();

// Add remaining values up to the max length
for (int i = offset + 1; i < lengthOfValues(); i++) {
builder.append(':');
writeValueTo(builder, i);
if (builder.length() > maxLength) {
builder.setLength(previousLength);
offset += i;
return true;
}
previousLength = builder.length();
}
offset = lengthOfValues();
return false;
}

protected abstract int lengthOfValues();

protected abstract void writeValueTo(StringBuilder buffer, int index);
}

final class LongsStatsDMessage extends MultiValuedStatsDMessage {
Expand All @@ -70,10 +127,13 @@ final class LongsStatsDMessage extends MultiValuedStatsDMessage {
}

@Override
protected void writeValuesTo(StringBuilder builder) {
for (long value : values) {
builder.append(':').append(value);
}
protected int lengthOfValues() {
return values.length;
}

@Override
protected void writeValueTo(StringBuilder buffer, int index) {
buffer.append(values[index]);
}
}

Expand All @@ -87,10 +147,13 @@ final class DoublesStatsDMessage extends MultiValuedStatsDMessage {
}

@Override
protected void writeValuesTo(StringBuilder builder) {
for (double value : values) {
builder.append(':').append(value);
}
protected int lengthOfValues() {
return values.length;
}

@Override
protected void writeValueTo(StringBuilder buffer, int index) {
buffer.append(values[index]);
}
}
}
22 changes: 14 additions & 8 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ protected StatsDMessage(String aspect, Message.Type type, T value, double sample
}

@Override
public final void writeTo(StringBuilder builder, String containerID) {
public final boolean writeTo(StringBuilder builder, int capacity, String containerID) {
builder.append(prefix).append(aspect).append(':');
writeValue(builder);
builder.append('|').append(type);
Expand All @@ -542,6 +542,7 @@ public final void writeTo(StringBuilder builder, String containerID) {
}

builder.append('\n');
return false;
}

@Override
Expand Down Expand Up @@ -1145,7 +1146,7 @@ private StringBuilder eventMap(final Event event, StringBuilder res) {
@Override
public void recordEvent(final Event event, final String... eventTags) {
statsDProcessor.send(new AlphaNumericMessage(Message.Type.EVENT, "") {
@Override public void writeTo(StringBuilder builder, String containerID) {
@Override public boolean writeTo(StringBuilder builder, int capacity, String containerID) {
final String title = escapeEventString(prefix + event.getTitle());
final String text = escapeEventString(event.getText());
builder.append(Message.Type.EVENT.toString())
Expand All @@ -1168,6 +1169,7 @@ public void recordEvent(final Event event, final String... eventTags) {
}

builder.append('\n');
return false;
}
});
this.telemetry.incrEventsSent(1);
Expand Down Expand Up @@ -1200,13 +1202,14 @@ private int getUtf8Length(final String text) {
@Override
public void recordServiceCheckRun(final ServiceCheck sc) {
statsDProcessor.send(new AlphaNumericMessage(Message.Type.SERVICE_CHECK, "") {
@Override public void writeTo(StringBuilder sb, String containerID) {
@Override
public boolean writeTo(StringBuilder sb, int capacity, String containerID) {
// see http://docs.datadoghq.com/guides/dogstatsd/#service-checks
sb.append(Message.Type.SERVICE_CHECK.toString())
.append("|")
.append(sc.getName())
.append("|")
.append(sc.getStatus());
.append("|")
.append(sc.getName())
.append("|")
.append(sc.getStatus());
if (sc.getTimestamp() > 0) {
sb.append("|d:").append(sc.getTimestamp());
}
Expand All @@ -1222,6 +1225,7 @@ public void recordServiceCheckRun(final ServiceCheck sc) {
}

sb.append('\n');
return false;
}
});
this.telemetry.incrServiceChecksSent(1);
Expand Down Expand Up @@ -1286,7 +1290,8 @@ protected void writeValue(StringBuilder builder) {
builder.append(getValue());
}

@Override protected final void writeTo(StringBuilder builder, String containerID) {
@Override
protected final boolean writeTo(StringBuilder builder, int capacity, String containerID) {
builder.append(prefix).append(aspect).append(':');
writeValue(builder);
builder.append('|').append(type);
Expand All @@ -1296,6 +1301,7 @@ protected void writeValue(StringBuilder builder) {
}

builder.append('\n');
return false;
}
});
}
Expand Down
38 changes: 21 additions & 17 deletions src/main/java/com/timgroup/statsd/StatsDProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,26 +103,30 @@ protected void processLoop() {
continue;
}

builder.setLength(0);
message.writeTo(builder, containerID);
int lowerBoundSize = builder.length();

if (sendBuffer.capacity() < lowerBoundSize) {
throw new InvalidMessageException(MESSAGE_TOO_LONG, builder.toString());
}
boolean partialWrite;
do {
builder.setLength(0);
partialWrite = message.writeTo(builder, sendBuffer.capacity(), containerID);
int lowerBoundSize = builder.length();

if (sendBuffer.capacity() < lowerBoundSize) {
throw new InvalidMessageException(MESSAGE_TOO_LONG, builder.toString());
}

if (sendBuffer.remaining() < (lowerBoundSize + 1)) {
outboundQueue.put(sendBuffer);
sendBuffer = bufferPool.borrow();
}
if (sendBuffer.remaining() < (lowerBoundSize + 1)) {
outboundQueue.put(sendBuffer);
sendBuffer = bufferPool.borrow();
}

try {
writeBuilderToSendBuffer(sendBuffer);
} catch (BufferOverflowException boe) {
outboundQueue.put(sendBuffer);
sendBuffer = bufferPool.borrow();
writeBuilderToSendBuffer(sendBuffer);
try {
writeBuilderToSendBuffer(sendBuffer);
} catch (BufferOverflowException boe) {
outboundQueue.put(sendBuffer);
sendBuffer = bufferPool.borrow();
writeBuilderToSendBuffer(sendBuffer);
}
}
while (partialWrite);

if (!haveMessages()) {
outboundQueue.put(sendBuffer);
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/timgroup/statsd/Telemetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ protected TelemetryMessage(String metric, Integer value, String tags) {
}

@Override
public final void writeTo(StringBuilder builder, String containerID) {
public final boolean writeTo(StringBuilder builder, int capacity, String containerID) {
builder.append(aspect)
.append(':')
.append(this.value)
Expand All @@ -82,6 +82,7 @@ public final void writeTo(StringBuilder builder, String containerID) {
}

builder.append('\n'); // already has the statsd separator baked-in
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
public class NonBlockingDirectStatsDClientTest {

private static final int STATSD_SERVER_PORT = 17254;
private static final int MAX_PACKET_SIZE = 64;
private static DirectStatsDClient client;
private static DummyStatsDServer server;

Expand All @@ -34,6 +35,7 @@ public static void start() throws IOException {
.port(STATSD_SERVER_PORT)
.enableTelemetry(false)
.originDetectionEnabled(false)
.maxPacketSizeBytes(MAX_PACKET_SIZE)
.buildDirectStatsDClient();
}

Expand All @@ -53,52 +55,55 @@ public void clear() {


@Test(timeout = 5000L)
public void sends_multivalued_distribution_to_statsd() throws Exception {

public void sends_multivalued_distribution_to_statsd() {
client.recordDistributionValues("mydistribution", new long[] { 423L, 234L }, Double.NaN);
server.waitForMessage("my.prefix");

assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mydistribution:423:234|d")));
}

@Test(timeout = 5000L)
public void sends_double_multivalued_distribution_to_statsd() throws Exception {


public void sends_double_multivalued_distribution_to_statsd() {
client.recordDistributionValues("mydistribution", new double[] { 0.423D, 0.234D }, Double.NaN);
server.waitForMessage("my.prefix");

assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mydistribution:0.423:0.234|d")));
}

@Test(timeout = 5000L)
public void sends_multivalued_distribution_to_statsd_with_tags() throws Exception {


public void sends_multivalued_distribution_to_statsd_with_tags() {
client.recordDistributionValues("mydistribution", new long[] { 423L, 234L }, Double.NaN, "foo:bar", "baz");
server.waitForMessage("my.prefix");

assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mydistribution:423:234|d|#baz,foo:bar")));
}

@Test(timeout = 5000L)
public void sends_multivalued_distribution_to_statsd_with_sampling_rate() throws Exception {


public void sends_multivalued_distribution_to_statsd_with_sampling_rate() {
client.recordDistributionValues("mydistribution", new long[] { 423L, 234L }, 1);
server.waitForMessage("my.prefix");

assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mydistribution:423:234|d|@1.000000")));
}

@Test(timeout = 5000L)
public void sends_multivalued_distribution_to_statsd_with_tags_and_sampling_rate() throws Exception {


public void sends_multivalued_distribution_to_statsd_with_tags_and_sampling_rate() {
client.recordDistributionValues("mydistribution", new long[] { 423L, 234L }, 1, "foo:bar", "baz");
server.waitForMessage("my.prefix");

assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mydistribution:423:234|d|@1.000000|#baz,foo:bar")));
}

@Test(timeout = 5000L)
public void sends_too_long_multivalued_distribution_to_statsd() {
long[] values = {423L, 234L, 456L, 512L};
client.recordDistributionValues("mydistribution", values, 1, "foo:bar", "baz");

server.waitForMessage("my.prefix");
assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mydistribution:423:234:456|d|@1.000000|#baz,foo:bar")));

server.waitForMessage("my.prefix");
assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mydistribution:512|d|@1.000000|#baz,foo:bar")));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1556,8 +1556,8 @@ public TestAlphaNumericMessage(String aspect, Type type, String value, String[]
}

@Override
void writeTo(StringBuilder builder, String containerID) {

boolean writeTo(StringBuilder builder, int capacity, String containerID) {
return false;
}
}
AlphaNumericMessage alphaNum1 = new TestAlphaNumericMessage("my.count", Message.Type.COUNT, "value", new String[] {"tag"});
Expand Down
Loading

0 comments on commit 55ebdc5

Please sign in to comment.