Skip to content

Commit

Permalink
[ML] Check and install the latest template in the DFA executor (#6158…
Browse files Browse the repository at this point in the history
…9) (#61842)

During a rolling upgrade it is possible that a worker node will be upgraded before
the master in which case the DFA templates will not have been installed.
Before a DFA task starts check that the latest template is installed and install it if necessary.
  • Loading branch information
davidkyle authored Sep 2, 2020
1 parent d59343b commit d268540
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
Expand All @@ -26,6 +27,9 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.core.template.IndexTemplateConfig;

import java.util.Arrays;
import java.util.Comparator;
Expand Down Expand Up @@ -228,4 +232,52 @@ private static void updateWriteAlias(Client client,
listener::onFailure),
client.admin().indices()::aliases);
}

/**
* Installs the index template specified by {@code templateConfig} if it is not in already
* installed in {@code clusterState}.
*
* The check for presence is simple and will return the listener on
* the calling thread if successful. If the template has to be installed
* an async call will be made.
*
* @param clusterState The cluster state
* @param client For putting the template
* @param templateConfig The config
* @param listener Async listener
*/
public static void installIndexTemplateIfRequired(
ClusterState clusterState,
Client client,
IndexTemplateConfig templateConfig,
ActionListener<Boolean> listener
) {
String templateName = templateConfig.getTemplateName();

// The check for existence of the template is against the cluster state, so very cheap
if (hasIndexTemplate(clusterState, templateName)) {
listener.onResponse(true);
return;
}

PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName)
.source(templateConfig.loadBytes(), XContentType.JSON);
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));

ActionListener<AcknowledgedResponse> innerListener = ActionListener.wrap(
response -> {
if (response.isAcknowledged() == false) {
logger.warn("error adding legacy template [{}], request was not acknowledged", templateName);
}
listener.onResponse(response.isAcknowledged());
},
listener::onFailure);

executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, request, innerListener,
client.admin().indices()::putTemplate);
}

private static boolean hasIndexTemplate(ClusterState state, String templateName) {
return state.getMetadata().getTemplates().containsKey(templateName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -37,6 +38,8 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.inference.persistence.InferenceIndexConstants;
import org.elasticsearch.xpack.core.template.IndexTemplateConfig;
import org.junit.After;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -93,6 +96,7 @@ public void setUpMocks() {
doAnswer(withResponse(new CreateIndexResponse(true, true, FIRST_CONCRETE_INDEX))).when(indicesAdminClient).create(any(), any());
when(indicesAdminClient.prepareAliases()).thenReturn(new IndicesAliasesRequestBuilder(client, IndicesAliasesAction.INSTANCE));
doAnswer(withResponse(new AcknowledgedResponse(true))).when(indicesAdminClient).aliases(any(), any());
doAnswer(withResponse(new AcknowledgedResponse(true))).when(indicesAdminClient).putTemplate(any(), any());

clusterAdminClient = mock(ClusterAdminClient.class);
doAnswer(invocationOnMock -> {
Expand Down Expand Up @@ -121,6 +125,34 @@ public void verifyNoMoreInteractionsWithMocks() {
verifyNoMoreInteractions(indicesAdminClient, listener);
}

public void testInstallIndexTemplateIfRequired_GivenTemplateExists() {
ClusterState clusterState = createClusterState(Collections.emptyMap(),
Collections.singletonMap(InferenceIndexConstants.LATEST_INDEX_NAME,
createIndexTemplateMetaData(InferenceIndexConstants.LATEST_INDEX_NAME,
Collections.singletonList(InferenceIndexConstants.LATEST_INDEX_NAME))));

IndexTemplateConfig inferenceTemplate = new IndexTemplateConfig(InferenceIndexConstants.LATEST_INDEX_NAME,
"not_a_real_file.json", Version.CURRENT.id, "xpack.ml.version",
Collections.singletonMap("xpack.ml.version.id", String.valueOf(Version.CURRENT.id)));

MlIndexAndAlias.installIndexTemplateIfRequired(clusterState, client, inferenceTemplate, listener);
verify(listener).onResponse(true);
verifyNoMoreInteractions(client);
}

public void testInstallIndexTemplateIfRequired() {
ClusterState clusterState = createClusterState(Collections.emptyMap());

IndexTemplateConfig inferenceTemplate = new IndexTemplateConfig(InferenceIndexConstants.LATEST_INDEX_NAME,
"/org/elasticsearch/xpack/core/ml/inference_index_template.json", Version.CURRENT.id, "xpack.ml.version",
Collections.singletonMap("xpack.ml.version.id", String.valueOf(Version.CURRENT.id)));

MlIndexAndAlias.installIndexTemplateIfRequired(clusterState, client, inferenceTemplate, listener);
InOrder inOrder = inOrder(indicesAdminClient, listener);
inOrder.verify(indicesAdminClient).putTemplate(any(), any());
inOrder.verify(listener).onResponse(true);
}

public void testCreateStateIndexAndAliasIfNecessary_CleanState() {
ClusterState clusterState = createClusterState(Collections.emptyMap());
createIndexAndAliasIfNecessary(clusterState);
Expand Down Expand Up @@ -266,9 +298,15 @@ private static <Response> Answer<Response> withResponse(Response response) {
}

private static ClusterState createClusterState(Map<String, IndexMetadata> indices) {
return createClusterState(indices, Collections.emptyMap());
}

private static ClusterState createClusterState(Map<String, IndexMetadata> indices, Map<String, IndexTemplateMetadata> templates) {
return ClusterState.builder(ClusterName.DEFAULT)
.metadata(Metadata.builder()
.indices(ImmutableOpenMap.<String, IndexMetadata>builder().putAll(indices).build()).build())
.indices(ImmutableOpenMap.<String, IndexMetadata>builder().putAll(indices).build())
.templates(ImmutableOpenMap.<String, IndexTemplateMetadata>builder().putAll(templates).build())
.build())
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), HIDDEN_INTRODUCED_VERSION)))
.build();
Expand All @@ -282,6 +320,10 @@ private static IndexMetadata createIndexMetadataWithAlias(String indexName) {
return createIndexMetadata(indexName, true);
}

private static IndexTemplateMetadata createIndexTemplateMetaData(String templateName, List<String> patterns) {
return IndexTemplateMetadata.builder(templateName).patterns(patterns).build();
}

private static IndexMetadata createIndexMetadata(String indexName, boolean withAlias) {
Settings settings =
Settings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,8 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
memoryTracker.get(), client, expressionResolver),
new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(datafeedManager.get(), expressionResolver),
new TransportStartDataFrameAnalyticsAction.TaskExecutor(settings, client, clusterService, dataFrameAnalyticsManager.get(),
dataFrameAnalyticsAuditor.get(), memoryTracker.get(), expressionResolver)
dataFrameAnalyticsAuditor.get(), memoryTracker.get(), expressionResolver,
MlIndexTemplateRegistry.INFERENCE_TEMPLATE)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry {

private static final IndexTemplateConfig CONFIG_TEMPLATE = configTemplate();

private static final IndexTemplateConfig INFERENCE_TEMPLATE = new IndexTemplateConfig(InferenceIndexConstants.LATEST_INDEX_NAME,
public static final IndexTemplateConfig INFERENCE_TEMPLATE = new IndexTemplateConfig(InferenceIndexConstants.LATEST_INDEX_NAME,
ROOT_RESOURCE_PATH + "inference_index_template.json", Version.CURRENT.id, VERSION_PATTERN,
Collections.singletonMap(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
import org.elasticsearch.xpack.core.template.IndexTemplateConfig;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
Expand Down Expand Up @@ -594,21 +596,24 @@ public static class TaskExecutor extends PersistentTasksExecutor<StartDataFrameA
private final DataFrameAnalyticsAuditor auditor;
private final MlMemoryTracker memoryTracker;
private final IndexNameExpressionResolver resolver;
private final IndexTemplateConfig inferenceIndexTemplate;

private volatile int maxMachineMemoryPercent;
private volatile int maxLazyMLNodes;
private volatile int maxOpenJobs;
private volatile ClusterState clusterState;

public TaskExecutor(Settings settings, Client client, ClusterService clusterService, DataFrameAnalyticsManager manager,
DataFrameAnalyticsAuditor auditor, MlMemoryTracker memoryTracker, IndexNameExpressionResolver resolver) {
DataFrameAnalyticsAuditor auditor, MlMemoryTracker memoryTracker, IndexNameExpressionResolver resolver,
IndexTemplateConfig inferenceIndexTemplate) {
super(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME);
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
this.manager = Objects.requireNonNull(manager);
this.auditor = Objects.requireNonNull(auditor);
this.memoryTracker = Objects.requireNonNull(memoryTracker);
this.resolver = Objects.requireNonNull(resolver);
this.inferenceIndexTemplate = Objects.requireNonNull(inferenceIndexTemplate);
this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings);
this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
this.maxOpenJobs = MAX_OPEN_JOBS_PER_NODE.get(settings);
Expand Down Expand Up @@ -693,6 +698,20 @@ protected void nodeOperation(AllocatedPersistentTask task, StartDataFrameAnalyti
return;
}

ActionListener<Boolean> templateCheckListener = ActionListener.wrap(
ok -> executeTask(analyticsTaskState, task),
error -> {
Throwable cause = ExceptionsHelper.unwrapCause(error);
String msg = "Failed to create internal index template [" + inferenceIndexTemplate.getTemplateName() + "]";
logger.error(msg, cause);
task.markAsFailed(error);
}
);

MlIndexAndAlias.installIndexTemplateIfRequired(clusterState, client, inferenceIndexTemplate, templateCheckListener);
}

private void executeTask(DataFrameAnalyticsTaskState analyticsTaskState, AllocatedPersistentTask task) {
if (analyticsTaskState == null) {
DataFrameAnalyticsTaskState startedState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.STARTED,
task.getAllocationId(), null);
Expand Down

0 comments on commit d268540

Please sign in to comment.