Skip to content

Commit 15f60b4

Browse files
committed
feat: Handle gracefully samples overflow
When we send multiple samples at once, handle gracefully the case where the output payload is bigger than the max packet size by splitting the samples into several payloads.
1 parent bd9d85f commit 15f60b4

10 files changed

+151
-64
lines changed

src/main/java/com/timgroup/statsd/Message.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,13 @@ protected Message(String aspect, Message.Type type, String[] tags) {
5454
* Write this message to the provided {@link StringBuilder}. Will
5555
* be called from the sender threads.
5656
*
57-
* @param builder
58-
* StringBuilder the text representation will be written to.
57+
* @param builder StringBuilder the text representation will be written to.
58+
* @param capacity The capacity of the send buffer.
59+
* @param containerID The container ID to be appended to the message.
60+
* @return boolean indicating whether the message was partially written to the builder.
61+
* If true, the method will be called again with the same arguments to continue writing.
5962
*/
60-
abstract void writeTo(StringBuilder builder, String containerID);
63+
abstract boolean writeTo(StringBuilder builder, int capacity, String containerID);
6164

6265
/**
6366
* Aggregate message.

src/main/java/com/timgroup/statsd/NonBlockingDirectStatsDClient.java

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,30 @@ public NonBlockingDirectStatsDClient(final NonBlockingStatsDClientBuilder builde
99
@Override
1010
public void recordDistributionValues(String aspect, double[] values, double sampleRate, String... tags) {
1111
if ((Double.isNaN(sampleRate) || !isInvalidSample(sampleRate)) && values != null && values.length > 0) {
12-
sendMetric(new DoublesStatsDMessage(aspect, Message.Type.DISTRIBUTION, values, sampleRate, 0, tags));
12+
if (values.length == 1) {
13+
recordDistributionValue(aspect, values[0], sampleRate, tags);
14+
} else {
15+
sendMetric(new DoublesStatsDMessage(aspect, Message.Type.DISTRIBUTION, values, sampleRate, 0, tags));
16+
}
1317
}
1418
}
1519

1620
@Override
1721
public void recordDistributionValues(String aspect, long[] values, double sampleRate, String... tags) {
1822
if ((Double.isNaN(sampleRate) || !isInvalidSample(sampleRate)) && values != null && values.length > 0) {
19-
sendMetric(new LongsStatsDMessage(aspect, Message.Type.DISTRIBUTION, values, sampleRate, 0, tags));
23+
if (values.length == 1) {
24+
recordDistributionValue(aspect, values[0], sampleRate, tags);
25+
} else {
26+
sendMetric(new LongsStatsDMessage(aspect, Message.Type.DISTRIBUTION, values, sampleRate, 0, tags));
27+
}
2028
}
2129
}
2230

2331
abstract class MultiValuedStatsDMessage extends Message {
2432
private final double sampleRate; // NaN for none
2533
private final long timestamp; // zero for none
34+
private int metadataSize = -1; // Cache the size of the metadata, -1 means not calculated yet
35+
private int offset = 0; // The index of the first value that has not been written
2636

2737
MultiValuedStatsDMessage(String aspect, Message.Type type, String[] tags, double sampleRate, long timestamp) {
2838
super(aspect, type, tags);
@@ -40,9 +50,31 @@ public final void aggregate(Message message) {
4050
}
4151

4252
@Override
43-
public final void writeTo(StringBuilder builder, String containerID) {
53+
public final boolean writeTo(StringBuilder builder, int capacity, String containerID) {
54+
int metadataSize = metadataSize(builder, containerID);
55+
writeHeadMetadata(builder);
56+
boolean partialWrite = writeValuesTo(builder, capacity - metadataSize);
57+
writeTailMetadata(builder, containerID);
58+
return partialWrite;
59+
60+
}
61+
62+
private int metadataSize(StringBuilder builder, String containerID) {
63+
if (metadataSize == -1) {
64+
int previousLength = builder.length();
65+
writeHeadMetadata(builder);
66+
writeTailMetadata(builder, containerID);
67+
metadataSize = builder.length() - previousLength;
68+
builder.setLength(previousLength);
69+
}
70+
return metadataSize;
71+
}
72+
73+
private void writeHeadMetadata(StringBuilder builder) {
4474
builder.append(prefix).append(aspect);
45-
writeValuesTo(builder);
75+
}
76+
77+
private void writeTailMetadata(StringBuilder builder, String containerID) {
4678
builder.append('|').append(type);
4779
if (!Double.isNaN(sampleRate)) {
4880
builder.append('|').append('@').append(format(SAMPLE_RATE_FORMATTER, sampleRate));
@@ -58,7 +90,32 @@ public final void writeTo(StringBuilder builder, String containerID) {
5890
builder.append('\n');
5991
}
6092

61-
protected abstract void writeValuesTo(StringBuilder builder);
93+
private boolean writeValuesTo(StringBuilder builder, int remainingCapacity) {
94+
int maxLength = builder.length() + remainingCapacity;
95+
96+
// Add at least one value
97+
builder.append(':');
98+
writeValueTo(builder, offset);
99+
int previousLength = builder.length();
100+
101+
// Add remaining values up to the max length
102+
for (int i = offset + 1; i < lengthOfValues(); i++) {
103+
builder.append(':');
104+
writeValueTo(builder, i);
105+
if (builder.length() > maxLength) {
106+
builder.setLength(previousLength);
107+
offset += i;
108+
return true;
109+
}
110+
previousLength = builder.length();
111+
}
112+
offset = lengthOfValues();
113+
return false;
114+
}
115+
116+
protected abstract int lengthOfValues();
117+
118+
protected abstract void writeValueTo(StringBuilder buffer, int index);
62119
}
63120

64121
final class LongsStatsDMessage extends MultiValuedStatsDMessage {
@@ -70,10 +127,13 @@ final class LongsStatsDMessage extends MultiValuedStatsDMessage {
70127
}
71128

72129
@Override
73-
protected void writeValuesTo(StringBuilder builder) {
74-
for (long value : values) {
75-
builder.append(':').append(value);
76-
}
130+
protected int lengthOfValues() {
131+
return values.length;
132+
}
133+
134+
@Override
135+
protected void writeValueTo(StringBuilder buffer, int index) {
136+
buffer.append(values[index]);
77137
}
78138
}
79139

@@ -87,10 +147,13 @@ final class DoublesStatsDMessage extends MultiValuedStatsDMessage {
87147
}
88148

89149
@Override
90-
protected void writeValuesTo(StringBuilder builder) {
91-
for (double value : values) {
92-
builder.append(':').append(value);
93-
}
150+
protected int lengthOfValues() {
151+
return values.length;
152+
}
153+
154+
@Override
155+
protected void writeValueTo(StringBuilder buffer, int index) {
156+
buffer.append(values[index]);
94157
}
95158
}
96159
}

src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,7 @@ protected StatsDMessage(String aspect, Message.Type type, T value, double sample
526526
}
527527

528528
@Override
529-
public final void writeTo(StringBuilder builder, String containerID) {
529+
public final boolean writeTo(StringBuilder builder, int capacity, String containerID) {
530530
builder.append(prefix).append(aspect).append(':');
531531
writeValue(builder);
532532
builder.append('|').append(type);
@@ -542,6 +542,7 @@ public final void writeTo(StringBuilder builder, String containerID) {
542542
}
543543

544544
builder.append('\n');
545+
return false;
545546
}
546547

547548
@Override
@@ -1145,7 +1146,7 @@ private StringBuilder eventMap(final Event event, StringBuilder res) {
11451146
@Override
11461147
public void recordEvent(final Event event, final String... eventTags) {
11471148
statsDProcessor.send(new AlphaNumericMessage(Message.Type.EVENT, "") {
1148-
@Override public void writeTo(StringBuilder builder, String containerID) {
1149+
@Override public boolean writeTo(StringBuilder builder, int capacity, String containerID) {
11491150
final String title = escapeEventString(prefix + event.getTitle());
11501151
final String text = escapeEventString(event.getText());
11511152
builder.append(Message.Type.EVENT.toString())
@@ -1168,6 +1169,7 @@ public void recordEvent(final Event event, final String... eventTags) {
11681169
}
11691170

11701171
builder.append('\n');
1172+
return false;
11711173
}
11721174
});
11731175
this.telemetry.incrEventsSent(1);
@@ -1200,13 +1202,14 @@ private int getUtf8Length(final String text) {
12001202
@Override
12011203
public void recordServiceCheckRun(final ServiceCheck sc) {
12021204
statsDProcessor.send(new AlphaNumericMessage(Message.Type.SERVICE_CHECK, "") {
1203-
@Override public void writeTo(StringBuilder sb, String containerID) {
1205+
@Override
1206+
public boolean writeTo(StringBuilder sb, int capacity, String containerID) {
12041207
// see http://docs.datadoghq.com/guides/dogstatsd/#service-checks
12051208
sb.append(Message.Type.SERVICE_CHECK.toString())
1206-
.append("|")
1207-
.append(sc.getName())
1208-
.append("|")
1209-
.append(sc.getStatus());
1209+
.append("|")
1210+
.append(sc.getName())
1211+
.append("|")
1212+
.append(sc.getStatus());
12101213
if (sc.getTimestamp() > 0) {
12111214
sb.append("|d:").append(sc.getTimestamp());
12121215
}
@@ -1222,6 +1225,7 @@ public void recordServiceCheckRun(final ServiceCheck sc) {
12221225
}
12231226

12241227
sb.append('\n');
1228+
return false;
12251229
}
12261230
});
12271231
this.telemetry.incrServiceChecksSent(1);
@@ -1286,7 +1290,8 @@ protected void writeValue(StringBuilder builder) {
12861290
builder.append(getValue());
12871291
}
12881292

1289-
@Override protected final void writeTo(StringBuilder builder, String containerID) {
1293+
@Override
1294+
protected final boolean writeTo(StringBuilder builder, int capacity, String containerID) {
12901295
builder.append(prefix).append(aspect).append(':');
12911296
writeValue(builder);
12921297
builder.append('|').append(type);
@@ -1296,6 +1301,7 @@ protected void writeValue(StringBuilder builder) {
12961301
}
12971302

12981303
builder.append('\n');
1304+
return false;
12991305
}
13001306
});
13011307
}

src/main/java/com/timgroup/statsd/StatsDProcessor.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -103,26 +103,30 @@ protected void processLoop() {
103103
continue;
104104
}
105105

106-
builder.setLength(0);
107-
message.writeTo(builder, containerID);
108-
int lowerBoundSize = builder.length();
109-
110-
if (sendBuffer.capacity() < lowerBoundSize) {
111-
throw new InvalidMessageException(MESSAGE_TOO_LONG, builder.toString());
112-
}
106+
boolean partialWrite;
107+
do {
108+
builder.setLength(0);
109+
partialWrite = message.writeTo(builder, sendBuffer.capacity(), containerID);
110+
int lowerBoundSize = builder.length();
111+
112+
if (sendBuffer.capacity() < lowerBoundSize) {
113+
throw new InvalidMessageException(MESSAGE_TOO_LONG, builder.toString());
114+
}
113115

114-
if (sendBuffer.remaining() < (lowerBoundSize + 1)) {
115-
outboundQueue.put(sendBuffer);
116-
sendBuffer = bufferPool.borrow();
117-
}
116+
if (sendBuffer.remaining() < (lowerBoundSize + 1)) {
117+
outboundQueue.put(sendBuffer);
118+
sendBuffer = bufferPool.borrow();
119+
}
118120

119-
try {
120-
writeBuilderToSendBuffer(sendBuffer);
121-
} catch (BufferOverflowException boe) {
122-
outboundQueue.put(sendBuffer);
123-
sendBuffer = bufferPool.borrow();
124-
writeBuilderToSendBuffer(sendBuffer);
121+
try {
122+
writeBuilderToSendBuffer(sendBuffer);
123+
} catch (BufferOverflowException boe) {
124+
outboundQueue.put(sendBuffer);
125+
sendBuffer = bufferPool.borrow();
126+
writeBuilderToSendBuffer(sendBuffer);
127+
}
125128
}
129+
while (partialWrite);
126130

127131
if (!haveMessages()) {
128132
outboundQueue.put(sendBuffer);

src/main/java/com/timgroup/statsd/Telemetry.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ protected TelemetryMessage(String metric, Integer value, String tags) {
6969
}
7070

7171
@Override
72-
public final void writeTo(StringBuilder builder, String containerID) {
72+
public final boolean writeTo(StringBuilder builder, int capacity, String containerID) {
7373
builder.append(aspect)
7474
.append(':')
7575
.append(this.value)
@@ -82,6 +82,7 @@ public final void writeTo(StringBuilder builder, String containerID) {
8282
}
8383

8484
builder.append('\n'); // already has the statsd separator baked-in
85+
return false;
8586
}
8687
}
8788

src/test/java/com/timgroup/statsd/NonBlockingDirectStatsDClientTest.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
public class NonBlockingDirectStatsDClientTest {
2020

2121
private static final int STATSD_SERVER_PORT = 17254;
22+
private static final int MAX_PACKET_SIZE = 64;
2223
private static DirectStatsDClient client;
2324
private static DummyStatsDServer server;
2425

@@ -34,6 +35,7 @@ public static void start() throws IOException {
3435
.port(STATSD_SERVER_PORT)
3536
.enableTelemetry(false)
3637
.originDetectionEnabled(false)
38+
.maxPacketSizeBytes(MAX_PACKET_SIZE)
3739
.buildDirectStatsDClient();
3840
}
3941

@@ -53,52 +55,55 @@ public void clear() {
5355

5456

5557
@Test(timeout = 5000L)
56-
public void sends_multivalued_distribution_to_statsd() throws Exception {
57-
58+
public void sends_multivalued_distribution_to_statsd() {
5859
client.recordDistributionValues("mydistribution", new long[] { 423L, 234L }, Double.NaN);
5960
server.waitForMessage("my.prefix");
6061

6162
assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mydistribution:423:234|d")));
6263
}
6364

6465
@Test(timeout = 5000L)
65-
public void sends_double_multivalued_distribution_to_statsd() throws Exception {
66-
67-
66+
public void sends_double_multivalued_distribution_to_statsd() {
6867
client.recordDistributionValues("mydistribution", new double[] { 0.423D, 0.234D }, Double.NaN);
6968
server.waitForMessage("my.prefix");
7069

7170
assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mydistribution:0.423:0.234|d")));
7271
}
7372

7473
@Test(timeout = 5000L)
75-
public void sends_multivalued_distribution_to_statsd_with_tags() throws Exception {
76-
77-
74+
public void sends_multivalued_distribution_to_statsd_with_tags() {
7875
client.recordDistributionValues("mydistribution", new long[] { 423L, 234L }, Double.NaN, "foo:bar", "baz");
7976
server.waitForMessage("my.prefix");
8077

8178
assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mydistribution:423:234|d|#baz,foo:bar")));
8279
}
8380

8481
@Test(timeout = 5000L)
85-
public void sends_multivalued_distribution_to_statsd_with_sampling_rate() throws Exception {
86-
87-
82+
public void sends_multivalued_distribution_to_statsd_with_sampling_rate() {
8883
client.recordDistributionValues("mydistribution", new long[] { 423L, 234L }, 1);
8984
server.waitForMessage("my.prefix");
9085

9186
assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mydistribution:423:234|d|@1.000000")));
9287
}
9388

9489
@Test(timeout = 5000L)
95-
public void sends_multivalued_distribution_to_statsd_with_tags_and_sampling_rate() throws Exception {
96-
97-
90+
public void sends_multivalued_distribution_to_statsd_with_tags_and_sampling_rate() {
9891
client.recordDistributionValues("mydistribution", new long[] { 423L, 234L }, 1, "foo:bar", "baz");
9992
server.waitForMessage("my.prefix");
10093

10194
assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mydistribution:423:234|d|@1.000000|#baz,foo:bar")));
10295
}
10396

97+
@Test(timeout = 5000L)
98+
public void sends_too_long_multivalued_distribution_to_statsd() {
99+
long[] values = {423L, 234L, 456L, 512L};
100+
client.recordDistributionValues("mydistribution", values, 1, "foo:bar", "baz");
101+
102+
server.waitForMessage("my.prefix");
103+
assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mydistribution:423:234:456|d|@1.000000|#baz,foo:bar")));
104+
105+
server.waitForMessage("my.prefix");
106+
assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mydistribution:512|d|@1.000000|#baz,foo:bar")));
107+
}
108+
104109
}

src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1556,8 +1556,8 @@ public TestAlphaNumericMessage(String aspect, Type type, String value, String[]
15561556
}
15571557

15581558
@Override
1559-
void writeTo(StringBuilder builder, String containerID) {
1560-
1559+
boolean writeTo(StringBuilder builder, int capacity, String containerID) {
1560+
return false;
15611561
}
15621562
}
15631563
AlphaNumericMessage alphaNum1 = new TestAlphaNumericMessage("my.count", Message.Type.COUNT, "value", new String[] {"tag"});

0 commit comments

Comments
 (0)