Skip to content

Rename seq# powered optimistic concurrency control parameters to ifSeqNo/ifPrimaryTerm #36757

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Dec 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@
"type" : "time",
"description" : "Explicit operation timeout"
},
"if_seq_no_match" : {
"if_seq_no" : {
"type" : "number",
"description" : "only perform the delete operation if the last operation that has changed the document has the specified sequence number"
},
"if_primary_term_match" : {
"if_primary_term" : {
"type" : "number",
"description" : "only perform the delete operation if the last operation that has changed the document has the specified primary term"
},
Expand Down
4 changes: 2 additions & 2 deletions rest-api-spec/src/main/resources/rest-api-spec/api/index.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@
"options" : ["internal", "external", "external_gte", "force"],
"description" : "Specific version type"
},
"if_seq_no_match" : {
"if_seq_no" : {
"type" : "number",
"description" : "only perform the index operation if the last operation that has changed the document has the specified sequence number"
},
"if_primary_term_match" : {
"if_primary_term" : {
"type" : "number",
"description" : "only perform the index operation if the last operation that has changed the document has the specified primary term"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,25 @@
index:
index: test_1
id: 1
if_seq_no_match: 10000
if_primary_term_match: $primary_term
if_seq_no: 10000
if_primary_term: $primary_term
body: { foo: bar2 }

- do:
catch: conflict
index:
index: test_1
id: 1
if_seq_no_match: $seqno
if_primary_term_match: 1000
if_seq_no: $seqno
if_primary_term: 1000
body: { foo: bar2 }

- do:
index:
index: test_1
id: 1
if_seq_no_match: $seqno
if_primary_term_match: $primary_term
if_seq_no: $seqno
if_primary_term: $primary_term
body: { foo: bar2 }

- match: { _version: 2 }
25 changes: 13 additions & 12 deletions server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
private static final ParseField RETRY_ON_CONFLICT = new ParseField("retry_on_conflict");
private static final ParseField PIPELINE = new ParseField("pipeline");
private static final ParseField SOURCE = new ParseField("_source");
private static final ParseField IF_SEQ_NO_MATCH = new ParseField("if_seq_no_match");
private static final ParseField IF_PRIMARY_TERM_MATCH = new ParseField("if_primary_term_match");
private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no");
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");

/**
* Requests that are part of this request. It is only possible to add things that are both {@link ActionRequest}s and
Expand Down Expand Up @@ -350,8 +350,8 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
String opType = null;
long version = Versions.MATCH_ANY;
VersionType versionType = VersionType.INTERNAL;
long ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO;
long ifPrimaryTermMatch = 0;
long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
long ifPrimaryTerm = 0;
int retryOnConflict = 0;
String pipeline = valueOrDefault(defaultPipeline, globalPipeline);

Expand Down Expand Up @@ -382,10 +382,10 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
version = parser.longValue();
} else if (VERSION_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
versionType = VersionType.fromString(parser.text());
} else if (IF_SEQ_NO_MATCH.match(currentFieldName, parser.getDeprecationHandler())) {
ifSeqNoMatch = parser.longValue();
} else if (IF_PRIMARY_TERM_MATCH.match(currentFieldName, parser.getDeprecationHandler())) {
ifPrimaryTermMatch = parser.longValue();
} else if (IF_SEQ_NO.match(currentFieldName, parser.getDeprecationHandler())) {
ifSeqNo = parser.longValue();
} else if (IF_PRIMARY_TERM.match(currentFieldName, parser.getDeprecationHandler())) {
ifPrimaryTerm = parser.longValue();
} else if (RETRY_ON_CONFLICT.match(currentFieldName, parser.getDeprecationHandler())) {
retryOnConflict = parser.intValue();
} else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
Expand Down Expand Up @@ -414,7 +414,7 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null

if ("delete".equals(action)) {
add(new DeleteRequest(index, type, id).routing(routing)
.version(version).versionType(versionType).setIfMatch(ifSeqNoMatch, ifPrimaryTermMatch), payload);
.version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm), payload);
} else {
nextMarker = findNextMarker(marker, from, data, length);
if (nextMarker == -1) {
Expand All @@ -427,16 +427,17 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
if ("index".equals(action)) {
if (opType == null) {
internalAdd(new IndexRequest(index, type, id).routing(routing).version(version).versionType(versionType)
.setPipeline(pipeline).ifMatch(ifSeqNoMatch, ifPrimaryTermMatch)
.setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker,xContentType), xContentType), payload);
} else {
internalAdd(new IndexRequest(index, type, id).routing(routing).version(version).versionType(versionType)
.create("create".equals(opType)).setPipeline(pipeline).ifMatch(ifSeqNoMatch, ifPrimaryTermMatch)
.create("create".equals(opType)).setPipeline(pipeline)
.setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload);
}
} else if ("create".equals(action)) {
internalAdd(new IndexRequest(index, type, id).routing(routing).version(version).versionType(versionType)
.create(true).setPipeline(pipeline).ifMatch(ifSeqNoMatch, ifPrimaryTermMatch)
.create(true).setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload);
} else if ("update".equals(action)) {
UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).retryOnConflict(retryOnConflict)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ private static void executeIndexRequestOnPrimary(BulkPrimaryExecutionContext con
executeOnPrimaryWhileHandlingMappingUpdates(context,
() ->
primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse,
request.ifSeqNoMatch(), request.ifPrimaryTermMatch(), request.getAutoGeneratedTimestamp(), request.isRetry()),
request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry()),
e -> primary.getFailedIndexResult(e, request.version()),
context::markOperationAsExecuted,
mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type()));
Expand All @@ -474,7 +474,7 @@ private static void executeDeleteRequestOnPrimary(BulkPrimaryExecutionContext co
final IndexShard primary = context.getPrimary();
executeOnPrimaryWhileHandlingMappingUpdates(context,
() -> primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType(),
request.ifSeqNoMatch(), request.ifPrimaryTermMatch()),
request.ifSeqNo(), request.ifPrimaryTerm()),
e -> primary.getFailedDeleteResult(e, request.version()),
context::markOperationAsExecuted,
mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
private String routing;
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
private long ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long ifPrimaryTermMatch = 0;
private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = 0;

public DeleteRequest() {
}
Expand Down Expand Up @@ -116,11 +116,20 @@ public ActionRequestValidationException validate() {
validationException = addValidationError("version type [force] may no longer be used", validationException);
}

if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO && (
if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO && (
versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY
)) {
validationException = addValidationError("compare and write operations can not use versioning", validationException);
}

if (ifPrimaryTerm == 0 && ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException);
}
if (ifPrimaryTerm != 0 && ifSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
validationException =
addValidationError("ifSeqNo is unassigned, but primary term is [" + ifPrimaryTerm + "]", validationException);
}

return validationException;
}

Expand Down Expand Up @@ -203,29 +212,52 @@ public DeleteRequest versionType(VersionType versionType) {
return this;
}

public long ifSeqNoMatch() {
return ifSeqNoMatch;
/**
* If set, only perform this delete request if the document was last modification was assigned this sequence number.
* If the document last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public long ifSeqNo() {
return ifSeqNo;
}

public long ifPrimaryTermMatch() {
return ifPrimaryTermMatch;
/**
* If set, only perform this delete request if the document was last modification was assigned this primary term.
*
* If the document last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public long ifPrimaryTerm() {
return ifPrimaryTerm;
}

public DeleteRequest setIfMatch(long seqNo, long term) {
if (term == 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("seqNo is set, but primary term is [0]");
}
if (term != 0 && seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("seqNo is unassigned, but primary term is [" + term + "]");
}
/**
* only perform this delete request if the document was last modification was assigned the given
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
*
* If the document last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public DeleteRequest setIfSeqNo(long seqNo) {
if (seqNo < 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "].");
}
ifSeqNo = seqNo;
return this;
}

/**
* only perform this delete request if the document was last modification was assigned the given
* primary term. Must be used in combination with {@link #setIfSeqNo(long)}
*
* If the document last modification was assigned a different primary term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public DeleteRequest setIfPrimaryTerm(long term) {
if (term < 0) {
throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]");
}
ifSeqNoMatch = seqNo;
ifPrimaryTermMatch = term;
ifPrimaryTerm = term;
return this;
}

Expand All @@ -251,11 +283,11 @@ public void readFrom(StreamInput in) throws IOException {
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
ifSeqNoMatch = in.readZLong();
ifPrimaryTermMatch = in.readVLong();
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
} else {
ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO;
ifPrimaryTermMatch = 0;
ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
ifPrimaryTerm = 0;
}
}

Expand All @@ -271,10 +303,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(version);
out.writeByte(versionType.getValue());
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeZLong(ifSeqNoMatch);
out.writeVLong(ifPrimaryTermMatch);
} else if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTermMatch != 0) {
assert false : "setIfMatch [" + ifSeqNoMatch + "], currentDocTem [" + ifPrimaryTermMatch + "]";
out.writeZLong(ifSeqNo);
out.writeVLong(ifPrimaryTerm);
} else if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTerm != 0) {
assert false : "setIfMatch [" + ifSeqNo + "], currentDocTem [" + ifPrimaryTerm + "]";
throw new IllegalStateException(
"sequence number based compare and write is not supported until all nodes are on version 7.0 or higher. " +
"Stream version [" + out.getVersion() + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,26 @@ public DeleteRequestBuilder setVersionType(VersionType versionType) {
}

/**
* only performs this delete request if the document was last modification was assigned the given
* sequence number and primary term
* only perform this delete request if the document was last modification was assigned the given
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
*
* If the document last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public DeleteRequestBuilder setIfMatch(long seqNo, long term) {
request.setIfMatch(seqNo, term);
public DeleteRequestBuilder setIfSeqNo(long seqNo) {
request.setIfSeqNo(seqNo);
return this;
}

/**
* only perform this delete request if the document was last modification was assigned the given
* primary term. Must be used in combination with {@link #setIfSeqNo(long)}
*
* If the document last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public DeleteRequestBuilder setIfPrimaryTerm(long term) {
request.setIfPrimaryTerm(term);
return this;
}

Expand Down
Loading