Skip to content

Commit 8bde608

Browse files
authored
Register CcrRepository based on settings update (#36086)
This commit adds an empty CcrRepository snapshot/restore repository. When a new cluster is registered in the remote cluster settings, a new CcrRepository is registered for that cluster. This is implemented using a new concept of "internal repositories". RepositoryPlugin now allows implementations to return factories for "internal repositories". The "internal repositories" are different from normal repositories in that they cannot be registered through the external repository api. Additionally, "internal repositories" are local to a node and are not stored in the cluster state. The repository will be unregistered if the remote cluster is removed.
1 parent e1fb150 commit 8bde608

File tree

19 files changed

+1012
-49
lines changed

19 files changed

+1012
-49
lines changed

server/src/main/java/org/elasticsearch/plugins/RepositoryPlugin.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,17 @@ public interface RepositoryPlugin {
4242
default Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
4343
return Collections.emptyMap();
4444
}
45+
46+
/**
47+
* Returns internal repository types added by this plugin. Internal repositories cannot be registered
48+
* through the external API.
49+
*
50+
* @param env The environment for the local node, which may be used for the local settings and path.repo
51+
*
52+
* The key of the returned {@link Map} is the type name of the repository and
53+
* the value is a factory to construct the {@link Repository} interface.
54+
*/
55+
default Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
56+
return Collections.emptyMap();
57+
}
4558
}

server/src/main/java/org/elasticsearch/repositories/RepositoriesModule.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,24 @@ public RepositoriesModule(Environment env, List<RepositoryPlugin> repoPlugins, T
5858
}
5959
}
6060

61+
Map<String, Repository.Factory> internalFactories = new HashMap<>();
62+
for (RepositoryPlugin repoPlugin : repoPlugins) {
63+
Map<String, Repository.Factory> newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry);
64+
for (Map.Entry<String, Repository.Factory> entry : newRepoTypes.entrySet()) {
65+
if (internalFactories.put(entry.getKey(), entry.getValue()) != null) {
66+
throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered");
67+
}
68+
if (factories.put(entry.getKey(), entry.getValue()) != null) {
69+
throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered as a " +
70+
"non-internal repository");
71+
}
72+
}
73+
}
74+
6175
Map<String, Repository.Factory> repositoryTypes = Collections.unmodifiableMap(factories);
62-
repositoriesService = new RepositoriesService(env.settings(), clusterService, transportService, repositoryTypes, threadPool);
76+
Map<String, Repository.Factory> internalRepositoryTypes = Collections.unmodifiableMap(internalFactories);
77+
repositoriesService = new RepositoriesService(env.settings(), clusterService, transportService, repositoryTypes,
78+
internalRepositoryTypes, threadPool);
6379
}
6480

6581
@Override

server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.cluster.service.ClusterService;
3737
import org.elasticsearch.common.regex.Regex;
3838
import org.elasticsearch.common.settings.Settings;
39+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
3940
import org.elasticsearch.snapshots.RestoreService;
4041
import org.elasticsearch.snapshots.SnapshotsService;
4142
import org.elasticsearch.threadpool.ThreadPool;
@@ -57,19 +58,22 @@ public class RepositoriesService implements ClusterStateApplier {
5758
private static final Logger logger = LogManager.getLogger(RepositoriesService.class);
5859

5960
private final Map<String, Repository.Factory> typesRegistry;
61+
private final Map<String, Repository.Factory> internalTypesRegistry;
6062

6163
private final ClusterService clusterService;
6264

6365
private final ThreadPool threadPool;
6466

6567
private final VerifyNodeRepositoryAction verifyAction;
6668

69+
private final Map<String, Repository> internalRepositories = ConcurrentCollections.newConcurrentMap();
6770
private volatile Map<String, Repository> repositories = Collections.emptyMap();
6871

6972
public RepositoriesService(Settings settings, ClusterService clusterService, TransportService transportService,
70-
Map<String, Repository.Factory> typesRegistry,
73+
Map<String, Repository.Factory> typesRegistry, Map<String, Repository.Factory> internalTypesRegistry,
7174
ThreadPool threadPool) {
7275
this.typesRegistry = typesRegistry;
76+
this.internalTypesRegistry = internalTypesRegistry;
7377
this.clusterService = clusterService;
7478
this.threadPool = threadPool;
7579
// Doesn't make sense to maintain repositories on non-master and non-data nodes
@@ -101,7 +105,7 @@ public void registerRepository(final RegisterRepositoryRequest request, final Ac
101105

102106
// Trying to create the new repository on master to make sure it works
103107
try {
104-
closeRepository(createRepository(newRepositoryMetaData));
108+
closeRepository(createRepository(newRepositoryMetaData, typesRegistry));
105109
} catch (Exception e) {
106110
registrationListener.onFailure(e);
107111
return;
@@ -315,7 +319,7 @@ public void applyClusterState(ClusterChangedEvent event) {
315319
closeRepository(repository);
316320
repository = null;
317321
try {
318-
repository = createRepository(repositoryMetaData);
322+
repository = createRepository(repositoryMetaData, typesRegistry);
319323
} catch (RepositoryException ex) {
320324
// TODO: this catch is bogus, it means the old repo is already closed,
321325
// but we have nothing to replace it
@@ -324,7 +328,7 @@ public void applyClusterState(ClusterChangedEvent event) {
324328
}
325329
} else {
326330
try {
327-
repository = createRepository(repositoryMetaData);
331+
repository = createRepository(repositoryMetaData, typesRegistry);
328332
} catch (RepositoryException ex) {
329333
logger.warn(() -> new ParameterizedMessage("failed to create repository [{}]", repositoryMetaData.name()), ex);
330334
}
@@ -355,31 +359,59 @@ public Repository repository(String repositoryName) {
355359
if (repository != null) {
356360
return repository;
357361
}
362+
repository = internalRepositories.get(repositoryName);
363+
if (repository != null) {
364+
return repository;
365+
}
358366
throw new RepositoryMissingException(repositoryName);
359367
}
360368

369+
public void registerInternalRepository(String name, String type) {
370+
RepositoryMetaData metaData = new RepositoryMetaData(name, type, Settings.EMPTY);
371+
Repository repository = internalRepositories.computeIfAbsent(name, (n) -> {
372+
logger.debug("put internal repository [{}][{}]", name, type);
373+
return createRepository(metaData, internalTypesRegistry);
374+
});
375+
if (type.equals(repository.getMetadata().type()) == false) {
376+
logger.warn(new ParameterizedMessage("internal repository [{}][{}] already registered. this prevented the registration of " +
377+
"internal repository [{}][{}].", name, repository.getMetadata().type(), name, type));
378+
} else if (repositories.containsKey(name)) {
379+
logger.warn(new ParameterizedMessage("non-internal repository [{}] already registered. this repository will block the " +
380+
"usage of internal repository [{}][{}].", name, metaData.type(), name));
381+
}
382+
}
383+
384+
public void unregisterInternalRepository(String name) {
385+
Repository repository = internalRepositories.remove(name);
386+
if (repository != null) {
387+
RepositoryMetaData metadata = repository.getMetadata();
388+
logger.debug(() -> new ParameterizedMessage("delete internal repository [{}][{}].", metadata.type(), name));
389+
closeRepository(repository);
390+
}
391+
}
392+
361393
/** Closes the given repository. */
362394
private void closeRepository(Repository repository) {
363395
logger.debug("closing repository [{}][{}]", repository.getMetadata().type(), repository.getMetadata().name());
364396
repository.close();
365397
}
366398

367399
/**
368-
* Creates repository holder
400+
* Creates repository holder. This method starts the repository
369401
*/
370-
private Repository createRepository(RepositoryMetaData repositoryMetaData) {
402+
private Repository createRepository(RepositoryMetaData repositoryMetaData, Map<String, Repository.Factory> factories) {
371403
logger.debug("creating repository [{}][{}]", repositoryMetaData.type(), repositoryMetaData.name());
372-
Repository.Factory factory = typesRegistry.get(repositoryMetaData.type());
404+
Repository.Factory factory = factories.get(repositoryMetaData.type());
373405
if (factory == null) {
374406
throw new RepositoryException(repositoryMetaData.name(),
375407
"repository type [" + repositoryMetaData.type() + "] does not exist");
376408
}
377409
try {
378-
Repository repository = factory.create(repositoryMetaData, typesRegistry::get);
410+
Repository repository = factory.create(repositoryMetaData, factories::get);
379411
repository.start();
380412
return repository;
381413
} catch (Exception e) {
382-
logger.warn(() -> new ParameterizedMessage("failed to create repository [{}][{}]", repositoryMetaData.type(), repositoryMetaData.name()), e);
414+
logger.warn(new ParameterizedMessage("failed to create repository [{}][{}]", repositoryMetaData.type(), repositoryMetaData.name()), e);
383415
throw new RepositoryException(repositoryMetaData.name(), "failed to create repository", e);
384416
}
385417
}

server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ public String getKey(final String key) {
167167
REMOTE_CLUSTERS_SEEDS);
168168

169169
protected final Settings settings;
170-
protected final ClusterNameExpressionResolver clusterNameResolver;
170+
private final ClusterNameExpressionResolver clusterNameResolver;
171171

172172
/**
173173
* Creates a new {@link RemoteClusterAware} instance
@@ -242,14 +242,15 @@ static DiscoveryNode buildSeedNode(String clusterName, String address, boolean p
242242
* indices per cluster are collected as a list in the returned map keyed by the cluster alias. Local indices are grouped under
243243
* {@link #LOCAL_CLUSTER_GROUP_KEY}. The returned map is mutable.
244244
*
245+
* @param remoteClusterNames the remote cluster names
245246
* @param requestIndices the indices in the search request to filter
246247
* @param indexExists a predicate that can test if a certain index or alias exists in the local cluster
247248
*
248249
* @return a map of grouped remote and local indices
249250
*/
250-
public Map<String, List<String>> groupClusterIndices(String[] requestIndices, Predicate<String> indexExists) {
251+
protected Map<String, List<String>> groupClusterIndices(Set<String> remoteClusterNames, String[] requestIndices,
252+
Predicate<String> indexExists) {
251253
Map<String, List<String>> perClusterIndices = new HashMap<>();
252-
Set<String> remoteClusterNames = getRemoteClusterNames();
253254
for (String index : requestIndices) {
254255
int i = index.indexOf(RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR);
255256
if (i >= 0) {
@@ -281,9 +282,6 @@ public Map<String, List<String>> groupClusterIndices(String[] requestIndices, Pr
281282
return perClusterIndices;
282283
}
283284

284-
protected abstract Set<String> getRemoteClusterNames();
285-
286-
287285
/**
288286
* Subclasses must implement this to receive information about updated cluster aliases. If the given address list is
289287
* empty the cluster alias is unregistered and should be removed.

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ boolean isRemoteNodeConnected(final String remoteCluster, final DiscoveryNode no
278278
public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indices, Predicate<String> indexExists) {
279279
Map<String, OriginalIndices> originalIndicesMap = new HashMap<>();
280280
if (isCrossClusterSearchEnabled()) {
281-
final Map<String, List<String>> groupedIndices = groupClusterIndices(indices, indexExists);
281+
final Map<String, List<String>> groupedIndices = groupClusterIndices(getRemoteClusterNames(), indices, indexExists);
282282
if (groupedIndices.isEmpty()) {
283283
//search on _all in the local cluster if neither local indices nor remote indices were specified
284284
originalIndicesMap.put(LOCAL_CLUSTER_GROUP_KEY, new OriginalIndices(Strings.EMPTY_ARRAY, indicesOptions));
@@ -380,8 +380,7 @@ RemoteClusterConnection getRemoteClusterConnection(String cluster) {
380380
return connection;
381381
}
382382

383-
@Override
384-
protected Set<String> getRemoteClusterNames() {
383+
Set<String> getRemoteClusterNames() {
385384
return this.remoteClusters.keySet();
386385
}
387386

server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,7 @@ private IndicesClusterStateService createIndicesClusterStateService(DiscoveryNod
461461
Collections.emptySet());
462462
final ClusterService clusterService = mock(ClusterService.class);
463463
final RepositoriesService repositoriesService = new RepositoriesService(settings, clusterService,
464-
transportService, null, threadPool);
464+
transportService, Collections.emptyMap(), Collections.emptyMap(), threadPool);
465465
final PeerRecoveryTargetService recoveryTargetService = new PeerRecoveryTargetService(threadPool,
466466
transportService, null, clusterService);
467467
final ShardStateAction shardStateAction = mock(ShardStateAction.class);
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.repositories;
21+
22+
import org.elasticsearch.cluster.service.ClusterService;
23+
import org.elasticsearch.common.settings.Settings;
24+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
25+
import org.elasticsearch.env.Environment;
26+
import org.elasticsearch.plugins.RepositoryPlugin;
27+
import org.elasticsearch.test.ESTestCase;
28+
import org.elasticsearch.threadpool.ThreadPool;
29+
import org.elasticsearch.transport.TransportService;
30+
31+
import java.util.ArrayList;
32+
import java.util.Collections;
33+
import java.util.List;
34+
35+
import static org.mockito.Mockito.mock;
36+
import static org.mockito.Mockito.when;
37+
38+
public class RepositoriesModuleTests extends ESTestCase {
39+
40+
private Environment environment;
41+
private NamedXContentRegistry contentRegistry;
42+
private List<RepositoryPlugin> repoPlugins = new ArrayList<>();
43+
private RepositoryPlugin plugin1;
44+
private RepositoryPlugin plugin2;
45+
private Repository.Factory factory;
46+
47+
@Override
48+
public void setUp() throws Exception {
49+
super.setUp();
50+
environment = mock(Environment.class);
51+
contentRegistry = mock(NamedXContentRegistry.class);
52+
plugin1 = mock(RepositoryPlugin.class);
53+
plugin2 = mock(RepositoryPlugin.class);
54+
factory = mock(Repository.Factory.class);
55+
repoPlugins.add(plugin1);
56+
repoPlugins.add(plugin2);
57+
when(environment.settings()).thenReturn(Settings.EMPTY);
58+
}
59+
60+
public void testCanRegisterTwoRepositoriesWithDifferentTypes() {
61+
when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
62+
when(plugin2.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type2", factory));
63+
64+
// Would throw
65+
new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
66+
mock(ThreadPool.class), contentRegistry);
67+
}
68+
69+
public void testCannotRegisterTwoRepositoriesWithSameTypes() {
70+
when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
71+
when(plugin2.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
72+
73+
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
74+
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
75+
mock(ThreadPool.class), contentRegistry));
76+
77+
assertEquals("Repository type [type1] is already registered", ex.getMessage());
78+
}
79+
80+
public void testCannotRegisterTwoInternalRepositoriesWithSameTypes() {
81+
when(plugin1.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
82+
when(plugin2.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
83+
84+
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
85+
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
86+
mock(ThreadPool.class), contentRegistry));
87+
88+
assertEquals("Internal repository type [type1] is already registered", ex.getMessage());
89+
}
90+
91+
public void testCannotRegisterNormalAndInternalRepositoriesWithSameTypes() {
92+
when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
93+
when(plugin2.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
94+
95+
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
96+
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
97+
mock(ThreadPool.class), contentRegistry));
98+
99+
assertEquals("Internal repository type [type1] is already registered as a non-internal repository", ex.getMessage());
100+
}
101+
}

0 commit comments

Comments
 (0)