Skip to content

Reserved cluster state service #88527

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ public interface ReservedClusterStateHandler<T> {
* <p>
* Sometimes certain parts of the cluster state cannot be created/updated without previously
* setting other cluster state components, e.g. composable templates. Since the reserved cluster state handlers
* are processed in random order by the ReservedClusterStateController, this method gives an opportunity
* are processed in random order by the ReservedClusterStateService, this method gives an opportunity
* to any reserved handler to declare other state handlers it depends on. Given dependencies exist,
* the ReservedClusterStateController will order those handlers such that the handlers that are dependent
* the ReservedClusterStateService will order those handlers such that the handlers that are dependent
* on are processed first.
*
* @return a collection of reserved state handler names
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import java.util.Set;

/**
* A {@link ClusterState} wrapper used by the ReservedClusterStateController to pass the
* A {@link ClusterState} wrapper used by the ReservedClusterStateService to pass the
* current state as well as previous keys set by an {@link ReservedClusterStateHandler} to each transform
* step of the cluster state update.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
/**
* This Action is the reserved state save version of RestClusterUpdateSettingsAction
* <p>
* It is used by the ReservedClusterStateController to update the persistent cluster settings.
* It is used by the ReservedClusterStateService to update the persistent cluster settings.
* Since transient cluster settings are deprecated, this action doesn't support updating transient cluster settings.
*/
public class ReservedClusterSettingsAction implements ReservedClusterStateHandler<Map<String, Object>> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.reservedstate.service;

import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata;

import java.util.List;

import static org.elasticsearch.ExceptionsHelper.stackTrace;

record ErrorState(String namespace, Long version, List<String> errors, ReservedStateErrorMetadata.ErrorKind errorKind) {
ErrorState(String namespace, Long version, Exception e, ReservedStateErrorMetadata.ErrorKind errorKind) {
this(namespace, version, List.of(stackTrace(e)), errorKind);
}

public String toString() {
return String.join(", ", errors());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.reservedstate.service;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata;
import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentParser;

import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.elasticsearch.core.Strings.format;

/**
* Controller class for storing and reserving a portion of the {@link ClusterState}
* <p>
* This class contains the logic about validation, ordering and applying of
* the cluster state specified in a file or through plugins/modules. Reserved cluster state
* cannot be modified through the REST APIs, only through this controller class.
*/
public class ReservedClusterStateService {
private static final Logger logger = LogManager.getLogger(ReservedClusterStateService.class);

public static final ParseField STATE_FIELD = new ParseField("state");
public static final ParseField METADATA_FIELD = new ParseField("metadata");

final Map<String, ReservedClusterStateHandler<?>> handlers;
final ClusterService clusterService;
private final ReservedStateUpdateTaskExecutor updateStateTaskExecutor;
private final ReservedStateErrorTaskExecutor errorStateTaskExecutor;

@SuppressWarnings("unchecked")
private final ConstructingObjectParser<ReservedStateChunk, Void> stateChunkParser = new ConstructingObjectParser<>(
"reserved_state_chunk",
a -> {
List<Tuple<String, Object>> tuples = (List<Tuple<String, Object>>) a[0];
Map<String, Object> stateMap = new HashMap<>();
for (var tuple : tuples) {
stateMap.put(tuple.v1(), tuple.v2());
}

return new ReservedStateChunk(stateMap, (ReservedStateVersion) a[1]);
}
);

/**
* Controller class for saving and reserving {@link ClusterState}.
* @param clusterService for fetching and saving the modified state
* @param handlerList a list of reserved state handlers, which we use to transform the state
*/
public ReservedClusterStateService(ClusterService clusterService, List<ReservedClusterStateHandler<?>> handlerList) {
this.clusterService = clusterService;
this.updateStateTaskExecutor = new ReservedStateUpdateTaskExecutor(clusterService.getRerouteService());
this.errorStateTaskExecutor = new ReservedStateErrorTaskExecutor();
this.handlers = handlerList.stream().collect(Collectors.toMap(ReservedClusterStateHandler::name, Function.identity()));
stateChunkParser.declareNamedObjects(ConstructingObjectParser.constructorArg(), (p, c, name) -> {
if (handlers.containsKey(name) == false) {
throw new IllegalStateException("Missing handler definition for content key [" + name + "]");
}
p.nextToken();
return new Tuple<>(name, handlers.get(name).fromXContent(p));
}, STATE_FIELD);
stateChunkParser.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> ReservedStateVersion.parse(p), METADATA_FIELD);
}

/**
* Saves and reserves a chunk of the cluster state under a given 'namespace' from {@link XContentParser}
*
* @param namespace the namespace under which we'll store the reserved keys in the cluster state metadata
* @param parser the XContentParser to process
* @param errorListener a consumer called with {@link IllegalStateException} if the content has errors and the
* cluster state cannot be correctly applied, null if successful or state couldn't be applied because of incompatible version.
*/
public void process(String namespace, XContentParser parser, Consumer<Exception> errorListener) {
ReservedStateChunk stateChunk;

try {
stateChunk = stateChunkParser.apply(parser, null);
} catch (Exception e) {
ErrorState errorState = new ErrorState(namespace, -1L, e, ReservedStateErrorMetadata.ErrorKind.PARSING);
saveErrorState(errorState);
logger.error("error processing state change request for [{}] with the following errors [{}]", namespace, errorState);

errorListener.accept(new IllegalStateException("Error processing state change request for " + namespace, e));
return;
}

process(namespace, stateChunk, errorListener);
}

/**
* Saves and reserves a chunk of the cluster state under a given 'namespace' from {@link XContentParser}
*
* @param namespace the namespace under which we'll store the reserved keys in the cluster state metadata
* @param reservedStateChunk a {@link ReservedStateChunk} composite state object to process
* @param errorListener a consumer called with {@link IllegalStateException} if the content has errors and the
* cluster state cannot be correctly applied, null if successful or the state failed to apply because of incompatible version.
*/
public void process(String namespace, ReservedStateChunk reservedStateChunk, Consumer<Exception> errorListener) {
Map<String, Object> reservedState = reservedStateChunk.state();
ReservedStateVersion reservedStateVersion = reservedStateChunk.metadata();

LinkedHashSet<String> orderedHandlers;
try {
orderedHandlers = orderedStateHandlers(reservedState.keySet());
} catch (Exception e) {
ErrorState errorState = new ErrorState(
namespace,
reservedStateVersion.version(),
e,
ReservedStateErrorMetadata.ErrorKind.PARSING
);

saveErrorState(errorState);
logger.error("error processing state change request for [{}] with the following errors [{}]", namespace, errorState);

errorListener.accept(new IllegalStateException("Error processing state change request for " + namespace, e));
return;
}

ClusterState state = clusterService.state();
ReservedStateMetadata existingMetadata = state.metadata().reservedStateMetadata().get(namespace);
if (checkMetadataVersion(namespace, existingMetadata, reservedStateVersion) == false) {
return;
}

clusterService.submitStateUpdateTask(
"reserved cluster state [" + namespace + "]",
new ReservedStateUpdateTask(
namespace,
reservedStateChunk,
handlers,
orderedHandlers,
(errorState) -> saveErrorState(errorState),
new ActionListener<>() {
@Override
public void onResponse(ActionResponse.Empty empty) {
logger.info("Successfully applied new reserved cluster state for namespace [{}]", namespace);
errorListener.accept(null);
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to apply reserved cluster state", e);
errorListener.accept(e);
}
}
),
ClusterStateTaskConfig.build(Priority.URGENT),
updateStateTaskExecutor
);
}

// package private for testing
static boolean checkMetadataVersion(
String namespace,
ReservedStateMetadata existingMetadata,
ReservedStateVersion reservedStateVersion
) {
if (Version.CURRENT.before(reservedStateVersion.minCompatibleVersion())) {
logger.warn(
() -> format(
"Reserved cluster state version [%s] for namespace [%s] is not compatible with this Elasticsearch node",
reservedStateVersion.minCompatibleVersion(),
namespace
)
);
return false;
}

if (existingMetadata != null && existingMetadata.version() >= reservedStateVersion.version()) {
logger.warn(
() -> format(
"Not updating reserved cluster state for namespace [%s], because version [%s] is less or equal"
+ " to the current metadata version [%s]",
namespace,
reservedStateVersion.version(),
existingMetadata.version()
)
);
return false;
}

return true;
}

private void saveErrorState(ErrorState state) {
clusterService.submitStateUpdateTask(
"reserved cluster state update error for [ " + state.namespace() + "]",
new ReservedStateErrorTask(state, new ActionListener<>() {
@Override
public void onResponse(ActionResponse.Empty empty) {
logger.info("Successfully applied new reserved error state for namespace [{}]", state.namespace());
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to apply reserved error cluster state", e);
}
}),
ClusterStateTaskConfig.build(Priority.URGENT),
errorStateTaskExecutor
);
}

/**
* Returns an ordered set ({@link LinkedHashSet}) of the cluster state handlers that need to
* execute for a given list of handler names supplied through the {@link ReservedStateChunk}.
* @param handlerNames Names of handlers found in the {@link ReservedStateChunk}
* @return
*/
LinkedHashSet<String> orderedStateHandlers(Set<String> handlerNames) {
LinkedHashSet<String> orderedHandlers = new LinkedHashSet<>();
LinkedHashSet<String> dependencyStack = new LinkedHashSet<>();

for (String key : handlerNames) {
addStateHandler(key, handlerNames, orderedHandlers, dependencyStack);
}

return orderedHandlers;
}

private void addStateHandler(String key, Set<String> keys, LinkedHashSet<String> ordered, LinkedHashSet<String> visited) {
if (visited.contains(key)) {
StringBuilder msg = new StringBuilder("Cycle found in settings dependencies: ");
visited.forEach(s -> {
msg.append(s);
msg.append(" -> ");
});
msg.append(key);
throw new IllegalStateException(msg.toString());
}

if (ordered.contains(key)) {
// already added by another dependent handler
return;
}

visited.add(key);
ReservedClusterStateHandler<?> handler = handlers.get(key);

if (handler == null) {
throw new IllegalStateException("Unknown handler type: " + key);
}

for (String dependency : handler.dependencies()) {
if (keys.contains(dependency) == false) {
throw new IllegalStateException("Missing handler dependency definition: " + key + " -> " + dependency);
}
addStateHandler(dependency, keys, ordered, visited);
}

visited.remove(key);
ordered.add(key);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.reservedstate.service;

import java.util.Map;

/**
* A holder for the cluster state to be saved and reserved and the version info
* <p>
* Apart from the cluster state we want to store and reserve, the chunk requires that
* you supply the version metadata. This version metadata (see {@link ReservedStateVersion}) is checked to ensure
* that the update is safe, and it's not unnecessarily repeated.
*/
public record ReservedStateChunk(Map<String, Object> state, ReservedStateVersion metadata) {}
Loading