Skip to content

Commit 06425eb

Browse files
Fix Netty ByteBuf leaks in StoreResponse and RetryContextOnDiagnosticTest (#47266)
* Initial plan * Improve logging for ByteBufInputStream close failures Change log level from debug to warn and catch Throwable instead of just IOException to make potential ByteBuf leak issues more visible. Co-authored-by: FabianMeiswinkel <19165014+FabianMeiswinkel@users.noreply.github.com> * Fix ByteBuf leak in RetryContextOnDiagnosticTest Changed from Mono.just() to Mono.fromCallable() to defer StoreResponse creation, ensuring ByteBuf lifecycle is properly managed within each subscription rather than eagerly at mock setup time. Co-authored-by: FabianMeiswinkel <19165014+FabianMeiswinkel@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: FabianMeiswinkel <19165014+FabianMeiswinkel@users.noreply.github.com>
1 parent b008d62 commit 06425eb

File tree

2 files changed

+12
-18
lines changed

2 files changed

+12
-18
lines changed

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/RetryContextOnDiagnosticTest.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -105,15 +105,12 @@ public void backoffRetryUtilityExecuteRetry() throws Exception {
105105
addressSelector = Mockito.mock(AddressSelector.class);
106106
CosmosException exception = new CosmosException(410, exceptionText);
107107
String rawJson = "{\"id\":\"" + responseText + "\"}";
108-
ByteBuf buffer = getUTF8BytesOrNull(rawJson);
109108
Mockito.when(callbackMethod.call()).thenThrow(exception, exception, exception, exception, exception)
110109

111-
.thenReturn(Mono.just(new StoreResponse(
112-
null,
113-
200,
114-
new HashMap<>(),
115-
new ByteBufInputStream(buffer, true),
116-
buffer.readableBytes())));
110+
.thenReturn(Mono.fromCallable(() -> StoreResponseBuilder.create()
111+
.withContent(rawJson)
112+
.withStatus(200)
113+
.build()));
117114
Mono<StoreResponse> monoResponse = BackoffRetryUtility.executeRetry(callbackMethod, retryPolicy);
118115
StoreResponse response = validateSuccess(monoResponse);
119116

@@ -156,14 +153,11 @@ public void backoffRetryUtilityExecuteAsync() {
156153
CosmosException exception = new CosmosException(410, exceptionText);
157154
Mono<StoreResponse> exceptionMono = Mono.error(exception);
158155
String rawJson = "{\"id\":\"" + responseText + "\"}";
159-
ByteBuf buffer = getUTF8BytesOrNull(rawJson);
160156
Mockito.when(parameterizedCallbackMethod.apply(ArgumentMatchers.any())).thenReturn(exceptionMono, exceptionMono, exceptionMono, exceptionMono, exceptionMono)
161-
.thenReturn(Mono.just(new StoreResponse(
162-
null,
163-
200,
164-
new HashMap<>(),
165-
new ByteBufInputStream(buffer, true),
166-
buffer.readableBytes())));
157+
.thenReturn(Mono.fromCallable(() -> StoreResponseBuilder.create()
158+
.withContent(rawJson)
159+
.withStatus(200)
160+
.build()));
167161
Mono<StoreResponse> monoResponse = BackoffRetryUtility.executeAsync(
168162
parameterizedCallbackMethod,
169163
retryPolicy,

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,12 @@ public StoreResponse(
7171
if (contentStream != null) {
7272
try {
7373
this.responsePayload = new JsonNodeStorePayload(contentStream, responsePayloadLength, headerMap);
74-
}
75-
finally {
74+
} finally {
7675
try {
7776
contentStream.close();
78-
} catch (IOException e) {
79-
logger.debug("Could not successfully close content stream.", e);
77+
} catch (Throwable e) {
78+
// Log as warning instead of debug to make ByteBuf leak issues more visible
79+
logger.warn("Failed to close content stream. This may cause a Netty ByteBuf leak.", e);
8080
}
8181
}
8282
} else {

0 commit comments

Comments
 (0)