Skip to content

[7.x] Optimistic concurrency control for updating ingest pipelines #79364

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
---
"Test pipeline versioned updates":
- skip:
version: " - 7.15.99"
reason: "added versioned updates in 7.16.0"

- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}
- match: { acknowledged: true }

# conditional update fails because of missing version
- do:
catch: bad_request
ingest.put_pipeline:
id: "my_pipeline"
if_version: 1
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}

- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"version": 1,
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.get_pipeline:
id: "my_pipeline"
- match: { my_pipeline.version: 1 }

# required version does not match specified version
- do:
catch: bad_request
ingest.put_pipeline:
id: "my_pipeline"
if_version: 99
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}

# may not update to same version
- do:
catch: bad_request
ingest.put_pipeline:
id: "my_pipeline"
if_version: 1
body: >
{
"version": 1,
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}

# cannot conditionally update non-existent pipeline
- do:
catch: bad_request
ingest.put_pipeline:
id: "my_pipeline2"
if_version: 1
body: >
{
"version": 1,
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}

# conditionally update to specified version
- do:
ingest.put_pipeline:
id: "my_pipeline"
if_version: 1
body: >
{
"version": 99,
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.get_pipeline:
id: "my_pipeline"
- match: { my_pipeline.version: 99 }

# conditionally update without specified version
- do:
ingest.put_pipeline:
id: "my_pipeline"
if_version: 99
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.get_pipeline:
id: "my_pipeline"
- match: { my_pipeline.version: 100 }

- do:
ingest.delete_pipeline:
id: "my_pipeline"
- match: { acknowledged: true }
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
]
},
"params":{
"if_version":{
"type":"int",
"description":"Required version for optimistic concurrency control for pipeline updates"
},
"master_timeout":{
"type":"time",
"description":"Explicit operation timeout for connection to master node"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.ingest;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -22,27 +23,39 @@

public class PutPipelineRequest extends AcknowledgedRequest<PutPipelineRequest> implements ToXContentObject {

private String id;
private BytesReference source;
private XContentType xContentType;
private final String id;
private final BytesReference source;
private final XContentType xContentType;
private final Integer version;

/**
* Create a new pipeline request with the id and source along with the content type of the source
*/
public PutPipelineRequest(String id, BytesReference source, XContentType xContentType) {
public PutPipelineRequest(String id, BytesReference source, XContentType xContentType, Integer version) {
this.id = Objects.requireNonNull(id);
this.source = Objects.requireNonNull(source);
this.xContentType = Objects.requireNonNull(xContentType);
this.version = version;
}

public PutPipelineRequest(String id, BytesReference source, XContentType xContentType) {
this(id, source, xContentType, null);
}

public PutPipelineRequest(StreamInput in) throws IOException {
super(in);
id = in.readString();
source = in.readBytesReference();
xContentType = in.readEnum(XContentType.class);
if (in.getVersion().onOrAfter(Version.V_7_16_0)) {
version = in.readOptionalInt();
} else {
version = null;
}
}

PutPipelineRequest() {
this(null, null, null, null);
}

@Override
Expand All @@ -62,12 +75,19 @@ public XContentType getXContentType() {
return xContentType;
}

public Integer getVersion() {
return version;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
out.writeBytesReference(source);
out.writeEnum(xContentType);
if (out.getVersion().onOrAfter(Version.V_7_16_0)) {
out.writeOptionalInt(version);
}
}

@Override
Expand Down
59 changes: 57 additions & 2 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -55,7 +56,9 @@
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -394,7 +397,9 @@ public void putPipeline(

Map<String, Object> pipelineConfig = null;
IngestMetadata currentIngestMetadata = state.metadata().custom(IngestMetadata.TYPE);
if (currentIngestMetadata != null && currentIngestMetadata.getPipelines().containsKey(request.getId())) {
if (request.getVersion() == null &&
currentIngestMetadata != null &&
currentIngestMetadata.getPipelines().containsKey(request.getId())) {
pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
PipelineConfiguration currentPipeline = currentIngestMetadata.getPipelines().get(request.getId());
if (currentPipeline.getConfigAsMap().equals(pipelineConfig)) {
Expand Down Expand Up @@ -494,16 +499,66 @@ private static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(Compound
return processorMetrics;
}

// visible for testing
public static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) {
IngestMetadata currentIngestMetadata = currentState.metadata().custom(IngestMetadata.TYPE);

BytesReference pipelineSource = request.getSource();
if (request.getVersion() != null) {
PipelineConfiguration currentPipeline = currentIngestMetadata != null
? currentIngestMetadata.getPipelines().get(request.getId())
: null;
if (currentPipeline == null) {
throw new IllegalArgumentException(String.format(
Locale.ROOT,
"version conflict, required version [%s] for pipeline [%s] but no pipeline was found",
request.getVersion(),
request.getId()
));
}

final Integer currentVersion = currentPipeline.getVersion();
if (Objects.equals(request.getVersion(), currentVersion) == false) {
throw new IllegalArgumentException(String.format(
Locale.ROOT,
"version conflict, required version [%s] for pipeline [%s] but current version is [%s]",
request.getVersion(),
request.getId(),
currentVersion
));
}

Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
final Integer specifiedVersion = (Integer) pipelineConfig.get("version");
if (pipelineConfig.containsKey("version") && Objects.equals(specifiedVersion, currentVersion)) {
throw new IllegalArgumentException(String.format(
Locale.ROOT,
"cannot update pipeline [%s] with the same version [%s]",
request.getId(),
request.getVersion()
));
}

// if no version specified in the pipeline definition, inject a version of [request.getVersion() + 1]
if (specifiedVersion == null) {
pipelineConfig.put("version", request.getVersion() == null ? 1 : request.getVersion() + 1);
try {
XContentBuilder builder = XContentBuilder.builder(request.getXContentType().xContent()).map(pipelineConfig);
pipelineSource = BytesReference.bytes(builder);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
}

Map<String, PipelineConfiguration> pipelines;
if (currentIngestMetadata != null) {
pipelines = new HashMap<>(currentIngestMetadata.getPipelines());
} else {
pipelines = new HashMap<>();
}

pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource(), request.getXContentType()));
pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), pipelineSource, request.getXContentType()));
ClusterState.Builder newState = ClusterState.builder(currentState);
newState.metadata(Metadata.builder(currentState.getMetadata())
.putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,22 @@ BytesReference getConfig() {
return config;
}

public Integer getVersion() {
Map<String, Object> configMap = getConfigAsMap();
if (configMap.containsKey("version")) {
Object o = configMap.get("version");
if (o == null) {
return null;
} else if (o instanceof Number) {
return ((Number) o).intValue();
} else {
throw new IllegalStateException("unexpected version type [" + o.getClass().getName() + "]");
}
} else {
return null;
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down
Loading