-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Refcount responses in TransportNodesAction
#103254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
73375cc
d54a2e7
ea95245
feb8674
e3cac3b
de540fc
b03854c
ac3a654
7a90554
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,8 @@ | |
import org.elasticsearch.common.io.stream.Writeable; | ||
import org.elasticsearch.common.util.concurrent.EsExecutors; | ||
import org.elasticsearch.core.CheckedConsumer; | ||
import org.elasticsearch.core.Releasables; | ||
import org.elasticsearch.tasks.CancellableTask; | ||
import org.elasticsearch.tasks.Task; | ||
import org.elasticsearch.transport.TransportChannel; | ||
import org.elasticsearch.transport.TransportRequest; | ||
|
@@ -96,6 +98,23 @@ protected void doExecute(Task task, NodesRequest request, ActionListener<NodesRe | |
|
||
final TransportRequestOptions transportRequestOptions = TransportRequestOptions.timeout(request.timeout()); | ||
|
||
{ | ||
addReleaseOnCancellationListener(); | ||
} | ||
|
||
private void addReleaseOnCancellationListener() { | ||
if (task instanceof CancellableTask cancellableTask) { | ||
cancellableTask.addListener(() -> { | ||
final List<NodeResponse> drainedResponses; | ||
synchronized (responses) { | ||
drainedResponses = List.copyOf(responses); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was wondering if could add copy and remove elements in one iteration:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can, but adding elements one-by-one to a list potentially involves more allocations to grow the list incrementally, and removing items from a list iterator one-by-one is an O(N²) operation. |
||
responses.clear(); | ||
} | ||
Releasables.wrap(Iterators.map(drainedResponses.iterator(), r -> r::decRef)).close(); | ||
}); | ||
} | ||
} | ||
|
||
@Override | ||
protected void sendItemRequest(DiscoveryNode discoveryNode, ActionListener<NodeResponse> listener) { | ||
final var nodeRequest = newNodeRequest(request); | ||
|
@@ -118,9 +137,14 @@ protected void sendItemRequest(DiscoveryNode discoveryNode, ActionListener<NodeR | |
|
||
@Override | ||
protected void onItemResponse(DiscoveryNode discoveryNode, NodeResponse nodeResponse) { | ||
nodeResponse.mustIncRef(); | ||
synchronized (responses) { | ||
responses.add(nodeResponse); | ||
if ((task instanceof CancellableTask cancellableTask && cancellableTask.isCancelled()) == false) { | ||
responses.add(nodeResponse); | ||
return; | ||
} | ||
} | ||
nodeResponse.decRef(); | ||
} | ||
|
||
@Override | ||
|
@@ -134,7 +158,11 @@ protected void onItemFailure(DiscoveryNode discoveryNode, Exception e) { | |
@Override | ||
protected CheckedConsumer<ActionListener<NodesResponse>, Exception> onCompletion() { | ||
// ref releases all happen-before here so no need to be synchronized | ||
return l -> newResponseAsync(task, request, responses, exceptions, l); | ||
return l -> { | ||
try (var ignored = Releasables.wrap(Iterators.map(responses.iterator(), r -> r::decRef))) { | ||
newResponseAsync(task, request, responses, exceptions, l); | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
|
@@ -154,9 +182,11 @@ private Writeable.Reader<NodeResponse> nodeResponseReader(DiscoveryNode discover | |
} | ||
|
||
/** | ||
* Create a new {@link NodesResponse} (multi-node response). | ||
* Create a new {@link NodesResponse}. This method is executed on {@link #finalExecutor}. | ||
* | ||
* @param request The associated request. | ||
* @param request The request whose response we are constructing. {@link TransportNodesAction} may have already released all its | ||
* references to this object before calling this method, so it's up to individual implementations to retain their own | ||
* reference to the request if still needed here. | ||
* @param responses All successful node-level responses. | ||
* @param failures All node-level failures. | ||
* @return Never {@code null}. | ||
|
@@ -166,7 +196,11 @@ private Writeable.Reader<NodeResponse> nodeResponseReader(DiscoveryNode discover | |
|
||
/** | ||
* Create a new {@link NodesResponse}, possibly asynchronously. The default implementation is synchronous and calls | ||
* {@link #newResponse(BaseNodesRequest, List, List)} | ||
* {@link #newResponse(BaseNodesRequest, List, List)}. This method is executed on {@link #finalExecutor}. | ||
* | ||
* @param request The request whose response we are constructing. {@link TransportNodesAction} may have already released all its | ||
* references to this object before calling this method, so it's up to individual implementations to retain their own | ||
* reference to the request if still needed here. | ||
*/ | ||
protected void newResponseAsync( | ||
Task task, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incredibly if you write this inline in the anonymous constructor then the compiler blows up with a NPE: https://gradle-enterprise.elastic.co/s/pp3aym5qktpa2