-
Notifications
You must be signed in to change notification settings - Fork 25.3k
File Settings Service #88329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
File Settings Service #88329
Changes from all commits
df61848
407fd72
b78d73f
334f39c
2836ff7
70f7202
2c11e74
c636026
da2654c
49a1809
3a66114
4f54393
fcf8ef7
16c3272
d9273f4
d36a968
81703a7
dac7a6a
f58b7a8
30648a2
1f0ee7a
f09a658
9357c28
089fe78
ecf8834
7de8408
31e0df1
7877236
a702f9a
837919a
e966002
bdee9c9
2fe7fe5
8e6f84e
dce6e17
610b63b
6552723
f70cdfc
50e2fe1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 88329 | ||
summary: File Settings Service | ||
area: Infra/Core | ||
type: feature | ||
issues: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,240 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
package org.elasticsearch.reservedstate.service; | ||
|
||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; | ||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; | ||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; | ||
import org.elasticsearch.client.internal.Client; | ||
import org.elasticsearch.cluster.ClusterChangedEvent; | ||
import org.elasticsearch.cluster.ClusterStateListener; | ||
import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata; | ||
import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata; | ||
import org.elasticsearch.cluster.metadata.ReservedStateMetadata; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.core.Strings; | ||
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction; | ||
import org.elasticsearch.test.ESIntegTestCase; | ||
|
||
import java.nio.charset.StandardCharsets; | ||
import java.nio.file.Files; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING; | ||
import static org.elasticsearch.test.NodeRoles.dataOnlyNode; | ||
import static org.hamcrest.Matchers.allOf; | ||
import static org.hamcrest.Matchers.contains; | ||
import static org.hamcrest.Matchers.containsString; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.hamcrest.Matchers.hasSize; | ||
import static org.hamcrest.Matchers.notNullValue; | ||
import static org.hamcrest.Matchers.nullValue; | ||
|
||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) | ||
public class FileSettingsServiceIT extends ESIntegTestCase { | ||
|
||
private AtomicLong versionCounter = new AtomicLong(1); | ||
|
||
private static String testJSON = """ | ||
{ | ||
"metadata": { | ||
"version": "%s", | ||
"compatibility": "8.4.0" | ||
}, | ||
"state": { | ||
"cluster_settings": { | ||
"indices.recovery.max_bytes_per_sec": "50mb" | ||
} | ||
} | ||
}"""; | ||
|
||
private static String testErrorJSON = """ | ||
{ | ||
"metadata": { | ||
"version": "%s", | ||
"compatibility": "8.4.0" | ||
}, | ||
"state": { | ||
"not_cluster_settings": { | ||
"search.allow_expensive_queries": "false" | ||
} | ||
} | ||
}"""; | ||
|
||
private void assertMasterNode(Client client, String node) { | ||
assertThat( | ||
client.admin().cluster().prepareState().execute().actionGet().getState().nodes().getMasterNode().getName(), | ||
equalTo(node) | ||
); | ||
} | ||
|
||
private void writeJSONFile(String node, String json) throws Exception { | ||
long version = versionCounter.incrementAndGet(); | ||
|
||
FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node); | ||
|
||
Files.createDirectories(fileSettingsService.operatorSettingsDir()); | ||
Files.write(fileSettingsService.operatorSettingsFile(), Strings.format(json, version).getBytes(StandardCharsets.UTF_8)); | ||
} | ||
|
||
private CountDownLatch setupClusterStateListener(String node) { | ||
ClusterService clusterService = internalCluster().clusterService(node); | ||
CountDownLatch savedClusterState = new CountDownLatch(1); | ||
clusterService.addListener(new ClusterStateListener() { | ||
@Override | ||
public void clusterChanged(ClusterChangedEvent event) { | ||
ReservedStateMetadata reservedState = event.state().metadata().reservedStateMetadata().get(FileSettingsService.NAMESPACE); | ||
if (reservedState != null) { | ||
ReservedStateHandlerMetadata handlerMetadata = reservedState.handlers().get(ReservedClusterSettingsAction.NAME); | ||
if (handlerMetadata == null) { | ||
fail("Should've found cluster settings in this metadata"); | ||
} | ||
assertThat(handlerMetadata.keys(), contains("indices.recovery.max_bytes_per_sec")); | ||
clusterService.removeListener(this); | ||
savedClusterState.countDown(); | ||
} | ||
} | ||
}); | ||
|
||
return savedClusterState; | ||
} | ||
|
||
private void assertClusterStateSaveOK(CountDownLatch savedClusterState) throws Exception { | ||
boolean awaitSuccessful = savedClusterState.await(20, TimeUnit.SECONDS); | ||
assertTrue(awaitSuccessful); | ||
|
||
final ClusterStateResponse clusterStateResponse = client().admin().cluster().state(new ClusterStateRequest()).actionGet(); | ||
|
||
assertThat( | ||
clusterStateResponse.getState().metadata().persistentSettings().get(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey()), | ||
equalTo("50mb") | ||
); | ||
|
||
ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest().persistentSettings( | ||
Settings.builder().put(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "1234kb") | ||
); | ||
assertEquals( | ||
"java.lang.IllegalArgumentException: Failed to process request " | ||
+ "[org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest/unset] " | ||
+ "with errors: [[indices.recovery.max_bytes_per_sec] set as read-only by [file_settings]]", | ||
expectThrows(ExecutionException.class, () -> client().admin().cluster().updateSettings(req).get()).getMessage() | ||
); | ||
} | ||
|
||
public void testSettingsApplied() throws Exception { | ||
internalCluster().setBootstrapMasterNodeIndex(0); | ||
logger.info("--> start data node / non master node"); | ||
String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s")); | ||
FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode); | ||
|
||
assertFalse(dataFileSettingsService.watching()); | ||
|
||
logger.info("--> start master node"); | ||
final String masterNode = internalCluster().startMasterOnlyNode(); | ||
assertMasterNode(internalCluster().nonMasterClient(), masterNode); | ||
var savedClusterState = setupClusterStateListener(masterNode); | ||
|
||
FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode); | ||
|
||
assertTrue(masterFileSettingsService.watching()); | ||
assertFalse(dataFileSettingsService.watching()); | ||
|
||
writeJSONFile(masterNode, testJSON); | ||
assertClusterStateSaveOK(savedClusterState); | ||
} | ||
|
||
public void testSettingsAppliedOnStart() throws Exception { | ||
internalCluster().setBootstrapMasterNodeIndex(0); | ||
logger.info("--> start data node / non master node"); | ||
String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s")); | ||
FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode); | ||
|
||
assertFalse(dataFileSettingsService.watching()); | ||
var savedClusterState = setupClusterStateListener(dataNode); | ||
|
||
// In internal cluster tests, the nodes share the config directory, so when we write with the data node path | ||
// the master will pick it up on start | ||
writeJSONFile(dataNode, testJSON); | ||
|
||
logger.info("--> start master node"); | ||
final String masterNode = internalCluster().startMasterOnlyNode(); | ||
assertMasterNode(internalCluster().nonMasterClient(), masterNode); | ||
|
||
FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode); | ||
|
||
assertTrue(masterFileSettingsService.watching()); | ||
assertFalse(dataFileSettingsService.watching()); | ||
|
||
assertClusterStateSaveOK(savedClusterState); | ||
} | ||
|
||
private CountDownLatch setupClusterStateListenerForError(String node) { | ||
ClusterService clusterService = internalCluster().clusterService(node); | ||
CountDownLatch savedClusterState = new CountDownLatch(1); | ||
clusterService.addListener(new ClusterStateListener() { | ||
@Override | ||
public void clusterChanged(ClusterChangedEvent event) { | ||
ReservedStateMetadata reservedState = event.state().metadata().reservedStateMetadata().get(FileSettingsService.NAMESPACE); | ||
if (reservedState != null) { | ||
assertEquals(ReservedStateErrorMetadata.ErrorKind.PARSING, reservedState.errorMetadata().errorKind()); | ||
assertThat(reservedState.errorMetadata().errors(), allOf(notNullValue(), hasSize(1))); | ||
assertThat( | ||
reservedState.errorMetadata().errors().get(0), | ||
containsString("Missing handler definition for content key [not_cluster_settings]") | ||
); | ||
clusterService.removeListener(this); | ||
savedClusterState.countDown(); | ||
} | ||
} | ||
}); | ||
|
||
return savedClusterState; | ||
} | ||
|
||
private void assertClusterStateNotSaved(CountDownLatch savedClusterState) throws Exception { | ||
boolean awaitSuccessful = savedClusterState.await(20, TimeUnit.SECONDS); | ||
assertTrue(awaitSuccessful); | ||
|
||
final ClusterStateResponse clusterStateResponse = client().admin().cluster().state(new ClusterStateRequest()).actionGet(); | ||
|
||
assertThat(clusterStateResponse.getState().metadata().persistentSettings().get("search.allow_expensive_queries"), nullValue()); | ||
|
||
ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest().persistentSettings( | ||
Settings.builder().put("search.allow_expensive_queries", "false") | ||
); | ||
// This should succeed, nothing was reserved | ||
client().admin().cluster().updateSettings(req).get(); | ||
} | ||
|
||
public void testErrorSaved() throws Exception { | ||
internalCluster().setBootstrapMasterNodeIndex(0); | ||
logger.info("--> start data node / non master node"); | ||
String dataNode = internalCluster().startNode(Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s")); | ||
FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode); | ||
|
||
assertFalse(dataFileSettingsService.watching()); | ||
|
||
logger.info("--> start master node"); | ||
final String masterNode = internalCluster().startMasterOnlyNode(); | ||
assertMasterNode(internalCluster().nonMasterClient(), masterNode); | ||
var savedClusterState = setupClusterStateListenerForError(masterNode); | ||
|
||
FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode); | ||
|
||
assertTrue(masterFileSettingsService.watching()); | ||
assertFalse(dataFileSettingsService.watching()); | ||
|
||
writeJSONFile(masterNode, testErrorJSON); | ||
assertClusterStateNotSaved(savedClusterState); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -81,8 +81,10 @@ public void testThreadNames() throws Exception { | |
// or the ones that are occasionally come up from ESSingleNodeTestCase | ||
if (threadName.contains("[node_s_0]") // TODO: this can't possibly be right! single node and integ test are unrelated! | ||
|| threadName.contains("Keep-Alive-Timer") | ||
|| threadName.contains("readiness-service") | ||
|| threadName.contains("JVMCI-native") // GraalVM Compiler Thread | ||
|| threadName.contains("readiness-service")) { | ||
|| threadName.contains("file-settings-watcher") | ||
|| threadName.contains("FileSystemWatchService")) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have many other threadpools, but we've never needed to modify this test for each of them. Can we utilize the existing ThreadPools class to manage these threads, or do something so that we don't need to modify this for every system thread that is added? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that's this is tedious to modify each time. We need to explicitly add each thread that's not in any threadpool. When I worked on the readiness-service, I was advised not to use the thread pools for threads that will constantly be running, so I followed the same approach here. Is it OK if I followed up with a new PR to tackle this problem, rather than extend this PR further? I think I should be able to ensure all standalone Elasticsearch threads are registered in a way that this list doesn't have to be manually modified. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A followup is great |
||
continue; | ||
} | ||
String nodePrefix = "(" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will tackle cleaning up this manual registration in a separate PR.