-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Implement few operator handlers #88097
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
df61848
407fd72
b78d73f
334f39c
2836ff7
70f7202
2c11e74
da2654c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
package org.elasticsearch.immutablestate.action; | ||
|
||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; | ||
import org.elasticsearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction; | ||
import org.elasticsearch.client.internal.Requests; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.common.settings.ClusterSettings; | ||
import org.elasticsearch.immutablestate.ImmutableClusterStateHandler; | ||
import org.elasticsearch.immutablestate.TransformState; | ||
|
||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.elasticsearch.common.util.Maps.asMap; | ||
|
||
/** | ||
* This Action is the immutable state save version of RestClusterUpdateSettingsAction | ||
* <p> | ||
* It is used by the ImmutableClusterStateController to update the persistent cluster settings. | ||
* Since transient cluster settings are deprecated, this action doesn't support updating transient cluster settings. | ||
*/ | ||
public class ImmutableClusterSettingsAction implements ImmutableClusterStateHandler<ClusterUpdateSettingsRequest> { | ||
|
||
public static final String NAME = "cluster_settings"; | ||
|
||
private final ClusterSettings clusterSettings; | ||
|
||
public ImmutableClusterSettingsAction(ClusterSettings clusterSettings) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this also get wired up like was done here: https://github.com/elastic/elasticsearch/pull/86224/files#diff-b56bed42f5b0025886eb243ecb48678ac048bfba0ef047545dbb6504e357edf2R901 ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, 100%. I left that part to happen in my follow-up draft PR I have now based on this one here: https://github.com/elastic/elasticsearch/pull/88224/files#diff-b56bed42f5b0025886eb243ecb48678ac048bfba0ef047545dbb6504e357edf2R905 After I wire them up, I call the initialize on the controller, which I haven't brought in yet from the initial WIP PR. Once we merge this in, that's the next one. |
||
this.clusterSettings = clusterSettings; | ||
} | ||
|
||
@Override | ||
public String name() { | ||
return NAME; | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
private ClusterUpdateSettingsRequest prepare(Object input, Set<String> previouslySet) { | ||
final ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = Requests.clusterUpdateSettingsRequest(); | ||
|
||
Map<String, ?> source = asMap(input); | ||
Map<String, Object> persistentSettings = new HashMap<>(); | ||
Set<String> toDelete = new HashSet<>(previouslySet); | ||
|
||
source.forEach((k, v) -> { | ||
persistentSettings.put(k, v); | ||
toDelete.remove(k); | ||
}); | ||
|
||
toDelete.forEach(k -> persistentSettings.put(k, null)); | ||
|
||
clusterUpdateSettingsRequest.persistentSettings(persistentSettings); | ||
return clusterUpdateSettingsRequest; | ||
} | ||
|
||
@Override | ||
public TransformState transform(Object input, TransformState prevState) { | ||
ClusterUpdateSettingsRequest request = prepare(input, prevState.keys()); | ||
|
||
// allow empty requests, this is how we clean up settings | ||
if (request.persistentSettings().isEmpty() == false) { | ||
validate(request); | ||
} | ||
|
||
ClusterState state = prevState.state(); | ||
|
||
TransportClusterUpdateSettingsAction.ClusterUpdateSettingsTask updateSettingsTask = | ||
new TransportClusterUpdateSettingsAction.ClusterUpdateSettingsTask(clusterSettings, request); | ||
|
||
state = updateSettingsTask.execute(state); | ||
Set<String> currentKeys = request.persistentSettings() | ||
.keySet() | ||
.stream() | ||
.filter(k -> request.persistentSettings().hasValue(k)) | ||
.collect(Collectors.toSet()); | ||
|
||
return new TransformState(state, currentKeys); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
package org.elasticsearch.immutablestate.action; | ||
|
||
import org.elasticsearch.cluster.ClusterName; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.common.settings.ClusterSettings; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.immutablestate.TransformState; | ||
import org.elasticsearch.test.ESTestCase; | ||
import org.elasticsearch.xcontent.XContentParser; | ||
import org.elasticsearch.xcontent.XContentParserConfiguration; | ||
import org.elasticsearch.xcontent.XContentType; | ||
|
||
import java.util.Collections; | ||
|
||
import static org.hamcrest.Matchers.containsInAnyOrder; | ||
|
||
public class ImmutableClusterSettingsActionTests extends ESTestCase { | ||
|
||
private TransformState processJSON(ImmutableClusterSettingsAction action, TransformState prevState, String json) throws Exception { | ||
try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, json)) { | ||
return action.transform(parser.map(), prevState); | ||
} | ||
} | ||
|
||
public void testValidation() throws Exception { | ||
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); | ||
|
||
ClusterState state = ClusterState.builder(new ClusterName("elasticsearch")).build(); | ||
TransformState prevState = new TransformState(state, Collections.emptySet()); | ||
ImmutableClusterSettingsAction action = new ImmutableClusterSettingsAction(clusterSettings); | ||
|
||
String badPolicyJSON = """ | ||
{ | ||
"indices.recovery.min_bytes_per_sec": "50mb" | ||
}"""; | ||
|
||
assertEquals( | ||
"persistent setting [indices.recovery.min_bytes_per_sec], not recognized", | ||
expectThrows(IllegalArgumentException.class, () -> processJSON(action, prevState, badPolicyJSON)).getMessage() | ||
); | ||
} | ||
|
||
public void testSetUnsetSettings() throws Exception { | ||
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); | ||
|
||
ClusterState state = ClusterState.builder(new ClusterName("elasticsearch")).build(); | ||
TransformState prevState = new TransformState(state, Collections.emptySet()); | ||
ImmutableClusterSettingsAction action = new ImmutableClusterSettingsAction(clusterSettings); | ||
|
||
String emptyJSON = ""; | ||
|
||
TransformState updatedState = processJSON(action, prevState, emptyJSON); | ||
assertEquals(0, updatedState.keys().size()); | ||
assertEquals(prevState.state(), updatedState.state()); | ||
|
||
String settingsJSON = """ | ||
{ | ||
"indices.recovery.max_bytes_per_sec": "50mb", | ||
"cluster": { | ||
"remote": { | ||
"cluster_one": { | ||
"seeds": [ | ||
"127.0.0.1:9300" | ||
] | ||
} | ||
} | ||
} | ||
}"""; | ||
|
||
prevState = updatedState; | ||
updatedState = processJSON(action, prevState, settingsJSON); | ||
assertThat(updatedState.keys(), containsInAnyOrder("indices.recovery.max_bytes_per_sec", "cluster.remote.cluster_one.seeds")); | ||
assertEquals("50mb", updatedState.state().metadata().persistentSettings().get("indices.recovery.max_bytes_per_sec")); | ||
assertEquals("[127.0.0.1:9300]", updatedState.state().metadata().persistentSettings().get("cluster.remote.cluster_one.seeds")); | ||
|
||
String oneSettingJSON = """ | ||
{ | ||
"indices.recovery.max_bytes_per_sec": "25mb" | ||
}"""; | ||
|
||
prevState = updatedState; | ||
updatedState = processJSON(action, prevState, oneSettingJSON); | ||
assertThat(updatedState.keys(), containsInAnyOrder("indices.recovery.max_bytes_per_sec")); | ||
assertEquals("25mb", updatedState.state().metadata().persistentSettings().get("indices.recovery.max_bytes_per_sec")); | ||
assertNull(updatedState.state().metadata().persistentSettings().get("cluster.remote.cluster_one.seeds")); | ||
|
||
prevState = updatedState; | ||
updatedState = processJSON(action, prevState, emptyJSON); | ||
assertEquals(0, updatedState.keys().size()); | ||
assertNull(updatedState.state().metadata().persistentSettings().get("indices.recovery.max_bytes_per_sec")); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.ilm; | ||
|
||
import org.elasticsearch.immutablestate.ImmutableClusterStateHandler; | ||
import org.elasticsearch.immutablestate.ImmutableClusterStateHandlerProvider; | ||
|
||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.Set; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
/** | ||
* ILM Provider implementation for the {@link ImmutableClusterStateHandlerProvider} service interface | ||
*/ | ||
public class ILMImmutableStateHandlerProvider implements ImmutableClusterStateHandlerProvider { | ||
private static final Set<ImmutableClusterStateHandler<?>> handlers = ConcurrentHashMap.newKeySet(); | ||
|
||
@Override | ||
public Collection<ImmutableClusterStateHandler<?>> handlers() { | ||
return handlers; | ||
} | ||
|
||
public static void registerHandlers(ImmutableClusterStateHandler<?>... stateHandlers) { | ||
handlers.addAll(Arrays.asList(stateHandlers)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's an example of how these two methods
operatorHandlerName
andmodifiedKeys
could be used indoExecute
below to verify that an operation is allowed to execute:https://github.com/elastic/elasticsearch/pull/86224/files#diff-e7c445981d60f65376f04699ec430437d9b1c78fac6b5ef582f39f84e43de9ceR175
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the
validateForOperatorState(...)
validation method in the referenced link,I do think the logic looks good, but I do think this also needs to be validated as part
of a cluster state update. Right now the validation there only occurs on the action/transport
level and there is no guarantee that what is validated at that point in time is also
true when the update if really performed. I think it is a good pre-check, that is likely
to catch almost all cases when resources are updates that are being managed operator state.
For example in order to guarantee that no operator state managed ilm policies are modified
via API. The UpdateLifecyclePolicyTask/DeleteLifecyclePolicyTask should do this validation
check as well as part of their respective
execute(...)
method.