Skip to content

Commit

Permalink
Add RemoteDirectoryFactory and use RemoteDirectory instance in Refres…
Browse files Browse the repository at this point in the history
…hListener (#3285)

Signed-off-by: Sachin Kale <kalsac@amazon.com>

Co-authored-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale and Sachin Kale authored May 16, 2022
1 parent bcfd328 commit 4408e6b
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,8 @@ public static final IndexShard newIndexShard(
Arrays.asList(listeners),
() -> {},
RetentionLeaseSyncer.EMPTY,
cbs
cbs,
null
);
}

Expand Down
4 changes: 4 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.RemoteDirectoryFactory;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
Expand Down Expand Up @@ -118,6 +119,8 @@ public final class IndexModule {

private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory();

private static final RemoteDirectoryFactory REMOTE_DIRECTORY_FACTORY = new RemoteDirectoryFactory();

private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new;

public static final Setting<String> INDEX_STORE_TYPE_SETTING = new Setting<>(
Expand Down Expand Up @@ -511,6 +514,7 @@ public IndexService newIndexService(
client,
queryCache,
directoryFactory,
REMOTE_DIRECTORY_FACTORY,
eventListener,
readerWrapperFactory,
mapperRegistry,
Expand Down
28 changes: 26 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.RemoteStoreRefreshListener;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardNotFoundException;
Expand All @@ -95,6 +96,9 @@
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -135,6 +139,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final NodeEnvironment nodeEnv;
private final ShardStoreDeleter shardStoreDeleter;
private final IndexStorePlugin.DirectoryFactory directoryFactory;
private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory;
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
private final IndexCache indexCache;
Expand Down Expand Up @@ -189,6 +194,7 @@ public IndexService(
Client client,
QueryCache queryCache,
IndexStorePlugin.DirectoryFactory directoryFactory,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
IndexEventListener eventListener,
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> wrapperFactory,
MapperRegistry mapperRegistry,
Expand Down Expand Up @@ -259,6 +265,7 @@ public IndexService(
this.eventListener = eventListener;
this.nodeEnv = nodeEnv;
this.directoryFactory = directoryFactory;
this.remoteDirectoryFactory = remoteDirectoryFactory;
this.recoveryStateFactory = recoveryStateFactory;
this.engineFactory = Objects.requireNonNull(engineFactory);
this.engineConfigFactory = Objects.requireNonNull(engineConfigFactory);
Expand Down Expand Up @@ -423,7 +430,8 @@ private long getAvgShardSizeInBytes() throws IOException {
public synchronized IndexShard createShard(
final ShardRouting routing,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer
final RetentionLeaseSyncer retentionLeaseSyncer,
final RepositoriesService repositoriesService
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down Expand Up @@ -497,6 +505,21 @@ public synchronized IndexShard createShard(
}
};
Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
Directory remoteDirectory = null;
RemoteStoreRefreshListener remoteStoreRefreshListener = null;
if (this.indexSettings.isRemoteStoreEnabled()) {
try {
Repository repository = repositoriesService.repository("dragon-stone");
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path, repository);
remoteStoreRefreshListener = new RemoteStoreRefreshListener(directory, remoteDirectory);
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException(
"Repository should be created before creating index with remote_store enabled setting",
e
);
}
}

store = new Store(
shardId,
this.indexSettings,
Expand Down Expand Up @@ -525,7 +548,8 @@ public synchronized IndexShard createShard(
indexingOperationListeners,
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService
circuitBreakerService,
remoteStoreRefreshListener
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
14 changes: 12 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ Runnable getGlobalCheckpointSyncer() {
private final RefreshPendingLocationListener refreshPendingLocationListener;
private volatile boolean useRetentionLeasesInPeerRecovery;

private final RemoteStoreRefreshListener remoteStoreRefreshListener;

public IndexShard(
final ShardRouting shardRouting,
final IndexSettings indexSettings,
Expand All @@ -319,7 +321,8 @@ public IndexShard(
final List<IndexingOperationListener> listeners,
final Runnable globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final CircuitBreakerService circuitBreakerService
final CircuitBreakerService circuitBreakerService,
final RemoteStoreRefreshListener remoteStoreRefreshListener
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -402,6 +405,7 @@ public boolean shouldCache(Query query) {
persistMetadata(path, indexSettings, shardRouting, null, logger);
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
this.remoteStoreRefreshListener = remoteStoreRefreshListener;
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -3100,6 +3104,12 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
}
};

final List<ReferenceManager.RefreshListener> internalRefreshListener;
if (remoteStoreRefreshListener != null && shardRouting.primary()) {
internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), remoteStoreRefreshListener);
} else {
internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric));
}
return this.engineConfigFactory.newEngineConfig(
shardId,
threadPool,
Expand All @@ -3116,7 +3126,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Arrays.asList(refreshListeners, refreshPendingLocationListener),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
internalRefreshListener,
indexSort,
circuitBreakerService,
globalCheckpointSupplier,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.store.Directory;

import java.io.IOException;

/**
* RefreshListener implementation to upload newly created segment files to the remote store
*/
public class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener {

private final Directory storeDirectory;
private final Directory remoteDirectory;

public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) {
this.storeDirectory = storeDirectory;
this.remoteDirectory = remoteDirectory;
}

@Override
public void beforeRefresh() throws IOException {
// ToDo Add implementation
}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
// ToDo Add implementation
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import org.apache.lucene.store.Directory;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;

public class RemoteDirectoryFactory implements IndexStorePlugin.RemoteDirectoryFactory {

@Override
public Directory newDirectory(IndexSettings indexSettings, ShardPath path, Repository repository) throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobPath blobPath = new BlobPath();
blobPath = blobPath.add(indexSettings.getIndex().getName()).add(String.valueOf(path.getShardId().getId()));
BlobContainer blobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(blobPath);
return new RemoteDirectory(blobContainer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ public IndexShard createShard(
IndexService indexService = indexService(shardRouting.index());
assert indexService != null;
RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer);
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, repositoriesService);
indexShard.addShardFailureCallback(onShardFailure);
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS
Expand Down
17 changes: 17 additions & 0 deletions server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.repositories.Repository;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -66,6 +67,22 @@ interface DirectoryFactory {
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException;
}

/**
* An interface that describes how to create a new remote directory instance per shard.
*/
@FunctionalInterface
interface RemoteDirectoryFactory {
/**
* Creates a new remote directory per shard. This method is called once per shard on shard creation.
* @param indexSettings the shards index settings
* @param shardPath the path the shard is using
* @param repository to get the BlobContainer details
* @return a new RemoteDirectory instance
* @throws IOException if an IOException occurs while opening the directory
*/
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, Repository repository) throws IOException;
}

/**
* The {@link DirectoryFactory} mappings for this plugin. When an index is created the store type setting
* {@link org.opensearch.index.IndexModule#INDEX_STORE_TYPE_SETTING} on the index will be examined and either use the default or a
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import org.apache.lucene.store.Directory;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;

public class RemoteDirectoryFactoryTests extends OpenSearchTestCase {

private RemoteDirectoryFactory remoteDirectoryFactory;

@Before
public void setup() {
remoteDirectoryFactory = new RemoteDirectoryFactory();
}

public void testNewDirectory() throws IOException {
Settings settings = Settings.builder().build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0");
ShardPath shardPath = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0));
BlobStoreRepository repository = mock(BlobStoreRepository.class);
BlobStore blobStore = mock(BlobStore.class);
BlobContainer blobContainer = mock(BlobContainer.class);
when(repository.blobStore()).thenReturn(blobStore);
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
when(blobContainer.listBlobs()).thenReturn(Collections.emptyMap());

Directory directory = remoteDirectoryFactory.newDirectory(indexSettings, shardPath, repository);
assertTrue(directory instanceof RemoteDirectory);
ArgumentCaptor<BlobPath> blobPathCaptor = ArgumentCaptor.forClass(BlobPath.class);
verify(blobStore).blobContainer(blobPathCaptor.capture());
BlobPath blobPath = blobPathCaptor.getValue();
assertEquals("foo/0/", blobPath.buildAsString());

directory.listAll();
verify(blobContainer).listBlobs();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem
newRouting = newRouting.moveToUnassigned(unassignedInfo)
.updateUnassigned(unassignedInfo, RecoverySource.EmptyStoreRecoverySource.INSTANCE);
newRouting = ShardRoutingHelper.initialize(newRouting, nodeId);
IndexShard shard = index.createShard(newRouting, s -> {}, RetentionLeaseSyncer.EMPTY);
IndexShard shard = index.createShard(newRouting, s -> {}, RetentionLeaseSyncer.EMPTY, null);
IndexShardTestCase.updateRoutingEntry(shard, newRouting);
assertEquals(5, counter.get());
final DiscoveryNode localNode = new DiscoveryNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,8 @@ protected IndexShard newShard(
Arrays.asList(listeners),
globalCheckpointSyncer,
retentionLeaseSyncer,
breakerService
breakerService,
null
);
indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
success = true;
Expand Down

0 comments on commit 4408e6b

Please sign in to comment.