Skip to content

Commit e814315

Browse files
authored
Stream the SymDB serialization and compression (#9642)
Current SymDB serialization use intermediate full large buffer for every intermediate operation: - serializing to string - converting to UTF-8 byte array - compressing with gzip every operation imply potentially large buffer (several 10MBs). On constraint heaps it can lead to OOME. we now stream this process using outputstream, buffered sink for serialization so we have less large buffers in flight.
1 parent c096994 commit e814315

File tree

1 file changed

+47
-46
lines changed
  • dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink

1 file changed

+47
-46
lines changed

dd-java-agent/agent-debugger/src/main/java/com/datadog/debugger/sink/SymbolSink.java

Lines changed: 47 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import datadog.trace.util.TagsHelper;
1414
import java.io.ByteArrayOutputStream;
1515
import java.io.IOException;
16+
import java.io.OutputStream;
1617
import java.nio.charset.StandardCharsets;
1718
import java.util.ArrayList;
1819
import java.util.Arrays;
@@ -22,6 +23,9 @@
2223
import java.util.concurrent.BlockingQueue;
2324
import java.util.zip.GZIPOutputStream;
2425
import okhttp3.HttpUrl;
26+
import okhttp3.MediaType;
27+
import okio.BufferedSink;
28+
import okio.Okio;
2529
import org.slf4j.Logger;
2630
import org.slf4j.LoggerFactory;
2731

@@ -103,41 +107,49 @@ public void flush() {
103107
if (scopesToSerialize.isEmpty()) {
104108
return;
105109
}
106-
String json =
110+
serializeAndUpload(scopesToSerialize);
111+
}
112+
113+
private void serializeAndUpload(List<Scope> scopesToSerialize) {
114+
try {
115+
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(2 * 1024 * 1024);
116+
try (OutputStream outputStream =
117+
isCompressed ? new GZIPOutputStream(byteArrayOutputStream) : byteArrayOutputStream) {
118+
BufferedSink sink = Okio.buffer(Okio.sink(outputStream));
107119
SERVICE_VERSION_ADAPTER.toJson(
108-
new ServiceVersion(serviceName, env, version, "JAVA", scopesToSerialize));
109-
updateStats(scopesToSerialize, json);
110-
doUpload(scopesToSerialize, json);
120+
sink, new ServiceVersion(serviceName, env, version, "JAVA", scopesToSerialize));
121+
sink.flush();
122+
}
123+
doUpload(scopesToSerialize, byteArrayOutputStream.toByteArray(), isCompressed);
124+
} catch (IOException e) {
125+
LOGGER.debug("Error serializing scopes", e);
126+
}
111127
}
112128

113-
private void doUpload(List<Scope> scopesToSerialize, String json) {
114-
byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);
115-
byte[] payload = null;
116-
if (isCompressed) {
117-
payload = compressPayload(jsonBytes);
129+
private void doUpload(List<Scope> scopesToSerialize, byte[] payload, boolean isCompressed) {
130+
if (payload.length > maxPayloadSize) {
131+
LOGGER.warn(
132+
"Payload is too big: {}/{} isCompressed={}",
133+
payload.length,
134+
maxPayloadSize,
135+
isCompressed);
136+
splitAndSend(scopesToSerialize);
137+
return;
118138
}
119-
if (payload == null) {
120-
if (json.length() > maxPayloadSize) {
121-
LOGGER.warn("Payload is too big: {}/{}", json.length(), maxPayloadSize);
122-
splitAndSend(scopesToSerialize);
123-
return;
124-
}
125-
symbolUploader.uploadAsMultipart(
126-
"",
127-
event,
128-
new BatchUploader.MultiPartContent(jsonBytes, "file", "file.json", APPLICATION_JSON));
129-
} else {
130-
if (payload.length > maxPayloadSize) {
131-
LOGGER.warn("Compressed payload is too big: {}/{}", payload.length, maxPayloadSize);
132-
splitAndSend(scopesToSerialize);
133-
return;
134-
}
135-
LOGGER.debug("Sending {} jar scopes size={}", scopesToSerialize.size(), payload.length);
136-
symbolUploader.uploadAsMultipart(
137-
"",
138-
event,
139-
new BatchUploader.MultiPartContent(payload, "file", "file.gz", APPLICATION_GZIP));
139+
updateStats(scopesToSerialize, payload.length);
140+
LOGGER.debug(
141+
"Sending {} jar scopes size={} isCompressed={}",
142+
scopesToSerialize.size(),
143+
payload.length,
144+
isCompressed);
145+
String fileName = "file.json";
146+
MediaType mediaType = APPLICATION_JSON;
147+
if (isCompressed) {
148+
fileName = "file.gz";
149+
mediaType = APPLICATION_GZIP;
140150
}
151+
symbolUploader.uploadAsMultipart(
152+
"", event, new BatchUploader.MultiPartContent(payload, "file", fileName, mediaType));
141153
}
142154

143155
private static byte[] compressPayload(byte[] jsonBytes) {
@@ -164,27 +176,16 @@ private void splitAndSend(List<Scope> scopesToSerialize) {
164176
// try to split by jar scopes: one scope per request
165177
if (scopesToSerialize.size() < BatchUploader.MAX_ENQUEUED_REQUESTS) {
166178
for (Scope scope : scopesToSerialize) {
167-
String json =
168-
SERVICE_VERSION_ADAPTER.toJson(
169-
new ServiceVersion(
170-
serviceName, env, version, "JAVA", Collections.singletonList(scope)));
171-
LOGGER.debug("Sending {} jar scope size={}", scope.getName(), json.length());
172-
doUpload(Collections.singletonList(scope), json);
179+
serializeAndUpload(Collections.singletonList(scope));
173180
}
174181
} else {
175182
// split the list of jar scope in 2 list jar scopes with half of the jar scopes
176183
int half = scopesToSerialize.size() / 2;
177184
List<Scope> firstHalf = scopesToSerialize.subList(0, half);
178185
List<Scope> secondHalf = scopesToSerialize.subList(half, scopesToSerialize.size());
179186
LOGGER.debug("split jar scope list in 2: {} and {}", firstHalf.size(), secondHalf.size());
180-
String jsonFirstHalf =
181-
SERVICE_VERSION_ADAPTER.toJson(
182-
new ServiceVersion(serviceName, env, version, "JAVA", firstHalf));
183-
doUpload(firstHalf, jsonFirstHalf);
184-
String jsonSecondHalf =
185-
SERVICE_VERSION_ADAPTER.toJson(
186-
new ServiceVersion(serviceName, env, version, "JAVA", secondHalf));
187-
doUpload(secondHalf, jsonSecondHalf);
187+
serializeAndUpload(firstHalf);
188+
serializeAndUpload(secondHalf);
188189
}
189190
} else {
190191
Scope jarScope = scopesToSerialize.get(0);
@@ -217,12 +218,12 @@ private static Scope createJarScope(String jarName, List<Scope> classScopes) {
217218
return Scope.builder(ScopeType.JAR, jarName, 0, 0).name(jarName).scopes(classScopes).build();
218219
}
219220

220-
private void updateStats(List<Scope> scopesToSerialize, String json) {
221+
private void updateStats(List<Scope> scopesToSerialize, long size) {
221222
int totalClasses = 0;
222223
for (Scope scope : scopesToSerialize) {
223224
totalClasses += scope.getScopes() != null ? scope.getScopes().size() : 0;
224225
}
225-
stats.updateStats(totalClasses, json.length());
226+
stats.updateStats(totalClasses, size);
226227
LOGGER.debug("SymbolSink stats: {}", stats);
227228
}
228229

0 commit comments

Comments
 (0)