Skip to content

Commit 4bb0309

Browse files
committed
refactor: implement lazy request body loading with dedicated HttpRequestBody class
• Created new HttpRequestBody class to encapsulate lazy loading logic and state • Refactored PluginHttpContext to use HttpRequestBody instead of managing state directly • Updated ProxyWasmFilter to use shared HttpRequestBody instance across plugins • Improved request body processing loop with proper action handling • Enhanced type safety by removing Object-based field types • Moved body state management from context to dedicated supplier class This refactoring addresses memory efficiency concerns by only loading request bodies when WASM modules actually access them via proxy_get_buffer_bytes. Signed-off-by: Hiram Chirino <hiram@hiramchirino.com>
1 parent ccf6e40 commit 4bb0309

File tree

3 files changed

+170
-49
lines changed

3 files changed

+170
-49
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package io.roastedroot.proxywasm.internal;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.util.function.Supplier;
6+
7+
/**
8+
* Holds the HTTP request body, loading it from a stream when first accessed.
9+
*/
10+
public class HttpRequestBody {
11+
12+
private byte[] body;
13+
private boolean loaded = false;
14+
private final Supplier<InputStream> streamSupplier;
15+
16+
public HttpRequestBody(Supplier<InputStream> streamSupplier) {
17+
this.streamSupplier = streamSupplier;
18+
}
19+
20+
public byte[] get() {
21+
if (!loaded) {
22+
try {
23+
body = streamSupplier.get().readAllBytes();
24+
} catch (IOException e) {
25+
throw new RuntimeException("Failed to read request body", e);
26+
}
27+
loaded = true;
28+
}
29+
return body;
30+
}
31+
32+
/**
33+
* Returns true if the request body has been loaded from the stream.
34+
*/
35+
public boolean isLoaded() {
36+
return loaded;
37+
}
38+
39+
/**
40+
* Sets the request body directly, marking it as loaded.
41+
* This is used when the body is modified by WASM plugins.
42+
*/
43+
public void setBody(byte[] body) {
44+
this.body = body;
45+
this.loaded = true;
46+
}
47+
48+
/**
49+
* Returns the request body if it has been loaded, null otherwise.
50+
* This allows checking if the body was accessed without triggering a load.
51+
*/
52+
public byte[] getBodyIfLoaded() {
53+
return loaded ? body : null;
54+
}
55+
}

proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/internal/PluginHttpContext.java

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ public class PluginHttpContext {
1313
private final long startedAt = System.currentTimeMillis();
1414

1515
final HashMap<List<String>, byte[]> properties = new HashMap<>();
16-
private byte[] httpRequestBody = new byte[0];
16+
private HttpRequestBody httpRequestBodyState;
17+
18+
// Other body buffers and state fields (not lazy)
1719
private byte[] grpcReceiveBuffer = new byte[0];
1820
private byte[] upstreamData = new byte[0];
1921
private byte[] downStreamData = new byte[0];
@@ -22,12 +24,45 @@ public class PluginHttpContext {
2224
private Action action;
2325
private CountDownLatch resumeLatch;
2426

25-
PluginHttpContext(Plugin plugin, HttpRequestAdaptor requestAdaptor) {
27+
public PluginHttpContext(Plugin plugin, HttpRequestAdaptor requestAdaptor) {
2628
this.plugin = plugin;
2729
this.requestAdaptor = requestAdaptor;
2830
this.context = plugin.wasm.createHttpContext(new HandlerImpl());
2931
}
3032

33+
/**
34+
* Sets the lazy request body supplier.
35+
*/
36+
public void setHttpRequestBodyState(HttpRequestBody supplier) {
37+
this.httpRequestBodyState = supplier;
38+
}
39+
40+
/**
41+
* Gets the lazy request body supplier.
42+
*/
43+
public HttpRequestBody getHttpRequestBodyState() {
44+
return httpRequestBodyState;
45+
}
46+
47+
/**
48+
* Gets the HTTP request body, triggering lazy loading if needed.
49+
*/
50+
public byte[] getHttpRequestBody() {
51+
if (httpRequestBodyState != null) {
52+
return httpRequestBodyState.get();
53+
}
54+
return new byte[0];
55+
}
56+
57+
/**
58+
* Sets the HTTP request body, updating the supplier if present.
59+
*/
60+
public void setHttpRequestBody(byte[] body) {
61+
if (httpRequestBodyState != null && body != null) {
62+
httpRequestBodyState.setBody(body);
63+
}
64+
}
65+
3166
public Plugin plugin() {
3267
return plugin;
3368
}
@@ -68,14 +103,6 @@ public void maybePause() {
68103
}
69104
}
70105

71-
public byte[] getHttpRequestBody() {
72-
return httpRequestBody;
73-
}
74-
75-
public void setHttpRequestBody(byte[] httpRequestBody) {
76-
this.httpRequestBody = httpRequestBody;
77-
}
78-
79106
public byte[] getHttpResponseBody() {
80107
return httpResponseBody;
81108
}
@@ -156,17 +183,21 @@ public ProxyMap getGrpcReceiveTrailerMetaData() {
156183

157184
@Override
158185
public byte[] getHttpRequestBody() {
159-
return httpRequestBody;
186+
return PluginHttpContext.this
187+
.getHttpRequestBody(); // Delegate to outer class for lazy loading
160188
}
161189

162190
@Override
163191
public WasmResult setHttpRequestBody(byte[] body) {
164-
httpRequestBody = body;
192+
PluginHttpContext.this.setHttpRequestBody(body); // Delegate to outer class
165193
return WasmResult.OK;
166194
}
167195

168196
public void appendHttpRequestBody(byte[] body) {
169-
httpRequestBody = Helpers.append(httpRequestBody, body);
197+
byte[] currentBody =
198+
PluginHttpContext.this
199+
.getHttpRequestBody(); // This will trigger lazy loading if needed
200+
PluginHttpContext.this.setHttpRequestBody(Helpers.append(currentBody, body));
170201
}
171202

172203
@Override

proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/ProxyWasmFilter.java

Lines changed: 71 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import io.roastedroot.proxywasm.StartException;
66
import io.roastedroot.proxywasm.internal.Action;
7+
import io.roastedroot.proxywasm.internal.HttpRequestBody;
78
import io.roastedroot.proxywasm.internal.Plugin;
89
import io.roastedroot.proxywasm.internal.PluginHttpContext;
910
import io.roastedroot.proxywasm.internal.Pool;
@@ -25,14 +26,19 @@
2526
import java.util.logging.Logger;
2627

2728
/**
28-
* Implements the JAX-RS {@link ContainerRequestFilter}, {@link ContainerResponseFilter},
29-
* and {@link WriterInterceptor} interfaces to intercept HTTP requests and responses,
29+
* Implements the JAX-RS {@link ContainerRequestFilter},
30+
* {@link ContainerResponseFilter},
31+
* and {@link WriterInterceptor} interfaces to intercept HTTP requests and
32+
* responses,
3033
* allowing Proxy-Wasm plugins to process them.
3134
*
32-
* <p>This filter is registered by the {@link ProxyWasmFeature}. It interacts with
33-
* {@link Plugin} instances obtained from configured {@link Pool}s to execute the
35+
* <p>
36+
* This filter is registered by the {@link ProxyWasmFeature}. It interacts with
37+
* {@link Plugin} instances obtained from configured {@link Pool}s to execute
38+
* the
3439
* appropriate Proxy-Wasm ABI functions (e.g., {@code on_http_request_headers},
35-
* {@code on_http_response_body}) at different stages of the JAX-RS request/response lifecycle.
40+
* {@code on_http_response_body}) at different stages of the JAX-RS
41+
* request/response lifecycle.
3642
*
3743
* @see ProxyWasmFeature
3844
* @see ProxyWasm
@@ -50,7 +56,8 @@ public class ProxyWasmFilter
5056
/**
5157
* Constructs a ProxyWasmFilter.
5258
*
53-
* @param pluginPools A list of {@link Pool} instances, each managing a pool of {@link Plugin}
59+
* @param pluginPools A list of {@link Pool} instances, each managing a pool of
60+
* {@link Plugin}
5461
* instances for a specific Wasm module.
5562
*/
5663
public ProxyWasmFilter(List<Pool> pluginPools) {
@@ -76,10 +83,15 @@ public void release() {
7683
/**
7784
* Intercepts incoming JAX-RS requests before they reach the resource method.
7885
*
79-
* <p>This method iterates through the configured plugin pools, borrows a {@link Plugin}
80-
* instance from each, creates a {@link PluginHttpContext}, and calls the plugin's
81-
* {@code on_http_request_headers} and potentially {@code on_http_request_body} functions.
82-
* It handles potential early responses or modifications dictated by the plugins.
86+
* <p>
87+
* This method iterates through the configured plugin pools, borrows a
88+
* {@link Plugin}
89+
* instance from each, creates a {@link PluginHttpContext}, and calls the
90+
* plugin's
91+
* {@code on_http_request_headers} and potentially {@code on_http_request_body}
92+
* functions.
93+
* It handles potential early responses or modifications dictated by the
94+
* plugins.
8395
*
8496
* @param requestContext The JAX-RS request context.
8597
* @throws IOException If an I/O error occurs, typically during body processing.
@@ -89,6 +101,7 @@ public void filter(ContainerRequestContext requestContext) throws IOException {
89101

90102
ArrayList<FilterContext> filterContexts = new ArrayList<>();
91103
requestContext.setProperty(FILTER_CONTEXT, filterContexts);
104+
92105
for (var pluginPool : pluginPools) {
93106
try {
94107
Plugin plugin = pluginPool.borrow();
@@ -100,6 +113,7 @@ public void filter(ContainerRequestContext requestContext) throws IOException {
100113
serverAdaptor.httpRequestAdaptor(requestContext);
101114
requestAdaptor.setRequestContext(requestContext);
102115
var httpContext = plugin.createHttpContext(requestAdaptor);
116+
103117
filterContexts.add(new FilterContext(pluginPool, plugin, httpContext));
104118
} finally {
105119
plugin.unlock();
@@ -116,6 +130,17 @@ public void filter(ContainerRequestContext requestContext) throws IOException {
116130
return;
117131
}
118132
}
133+
134+
// Create a shared lazy body supplier for all plugins
135+
HttpRequestBody bodySupplier = new HttpRequestBody(() -> requestContext.getEntityStream());
136+
137+
// Set up lazy providers for all plugins that need the body
138+
for (var filterContext : filterContexts) {
139+
if (filterContext.httpContext.context().hasOnRequestBody()) {
140+
filterContext.httpContext.setHttpRequestBodyState(bodySupplier);
141+
}
142+
}
143+
119144
for (var filterContext : filterContexts) {
120145
filter(requestContext, filterContext);
121146
}
@@ -147,29 +172,30 @@ private void filter(ContainerRequestContext requestContext, FilterContext filter
147172
// the plugin may not be interested in the request body.
148173
if (httpContext.context().hasOnRequestBody()) {
149174

150-
// TODO: find out if it's more efficient to read the body in chunks and do multiple
151-
// callOnRequestBody calls.
152-
byte[] bytes = requestContext.getEntityStream().readAllBytes();
175+
HttpRequestBody httpRequestBodyState = httpContext.getHttpRequestBodyState();
153176

154-
httpContext.setHttpRequestBody(bytes);
155-
var action = httpContext.context().callOnRequestBody(true);
156-
bytes = httpContext.getHttpRequestBody();
157-
if (action == Action.CONTINUE) {
158-
// continue means plugin is done reading the body.
159-
httpContext.setHttpRequestBody(null);
160-
} else {
177+
while (true) {
178+
// if we streamed body updates, then endOfStream would be initially false
179+
var action = httpContext.context().callOnRequestBody(true);
180+
181+
// does the plugin want to respond early?
182+
var sendResponse = httpContext.consumeSentHttpResponse();
183+
if (sendResponse != null) {
184+
requestContext.abortWith(toResponse(sendResponse));
185+
return;
186+
}
187+
188+
if (action == Action.CONTINUE) {
189+
break;
190+
}
161191
httpContext.maybePause();
162192
}
163193

164-
// does the plugin want to respond early?
165-
var sendResponse = httpContext.consumeSentHttpResponse();
166-
if (sendResponse != null) {
167-
requestContext.abortWith(toResponse(sendResponse));
168-
return;
194+
// Body was accessed and potentially modified, update the request stream
195+
if (httpRequestBodyState.isLoaded()) {
196+
byte[] bytes = httpRequestBodyState.getBodyIfLoaded();
197+
requestContext.setEntityStream(new java.io.ByteArrayInputStream(bytes));
169198
}
170-
171-
// plugin may have modified the body
172-
requestContext.setEntityStream(new java.io.ByteArrayInputStream(bytes));
173199
}
174200

175201
} finally {
@@ -184,10 +210,14 @@ private static Response internalServerError() {
184210
/**
185211
* Intercepts outgoing JAX-RS responses before the entity is written.
186212
*
187-
* <p>This method iterates through the configured plugin pools, retrieves the
188-
* {@link PluginHttpContext} created during the request phase, and calls the plugin's
189-
* {@code on_http_response_headers} function. It handles potential modifications to the
190-
* response headers dictated by the plugins. If the response has no entity but the plugin
213+
* <p>
214+
* This method iterates through the configured plugin pools, retrieves the
215+
* {@link PluginHttpContext} created during the request phase, and calls the
216+
* plugin's
217+
* {@code on_http_response_headers} function. It handles potential modifications
218+
* to the
219+
* response headers dictated by the plugins. If the response has no entity but
220+
* the plugin
191221
* implements {@code on_http_response_body}, it invokes that callback as well.
192222
*
193223
* @param requestContext The JAX-RS request context.
@@ -274,15 +304,20 @@ private void filter(
274304
/**
275305
* Intercepts the response body writing process.
276306
*
277-
* <p>This method is called when the JAX-RS framework is about to serialize and write
307+
* <p>
308+
* This method is called when the JAX-RS framework is about to serialize and
309+
* write
278310
* the response entity. It captures the original response body, allows plugins
279-
* (via {@code on_http_response_body}) to inspect or modify it, and then writes the
311+
* (via {@code on_http_response_body}) to inspect or modify it, and then writes
312+
* the
280313
* potentially modified body to the original output stream. It handles potential
281314
* early responses dictated by the plugins during body processing.
282315
*
283316
* @param ctx The JAX-RS writer interceptor context.
284-
* @throws IOException If an I/O error occurs during stream processing.
285-
* @throws WebApplicationException If a plugin decides to abort processing and send an
317+
* @throws IOException If an I/O error occurs during stream
318+
* processing.
319+
* @throws WebApplicationException If a plugin decides to abort processing and
320+
* send an
286321
* alternative response during body filtering.
287322
*/
288323
@Override

0 commit comments

Comments
 (0)