Skip to content

Commit fdaa09d

Browse files
committed
feat: implement lazy response body loading for improved memory efficiency
• Add HttpResponseBody class for lazy response body management with supplier-based loading • Add deferred loading mechanism - response body only loaded when WASM modules access it • Fix request body processing logic to properly handle early returns for non-body plugins • Update response header filtering logic to skip unnecessary processing when no body handling needed This implementation extends the existing lazy request body loading concept to response bodies, providing significant memory and performance improvements. The lazy loading mechanism ensures response bodies are only read from the output stream when WASM plugins actually call proxy_get_buffer_bytes to access the data. This prevents unnecessary memory allocation and I/O operations for plugins that process headers but don't need response body content, while maintaining full backward compatibility with existing plugin implementations. Signed-off-by: Hiram Chirino <hiram@hiramchirino.com>
1 parent a2d9e18 commit fdaa09d

File tree

3 files changed

+175
-64
lines changed

3 files changed

+175
-64
lines changed
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package io.roastedroot.proxywasm.internal;
2+
3+
import java.util.function.Supplier;
4+
5+
/**
6+
* Holds the HTTP response body, loading it from a supplier when first accessed.
7+
* Unlike request bodies which come from streams, response bodies can be provided
8+
* directly as byte arrays or from suppliers.
9+
*/
10+
public class HttpResponseBody {
11+
12+
private byte[] body;
13+
private boolean loaded = false;
14+
private final Supplier<byte[]> bodySupplier;
15+
16+
public HttpResponseBody(Supplier<byte[]> bodySupplier) {
17+
this.bodySupplier = bodySupplier;
18+
}
19+
20+
/**
21+
* Creates an HttpResponseBody with a fixed byte array (no lazy loading needed).
22+
*/
23+
public HttpResponseBody(byte[] body) {
24+
this.body = body;
25+
this.loaded = true;
26+
this.bodySupplier = null;
27+
}
28+
29+
public byte[] get() {
30+
if (!loaded) {
31+
if (bodySupplier != null) {
32+
body = bodySupplier.get();
33+
} else {
34+
body = new byte[0];
35+
}
36+
loaded = true;
37+
}
38+
return body;
39+
}
40+
41+
/**
42+
* Returns true if the response body has been loaded.
43+
*/
44+
public boolean isLoaded() {
45+
return loaded;
46+
}
47+
48+
/**
49+
* Sets the response body directly, marking it as loaded.
50+
* This is used when the body is modified by WASM plugins.
51+
*/
52+
public void setBody(byte[] body) {
53+
this.body = body;
54+
this.loaded = true;
55+
}
56+
57+
/**
58+
* Returns the response body if it has been loaded, null otherwise.
59+
* This allows checking if the body was accessed without triggering a load.
60+
*/
61+
public byte[] getBodyIfLoaded() {
62+
return loaded ? body : null;
63+
}
64+
}

proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/internal/PluginHttpContext.java

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ public class PluginHttpContext {
1414

1515
final HashMap<List<String>, byte[]> properties = new HashMap<>();
1616
private HttpRequestBody httpRequestBodyState;
17+
private HttpResponseBody httpResponseBodyState;
1718

1819
// Other body buffers and state fields (not lazy)
1920
private byte[] grpcReceiveBuffer = new byte[0];
2021
private byte[] upstreamData = new byte[0];
2122
private byte[] downStreamData = new byte[0];
22-
private byte[] httpResponseBody = new byte[0];
2323
private SendResponse sendResponse;
2424
private Action action;
2525
private CountDownLatch resumeLatch;
@@ -63,6 +63,39 @@ public void setHttpRequestBody(byte[] body) {
6363
}
6464
}
6565

66+
/**
67+
* Sets the HTTP response body state.
68+
*/
69+
public void setHttpResponseBodyState(HttpResponseBody responseBody) {
70+
this.httpResponseBodyState = responseBody;
71+
}
72+
73+
/**
74+
* Gets the HTTP response body state.
75+
*/
76+
public HttpResponseBody getHttpResponseBodyState() {
77+
return httpResponseBodyState;
78+
}
79+
80+
/**
81+
* Gets the HTTP response body, triggering lazy loading if needed.
82+
*/
83+
public byte[] getHttpResponseBody() {
84+
if (httpResponseBodyState != null) {
85+
return httpResponseBodyState.get();
86+
}
87+
return new byte[0];
88+
}
89+
90+
/**
91+
* Sets the HTTP response body, updating the state if present.
92+
*/
93+
public void setHttpResponseBody(byte[] httpResponseBody) {
94+
if (httpResponseBodyState != null && httpResponseBody != null) {
95+
httpResponseBodyState.setBody(httpResponseBody);
96+
}
97+
}
98+
6699
public Plugin plugin() {
67100
return plugin;
68101
}
@@ -103,14 +136,6 @@ public void maybePause() {
103136
}
104137
}
105138

106-
public byte[] getHttpResponseBody() {
107-
return httpResponseBody;
108-
}
109-
110-
public void setHttpResponseBody(byte[] httpResponseBody) {
111-
this.httpResponseBody = httpResponseBody;
112-
}
113-
114139
public byte[] getGrpcReceiveBuffer() {
115140
return grpcReceiveBuffer;
116141
}
@@ -235,17 +260,18 @@ public WasmResult setDownStreamData(byte[] data) {
235260

236261
@Override
237262
public byte[] getHttpResponseBody() {
238-
return httpResponseBody;
263+
return PluginHttpContext.this.getHttpResponseBody();
239264
}
240265

241266
@Override
242267
public WasmResult setHttpResponseBody(byte[] body) {
243-
httpResponseBody = body;
268+
PluginHttpContext.this.setHttpResponseBody(body);
244269
return WasmResult.OK;
245270
}
246271

247272
public void appendHttpResponseBody(byte[] body) {
248-
httpResponseBody = Helpers.append(httpResponseBody, body);
273+
byte[] currentBody = PluginHttpContext.this.getHttpResponseBody();
274+
PluginHttpContext.this.setHttpResponseBody(Helpers.append(currentBody, body));
249275
}
250276

251277
// //////////////////////////////////////////////////////////////////////

proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/ProxyWasmFilter.java

Lines changed: 73 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.roastedroot.proxywasm.StartException;
66
import io.roastedroot.proxywasm.internal.Action;
77
import io.roastedroot.proxywasm.internal.HttpRequestBody;
8+
import io.roastedroot.proxywasm.internal.HttpResponseBody;
89
import io.roastedroot.proxywasm.internal.Plugin;
910
import io.roastedroot.proxywasm.internal.PluginHttpContext;
1011
import io.roastedroot.proxywasm.internal.Pool;
@@ -170,32 +171,33 @@ private void filter(ContainerRequestContext requestContext, FilterContext filter
170171
}
171172

172173
// the plugin may not be interested in the request body.
173-
if (httpContext.context().hasOnRequestBody()) {
174+
if (!httpContext.context().hasOnRequestBody()) {
175+
return;
176+
}
174177

175-
HttpRequestBody httpRequestBodyState = httpContext.getHttpRequestBodyState();
178+
HttpRequestBody httpRequestBodyState = httpContext.getHttpRequestBodyState();
176179

177-
while (true) {
178-
// if we streamed body updates, then endOfStream would be initially false
179-
var action = httpContext.context().callOnRequestBody(true);
180+
while (true) {
181+
// if we streamed body updates, then endOfStream would be initially false
182+
var action = httpContext.context().callOnRequestBody(true);
180183

181-
// does the plugin want to respond early?
182-
var sendResponse = httpContext.consumeSentHttpResponse();
183-
if (sendResponse != null) {
184-
requestContext.abortWith(toResponse(sendResponse));
185-
return;
186-
}
187-
188-
if (action == Action.CONTINUE) {
189-
break;
190-
}
191-
httpContext.maybePause();
184+
// does the plugin want to respond early?
185+
var sendResponse = httpContext.consumeSentHttpResponse();
186+
if (sendResponse != null) {
187+
requestContext.abortWith(toResponse(sendResponse));
188+
return;
192189
}
193190

194-
// Body was accessed and potentially modified, update the request stream
195-
if (httpRequestBodyState.isLoaded()) {
196-
byte[] bytes = httpRequestBodyState.getBodyIfLoaded();
197-
requestContext.setEntityStream(new java.io.ByteArrayInputStream(bytes));
191+
if (action == Action.CONTINUE) {
192+
break;
198193
}
194+
httpContext.maybePause();
195+
}
196+
197+
// Body was accessed and potentially modified, update the request stream
198+
if (httpRequestBodyState.isLoaded()) {
199+
byte[] bytes = httpRequestBodyState.getBodyIfLoaded();
200+
requestContext.setEntityStream(new java.io.ByteArrayInputStream(bytes));
199201
}
200202

201203
} finally {
@@ -269,19 +271,17 @@ private void filter(
269271
}
270272

271273
// aroundWriteTo won't be called if there is no entity to send.
272-
if (responseContext.getEntity() == null
273-
&& httpContext.context().hasOnResponseBody()) {
274+
if (responseContext.getEntity() != null
275+
|| !httpContext.context().hasOnResponseBody()) {
276+
return;
277+
}
278+
279+
// Set up empty response body for plugins that need it
280+
HttpResponseBody responseBodyState = new HttpResponseBody(new byte[0]);
281+
httpContext.setHttpResponseBodyState(responseBodyState);
274282

275-
byte[] bytes = new byte[0];
276-
httpContext.setHttpResponseBody(bytes);
283+
while (true) {
277284
action = httpContext.context().callOnResponseBody(true);
278-
bytes = httpContext.getHttpResponseBody();
279-
if (action == Action.CONTINUE) {
280-
// continue means plugin is done reading the body.
281-
httpContext.setHttpResponseBody(null);
282-
} else {
283-
httpContext.maybePause();
284-
}
285285

286286
// does the plugin want to respond early?
287287
sendResponse = httpContext.consumeSentHttpResponse();
@@ -292,6 +292,11 @@ private void filter(
292292
responseContext.setEntity(response.getEntity());
293293
return;
294294
}
295+
296+
if (action == Action.CONTINUE) {
297+
break;
298+
}
299+
httpContext.maybePause();
295300
}
296301

297302
} finally {
@@ -332,41 +337,57 @@ public void aroundWriteTo(WriterInterceptorContext ctx)
332337
try {
333338

334339
var original = ctx.getOutputStream();
335-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
336-
ctx.setOutputStream(baos);
337-
ctx.proceed();
338340

339-
byte[] bytes = baos.toByteArray();
341+
HttpResponseBody sharedResponseBody =
342+
new HttpResponseBody(
343+
() -> {
344+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
345+
ctx.setOutputStream(baos);
346+
try {
347+
ctx.proceed();
348+
} catch (Exception e) {
349+
throw new RuntimeException("Failed to read response body", e);
350+
}
351+
return baos.toByteArray();
352+
});
340353

341354
for (var filterContext : List.copyOf(filterContexts)) {
342355
var httpContext = filterContext.httpContext;
343356

344357
httpContext.plugin().lock();
345358

346-
// the plugin may not be interested in the request body.
359+
// the plugin may not be interested in the response body.
347360
if (!httpContext.context().hasOnResponseBody()) {
348-
ctx.proceed();
361+
continue;
349362
}
350363

351-
httpContext.setHttpResponseBody(bytes);
352-
var action = httpContext.context().callOnResponseBody(true);
353-
bytes = httpContext.getHttpResponseBody();
354-
if (action == Action.CONTINUE) {
355-
// continue means plugin is done reading the body.
356-
httpContext.setHttpResponseBody(null);
357-
} else {
358-
httpContext.maybePause();
359-
}
364+
// Set up lazy response body - will only be accessed if plugin needs it
365+
httpContext.setHttpResponseBodyState(sharedResponseBody);
360366

361-
// does the plugin want to respond early?
362-
var sendResponse = httpContext.consumeSentHttpResponse();
363-
if (sendResponse != null) {
364-
throw new WebApplicationException(toResponse(sendResponse));
367+
while (true) {
368+
var action = httpContext.context().callOnResponseBody(true);
369+
370+
// does the plugin want to respond early?
371+
var sendResponse = httpContext.consumeSentHttpResponse();
372+
if (sendResponse != null) {
373+
throw new WebApplicationException(toResponse(sendResponse));
374+
}
375+
376+
if (action == Action.CONTINUE) {
377+
break;
378+
}
379+
httpContext.maybePause();
365380
}
366381
}
367382

368-
// plugin may have modified the body
369-
original.write(bytes);
383+
// Write the response body - if it was accessed and modified, use that,
384+
// otherwise continue with the original stream.
385+
if (sharedResponseBody.isLoaded()) {
386+
original.write(sharedResponseBody.get());
387+
} else {
388+
// Body was never accessed by any plugin, use original
389+
ctx.proceed();
390+
}
370391

371392
} finally {
372393
for (var filterContext : List.copyOf(filterContexts)) {

0 commit comments

Comments
 (0)