Skip to content

File Settings Service #88329

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
df61848
Implement few operator handlers
Jun 27, 2022
407fd72
Add few ILM operator handler tests
Jun 27, 2022
b78d73f
Merge branch 'master' into operator/state_ilm_handlers
Jun 27, 2022
334f39c
Merge master
Jun 29, 2022
2836ff7
Rename operator to immutablestate
Jun 29, 2022
70f7202
Merge branch 'master' into operator/state_ilm_handlers
Jun 30, 2022
2c11e74
Address PR review feedback
Jun 30, 2022
c636026
Add ImmutableClusterStateController
Jun 30, 2022
da2654c
Merge master
Jul 4, 2022
49a1809
Merge branch 'operator/state_ilm_handlers' into operator/controller
Jul 4, 2022
3a66114
Merge master
Jul 4, 2022
4f54393
Fix test to run the transforms
Jul 5, 2022
fcf8ef7
Merge branch 'master' into operator/controller
Jul 5, 2022
16c3272
Refactor to split out parsing in handlers
Jul 6, 2022
d9273f4
Fix test JSON parsing
Jul 6, 2022
d36a968
Merge branch 'master' into operator/controller
Jul 6, 2022
81703a7
Add file settings service
Jul 6, 2022
dac7a6a
Update docs/changelog/88329.yaml
grcevski Jul 6, 2022
f58b7a8
Fix tests to reflect more debugging info
Jul 6, 2022
30648a2
Merge branch 'operator/file_service' of github.com:grcevski/elasticse…
Jul 6, 2022
1f0ee7a
Merge master
Jul 20, 2022
f09a658
Update to latest version
Jul 20, 2022
9357c28
Remove unused helper
Jul 20, 2022
089fe78
Change package
Jul 20, 2022
ecf8834
Fix MockNode
Jul 20, 2022
7de8408
Fix tests
Jul 20, 2022
31e0df1
Clean-up duplicate error reporting
Jul 21, 2022
7877236
Apply some PR review comments
Jul 21, 2022
a702f9a
Address feedback
Jul 22, 2022
837919a
Update tests to handle more debug info
Jul 22, 2022
e966002
Merge master
Jul 22, 2022
bdee9c9
Add integration test
Jul 22, 2022
2fe7fe5
Remove unused method
Jul 23, 2022
8e6f84e
Spotless
Jul 23, 2022
dce6e17
Fix tests for Windows.
Jul 25, 2022
610b63b
Address further feedback
Jul 25, 2022
6552723
Fix issue with missing config dir in tests
Jul 25, 2022
f70cdfc
Merge main
Jul 25, 2022
50e2fe1
Fix merge issue
Jul 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/88329.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 88329
summary: File Settings Service
area: Infra/Core
type: feature
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.reservedstate.service;

import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata;
import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata;
import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
import org.elasticsearch.test.ESIntegTestCase;

import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING;
import static org.elasticsearch.test.NodeRoles.dataOnlyNode;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
public class FileSettingsServiceIT extends ESIntegTestCase {

private AtomicLong versionCounter = new AtomicLong(1);

private static String testJSON = """
{
"metadata": {
"version": "%s",
"compatibility": "8.4.0"
},
"state": {
"cluster_settings": {
"indices.recovery.max_bytes_per_sec": "50mb"
}
}
}""";

private static String testErrorJSON = """
{
"metadata": {
"version": "%s",
"compatibility": "8.4.0"
},
"state": {
"not_cluster_settings": {
"search.allow_expensive_queries": "false"
}
}
}""";

private void assertMasterNode(Client client, String node) {
assertThat(
client.admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(),
equalTo(node)
);
}

private void writeJSONFile(String node, String json) throws Exception {
long version = versionCounter.incrementAndGet();

FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node);

Files.createDirectories(fileSettingsService.operatorSettingsDir());
Files.write(fileSettingsService.operatorSettingsFile(), Strings.format(json, version).getBytes(StandardCharsets.UTF_8));
}

private CountDownLatch setupClusterStateListener(String node) {
ClusterService clusterService = internalCluster().clusterService(node);
CountDownLatch savedClusterState = new CountDownLatch(1);
clusterService.addListener(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
ReservedStateMetadata reservedState = event.state().metadata().reservedStateMetadata().get(FileSettingsService.NAMESPACE);
if (reservedState != null) {
ReservedStateHandlerMetadata handlerMetadata = reservedState.handlers().get(ReservedClusterSettingsAction.NAME);
if (handlerMetadata == null) {
fail("Should've found cluster settings in this metadata");
}
assertThat(handlerMetadata.keys(), contains("indices.recovery.max_bytes_per_sec"));
clusterService.removeListener(this);
savedClusterState.countDown();
}
}
});

return savedClusterState;
}

private void assertClusterStateSaveOK(CountDownLatch savedClusterState) throws Exception {
boolean awaitSuccessful = savedClusterState.await(20, TimeUnit.SECONDS);
assertTrue(awaitSuccessful);

final ClusterStateResponse clusterStateResponse = client().admin().cluster().state(new ClusterStateRequest()).actionGet();

assertThat(
clusterStateResponse.getState().metadata().persistentSettings().get(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey()),
equalTo("50mb")
);

ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest().persistentSettings(
Settings.builder().put(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "1234kb")
);
assertEquals(
"java.lang.IllegalArgumentException: Failed to process request "
+ "[org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest/unset] "
+ "with errors: [[indices.recovery.max_bytes_per_sec] set as read-only by [file_settings]]",
expectThrows(ExecutionException.class, () -> client().admin().cluster().updateSettings(req).get()).getMessage()
);
}

public void testSettingsApplied() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(0);
logger.info("--> start data node / non master node");
String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s"));
FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode);

assertFalse(dataFileSettingsService.watching());

logger.info("--> start master node");
final String masterNode = internalCluster().startMasterOnlyNode();
assertMasterNode(internalCluster().nonMasterClient(), masterNode);
var savedClusterState = setupClusterStateListener(masterNode);

FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);

assertTrue(masterFileSettingsService.watching());
assertFalse(dataFileSettingsService.watching());

writeJSONFile(masterNode, testJSON);
assertClusterStateSaveOK(savedClusterState);
}

public void testSettingsAppliedOnStart() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(0);
logger.info("--> start data node / non master node");
String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s"));
FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode);

assertFalse(dataFileSettingsService.watching());
var savedClusterState = setupClusterStateListener(dataNode);

// In internal cluster tests, the nodes share the config directory, so when we write with the data node path
// the master will pick it up on start
writeJSONFile(dataNode, testJSON);

logger.info("--> start master node");
final String masterNode = internalCluster().startMasterOnlyNode();
assertMasterNode(internalCluster().nonMasterClient(), masterNode);

FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);

assertTrue(masterFileSettingsService.watching());
assertFalse(dataFileSettingsService.watching());

assertClusterStateSaveOK(savedClusterState);
}

private CountDownLatch setupClusterStateListenerForError(String node) {
ClusterService clusterService = internalCluster().clusterService(node);
CountDownLatch savedClusterState = new CountDownLatch(1);
clusterService.addListener(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
ReservedStateMetadata reservedState = event.state().metadata().reservedStateMetadata().get(FileSettingsService.NAMESPACE);
if (reservedState != null) {
assertEquals(ReservedStateErrorMetadata.ErrorKind.PARSING, reservedState.errorMetadata().errorKind());
assertThat(reservedState.errorMetadata().errors(), allOf(notNullValue(), hasSize(1)));
assertThat(
reservedState.errorMetadata().errors().get(0),
containsString("Missing handler definition for content key [not_cluster_settings]")
);
clusterService.removeListener(this);
savedClusterState.countDown();
}
}
});

return savedClusterState;
}

private void assertClusterStateNotSaved(CountDownLatch savedClusterState) throws Exception {
boolean awaitSuccessful = savedClusterState.await(20, TimeUnit.SECONDS);
assertTrue(awaitSuccessful);

final ClusterStateResponse clusterStateResponse = client().admin().cluster().state(new ClusterStateRequest()).actionGet();

assertThat(clusterStateResponse.getState().metadata().persistentSettings().get("search.allow_expensive_queries"), nullValue());

ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest().persistentSettings(
Settings.builder().put("search.allow_expensive_queries", "false")
);
// This should succeed, nothing was reserved
client().admin().cluster().updateSettings(req).get();
}

public void testErrorSaved() throws Exception {
internalCluster().setBootstrapMasterNodeIndex(0);
logger.info("--> start data node / non master node");
String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s"));
FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode);

assertFalse(dataFileSettingsService.watching());

logger.info("--> start master node");
final String masterNode = internalCluster().startMasterOnlyNode();
assertMasterNode(internalCluster().nonMasterClient(), masterNode);
var savedClusterState = setupClusterStateListenerForError(masterNode);

FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);

assertTrue(masterFileSettingsService.watching());
assertFalse(dataFileSettingsService.watching());

writeJSONFile(masterNode, testErrorJSON);
assertClusterStateNotSaved(savedClusterState);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ public void testThreadNames() throws Exception {
// or the ones that are occasionally come up from ESSingleNodeTestCase
if (threadName.contains("[node_s_0]") // TODO: this can't possibly be right! single node and integ test are unrelated!
|| threadName.contains("Keep-Alive-Timer")
|| threadName.contains("readiness-service")
|| threadName.contains("JVMCI-native") // GraalVM Compiler Thread
|| threadName.contains("readiness-service")) {
|| threadName.contains("file-settings-watcher")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will tackle cleaning up this manual registration in a separate PR.

|| threadName.contains("FileSystemWatchService")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have many other threadpools, but we've never needed to modify this test for each of them. Can we utilize the existing ThreadPools class to manage these threads, or do something so that we don't need to modify this for every system thread that is added?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that's this is tedious to modify each time. We need to explicitly add each thread that's not in any threadpool. When I worked on the readiness-service, I was advised not to use the thread pools for threads that will constantly be running, so I followed the same approach here. FileSystemWatchService comes from the JDK, so there's no avoiding adding that one.

Is it OK if I followed up with a new PR to tackle this problem, rather than extend this PR further? I think I should be able to ensure all standalone Elasticsearch threads are registered in a way that this list doesn't have to be manually modified.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A followup is great

continue;
}
String nodePrefix = "("
Expand Down
13 changes: 12 additions & 1 deletion server/src/main/java/org/elasticsearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.NamedRegistry;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.TypeLiteral;
Expand All @@ -274,6 +275,8 @@
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.ActionPlugin.ActionHandler;
import org.elasticsearch.plugins.interceptor.RestInterceptorActionPlugin;
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
import org.elasticsearch.reservedstate.service.ReservedClusterStateService;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.RestHeaderDefinition;
Expand Down Expand Up @@ -449,6 +452,7 @@ public class ActionModule extends AbstractModule {
private final RequestValidators<PutMappingRequest> mappingRequestValidators;
private final RequestValidators<IndicesAliasesRequest> indicesAliasesRequestRequestValidators;
private final ThreadPool threadPool;
private final ReservedClusterStateService reservedClusterStateService;

public ActionModule(
Settings settings,
Expand All @@ -462,7 +466,9 @@ public ActionModule(
CircuitBreakerService circuitBreakerService,
UsageService usageService,
SystemIndices systemIndices,
Tracer tracer
Tracer tracer,
ClusterService clusterService,
List<ReservedClusterStateHandler<?>> reservedStateHandlers
) {
this.settings = settings;
this.indexNameExpressionResolver = indexNameExpressionResolver;
Expand Down Expand Up @@ -513,6 +519,7 @@ public ActionModule(
);

restController = new RestController(headers, restInterceptor, nodeClient, circuitBreakerService, usageService, tracer);
reservedClusterStateService = new ReservedClusterStateService(clusterService, reservedStateHandlers);
}

public Map<String, ActionHandler<?, ?>> getActions() {
Expand Down Expand Up @@ -922,4 +929,8 @@ public ActionFilters getActionFilters() {
public RestController getRestController() {
return restController;
}

public ReservedClusterStateService getReservedClusterStateService() {
return reservedClusterStateService;
}
}
29 changes: 28 additions & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@
import org.elasticsearch.readiness.ReadinessService;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
import org.elasticsearch.reservedstate.ReservedClusterStateHandlerProvider;
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
import org.elasticsearch.reservedstate.service.FileSettingsService;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptEngine;
Expand Down Expand Up @@ -705,6 +709,17 @@ protected Node(
)
).toList();

List<ReservedClusterStateHandler<?>> reservedStateHandlers = new ArrayList<>();

// add all reserved state handlers from server
reservedStateHandlers.add(new ReservedClusterSettingsAction(settingsModule.getClusterSettings()));

// add all reserved state handlers from plugins
List<? extends ReservedClusterStateHandlerProvider> pluginHandlers = pluginsService.loadServiceProviders(
ReservedClusterStateHandlerProvider.class
);
pluginHandlers.forEach(h -> reservedStateHandlers.addAll(h.handlers()));

ActionModule actionModule = new ActionModule(
settings,
clusterModule.getIndexNameExpressionResolver(),
Expand All @@ -717,7 +732,9 @@ protected Node(
circuitBreakerService,
usageService,
systemIndices,
tracer
tracer,
clusterService,
reservedStateHandlers
);
modules.add(actionModule);

Expand Down Expand Up @@ -929,6 +946,12 @@ protected Node(
? new HealthMetadataService(clusterService, settings)
: null;

FileSettingsService fileSettingsService = new FileSettingsService(
clusterService,
actionModule.getReservedClusterStateService(),
environment
);

modules.add(b -> {
b.bind(Node.class).toInstance(this);
b.bind(NodeService.class).toInstance(nodeService);
Expand Down Expand Up @@ -1017,6 +1040,7 @@ protected Node(
b.bind(HealthMetadataService.class).toInstance(healthMetadataService);
}
b.bind(Tracer.class).toInstance(tracer);
b.bind(FileSettingsService.class).toInstance(fileSettingsService);
});

if (ReadinessService.enabled(environment)) {
Expand Down Expand Up @@ -1297,6 +1321,7 @@ public void onTimeout(TimeValue timeout) {
}
}

injector.getInstance(FileSettingsService.class).start();
injector.getInstance(HttpServerTransport.class).start();

if (WRITE_PORTS_FILE_SETTING.get(settings())) {
Expand Down Expand Up @@ -1334,6 +1359,7 @@ private Node stop() {
if (ReadinessService.enabled(environment)) {
injector.getInstance(ReadinessService.class).stop();
}
injector.getInstance(FileSettingsService.class).stop();
injector.getInstance(ResourceWatcherService.class).close();
injector.getInstance(HttpServerTransport.class).stop();

Expand Down Expand Up @@ -1417,6 +1443,7 @@ public synchronized void close() throws IOException {
if (ReadinessService.enabled(environment)) {
toClose.add(injector.getInstance(ReadinessService.class));
}
toClose.add(injector.getInstance(FileSettingsService.class));

for (LifecycleComponent plugin : pluginLifecycleComponents) {
toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")"));
Expand Down
Loading