Skip to content

Pass Directory instead of DirectoryService to Store #33466

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -377,7 +378,9 @@ public synchronized IndexShard createShard(ShardRouting routing, Consumer<ShardI
warmer.warm(searcher, shard, IndexService.this.indexSettings);
}
};
store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock,
// TODO we can remove either IndexStore or DirectoryService. All we need is a simple Supplier<Directory>
DirectoryService directoryService = indexStore.newDirectoryService(path);
store = new Store(shardId, this.indexSettings, directoryService.newDirectory(), lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)));
indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier,
indexCache, mapperService, similarityService, engineFactory,
Expand Down
13 changes: 5 additions & 8 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.RefCounted;
Expand Down Expand Up @@ -153,18 +152,16 @@ protected void closeInternal() {
}
};

public Store(ShardId shardId, IndexSettings indexSettings, DirectoryService directoryService, ShardLock shardLock) throws IOException {
this(shardId, indexSettings, directoryService, shardLock, OnClose.EMPTY);
public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock) {
this(shardId, indexSettings, directory, shardLock, OnClose.EMPTY);
}

public Store(ShardId shardId, IndexSettings indexSettings, DirectoryService directoryService, ShardLock shardLock,
OnClose onClose) throws IOException {
public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock,
OnClose onClose) {
super(shardId, indexSettings);
final Settings settings = indexSettings.getSettings();
Directory dir = directoryService.newDirectory();
final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING);
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
ByteSizeCachingDirectory sizeCachingDir = new ByteSizeCachingDirectory(dir, refreshInterval);
ByteSizeCachingDirectory sizeCachingDir = new ByteSizeCachingDirectory(directory, refreshInterval);
this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId));
this.shardLock = shardLock;
this.onClose = onClose;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
Expand Down Expand Up @@ -106,13 +105,7 @@ public void setupListeners() throws Exception {
ShardId shardId = new ShardId(new Index("index", "_na_"), 1);
String allocationId = UUIDs.randomBase64UUID(random());
Directory directory = newDirectory();
DirectoryService directoryService = new DirectoryService(shardId, indexSettings) {
@Override
public Directory newDirectory() throws IOException {
return directory;
}
};
store = new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
store = new Store(shardId, indexSettings, directory, new DummyShardLock(shardId));
IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, createTempDir("translog"), indexSettings,
BigArrays.NON_RECYCLING_INSTANCE);
Expand Down
95 changes: 20 additions & 75 deletions server/src/test/java/org/elasticsearch/index/store/StoreTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,10 @@ public class StoreTests extends ESTestCase {
private static final Version MIN_SUPPORTED_LUCENE_VERSION = org.elasticsearch.Version.CURRENT
.minimumIndexCompatibilityVersion().luceneVersion;

public void testRefCount() throws IOException {
public void testRefCount() {
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
IndexSettings indexSettings = INDEX_SETTINGS;

Store store = new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
Store store = new Store(shardId, indexSettings, StoreTests.newDirectory(random()), new DummyShardLock(shardId));
int incs = randomIntBetween(1, 100);
for (int i = 0; i < incs; i++) {
if (randomBoolean()) {
Expand Down Expand Up @@ -296,8 +294,7 @@ public void testVerifyingIndexOutputWithBogusInput() throws IOException {

public void testNewChecksums() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId));
// set default codec - all segments need checksums
IndexWriter writer = new IndexWriter(store.directory(), newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(TestUtil.getDefaultCodec()));
int docs = 1 + random().nextInt(100);
Expand Down Expand Up @@ -347,7 +344,7 @@ public void testNewChecksums() throws IOException {
assertConsistent(store, metadata);

TestUtil.checkIndex(store.directory());
assertDeleteContent(store, directoryService);
assertDeleteContent(store, store.directory());
IOUtils.close(store);
}

Expand Down Expand Up @@ -455,32 +452,11 @@ private void corruptFile(Directory dir, String fileIn, String fileOut) throws IO

}

public void assertDeleteContent(Store store, DirectoryService service) throws IOException {
public void assertDeleteContent(Store store, Directory dir) throws IOException {
deleteContent(store.directory());
assertThat(Arrays.toString(store.directory().listAll()), store.directory().listAll().length, equalTo(0));
assertThat(store.stats().sizeInBytes(), equalTo(0L));
assertThat(service.newDirectory().listAll().length, equalTo(0));
}

private static final class LuceneManagedDirectoryService extends DirectoryService {
private final Directory dir;
private final Random random;

LuceneManagedDirectoryService(Random random) {
this(random, true);
}

LuceneManagedDirectoryService(Random random, boolean preventDoubleWrite) {
super(new ShardId(INDEX_SETTINGS.getIndex(), 1), INDEX_SETTINGS);
dir = StoreTests.newDirectory(random);
this.random = random;
}

@Override
public Directory newDirectory() throws IOException {
return dir;
}

assertThat(dir.listAll().length, equalTo(0));
}

public static void assertConsistent(Store store, Store.MetadataSnapshot metadata) throws IOException {
Expand Down Expand Up @@ -511,8 +487,7 @@ public void testRecoveryDiff() throws IOException, InterruptedException {
iwc.setMergePolicy(NoMergePolicy.INSTANCE);
iwc.setUseCompoundFile(random.nextBoolean());
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random);
Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId));
IndexWriter writer = new IndexWriter(store.directory(), iwc);
final boolean lotsOfSegments = rarely(random);
for (Document d : docs) {
Expand All @@ -526,7 +501,7 @@ public void testRecoveryDiff() throws IOException, InterruptedException {
writer.commit();
writer.close();
first = store.getMetadata(null);
assertDeleteContent(store, directoryService);
assertDeleteContent(store, store.directory());
store.close();
}
long time = new Date().getTime();
Expand All @@ -541,8 +516,7 @@ public void testRecoveryDiff() throws IOException, InterruptedException {
iwc.setMergePolicy(NoMergePolicy.INSTANCE);
iwc.setUseCompoundFile(random.nextBoolean());
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random);
store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId));
IndexWriter writer = new IndexWriter(store.directory(), iwc);
final boolean lotsOfSegments = rarely(random);
for (Document d : docs) {
Expand Down Expand Up @@ -639,8 +613,7 @@ public void testRecoveryDiff() throws IOException, InterruptedException {

public void testCleanupFromSnapshot() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId));
// this time random codec....
IndexWriterConfig indexWriterConfig = newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(TestUtil.getDefaultCodec());
// we keep all commits and that allows us clean based on multiple snapshots
Expand Down Expand Up @@ -727,11 +700,10 @@ public void testCleanupFromSnapshot() throws IOException {

public void testOnCloseCallback() throws IOException {
final ShardId shardId = new ShardId(new Index(randomRealisticUnicodeOfCodepointLengthBetween(1, 10), "_na_"), randomIntBetween(0, 100));
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
final AtomicInteger count = new AtomicInteger(0);
final ShardLock lock = new DummyShardLock(shardId);

Store store = new Store(shardId, INDEX_SETTINGS, directoryService, lock, theLock -> {
Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), lock, theLock -> {
assertEquals(shardId, theLock.getShardId());
assertEquals(lock, theLock);
count.incrementAndGet();
Expand All @@ -748,11 +720,10 @@ public void testOnCloseCallback() throws IOException {

public void testStoreStats() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
Settings settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
.put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMinutes(0)).build();
Store store = new Store(shardId, IndexSettingsModule.newIndexSettings("index", settings), directoryService,
Store store = new Store(shardId, IndexSettingsModule.newIndexSettings("index", settings), StoreTests.newDirectory(random()),
new DummyShardLock(shardId));
long initialStoreSize = 0;
for (String extraFiles : store.directory().listAll()) {
Expand Down Expand Up @@ -843,8 +814,7 @@ protected Store.MetadataSnapshot createMetaDataSnapshot() {

public void testUserDataRead() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId));
IndexWriterConfig config = newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(TestUtil.getDefaultCodec());
SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
config.setIndexDeletionPolicy(deletionPolicy);
Expand All @@ -867,7 +837,7 @@ public void testUserDataRead() throws IOException {
assertThat(metadata.getCommitUserData().get(Engine.SYNC_COMMIT_ID), equalTo(syncId));
assertThat(metadata.getCommitUserData().get(Translog.TRANSLOG_GENERATION_KEY), equalTo(translogId));
TestUtil.checkIndex(store.directory());
assertDeleteContent(store, directoryService);
assertDeleteContent(store, store.directory());
IOUtils.close(store);
}

Expand All @@ -893,8 +863,7 @@ public void testStreamStoreFilesMetaData() throws Exception {
public void testMarkCorruptedOnTruncatedSegmentsFile() throws IOException {
IndexWriterConfig iwc = newIndexWriterConfig();
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId));
IndexWriter writer = new IndexWriter(store.directory(), iwc);

int numDocs = 1 + random().nextInt(10);
Expand Down Expand Up @@ -945,15 +914,7 @@ public void testCanOpenIndex() throws IOException {
writer.commit();
writer.close();
assertTrue(Store.canOpenIndex(logger, tempDir, shardId, (id, l) -> new DummyShardLock(id)));

DirectoryService directoryService = new DirectoryService(shardId, INDEX_SETTINGS) {

@Override
public Directory newDirectory() throws IOException {
return dir;
}
};
Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
Store store = new Store(shardId, INDEX_SETTINGS, dir, new DummyShardLock(shardId));
store.markStoreCorrupted(new CorruptIndexException("foo", "bar"));
assertFalse(Store.canOpenIndex(logger, tempDir, shardId, (id, l) -> new DummyShardLock(id)));
store.close();
Expand All @@ -962,14 +923,7 @@ public Directory newDirectory() throws IOException {
public void testDeserializeCorruptionException() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
final Directory dir = new RAMDirectory(); // I use ram dir to prevent that virusscanner being a PITA
DirectoryService directoryService = new DirectoryService(shardId, INDEX_SETTINGS) {

@Override
public Directory newDirectory() throws IOException {
return dir;
}
};
Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
Store store = new Store(shardId, INDEX_SETTINGS, dir, new DummyShardLock(shardId));
CorruptIndexException ex = new CorruptIndexException("foo", "bar");
store.markStoreCorrupted(ex);
try {
Expand Down Expand Up @@ -998,14 +952,7 @@ public Directory newDirectory() throws IOException {
public void testCanReadOldCorruptionMarker() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
final Directory dir = new RAMDirectory(); // I use ram dir to prevent that virusscanner being a PITA
DirectoryService directoryService = new DirectoryService(shardId, INDEX_SETTINGS) {

@Override
public Directory newDirectory() throws IOException {
return dir;
}
};
Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
Store store = new Store(shardId, INDEX_SETTINGS, dir, new DummyShardLock(shardId));

CorruptIndexException exception = new CorruptIndexException("foo", "bar");
String uuid = Store.CORRUPTED + UUIDs.randomBase64UUID();
Expand Down Expand Up @@ -1065,8 +1012,7 @@ public Directory newDirectory() throws IOException {

public void testEnsureIndexHasHistoryUUID() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
try (Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId))) {
try (Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId))) {

store.createEmpty();

Expand Down Expand Up @@ -1098,8 +1044,7 @@ public void testEnsureIndexHasHistoryUUID() throws IOException {

public void testHistoryUUIDCanBeForced() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
try (Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId))) {
try (Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId))) {

store.createEmpty();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.elasticsearch.index.shard.IndexShardRelocatedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -461,18 +460,11 @@ private Store newStore(Path path) throws IOException {
return newStore(path, true);
}
private Store newStore(Path path, boolean checkIndex) throws IOException {
DirectoryService directoryService = new DirectoryService(shardId, INDEX_SETTINGS) {

@Override
public Directory newDirectory() throws IOException {
BaseDirectoryWrapper baseDirectoryWrapper = RecoverySourceHandlerTests.newFSDirectory(path);
if (checkIndex == false) {
baseDirectoryWrapper.setCheckIndexOnClose(false); // don't run checkindex we might corrupt the index in these tests
}
return baseDirectoryWrapper;
}
};
return new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
BaseDirectoryWrapper baseDirectoryWrapper = RecoverySourceHandlerTests.newFSDirectory(path);
if (checkIndex == false) {
baseDirectoryWrapper.setCheckIndexOnClose(false); // don't run checkindex we might corrupt the index in these tests
}
return new Store(shardId, INDEX_SETTINGS, baseDirectoryWrapper, new DummyShardLock(shardId));
}


Expand Down
Loading