Skip to content

Register CcrRepository based on settings update (#36086) #36277

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Dec 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,17 @@ public interface RepositoryPlugin {
default Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
return Collections.emptyMap();
}

/**
* Returns internal repository types added by this plugin. Internal repositories cannot be registered
* through the external API.
*
* @param env The environment for the local node, which may be used for the local settings and path.repo
*
* The key of the returned {@link Map} is the type name of the repository and
* the value is a factory to construct the {@link Repository} interface.
*/
default Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,24 @@ public RepositoriesModule(Environment env, List<RepositoryPlugin> repoPlugins, T
}
}

Map<String, Repository.Factory> internalFactories = new HashMap<>();
for (RepositoryPlugin repoPlugin : repoPlugins) {
Map<String, Repository.Factory> newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry);
for (Map.Entry<String, Repository.Factory> entry : newRepoTypes.entrySet()) {
if (internalFactories.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered");
}
if (factories.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered as a " +
"non-internal repository");
}
}
}

Map<String, Repository.Factory> repositoryTypes = Collections.unmodifiableMap(factories);
repositoriesService = new RepositoriesService(env.settings(), clusterService, transportService, repositoryTypes, threadPool);
Map<String, Repository.Factory> internalRepositoryTypes = Collections.unmodifiableMap(internalFactories);
repositoriesService = new RepositoriesService(env.settings(), clusterService, transportService, repositoryTypes,
internalRepositoryTypes, threadPool);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -57,19 +58,22 @@ public class RepositoriesService implements ClusterStateApplier {
private static final Logger logger = LogManager.getLogger(RepositoriesService.class);

private final Map<String, Repository.Factory> typesRegistry;
private final Map<String, Repository.Factory> internalTypesRegistry;

private final ClusterService clusterService;

private final ThreadPool threadPool;

private final VerifyNodeRepositoryAction verifyAction;

private final Map<String, Repository> internalRepositories = ConcurrentCollections.newConcurrentMap();
private volatile Map<String, Repository> repositories = Collections.emptyMap();

public RepositoriesService(Settings settings, ClusterService clusterService, TransportService transportService,
Map<String, Repository.Factory> typesRegistry,
Map<String, Repository.Factory> typesRegistry, Map<String, Repository.Factory> internalTypesRegistry,
ThreadPool threadPool) {
this.typesRegistry = typesRegistry;
this.internalTypesRegistry = internalTypesRegistry;
this.clusterService = clusterService;
this.threadPool = threadPool;
// Doesn't make sense to maintain repositories on non-master and non-data nodes
Expand Down Expand Up @@ -101,7 +105,7 @@ public void registerRepository(final RegisterRepositoryRequest request, final Ac

// Trying to create the new repository on master to make sure it works
try {
closeRepository(createRepository(newRepositoryMetaData));
closeRepository(createRepository(newRepositoryMetaData, typesRegistry));
} catch (Exception e) {
registrationListener.onFailure(e);
return;
Expand Down Expand Up @@ -316,7 +320,7 @@ public void applyClusterState(ClusterChangedEvent event) {
closeRepository(repository);
repository = null;
try {
repository = createRepository(repositoryMetaData);
repository = createRepository(repositoryMetaData, typesRegistry);
} catch (RepositoryException ex) {
// TODO: this catch is bogus, it means the old repo is already closed,
// but we have nothing to replace it
Expand All @@ -325,7 +329,7 @@ public void applyClusterState(ClusterChangedEvent event) {
}
} else {
try {
repository = createRepository(repositoryMetaData);
repository = createRepository(repositoryMetaData, typesRegistry);
} catch (RepositoryException ex) {
logger.warn(() -> new ParameterizedMessage("failed to create repository [{}]", repositoryMetaData.name()), ex);
}
Expand Down Expand Up @@ -356,31 +360,59 @@ public Repository repository(String repositoryName) {
if (repository != null) {
return repository;
}
repository = internalRepositories.get(repositoryName);
if (repository != null) {
return repository;
}
throw new RepositoryMissingException(repositoryName);
}

public void registerInternalRepository(String name, String type) {
RepositoryMetaData metaData = new RepositoryMetaData(name, type, Settings.EMPTY);
Repository repository = internalRepositories.computeIfAbsent(name, (n) -> {
logger.debug("put internal repository [{}][{}]", name, type);
return createRepository(metaData, internalTypesRegistry);
});
if (type.equals(repository.getMetadata().type()) == false) {
logger.warn(new ParameterizedMessage("internal repository [{}][{}] already registered. this prevented the registration of " +
"internal repository [{}][{}].", name, repository.getMetadata().type(), name, type));
} else if (repositories.containsKey(name)) {
logger.warn(new ParameterizedMessage("non-internal repository [{}] already registered. this repository will block the " +
"usage of internal repository [{}][{}].", name, metaData.type(), name));
}
}

public void unregisterInternalRepository(String name) {
Repository repository = internalRepositories.remove(name);
if (repository != null) {
RepositoryMetaData metadata = repository.getMetadata();
logger.debug(() -> new ParameterizedMessage("delete internal repository [{}][{}].", metadata.type(), name));
closeRepository(repository);
}
}

/** Closes the given repository. */
private void closeRepository(Repository repository) {
logger.debug("closing repository [{}][{}]", repository.getMetadata().type(), repository.getMetadata().name());
repository.close();
}

/**
* Creates repository holder
* Creates repository holder. This method starts the repository
*/
private Repository createRepository(RepositoryMetaData repositoryMetaData) {
private Repository createRepository(RepositoryMetaData repositoryMetaData, Map<String, Repository.Factory> factories) {
logger.debug("creating repository [{}][{}]", repositoryMetaData.type(), repositoryMetaData.name());
Repository.Factory factory = typesRegistry.get(repositoryMetaData.type());
Repository.Factory factory = factories.get(repositoryMetaData.type());
if (factory == null) {
throw new RepositoryException(repositoryMetaData.name(),
"repository type [" + repositoryMetaData.type() + "] does not exist");
}
try {
Repository repository = factory.create(repositoryMetaData, typesRegistry::get);
Repository repository = factory.create(repositoryMetaData, factories::get);
repository.start();
return repository;
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to create repository [{}][{}]", repositoryMetaData.type(), repositoryMetaData.name()), e);
logger.warn(new ParameterizedMessage("failed to create repository [{}][{}]", repositoryMetaData.type(), repositoryMetaData.name()), e);
throw new RepositoryException(repositoryMetaData.name(), "failed to create repository", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public String getKey(final String key) {
REMOTE_CLUSTERS_SEEDS);

protected final Settings settings;
protected final ClusterNameExpressionResolver clusterNameResolver;
private final ClusterNameExpressionResolver clusterNameResolver;

/**
* Creates a new {@link RemoteClusterAware} instance
Expand Down Expand Up @@ -237,14 +237,15 @@ static DiscoveryNode buildSeedNode(String clusterName, String address, boolean p
* indices per cluster are collected as a list in the returned map keyed by the cluster alias. Local indices are grouped under
* {@link #LOCAL_CLUSTER_GROUP_KEY}. The returned map is mutable.
*
* @param remoteClusterNames the remote cluster names
* @param requestIndices the indices in the search request to filter
* @param indexExists a predicate that can test if a certain index or alias exists in the local cluster
*
* @return a map of grouped remote and local indices
*/
public Map<String, List<String>> groupClusterIndices(String[] requestIndices, Predicate<String> indexExists) {
protected Map<String, List<String>> groupClusterIndices(Set<String> remoteClusterNames, String[] requestIndices,
Predicate<String> indexExists) {
Map<String, List<String>> perClusterIndices = new HashMap<>();
Set<String> remoteClusterNames = getRemoteClusterNames();
for (String index : requestIndices) {
int i = index.indexOf(RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR);
if (i >= 0) {
Expand Down Expand Up @@ -276,9 +277,6 @@ public Map<String, List<String>> groupClusterIndices(String[] requestIndices, Pr
return perClusterIndices;
}

protected abstract Set<String> getRemoteClusterNames();


/**
* Subclasses must implement this to receive information about updated cluster aliases. If the given address list is
* empty the cluster alias is unregistered and should be removed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ boolean isRemoteNodeConnected(final String remoteCluster, final DiscoveryNode no
public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indices, Predicate<String> indexExists) {
Map<String, OriginalIndices> originalIndicesMap = new HashMap<>();
if (isCrossClusterSearchEnabled()) {
final Map<String, List<String>> groupedIndices = groupClusterIndices(indices, indexExists);
final Map<String, List<String>> groupedIndices = groupClusterIndices(getRemoteClusterNames(), indices, indexExists);
if (groupedIndices.isEmpty()) {
//search on _all in the local cluster if neither local indices nor remote indices were specified
originalIndicesMap.put(LOCAL_CLUSTER_GROUP_KEY, new OriginalIndices(Strings.EMPTY_ARRAY, indicesOptions));
Expand Down Expand Up @@ -374,8 +374,7 @@ RemoteClusterConnection getRemoteClusterConnection(String cluster) {
return connection;
}

@Override
protected Set<String> getRemoteClusterNames() {
Set<String> getRemoteClusterNames() {
return this.remoteClusters.keySet();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ private IndicesClusterStateService createIndicesClusterStateService(DiscoveryNod
Collections.emptySet());
final ClusterService clusterService = mock(ClusterService.class);
final RepositoriesService repositoriesService = new RepositoriesService(settings, clusterService,
transportService, null, threadPool);
transportService, Collections.emptyMap(), Collections.emptyMap(), threadPool);
final PeerRecoveryTargetService recoveryTargetService = new PeerRecoveryTargetService(threadPool,
transportService, null, clusterService);
final ShardStateAction shardStateAction = mock(ShardStateAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.repositories;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class RepositoriesModuleTests extends ESTestCase {

private Environment environment;
private NamedXContentRegistry contentRegistry;
private List<RepositoryPlugin> repoPlugins = new ArrayList<>();
private RepositoryPlugin plugin1;
private RepositoryPlugin plugin2;
private Repository.Factory factory;

@Override
public void setUp() throws Exception {
super.setUp();
environment = mock(Environment.class);
contentRegistry = mock(NamedXContentRegistry.class);
plugin1 = mock(RepositoryPlugin.class);
plugin2 = mock(RepositoryPlugin.class);
factory = mock(Repository.Factory.class);
repoPlugins.add(plugin1);
repoPlugins.add(plugin2);
when(environment.settings()).thenReturn(Settings.EMPTY);
}

public void testCanRegisterTwoRepositoriesWithDifferentTypes() {
when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type2", factory));

// Would throw
new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
mock(ThreadPool.class), contentRegistry);
}

public void testCannotRegisterTwoRepositoriesWithSameTypes() {
when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
mock(ThreadPool.class), contentRegistry));

assertEquals("Repository type [type1] is already registered", ex.getMessage());
}

public void testCannotRegisterTwoInternalRepositoriesWithSameTypes() {
when(plugin1.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
mock(ThreadPool.class), contentRegistry));

assertEquals("Internal repository type [type1] is already registered", ex.getMessage());
}

public void testCannotRegisterNormalAndInternalRepositoriesWithSameTypes() {
when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
mock(ThreadPool.class), contentRegistry));

assertEquals("Internal repository type [type1] is already registered as a non-internal repository", ex.getMessage());
}
}
Loading