Skip to content

Add searchable snapshots cache directory #50693

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
cc58922
Add basis for CacheService & CacheDirectory
tlrx Jan 2, 2020
a3bd088
Add SparseFileTracker
tlrx Jan 2, 2020
bfbddbf
Add length to SparseFileTracker
tlrx Jan 3, 2020
95ca3dc
Tests passed
tlrx Jan 3, 2020
99194dc
Add small note
tlrx Jan 6, 2020
bf277a3
Remove invalidate_on_shutdown
tlrx Jan 8, 2020
e0a6d01
Estimate weight using file length
tlrx Jan 8, 2020
e24a74d
whitespaces
tlrx Jan 8, 2020
f3a958b
Handle lifecycle
tlrx Jan 8, 2020
43b59b4
Add small cache size + concurrent searches
tlrx Jan 14, 2020
288fa21
apply feedback
tlrx Jan 14, 2020
b8597b8
Update x-pack/plugin/searchable-snapshots/src/main/java/org/elasticse…
tlrx Jan 14, 2020
bc783a4
Restore sparse flag
tlrx Jan 14, 2020
51852da
Apply David's feedback
tlrx Jan 20, 2020
178b6c4
Merge branch 'feature/searchable-snapshots' into add-searchable-snaps…
elasticmachine Jan 20, 2020
8645db3
Merge branch 'feature/searchable-snapshots' into add-searchable-snaps…
elasticmachine Jan 20, 2020
93d4064
Fix bug in position + potential deadlock in getOrAcquire() + NPE in c…
tlrx Jan 20, 2020
7211c22
Apply David's early feedback
tlrx Jan 21, 2020
e1f5450
Really?!
tlrx Jan 21, 2020
f0bf0e6
Fix setting name
tlrx Jan 22, 2020
15a8698
Remove FileChannelRefCount and manage FileChannel within CacheFile
tlrx Jan 22, 2020
a1b4277
Add CacheBufferedIndexInputTests
tlrx Jan 22, 2020
cea36b2
Merge branch 'feature/searchable-snapshots' into add-searchable-snaps…
elasticmachine Jan 22, 2020
67f7b9a
Prevent cache invalidation to trigger more gets on shutdown
tlrx Jan 22, 2020
5efa91f
Also on removeFromCache
tlrx Jan 22, 2020
eb514d7
Don't return int in writeCacheFile()
tlrx Jan 23, 2020
9449563
Math.toIntExact(remaining)
tlrx Jan 23, 2020
573f7ab
Trust assertion
tlrx Jan 23, 2020
02480db
Rename close/closeInternal
tlrx Jan 23, 2020
628e487
assert channel != null
tlrx Jan 23, 2020
4668456
Release for clones
tlrx Jan 23, 2020
cdc99d8
Remove unused start
tlrx Jan 23, 2020
50f36bb
remove unused
tlrx Jan 23, 2020
610affd
More assertion with underlying index input
tlrx Jan 23, 2020
262d7e1
Clones should acquire from root IndexInput + prevent file closing
tlrx Jan 24, 2020
757dc68
Merge branch 'feature/searchable-snapshots' into add-searchable-snaps…
elasticmachine Jan 25, 2020
0aece73
ReleasableLock -> synchronized
tlrx Jan 27, 2020
48fa923
closed -> evicted
tlrx Jan 27, 2020
391633f
readDirectly
tlrx Jan 27, 2020
99a9968
COPY_BUFFER_SIZE
tlrx Jan 27, 2020
348fac8
Remove IOE
tlrx Jan 27, 2020
f01d632
Remove locking in CacheService
tlrx Jan 27, 2020
f7cc415
Add CacheFileReference
tlrx Jan 27, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheDirectory;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
Expand All @@ -53,6 +56,8 @@ public class SearchableSnapshotRepository extends FilterRepository {
Setting.simpleString("index.store.snapshot.snapshot_uuid", Setting.Property.IndexScope, Setting.Property.PrivateIndex);
public static final Setting<String> SNAPSHOT_INDEX_ID_SETTING =
Setting.simpleString("index.store.snapshot.index_uuid", Setting.Property.IndexScope, Setting.Property.PrivateIndex);
public static final Setting<Boolean> SNAPSHOT_CACHE_ENABLED_SETTING =
Setting.boolSetting("index.store.snapshot.cache.enabled", true, Setting.Property.IndexScope);

public static final String SNAPSHOT_DIRECTORY_FACTORY_KEY = "snapshot";

Expand All @@ -64,12 +69,12 @@ public class SearchableSnapshotRepository extends FilterRepository {
public SearchableSnapshotRepository(Repository in) {
super(in);
if (in instanceof BlobStoreRepository == false) {
throw new IllegalArgumentException("Repository [" + in + "] does not support searchable snapshots" );
throw new IllegalArgumentException("Repository [" + in + "] does not support searchable snapshots");
}
blobStoreRepository = (BlobStoreRepository) in;
}

private Directory makeDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
private Directory makeDirectory(IndexSettings indexSettings, ShardPath shardPath, CacheService cacheService) throws IOException {

IndexId indexId = new IndexId(indexSettings.getIndex().getName(), SNAPSHOT_INDEX_ID_SETTING.get(indexSettings.getSettings()));
BlobContainer blobContainer = blobStoreRepository.shardContainer(indexId, shardPath.getShardId().id());
Expand All @@ -78,10 +83,14 @@ private Directory makeDirectory(IndexSettings indexSettings, ShardPath shardPath
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()));
BlobStoreIndexShardSnapshot snapshot = blobStoreRepository.loadShardSnapshot(blobContainer, snapshotId);

final SearchableSnapshotDirectory searchableSnapshotDirectory = new SearchableSnapshotDirectory(snapshot, blobContainer);
final InMemoryNoOpCommitDirectory inMemoryNoOpCommitDirectory = new InMemoryNoOpCommitDirectory(searchableSnapshotDirectory);
Directory directory = new SearchableSnapshotDirectory(snapshot, blobContainer);
if (SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings.getSettings())) {
final Path cacheDir = shardPath.getDataPath().resolve("snapshots").resolve(snapshotId.getUUID());
directory = new CacheDirectory(directory, cacheService, cacheDir);
}
directory = new InMemoryNoOpCommitDirectory(directory);

try (IndexWriter indexWriter = new IndexWriter(inMemoryNoOpCommitDirectory, new IndexWriterConfig())) {
try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) {
final Map<String, String> userData = new HashMap<>();
indexWriter.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue()));

Expand All @@ -94,7 +103,7 @@ private Directory makeDirectory(IndexSettings indexSettings, ShardPath shardPath
indexWriter.commit();
}

return inMemoryNoOpCommitDirectory;
return directory;
}

@Override
Expand Down Expand Up @@ -136,17 +145,21 @@ public Repository create(RepositoryMetaData metaData, Function<String, Factory>
};
}

public static IndexStorePlugin.DirectoryFactory newDirectoryFactory(final Supplier<RepositoriesService> repositoriesService) {
public static IndexStorePlugin.DirectoryFactory newDirectoryFactory(final Supplier<RepositoriesService> repositoriesService,
final Supplier<CacheService> cacheService) {
return (indexSettings, shardPath) -> {
final RepositoriesService repositories = repositoriesService.get();
assert repositories != null;

final Repository repository = repositories.repository(SNAPSHOT_REPOSITORY_SETTING.get(indexSettings.getSettings()));
if (repository instanceof SearchableSnapshotRepository == false) {
throw new IllegalArgumentException("Repository [" + repository + "] is not searchable" );
throw new IllegalArgumentException("Repository [" + repository + "] is not searchable");
}

return ((SearchableSnapshotRepository)repository).makeDirectory(indexSettings, shardPath);
final CacheService cache = cacheService.get();
assert cache != null;

return ((SearchableSnapshotRepository) repository).makeDirectory(indexSettings, shardPath, cache);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
package org.elasticsearch.xpack.searchablesnapshots;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.ReadOnlyEngine;
Expand All @@ -21,7 +25,12 @@
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -36,17 +45,41 @@
public class SearchableSnapshots extends Plugin implements IndexStorePlugin, RepositoryPlugin, EnginePlugin {

private final SetOnce<RepositoriesService> repositoriesService;
private final SetOnce<CacheService> cacheService;
private final Settings settings;

public SearchableSnapshots() {
public SearchableSnapshots(final Settings settings) {
this.repositoriesService = new SetOnce<>();
this.cacheService = new SetOnce<>();
this.settings = settings;
}

@Override
public List<Setting<?>> getSettings() {
return List.of(SearchableSnapshotRepository.SNAPSHOT_REPOSITORY_SETTING,
SearchableSnapshotRepository.SNAPSHOT_SNAPSHOT_NAME_SETTING,
SearchableSnapshotRepository.SNAPSHOT_SNAPSHOT_ID_SETTING,
SearchableSnapshotRepository.SNAPSHOT_INDEX_ID_SETTING);
SearchableSnapshotRepository.SNAPSHOT_INDEX_ID_SETTING,
SearchableSnapshotRepository.SNAPSHOT_CACHE_ENABLED_SETTING,
CacheService.SNAPSHOT_CACHE_SIZE_SETTING
);
}

@Override
public Collection<Object> createComponents(
final Client client,
final ClusterService clusterService,
final ThreadPool threadPool,
final ResourceWatcherService resourceWatcherService,
final ScriptService scriptService,
final NamedXContentRegistry xContentRegistry,
final Environment environment,
final NodeEnvironment nodeEnvironment,
final NamedWriteableRegistry namedWriteableRegistry) {

final CacheService cacheService = new CacheService(settings);
this.cacheService.set(cacheService);
return List.of(cacheService);
}

@Override
Expand All @@ -57,7 +90,7 @@ public void onRepositoriesModule(RepositoriesModule repositoriesModule) {
@Override
public Map<String, DirectoryFactory> getDirectoryFactories() {
return Map.of(SearchableSnapshotRepository.SNAPSHOT_DIRECTORY_FACTORY_KEY,
SearchableSnapshotRepository.newDirectoryFactory(repositoriesService::get));
SearchableSnapshotRepository.newDirectoryFactory(repositoriesService::get, cacheService::get));
}

@Override
Expand Down
Loading