Skip to content

Commit a9010f9

Browse files
authored
Release top-level response in TransportNodesAction (#103845)
Similarly to #103254, we should `decRef` the top-level response after completing the listener too.
1 parent f49113f commit a9010f9

File tree

2 files changed

+33
-2
lines changed

2 files changed

+33
-2
lines changed

server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ protected void newResponseAsync(
209209
List<FailedNodeException> failures,
210210
ActionListener<NodesResponse> listener
211211
) {
212-
ActionListener.completeWith(listener, () -> newResponse(request, responses, failures));
212+
ActionListener.run(listener, l -> ActionListener.respondAndRelease(l, newResponse(request, responses, failures)));
213213
}
214214

215215
protected abstract NodeRequest newNodeRequest(NodesRequest request);

server/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.test.transport.CapturingTransport;
3838
import org.elasticsearch.threadpool.TestThreadPool;
3939
import org.elasticsearch.threadpool.ThreadPool;
40+
import org.elasticsearch.transport.LeakTracker;
4041
import org.elasticsearch.transport.TransportRequest;
4142
import org.elasticsearch.transport.TransportService;
4243
import org.hamcrest.Matchers;
@@ -125,6 +126,7 @@ public void testResponseAggregation() {
125126
final PlainActionFuture<TestNodesResponse> listener = new PlainActionFuture<>();
126127
action.execute(null, new TestNodesRequest(), listener.delegateFailure((l, response) -> {
127128
assertTrue(response.getNodes().stream().allMatch(TestNodeResponse::hasReferences));
129+
assertTrue(response.hasReferences());
128130
l.onResponse(response);
129131
}));
130132
assertFalse(listener.isDone());
@@ -152,12 +154,14 @@ public void testResponseAggregation() {
152154

153155
final var allResponsesReleasedListener = new SubscribableListener<Void>();
154156
try (var listeners = new RefCountingListener(allResponsesReleasedListener)) {
157+
response.addCloseListener(listeners.acquire());
155158
for (final var nodeResponse : response.getNodes()) {
156159
nodeResponse.addCloseListener(listeners.acquire());
157160
}
158161
}
159162
safeAwait(allResponsesReleasedListener);
160163
assertTrue(response.getNodes().stream().noneMatch(TestNodeResponse::hasReferences));
164+
assertFalse(response.hasReferences());
161165

162166
for (TestNodeResponse nodeResponse : response.getNodes()) {
163167
assertThat(successfulNodes, Matchers.hasItem(nodeResponse.getNode()));
@@ -407,6 +411,9 @@ private static class TestNodesRequest extends BaseNodesRequest<TestNodesRequest>
407411

408412
private static class TestNodesResponse extends BaseNodesResponse<TestNodeResponse> {
409413

414+
private final SubscribableListener<Void> onClose = new SubscribableListener<>();
415+
private final RefCounted refCounted = LeakTracker.wrap(AbstractRefCounted.of(() -> onClose.onResponse(null)));
416+
410417
TestNodesResponse(ClusterName clusterName, List<TestNodeResponse> nodeResponses, List<FailedNodeException> failures) {
411418
super(clusterName, nodeResponses, failures);
412419
}
@@ -420,6 +427,30 @@ protected List<TestNodeResponse> readNodesFrom(StreamInput in) throws IOExceptio
420427
protected void writeNodesTo(StreamOutput out, List<TestNodeResponse> nodes) throws IOException {
421428
out.writeCollection(nodes);
422429
}
430+
431+
@Override
432+
public void incRef() {
433+
refCounted.incRef();
434+
}
435+
436+
@Override
437+
public boolean tryIncRef() {
438+
return refCounted.tryIncRef();
439+
}
440+
441+
@Override
442+
public boolean decRef() {
443+
return refCounted.decRef();
444+
}
445+
446+
@Override
447+
public boolean hasReferences() {
448+
return refCounted.hasReferences();
449+
}
450+
451+
void addCloseListener(ActionListener<Void> listener) {
452+
onClose.addListener(listener);
453+
}
423454
}
424455

425456
private static class TestNodeRequest extends TransportRequest {
@@ -455,7 +486,7 @@ public boolean hasReferences() {
455486
private static class TestNodeResponse extends BaseNodeResponse {
456487

457488
private final SubscribableListener<Void> onClose = new SubscribableListener<>();
458-
private final RefCounted refCounted = AbstractRefCounted.of(() -> onClose.onResponse(null));
489+
private final RefCounted refCounted = LeakTracker.wrap(AbstractRefCounted.of(() -> onClose.onResponse(null)));
459490

460491
TestNodeResponse() {
461492
this(mock(DiscoveryNode.class));

0 commit comments

Comments
 (0)