-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Use recycled byte arrays for peer recovery #50107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Pinging @elastic/es-distributed (:Distributed/Network) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a proper review, I was just curious about this change, but I left one small comment.
server/src/main/java/org/elasticsearch/transport/InboundMessage.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left some initial comments. I think that the overall approach could work, but also think that getting this right without leaking / incorrectly accessing stuff is going to be tricky. We should add as many safeguards as possible.
} | ||
|
||
@Override | ||
public void close() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we need to protect against double-closing as it is super dangerous here (one caller double closes and releases underlying byte array while another still thinks it's safe to operate on it) . A first step would be to have an assertion here checking the refCount()
synchronized (this) { | ||
FileChunk chunk; | ||
while ((chunk = pendingChunks.poll()) != null) { | ||
chunk.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where are we closing the FileChunkWriter
?
Also how are we ensuring that nothing is being added to the FileChunkWriter anymore after it has been closed?
Otherwise we are at risk of creating leaks here?
throws IOException { | ||
assert Transports.assertNotTransportThread("multi_file_writer"); | ||
final FileChunkWriter writer = fileChunkWriters.computeIfAbsent(fileMetaData.name(), name -> new FileChunkWriter()); | ||
writer.writeChunk(new FileChunk(fileMetaData, content, position, lastChunk)); | ||
writer.writeChunk(new FileChunk(fileMetaData, content.retain(), position, lastChunk)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are you calling retain here?
Wouldn't it be safer to do that at the point where it's successfully added to the pendingChunks
queue?
@@ -69,6 +75,7 @@ public void sendResponse(Exception exception) throws IOException { | |||
try { | |||
outboundHandler.sendErrorResponse(version, channel, requestId, action, exception); | |||
} finally { | |||
Releasables.close(toRelease); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to put this into the release
method, which protects against double-releasing (see breaker)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also note that the closing logic currently relies on InboundHandler.messageReceived
not throwing any exception. Given that the initial lifecycle (until retain()
is called) is ultimately controlled through that method, I wonder if we need extra safekeeping.
private static class ReleasableArraysStreamInput extends FilterStreamInput { | ||
|
||
private final BigArrays bigArrays; | ||
private final List<Releasable> managedResources; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if instead of managing the list here, we should just offer a callback to register manageable items. This avoids anyone mistakenly messing with the list, and we can also limit the lifecycle during which can be registered.
|
||
public abstract class InboundMessage extends NetworkMessage implements Closeable { | ||
|
||
private final StreamInput streamInput; | ||
private final List<Releasable> managedResources = new ArrayList<>(4); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is unused
|
||
public abstract class InboundMessage extends NetworkMessage implements Closeable { | ||
|
||
private final StreamInput streamInput; | ||
private final List<Releasable> managedResources = new ArrayList<>(4); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is unused
@@ -98,6 +120,7 @@ InboundMessage deserialize(BytesReference reference) throws IOException { | |||
return message; | |||
} finally { | |||
if (success == false) { | |||
Releasables.close(managedResources); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the list always be empty here
This work will probably come back at some point. But this PR is pretty out of date. Closing. |
Currently, every peer recovery file chunk request must allocated the
bytes for the file chunk when the request is received. This PR
implements to concept of a releasable bytes reference which certain
stream types can optimize to be shared or pooled. This commit implements
this mechanism for the file chunk requests. The pooled chunks are
managed at the transport level and will be released when the response
is submitted. Additionally, the releasable references are ref counted
so async implementations can retain them.