Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Translog] Introduce remote translog transfer support #4480

Merged
merged 15 commits into from
Nov 22, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Relax visibility of the HTTP_CHANNEL_KEY and HTTP_SERVER_CHANNEL_KEY to make it possible for the plugins to access associated Netty4HttpChannel / Netty4HttpServerChannel instance ([#4638](https://github.com/opensearch-project/OpenSearch/pull/4638))
- Use ReplicationFailedException instead of OpensearchException in ReplicationTarget ([#4725](https://github.com/opensearch-project/OpenSearch/pull/4725))
- Migrate client transports to Apache HttpClient / Core 5.x ([#4459](https://github.com/opensearch-project/OpenSearch/pull/4459))
- Introduce remote translog transfer support([#4480](https://github.com/opensearch-project/OpenSearch/pull/4480))
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.common.io.stream.FilterStreamInput;
import org.opensearch.common.io.stream.StreamInput;

import java.io.EOFException;
import java.io.IOException;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
Expand Down Expand Up @@ -117,7 +118,11 @@ public void reset() throws IOException {

@Override
public int read() throws IOException {
return readByte() & 0xFF;
try {
return readByte() & 0xFF;
} catch (EOFException e) {
return -1;
}
Comment on lines +121 to +125
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this change ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is due to an interface contract that OpenSearch was breaking. This is unrelated to the change

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we have a base contract test for a stream input. Can you create an issue to create a contract test for this class since you've got the full context for this specific miss?

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
*
* @opensearch.internal
*/
final class Checkpoint {
final public class Checkpoint {

final long offset;
final int numOps;
Expand Down Expand Up @@ -262,6 +262,14 @@ public synchronized byte[] toByteArray() {
return byteOutputStream.toByteArray();
}

public long getMinTranslogGeneration() {
return minTranslogGeneration;
}

public long getGeneration() {
return generation;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public int totalOperations() {
}

@Override
final Checkpoint getCheckpoint() {
final public Checkpoint getCheckpoint() {
return checkpoint;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.translog.transfer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;

/**
* Service that handles remote transfer of translog and checkpoint files
*
* @opensearch.internal
*/
public class BlobStoreTransferService implements TransferService {

private final BlobStore blobStore;
private final ExecutorService executorService;

private static final Logger logger = LogManager.getLogger(BlobStoreTransferService.class);

public BlobStoreTransferService(BlobStore blobStore, ExecutorService executorService) {
this.blobStore = blobStore;
this.executorService = executorService;
}

@Override
public void uploadBlobAsync(
final TransferFileSnapshot fileSnapshot,
Iterable<String> remoteTransferPath,
ActionListener<TransferFileSnapshot> listener
) {
assert remoteTransferPath instanceof BlobPath;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not make the parameter type a BlobPath to let the compiler do this check for you?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the interface is generic and not tied to a store, could be extended to a streaming store as well

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you currently only have one instance of a TransferService, I would recommend not introducing an interface and do things like this. It's easy enough to refactor out an interface if/when you need it, but until you really need it you can only make educated guesses about what the generic interface should look like. Not a huge deal, I'll defer to your preference.

BlobPath blobPath = (BlobPath) remoteTransferPath;
executorService.execute(ActionRunnable.wrap(listener, l -> {
try (InputStream inputStream = fileSnapshot.inputStream()) {
blobStore.blobContainer(blobPath)
.writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true);
l.onResponse(fileSnapshot);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e);
l.onFailure(new FileTransferException(fileSnapshot, e));
}
}));
}

@Override
public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable<String> remoteTransferPath) throws IOException {
assert remoteTransferPath instanceof BlobPath;
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
BlobPath blobPath = (BlobPath) remoteTransferPath;
try (InputStream inputStream = fileSnapshot.inputStream()) {
blobStore.blobContainer(blobPath).writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true);
} catch (Exception ex) {
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex);
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
throw ex;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.translog.transfer;

import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.BytesStreamInput;
import org.opensearch.common.io.stream.InputStreamStreamInput;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.translog.BufferedChecksumStreamInput;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Objects;

/**
* Snapshot of a single file that gets transferred
*
* @opensearch.internal
*/
public class FileSnapshot implements Closeable {

private final String name;
@Nullable
private final FileChannel fileChannel;
@Nullable
private Path path;
@Nullable
private byte[] content;

public FileSnapshot(Path path) throws IOException {
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
Objects.requireNonNull(path);
this.name = path.getFileName().toString();
this.path = path;
this.fileChannel = FileChannel.open(path, StandardOpenOption.READ);
}

public FileSnapshot(String name, byte[] content) throws IOException {
Objects.requireNonNull(name);
this.name = name;
this.content = content;
this.fileChannel = null;
}

public Path getPath() {
return path;
}

public String getName() {
return name;
}

public long getContentLength() throws IOException {
return fileChannel == null ? content.length : fileChannel.size();
}

public InputStream inputStream() throws IOException {
return fileChannel != null
? new BufferedChecksumStreamInput(
new InputStreamStreamInput(Channels.newInputStream(fileChannel), fileChannel.size()),
path.toString()
)
: new BufferedChecksumStreamInput(new BytesStreamInput(content), name);
}

@Override
public int hashCode() {
return Objects.hash(name, content, path);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FileSnapshot other = (FileSnapshot) o;
return Objects.equals(this.name, other.name)
&& Objects.equals(this.content, other.content)
&& Objects.equals(this.path, other.path);
}

@Override
public String toString() {
return new StringBuilder("FileInfo [").append(" name = ")
.append(name)
.append(", path = ")
.append(path.toUri())
.append("]")
.toString();
}

@Override
public void close() throws IOException {
IOUtils.close(fileChannel);
}

/**
* Snapshot of a single file with primary term that gets transferred
*
* @opensearch.internal
*/
public static class TransferFileSnapshot extends FileSnapshot {
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved

private final long primaryTerm;

public TransferFileSnapshot(Path path, long primaryTerm) throws IOException {
super(path);
this.primaryTerm = primaryTerm;
}

public TransferFileSnapshot(String name, byte[] content, long primaryTerm) throws IOException {
super(name, content);
this.primaryTerm = primaryTerm;
}

public long getPrimaryTerm() {
return primaryTerm;
}

@Override
public int hashCode() {
return Objects.hash(primaryTerm, super.hashCode());
}

@Override
public boolean equals(Object o) {
if (super.equals(o)) {
if (this == o) return true;
if (getClass() != o.getClass()) return false;
TransferFileSnapshot other = (TransferFileSnapshot) o;
return Objects.equals(this.primaryTerm, other.primaryTerm);
}
return false;
}
}

/**
* Snapshot of a single .tlg file that gets transferred
*
* @opensearch.internal
*/
public static class TranslogFileSnapshot extends TransferFileSnapshot {

private final long generation;

public TranslogFileSnapshot(long primaryTerm, long generation, Path path) throws IOException {
super(path, primaryTerm);
this.generation = generation;
}

public long getGeneration() {
return generation;
}

@Override
public int hashCode() {
return Objects.hash(generation, super.hashCode());
}

@Override
public boolean equals(Object o) {
if (super.equals(o)) {
if (this == o) return true;
if (getClass() != o.getClass()) return false;
TranslogFileSnapshot other = (TranslogFileSnapshot) o;
return Objects.equals(this.generation, other.generation);
}
return false;
}
}

/**
* Snapshot of a single .ckp file that gets transferred
*
* @opensearch.internal
*/
public static class CheckpointFileSnapshot extends TransferFileSnapshot {

private final long generation;

private final long minTranslogGeneration;

public CheckpointFileSnapshot(long primaryTerm, long generation, long minTranslogGeneration, Path path) throws IOException {
super(path, primaryTerm);
this.minTranslogGeneration = minTranslogGeneration;
this.generation = generation;
}

public long getGeneration() {
return generation;
}

public long getMinTranslogGeneration() {
return minTranslogGeneration;
}

@Override
public int hashCode() {
return Objects.hash(generation, minTranslogGeneration, super.hashCode());
}

@Override
public boolean equals(Object o) {
if (super.equals(o)) {
if (this == o) return true;
if (getClass() != o.getClass()) return false;
CheckpointFileSnapshot other = (CheckpointFileSnapshot) o;
return Objects.equals(this.minTranslogGeneration, other.minTranslogGeneration)
&& Objects.equals(this.generation, other.generation);
}
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.translog.transfer;

import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;

/**
* Exception when a single file transfer encounters a failure
*
* @opensearch.internal
*/
public class FileTransferException extends RuntimeException {

private final TransferFileSnapshot fileSnapshot;

public FileTransferException(TransferFileSnapshot fileSnapshot, Throwable cause) {
super(cause);
this.fileSnapshot = fileSnapshot;
}

public FileTransferException(TransferFileSnapshot fileSnapshot, String message, Throwable cause) {
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
super(message, cause);
this.fileSnapshot = fileSnapshot;
}

public TransferFileSnapshot getFileSnapshot() {
return fileSnapshot;
}
}
Loading