Skip to content

[ML][Transforms] protecting doSaveState with optimistic concurrency #46156

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex;
import org.elasticsearch.xpack.dataframe.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.dataframe.transforms.SourceDestValidator;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;

Expand Down Expand Up @@ -138,7 +139,7 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
private void handlePrivsResponse(String username,
Request request,
DataFrameTransformConfig config,
DataFrameTransformsConfigManager.SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
ClusterState clusterState,
HasPrivilegesResponse privilegesResponse,
ActionListener<Response> listener) {
Expand All @@ -161,7 +162,7 @@ private void handlePrivsResponse(String username,
private void validateAndUpdateDataFrame(Request request,
ClusterState clusterState,
DataFrameTransformConfig config,
DataFrameTransformsConfigManager.SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
ActionListener<Response> listener) {
try {
SourceDestValidator.validate(config, clusterState, indexNameExpressionResolver, request.isDeferValidation());
Expand All @@ -186,7 +187,7 @@ private void validateAndUpdateDataFrame(Request request,
}
private void updateDataFrame(Request request,
DataFrameTransformConfig config,
DataFrameTransformsConfigManager.SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
ClusterState clusterState,
ActionListener<Response> listener) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ private void putTransformConfiguration(DataFrameTransformConfig transformConfig,
.id(DataFrameTransformConfig.documentId(transformConfig.getId()))
.source(source);
if (seqNoPrimaryTermAndIndex != null) {
indexRequest.setIfSeqNo(seqNoPrimaryTermAndIndex.seqNo).setIfPrimaryTerm(seqNoPrimaryTermAndIndex.primaryTerm);
indexRequest.setIfSeqNo(seqNoPrimaryTermAndIndex.getSeqNo())
.setIfPrimaryTerm(seqNoPrimaryTermAndIndex.getPrimaryTerm());
}
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(r -> {
listener.onResponse(true);
Expand Down Expand Up @@ -433,21 +434,31 @@ public void deleteTransform(String transformId, ActionListener<Boolean> listener
}));
}

public void putOrUpdateTransformStoredDoc(DataFrameTransformStoredDoc stats, ActionListener<Boolean> listener) {
public void putOrUpdateTransformStoredDoc(DataFrameTransformStoredDoc stats,
SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
ActionListener<SeqNoPrimaryTermAndIndex> listener) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder source = stats.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));

IndexRequest indexRequest = new IndexRequest(DataFrameInternalIndex.LATEST_INDEX_NAME)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.id(DataFrameTransformStoredDoc.documentId(stats.getId()))
.opType(DocWriteRequest.OpType.INDEX)
.source(source);

if (seqNoPrimaryTermAndIndex != null &&
seqNoPrimaryTermAndIndex.getIndex().equals(DataFrameInternalIndex.LATEST_INDEX_NAME)) {
indexRequest.opType(DocWriteRequest.OpType.INDEX)
.setIfSeqNo(seqNoPrimaryTermAndIndex.getSeqNo())
.setIfPrimaryTerm(seqNoPrimaryTermAndIndex.getPrimaryTerm());
} else {
// If the index is NOT the latest or we are null, that means we have not created this doc before
// so, it should be a create option without the seqNo and primaryTerm set
indexRequest.opType(DocWriteRequest.OpType.CREATE);
}
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(
r -> listener.onResponse(true),
r -> listener.onResponse(SeqNoPrimaryTermAndIndex.fromIndexResponse(r)),
e -> listener.onFailure(new RuntimeException(
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_FAILED_TO_PERSIST_STATS, stats.getId()),
e))
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_FAILED_TO_PERSIST_STATS, stats.getId()),
e))
));
} catch (IOException e) {
// not expected to happen but for the sake of completeness
Expand All @@ -457,13 +468,15 @@ public void putOrUpdateTransformStoredDoc(DataFrameTransformStoredDoc stats, Act
}
}

public void getTransformStoredDoc(String transformId, ActionListener<DataFrameTransformStoredDoc> resultListener) {
public void getTransformStoredDoc(String transformId,
ActionListener<Tuple<DataFrameTransformStoredDoc, SeqNoPrimaryTermAndIndex>> resultListener) {
QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", DataFrameTransformStoredDoc.documentId(transformId));
SearchRequest searchRequest = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME_PATTERN)
.setQuery(queryBuilder)
// use sort to get the last
.addSort("_index", SortOrder.DESC)
.setSize(1)
.seqNoAndPrimaryTerm(true)
.request();

executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, SearchAction.INSTANCE, searchRequest, ActionListener.<SearchResponse>wrap(
Expand All @@ -473,11 +486,14 @@ public void getTransformStoredDoc(String transformId, ActionListener<DataFrameTr
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNKNOWN_TRANSFORM_STATS, transformId)));
return;
}
BytesReference source = searchResponse.getHits().getHits()[0].getSourceRef();
SearchHit searchHit = searchResponse.getHits().getHits()[0];
BytesReference source = searchHit.getSourceRef();
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) {
resultListener.onResponse(DataFrameTransformStoredDoc.fromXContent(parser));
resultListener.onResponse(
Tuple.tuple(DataFrameTransformStoredDoc.fromXContent(parser),
SeqNoPrimaryTermAndIndex.fromSearchHit(searchHit)));
} catch (Exception e) {
logger.error(DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_PARSE_TRANSFORM_STATISTICS_CONFIGURATION,
transformId), e);
Expand Down Expand Up @@ -595,28 +611,4 @@ private static Tuple<RestStatus, Throwable> getStatusAndReason(final BulkByScrol
}
return new Tuple<>(status, reason);
}

public static class SeqNoPrimaryTermAndIndex {
private final long seqNo;
private final long primaryTerm;
private final String index;

public SeqNoPrimaryTermAndIndex(long seqNo, long primaryTerm, String index) {
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.index = index;
}

public long getSeqNo() {
return seqNo;
}

public long getPrimaryTerm() {
return primaryTerm;
}

public String getIndex() {
return index;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.dataframe.persistence;

import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.search.SearchHit;

import java.util.Objects;

/**
* Simple class to keep track of information needed for optimistic concurrency
*/
public class SeqNoPrimaryTermAndIndex {
private final long seqNo;
private final long primaryTerm;
private final String index;

public static SeqNoPrimaryTermAndIndex fromSearchHit(SearchHit hit) {
return new SeqNoPrimaryTermAndIndex(hit.getSeqNo(), hit.getPrimaryTerm(), hit.getIndex());
}

public static SeqNoPrimaryTermAndIndex fromIndexResponse(IndexResponse response) {
return new SeqNoPrimaryTermAndIndex(response.getSeqNo(), response.getPrimaryTerm(), response.getIndex());
}

SeqNoPrimaryTermAndIndex(long seqNo, long primaryTerm, String index) {
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.index = index;
}

public long getSeqNo() {
return seqNo;
}

public long getPrimaryTerm() {
return primaryTerm;
}

public String getIndex() {
return index;
}

@Override
public int hashCode() {
return Objects.hash(seqNo, primaryTerm, index);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}

if (obj == null || getClass() != obj.getClass()) {
return false;
}

SeqNoPrimaryTermAndIndex other = (SeqNoPrimaryTermAndIndex) obj;
return Objects.equals(seqNo, other.seqNo)
&& Objects.equals(primaryTerm, other.primaryTerm)
&& Objects.equals(index, other.index);
}

@Override
public String toString() {
return "{seqNo=" + seqNo + ",primaryTerm=" + primaryTerm + ",index=" + index + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTaskState;
Expand All @@ -42,6 +43,7 @@
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil;

import java.util.ArrayList;
Expand Down Expand Up @@ -189,8 +191,12 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr
// <3> Set the previous stats (if they exist), initialize the indexer, start the task (If it is STOPPED)
// Since we don't create the task until `_start` is called, if we see that the task state is stopped, attempt to start
// Schedule execution regardless
ActionListener<DataFrameTransformStoredDoc> transformStatsActionListener = ActionListener.wrap(
stateAndStats -> {
ActionListener<Tuple<DataFrameTransformStoredDoc, SeqNoPrimaryTermAndIndex>> transformStatsActionListener = ActionListener.wrap(
stateAndStatsAndSeqNoPrimaryTermAndIndex -> {
DataFrameTransformStoredDoc stateAndStats = stateAndStatsAndSeqNoPrimaryTermAndIndex.v1();
SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex = stateAndStatsAndSeqNoPrimaryTermAndIndex.v2();
// Since we have not set the value for this yet, it SHOULD be null
buildTask.updateSeqNoPrimaryTermAndIndex(null, seqNoPrimaryTermAndIndex);
logger.trace("[{}] initializing state and stats: [{}]", transformId, stateAndStats.toString());
indexerBuilder.setInitialStats(stateAndStats.getTransformStats())
.setInitialPosition(stateAndStats.getTransformState().getPosition())
Expand All @@ -217,10 +223,10 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr
String msg = DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_STATE, transformId);
logger.error(msg, error);
markAsFailed(buildTask, msg);
} else {
logger.trace("[{}] No stats found (new transform), starting the task", transformId);
startTask(buildTask, indexerBuilder, null, startTaskListener);
}

logger.trace("[{}] No stats found(new transform), starting the task", transformId);
startTask(buildTask, indexerBuilder, null, startTaskListener);
}
);

Expand Down
Loading