-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Use recycled byte arrays for peer recovery #50107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
31c6d64
aa05e0f
0500404
4de1377
899ea11
196ae29
d030a88
36e0445
42a167c
485b40e
df3cc51
5697c48
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ | |
import org.apache.lucene.util.BytesRefIterator; | ||
import org.elasticsearch.common.Strings; | ||
import org.elasticsearch.common.bytes.BytesReference; | ||
import org.elasticsearch.common.bytes.ReleasableBytesReference; | ||
import org.elasticsearch.common.lease.Releasable; | ||
import org.elasticsearch.common.util.concurrent.AbstractRefCounted; | ||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; | ||
|
@@ -67,11 +68,11 @@ public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempF | |
|
||
final Map<String, String> tempFileNames = ConcurrentCollections.newConcurrentMap(); | ||
|
||
public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk) | ||
public void writeFileChunk(StoreFileMetaData fileMetaData, long position, ReleasableBytesReference content, boolean lastChunk) | ||
throws IOException { | ||
assert Transports.assertNotTransportThread("multi_file_writer"); | ||
final FileChunkWriter writer = fileChunkWriters.computeIfAbsent(fileMetaData.name(), name -> new FileChunkWriter()); | ||
writer.writeChunk(new FileChunk(fileMetaData, content, position, lastChunk)); | ||
writer.writeChunk(new FileChunk(fileMetaData, content.retain(), position, lastChunk)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why are you calling retain here? |
||
} | ||
|
||
/** Get a temporary name for the provided file name. */ | ||
|
@@ -179,20 +180,25 @@ public void renameAllTempFiles() throws IOException { | |
store.renameTempFilesSafe(tempFileNames); | ||
} | ||
|
||
static final class FileChunk { | ||
static final class FileChunk implements Releasable { | ||
final StoreFileMetaData md; | ||
final BytesReference content; | ||
final ReleasableBytesReference content; | ||
final long position; | ||
final boolean lastChunk; | ||
FileChunk(StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) { | ||
FileChunk(StoreFileMetaData md, ReleasableBytesReference content, long position, boolean lastChunk) { | ||
this.md = md; | ||
this.content = content; | ||
this.position = position; | ||
this.lastChunk = lastChunk; | ||
} | ||
|
||
@Override | ||
public void close() { | ||
content.close(); | ||
} | ||
} | ||
|
||
private final class FileChunkWriter { | ||
private final class FileChunkWriter implements Releasable { | ||
// chunks can be delivered out of order, we need to buffer chunks if there's a gap between them. | ||
final PriorityQueue<FileChunk> pendingChunks = new PriorityQueue<>(Comparator.comparing(fc -> fc.position)); | ||
long lastPosition = 0; | ||
|
@@ -210,17 +216,29 @@ void writeChunk(FileChunk newChunk) throws IOException { | |
} | ||
pendingChunks.remove(); | ||
} | ||
innerWriteFileChunk(chunk.md, chunk.position, chunk.content, chunk.lastChunk); | ||
synchronized (this) { | ||
assert lastPosition == chunk.position : "last_position " + lastPosition + " != chunk_position " + chunk.position; | ||
lastPosition += chunk.content.length(); | ||
if (chunk.lastChunk) { | ||
assert pendingChunks.isEmpty() : "still have pending chunks [" + pendingChunks + "]"; | ||
fileChunkWriters.remove(chunk.md.name()); | ||
assert fileChunkWriters.containsValue(this) == false : "chunk writer [" + newChunk.md + "] was not removed"; | ||
try (chunk) { | ||
innerWriteFileChunk(chunk.md, chunk.position, chunk.content, chunk.lastChunk); | ||
synchronized (this) { | ||
assert lastPosition == chunk.position : "last_position " + lastPosition + " != chunk_position " + chunk.position; | ||
lastPosition += chunk.content.length(); | ||
if (chunk.lastChunk) { | ||
assert pendingChunks.isEmpty() : "still have pending chunks [" + pendingChunks + "]"; | ||
fileChunkWriters.remove(chunk.md.name()); | ||
assert fileChunkWriters.containsValue(this) == false : "chunk writer [" + newChunk.md + "] was not removed"; | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
synchronized (this) { | ||
FileChunk chunk; | ||
while ((chunk = pendingChunks.poll()) != null) { | ||
chunk.close(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where are we closing the |
||
} | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we need to protect against double-closing as it is super dangerous here (one caller double closes and releases underlying byte array while another still thinks it's safe to operate on it) . A first step would be to have an assertion here checking the
refCount()