Skip to content

Commit

Permalink
Add support for CompositeDirectory
Browse files Browse the repository at this point in the history
  • Loading branch information
Sachin Kale committed May 13, 2022
1 parent d13d423 commit 0647e2b
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 34 deletions.
6 changes: 3 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.opensearch.index.shard.ShardNotInPrimaryModeException;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.CompositeDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.breaker.CircuitBreakerService;
Expand Down Expand Up @@ -504,11 +505,11 @@ public synchronized IndexShard createShard(
}
};
Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
Directory remoteDirectory = null;
if (this.indexSettings.isRemoteStoreEnabled()) {
try {
Repository repository = repositoriesService.repository("dragon-stone");
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path, repository);
Directory remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path, repository);
directory = new CompositeDirectory(directory, remoteDirectory);
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException(
"Repository should be created before creating index with remote_store enabled setting",
Expand All @@ -520,7 +521,6 @@ public synchronized IndexShard createShard(
shardId,
this.indexSettings,
directory,
remoteDirectory,
lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId))
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;

import java.io.IOException;
import java.util.Set;

public class CompositeDirectory extends FilterDirectory {
private final Directory directory1;
private final Directory directory2;

public CompositeDirectory(Directory directory1, Directory directory2) {
super(directory1);
this.directory1 = directory1;
this.directory2 = directory2;
}

@Override
public void deleteFile(String name) throws IOException {
directory1.deleteFile(name);
directory2.deleteFile(name);
}

@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
IndexOutput output1 = directory1.createOutput(name, context);
IndexOutput output2 = directory2.createOutput(name, context);
return new CompositeIndexOutput(name, name, output1, output2, directory1);
}

@Override
public void close() throws IOException {
directory1.close();
directory2.close();
}

@Override
public Set<String> getPendingDeletions() throws IOException {
return directory1.getPendingDeletions();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;

import java.io.IOException;

public class CompositeIndexOutput extends IndexOutput {
private final IndexOutput output1;
private final IndexOutput output2;
private final Directory directory1;

protected CompositeIndexOutput(String resourceDescription, String name, IndexOutput output1, IndexOutput output2, Directory directory1) {
super(resourceDescription, name);
this.output1 = output1;
this.output2 = output2;
this.directory1 = directory1;
}

@Override
public void close() throws IOException {
output1.close();
IndexInput indexInput = directory1.openInput(this.getName(), IOContext.DEFAULT);
output2.copyBytes(indexInput, indexInput.length());
}

@Override
public long getFilePointer() {
return output1.getFilePointer();
}

@Override
public long getChecksum() throws IOException {
return output1.getChecksum();
}

@Override
public void writeByte(byte b) throws IOException {
output1.writeByte(b);
}

@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
output1.writeBytes(b, offset, length);
}
}
33 changes: 2 additions & 31 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref

private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final StoreDirectory directory;
private final StoreDirectory secondaryDirectory;
private final ReentrantReadWriteLock metadataLock = new ReentrantReadWriteLock();
private final ShardLock shardLock;
private final OnClose onClose;
Expand All @@ -188,22 +187,13 @@ protected void closeInternal() {
};

public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock) {
this(shardId, indexSettings, directory, null, shardLock, OnClose.EMPTY);
}

public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, Directory secondaryDirectory, ShardLock shardLock) {
this(shardId, indexSettings, directory, secondaryDirectory, shardLock, OnClose.EMPTY);
}

public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock, OnClose onClose) {
this(shardId, indexSettings, directory, null, shardLock, onClose);
this(shardId, indexSettings, directory, shardLock, OnClose.EMPTY);
}

public Store(
ShardId shardId,
IndexSettings indexSettings,
Directory directory,
Directory secondaryDirectory,
ShardLock shardLock,
OnClose onClose
) {
Expand All @@ -212,12 +202,6 @@ public Store(
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
ByteSizeCachingDirectory sizeCachingDir = new ByteSizeCachingDirectory(directory, refreshInterval);
this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId));
if (secondaryDirectory != null) {
ByteSizeCachingDirectory sizeCachingSecondaryDir = new ByteSizeCachingDirectory(secondaryDirectory, refreshInterval);
this.secondaryDirectory = new StoreDirectory(sizeCachingSecondaryDir, Loggers.getLogger("index.store.deletes", shardId));
} else {
this.secondaryDirectory = null;
}
this.shardLock = shardLock;
this.onClose = onClose;

Expand All @@ -227,20 +211,7 @@ public Store(
}

public Directory directory() {
return directory(true);
}

public Directory secondaryDirectory() {
return directory(false);
}

private Directory directory(boolean primary) {
ensureOpen();
if (primary) {
return directory;
} else {
return secondaryDirectory;
}
return directory;
}

/**
Expand Down

0 comments on commit 0647e2b

Please sign in to comment.