Skip to content

Commit

Permalink
Use RemoteSegmentStoreDirectory instead of RemoteDirectory (opensearc…
Browse files Browse the repository at this point in the history
…h-project#4240)

* Use RemoteSegmentStoreDirectory instead of RemoteDirectory

Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale authored Aug 29, 2022
1 parent 7ea6e88 commit cd961f3
Show file tree
Hide file tree
Showing 18 changed files with 628 additions and 217 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085))

### Changed
- Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308))
- Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308))
- Use RemoteSegmentStoreDirectory instead of RemoteDirectory ([#4240](https://github.com/opensearch-project/OpenSearch/pull/4240))

### Deprecated

Expand Down
3 changes: 1 addition & 2 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.RemoteDirectoryFactory;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
Expand Down Expand Up @@ -487,7 +486,7 @@ public IndexService newIndexService(
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry,
RemoteDirectoryFactory remoteDirectoryFactory
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.util.SetOnce;
import org.apache.lucene.util.ThreadInterruptedException;
import org.opensearch.Assertions;
Expand Down Expand Up @@ -3228,8 +3226,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (isRemoteStoreEnabled()) {
Directory remoteDirectory = ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate();
internalRefreshListener.add(new RemoteStoreRefreshListener(store.directory(), remoteDirectory));
internalRefreshListener.add(new RemoteStoreRefreshListener(this));
}
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,54 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;

import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/**
* RefreshListener implementation to upload newly created segment files to the remote store
*
* @opensearch.internal
*/
public class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener {
public final class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener {
// Visible for testing
static final Set<String> EXCLUDE_FILES = Set.of("write.lock");
// Visible for testing
static final int LAST_N_METADATA_FILES_TO_KEEP = 10;

private final IndexShard indexShard;
private final Directory storeDirectory;
private final Directory remoteDirectory;
// ToDo: This can be a map with metadata of the uploaded file as value of the map (GitHub #3398)
private final Set<String> filesUploadedToRemoteStore;
private final RemoteSegmentStoreDirectory remoteDirectory;
private final Map<String, String> localSegmentChecksumMap;
private long primaryTerm;
private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class);

public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) throws IOException {
this.storeDirectory = storeDirectory;
this.remoteDirectory = remoteDirectory;
// ToDo: Handle failures in reading list of files (GitHub #3397)
this.filesUploadedToRemoteStore = new HashSet<>(Arrays.asList(remoteDirectory.listAll()));
public RemoteStoreRefreshListener(IndexShard indexShard) {
this.indexShard = indexShard;
this.storeDirectory = indexShard.store().directory();
this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
.getDelegate()).getDelegate();
this.primaryTerm = indexShard.getOperationPrimaryTerm();
localSegmentChecksumMap = new HashMap<>();
}

@Override
Expand All @@ -46,42 +68,112 @@ public void beforeRefresh() throws IOException {

/**
* Upload new segment files created as part of the last refresh to the remote segment store.
* The method also deletes segment files from remote store which are not part of local filesystem.
* This method also uploads remote_segments_metadata file which contains metadata of each segment file uploaded.
* @param didRefresh true if the refresh opened a new reference
* @throws IOException in case of I/O error in reading list of local files
*/
@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh) {
Set<String> localFiles = Set.of(storeDirectory.listAll());
localFiles.stream().filter(file -> !filesUploadedToRemoteStore.contains(file)).forEach(file -> {
try {
remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT);
filesUploadedToRemoteStore.add(file);
} catch (NoSuchFileException e) {
logger.info(
() -> new ParameterizedMessage("The file {} does not exist anymore. It can happen in case of temp files", file),
e
);
} catch (IOException e) {
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e);
}
});
public void afterRefresh(boolean didRefresh) {
synchronized (this) {
try {
if (indexShard.shardRouting.primary()) {
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
this.primaryTerm = indexShard.getOperationPrimaryTerm();
this.remoteDirectory.init();
}
try {
String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory);
if (!remoteDirectory.containsFile(
lastCommittedLocalSegmentFileName,
getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)
)) {
deleteStaleCommits();
}
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
Collection<String> refreshedLocalFiles = segmentInfos.files(true);

List<String> segmentInfosFiles = refreshedLocalFiles.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.collect(Collectors.toList());
Optional<String> latestSegmentInfos = segmentInfosFiles.stream()
.max(Comparator.comparingLong(IndexFileNames::parseGeneration));

Set<String> remoteFilesToBeDeleted = new HashSet<>();
// ToDo: Instead of deleting files in sync, mark them and delete in async/periodic flow (GitHub #3142)
filesUploadedToRemoteStore.stream().filter(file -> !localFiles.contains(file)).forEach(file -> {
try {
remoteDirectory.deleteFile(file);
remoteFilesToBeDeleted.add(file);
} catch (IOException e) {
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
logger.warn(() -> new ParameterizedMessage("Exception while deleting file {} from the remote segment store", file), e);
if (latestSegmentInfos.isPresent()) {
refreshedLocalFiles.addAll(SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true));
segmentInfosFiles.stream()
.filter(file -> !file.equals(latestSegmentInfos.get()))
.forEach(refreshedLocalFiles::remove);

boolean uploadStatus = uploadNewSegments(refreshedLocalFiles);
if (uploadStatus) {
remoteDirectory.uploadMetadata(
refreshedLocalFiles,
storeDirectory,
indexShard.getOperationPrimaryTerm(),
segmentInfos.getGeneration()
);
localSegmentChecksumMap.keySet()
.stream()
.filter(file -> !refreshedLocalFiles.contains(file))
.collect(Collectors.toSet())
.forEach(localSegmentChecksumMap::remove);
}
}
} catch (EngineException e) {
logger.warn("Exception while reading SegmentInfosSnapshot", e);
}
} catch (IOException e) {
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
// in the next refresh. This should not affect durability of the indexed data after remote trans-log integration.
logger.warn("Exception while uploading new segments to the remote segment store", e);
}
}
});
} catch (Throwable t) {
logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t);
}
}
}

// Visible for testing
boolean uploadNewSegments(Collection<String> localFiles) throws IOException {
AtomicBoolean uploadSuccess = new AtomicBoolean(true);
localFiles.stream().filter(file -> !EXCLUDE_FILES.contains(file)).filter(file -> {
try {
return !remoteDirectory.containsFile(file, getChecksumOfLocalFile(file));
} catch (IOException e) {
logger.info(
"Exception while reading checksum of local segment file: {}, ignoring the exception and re-uploading the file",
file
);
return true;
}
}).forEach(file -> {
try {
remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT);
} catch (IOException e) {
uploadSuccess.set(false);
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e);
}
});
return uploadSuccess.get();
}

private String getChecksumOfLocalFile(String file) throws IOException {
if (!localSegmentChecksumMap.containsKey(file)) {
try (IndexInput indexInput = storeDirectory.openInput(file, IOContext.DEFAULT)) {
String checksum = Long.toString(CodecUtil.retrieveChecksum(indexInput));
localSegmentChecksumMap.put(file, checksum);
}
}
return localSegmentChecksumMap.get(file);
}

remoteFilesToBeDeleted.forEach(filesUploadedToRemoteStore::remove);
private void deleteStaleCommits() {
try {
remoteDirectory.deleteStaleSegments(LAST_N_METADATA_FILES_TO_KEEP);
} catch (IOException e) {
logger.info("Exception while deleting stale commits from remote segment store, will retry delete post next commit", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,12 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
}
indexShard.preRecovery();
indexShard.prepareForIndexRecovery();
final Directory remoteDirectory = remoteStore.directory();
assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory();
assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory
: "Store.directory is not enclosing an instance of FilterDirectory";
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate();
final Store store = indexShard.store();
final Directory storeDirectory = store.directory();
store.incRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,37 @@ public class RemoteIndexInput extends IndexInput {

private final InputStream inputStream;
private final long size;
private long filePointer;

public RemoteIndexInput(String name, InputStream inputStream, long size) {
super(name);
this.inputStream = inputStream;
this.size = size;
this.filePointer = 0;
}

@Override
public byte readByte() throws IOException {
byte[] buffer = new byte[1];
inputStream.read(buffer);
int numberOfBytesRead = inputStream.read(buffer);
if (numberOfBytesRead != -1) {
filePointer += numberOfBytesRead;
}
return buffer[0];
}

@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
int bytesRead = inputStream.read(b, offset, len);
while (bytesRead > 0 && bytesRead < len) {
len -= bytesRead;
offset += bytesRead;
bytesRead = inputStream.read(b, offset, len);
if (bytesRead == len) {
filePointer += bytesRead;
} else {
while (bytesRead > 0 && bytesRead < len) {
filePointer += bytesRead;
len -= bytesRead;
offset += bytesRead;
bytesRead = inputStream.read(b, offset, len);
}
}
}

Expand All @@ -61,22 +71,25 @@ public long length() {
return size;
}

@Override
public void seek(long pos) throws IOException {
inputStream.skip(pos);
}

/**
* Guaranteed to throw an exception and leave the RemoteIndexInput unmodified.
* This method is not implemented as it is not used for the file transfer to/from the remote store.
*
* @throws UnsupportedOperationException always
*/
@Override
public long getFilePointer() {
public void seek(long pos) throws IOException {
throw new UnsupportedOperationException();
}

/**
* Returns the current position in this file in terms of number of bytes read so far.
*/
@Override
public long getFilePointer() {
return filePointer;
}

/**
* Guaranteed to throw an exception and leave the RemoteIndexInput unmodified.
* This method is not implemented as it is not used for the file transfer to/from the remote store.
Expand Down
Loading

0 comments on commit cd961f3

Please sign in to comment.