Skip to content

Use recycled byte arrays for peer recovery #50107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.buffer.ByteBuf;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.io.stream.StreamInput;

import java.io.EOFException;
Expand Down Expand Up @@ -54,6 +55,15 @@ public BytesReference readBytesReference(int length) throws IOException {
return super.readBytesReference(length);
}

@Override
public ReleasableBytesReference readReleasableBytesReference(int length) throws IOException {
// NOTE: It is unsafe to share a reference of the internal structure, so we
// use the default implementation which will copy the bytes. It is unsafe because
// a netty ByteBuf might be pooled which requires a manual release to prevent
// memory leaks.
return super.readReleasableBytesReference(length);
}

@Override
public BytesRef readBytesRef(int length) throws IOException {
// NOTE: It is unsafe to share a reference of the internal structure, so we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
Expand All @@ -33,21 +34,32 @@
* An extension to {@link BytesReference} that requires releasing its content. This
* class exists to make it explicit when a bytes reference needs to be released, and when not.
*/
public final class ReleasableBytesReference implements Releasable, BytesReference {
public final class ReleasableBytesReference extends AbstractRefCounted implements Releasable, BytesReference {

private final BytesReference delegate;
private final Releasable releasable;

public ReleasableBytesReference(BytesReference delegate, Releasable releasable) {
super("bytes-reference");
this.delegate = delegate;
this.releasable = releasable;
}

@Override
public void close() {
protected void closeInternal() {
Releasables.close(releasable);
}

public ReleasableBytesReference retain() {
incRef();
return this;
}

@Override
public void close() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we need to protect against double-closing as it is super dangerous here (one caller double closes and releases underlying byte array while another still thinks it's safe to operate on it) . A first step would be to have an assertion here checking the refCount()

decRef();
}

@Override
public byte get(int index) {
return delegate.get(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.common.io.stream;

import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.ReleasableBytesReference;

import java.io.EOFException;
import java.io.IOException;
Expand All @@ -46,6 +47,10 @@ public void readBytes(byte[] b, int offset, int len) throws IOException {
}

@Override
public ReleasableBytesReference readReleasableBytesReference(int length) throws IOException {
return delegate.readReleasableBytesReference(length);
}

public short readShort() throws IOException {
return delegate.readShort();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.text.Text;
Expand Down Expand Up @@ -149,6 +150,17 @@ public BytesReference readBytesReference() throws IOException {
return readBytesReference(length);
}

/**
* Reads a releasable bytes reference from this stream. Some implementations of this method might optimize
* this operation by returning a zero-copy, zero-allocation reference to the underlying bytes. Consumers
* of this method MUST release the reference when they are finished.
*/
public ReleasableBytesReference readReleasableBytesReference() throws IOException {
int length = readArraySize();
return readReleasableBytesReference(length);
}


/**
* Reads an optional bytes reference from this stream. It might hold an actual reference to the underlying bytes of the stream. Use this
* only if you must differentiate null from empty. Use {@link StreamInput#readBytesReference()} and
Expand Down Expand Up @@ -176,6 +188,14 @@ public BytesReference readBytesReference(int length) throws IOException {
return new BytesArray(bytes, 0, length);
}

/**
* Reads a bytes reference from this stream, might hold an actual reference to the underlying
* bytes of the stream.
*/
public ReleasableBytesReference readReleasableBytesReference(int length) throws IOException {
return new ReleasableBytesReference(readBytesReference(length), () -> {});
}

public BytesRef readBytesRef() throws IOException {
int length = readArraySize();
return readBytesRef(length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
Expand Down Expand Up @@ -67,11 +68,11 @@ public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempF

final Map<String, String> tempFileNames = ConcurrentCollections.newConcurrentMap();

public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk)
public void writeFileChunk(StoreFileMetaData fileMetaData, long position, ReleasableBytesReference content, boolean lastChunk)
throws IOException {
assert Transports.assertNotTransportThread("multi_file_writer");
final FileChunkWriter writer = fileChunkWriters.computeIfAbsent(fileMetaData.name(), name -> new FileChunkWriter());
writer.writeChunk(new FileChunk(fileMetaData, content, position, lastChunk));
writer.writeChunk(new FileChunk(fileMetaData, content.retain(), position, lastChunk));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you calling retain here?
Wouldn't it be safer to do that at the point where it's successfully added to the pendingChunks queue?

}

/** Get a temporary name for the provided file name. */
Expand Down Expand Up @@ -179,20 +180,25 @@ public void renameAllTempFiles() throws IOException {
store.renameTempFilesSafe(tempFileNames);
}

static final class FileChunk {
static final class FileChunk implements Releasable {
final StoreFileMetaData md;
final BytesReference content;
final ReleasableBytesReference content;
final long position;
final boolean lastChunk;
FileChunk(StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) {
FileChunk(StoreFileMetaData md, ReleasableBytesReference content, long position, boolean lastChunk) {
this.md = md;
this.content = content;
this.position = position;
this.lastChunk = lastChunk;
}

@Override
public void close() {
content.close();
}
}

private final class FileChunkWriter {
private final class FileChunkWriter implements Releasable {
// chunks can be delivered out of order, we need to buffer chunks if there's a gap between them.
final PriorityQueue<FileChunk> pendingChunks = new PriorityQueue<>(Comparator.comparing(fc -> fc.position));
long lastPosition = 0;
Expand All @@ -210,17 +216,29 @@ void writeChunk(FileChunk newChunk) throws IOException {
}
pendingChunks.remove();
}
innerWriteFileChunk(chunk.md, chunk.position, chunk.content, chunk.lastChunk);
synchronized (this) {
assert lastPosition == chunk.position : "last_position " + lastPosition + " != chunk_position " + chunk.position;
lastPosition += chunk.content.length();
if (chunk.lastChunk) {
assert pendingChunks.isEmpty() : "still have pending chunks [" + pendingChunks + "]";
fileChunkWriters.remove(chunk.md.name());
assert fileChunkWriters.containsValue(this) == false : "chunk writer [" + newChunk.md + "] was not removed";
try (chunk) {
innerWriteFileChunk(chunk.md, chunk.position, chunk.content, chunk.lastChunk);
synchronized (this) {
assert lastPosition == chunk.position : "last_position " + lastPosition + " != chunk_position " + chunk.position;
lastPosition += chunk.content.length();
if (chunk.lastChunk) {
assert pendingChunks.isEmpty() : "still have pending chunks [" + pendingChunks + "]";
fileChunkWriters.remove(chunk.md.name());
assert fileChunkWriters.containsValue(this) == false : "chunk writer [" + newChunk.md + "] was not removed";
}
}
}
}
}

@Override
public void close() {
synchronized (this) {
FileChunk chunk;
while ((chunk = pendingChunks.poll()) != null) {
chunk.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are we closing the FileChunkWriter?
Also how are we ensuring that nothing is being added to the FileChunkWriter anymore after it has been closed?
Otherwise we are at risk of creating leaks here?

}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand Down Expand Up @@ -517,8 +518,9 @@ public void messageReceived(final RecoveryFileChunkRequest request, TransportCha
}

RateLimiter rateLimiter = recoverySettings.rateLimiter();
ReleasableBytesReference content = request.content();
if (rateLimiter != null) {
long bytes = bytesSinceLastPause.addAndGet(request.content().length());
long bytes = bytesSinceLastPause.addAndGet(content.length());
if (bytes > rateLimiter.getMinPauseCheckBytes()) {
// Time to pause
bytesSinceLastPause.addAndGet(-bytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.lucene.util.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene;
Expand All @@ -35,7 +36,7 @@ public final class RecoveryFileChunkRequest extends TransportRequest {
private long recoveryId;
private ShardId shardId;
private long position;
private BytesReference content;
private ReleasableBytesReference content;
private StoreFileMetaData metaData;
private long sourceThrottleTimeInNanos;

Expand All @@ -49,7 +50,7 @@ public RecoveryFileChunkRequest(StreamInput in) throws IOException {
position = in.readVLong();
long length = in.readVLong();
String checksum = in.readString();
content = in.readBytesReference();
content = in.readReleasableBytesReference();
Version writtenBy = Lucene.parseVersionLenient(in.readString(), null);
assert writtenBy != null;
metaData = new StoreFileMetaData(name, length, checksum, writtenBy);
Expand All @@ -64,7 +65,8 @@ public RecoveryFileChunkRequest(long recoveryId, ShardId shardId, StoreFileMetaD
this.shardId = shardId;
this.metaData = metaData;
this.position = position;
this.content = content;
// TODO: Ugly
this.content = new ReleasableBytesReference(content, () -> {});
this.lastChunk = lastChunk;
this.totalTranslogOps = totalTranslogOps;
this.sourceThrottleTimeInNanos = sourceThrottleTimeInNanos;
Expand All @@ -90,7 +92,7 @@ public long length() {
return metaData.length();
}

public BytesReference content() {
public ReleasableBytesReference content() {
return content;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.Loggers;
Expand Down Expand Up @@ -823,11 +823,11 @@ public String toString() {

private static class FileChunk implements MultiFileTransfer.ChunkRequest {
final StoreFileMetaData md;
final BytesReference content;
final ReleasableBytesReference content;
final long position;
final boolean lastChunk;

FileChunk(StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) {
FileChunk(StoreFileMetaData md, ReleasableBytesReference content, long position, boolean lastChunk) {
this.md = md;
this.content = content;
this.position = position;
Expand Down Expand Up @@ -872,7 +872,8 @@ protected FileChunk nextChunkRequest(StoreFileMetaData md) throws IOException {
throw new CorruptIndexException("file truncated; length=" + md.length() + " offset=" + offset, md.name());
}
final boolean lastChunk = offset + bytesRead == md.length();
final FileChunk chunk = new FileChunk(md, new BytesArray(buffer, 0, bytesRead), offset, lastChunk);
ReleasableBytesReference content = new ReleasableBytesReference(new BytesArray(buffer, 0, bytesRead), () -> {});
final FileChunk chunk = new FileChunk(md, content, offset, lastChunk);
offset += bytesRead;
return chunk;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.CancellableThreads;
Expand Down Expand Up @@ -461,7 +461,7 @@ public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.Metada
}

@Override
public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
public void writeFileChunk(StoreFileMetaData fileMetaData, long position, ReleasableBytesReference content,
boolean lastChunk, int totalTranslogOps, ActionListener<Void> listener) {
try {
state().getTranslog().totalOperations(totalTranslogOps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.elasticsearch.indices.recovery;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -103,7 +103,7 @@ void receiveFileInfo(List<String> phase1FileNames,
void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData, ActionListener<Void> listener);

/** writes a partial file chunk to the target store */
void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
void writeFileChunk(StoreFileMetaData fileMetaData, long position, ReleasableBytesReference content,
boolean lastChunk, int totalTranslogOps, ActionListener<Void> listener);

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -150,7 +150,7 @@ public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.Metada
}

@Override
public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
public void writeFileChunk(StoreFileMetaData fileMetaData, long position, ReleasableBytesReference content,
boolean lastChunk, int totalTranslogOps, ActionListener<Void> listener) {
// Pause using the rate limiter, if desired, to throttle the recovery
final long throttleTimeInNanos;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Inbo
breaker.addWithoutBreaking(messageLengthBytes);
}
transportChannel = new TcpTransportChannel(outboundHandler, channel, action, requestId, version,
circuitBreakerService, messageLengthBytes, message.isCompress());
circuitBreakerService, messageLengthBytes, message.isCompress(), message.getManagedResources());
final T request = reg.newRequest(stream);
request.remoteAddress(new TransportAddress(channel.getRemoteAddress()));
// in case we throw an exception, i.e. when the limit is hit, we don't want to verify
Expand All @@ -187,7 +187,7 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Inbo
// the circuit breaker tripped
if (transportChannel == null) {
transportChannel = new TcpTransportChannel(outboundHandler, channel, action, requestId, version,
circuitBreakerService, 0, message.isCompress());
circuitBreakerService, 0, message.isCompress(), message.getManagedResources());
}
try {
transportChannel.sendResponse(e);
Expand Down
Loading