Skip to content

Add sequence numbers based optimistic concurrency control support to Engine #36467

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ final class PerThreadIDVersionAndSeqNoLookup {
* using the same cache key. Otherwise we'd have to disable caching
* entirely for these readers.
*/
public DocIdAndVersion lookupVersion(BytesRef id, LeafReaderContext context)
public DocIdAndVersion lookupVersion(BytesRef id, boolean loadSeqNo, LeafReaderContext context)
throws IOException {
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
"context's reader is not the same as the reader class was initialized on.";
Expand All @@ -108,7 +108,28 @@ public DocIdAndVersion lookupVersion(BytesRef id, LeafReaderContext context)
if (versions.advanceExact(docID) == false) {
throw new IllegalArgumentException("Document [" + docID + "] misses the [" + VersionFieldMapper.NAME + "] field");
}
return new DocIdAndVersion(docID, versions.longValue(), context.reader(), context.docBase);
final long seqNo;
final long term;
if (loadSeqNo) {
NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should clean this up in master once backported. can you put a comment in there?

// remove the null check in 7.0 once we can't read indices with no seq#
if (seqNos != null && seqNos.advanceExact(docID)) {
seqNo = seqNos.longValue();
} else {
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
NumericDocValues terms = context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
if (terms != null && terms.advanceExact(docID)) {
term = terms.longValue();
} else {
term = 0;
}

} else {
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
term = 0;
}
return new DocIdAndVersion(docID, versions.longValue(), seqNo, term, context.reader(), context.docBase);
} else {
return null;
}
Expand Down Expand Up @@ -150,6 +171,7 @@ DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOExcep
final NumericDocValues seqNoDV = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
for (; docID != DocIdSetIterator.NO_MORE_DOCS; docID = docsEnum.nextDoc()) {
final long seqNo;
// remove the null check in 7.0 once we can't read indices with no seq#
if (seqNoDV != null && seqNoDV.advanceExact(docID)) {
seqNo = seqNoDV.longValue();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;

import static org.elasticsearch.common.lucene.uid.Versions.NOT_FOUND;

/** Utility class to resolve the Lucene doc ID, version, seqNo and primaryTerms for a given uid. */
public final class VersionsAndSeqNoResolver {

Expand Down Expand Up @@ -96,12 +94,16 @@ private VersionsAndSeqNoResolver() {
public static class DocIdAndVersion {
public final int docId;
public final long version;
public final long seqNo;
public final long primaryTerm;
public final LeafReader reader;
public final int docBase;

public DocIdAndVersion(int docId, long version, LeafReader reader, int docBase) {
public DocIdAndVersion(int docId, long version, long seqNo, long primaryTerm, LeafReader reader, int docBase) {
this.docId = docId;
this.version = version;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.reader = reader;
this.docBase = docBase;
}
Expand Down Expand Up @@ -129,15 +131,15 @@ public static class DocIdAndSeqNo {
* <li>a doc ID and a version otherwise
* </ul>
*/
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException {
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term, boolean loadSeqNo) throws IOException {
PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
List<LeafReaderContext> leaves = reader.leaves();
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
final LeafReaderContext leaf = leaves.get(i);
PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
DocIdAndVersion result = lookup.lookupVersion(term.bytes(), leaf);
DocIdAndVersion result = lookup.lookupVersion(term.bytes(), loadSeqNo, leaf);
if (result != null) {
return result;
}
Expand Down Expand Up @@ -175,15 +177,4 @@ public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) thr
}
return latest;
}

/**
* Load the version for the uid from the reader, returning<ul>
* <li>{@link Versions#NOT_FOUND} if no matching doc exists,
* <li>the version associated with the provided uid otherwise
* </ul>
*/
public static long loadVersion(IndexReader reader, Term term) throws IOException {
final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term);
return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version;
}
}
44 changes: 38 additions & 6 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ protected final GetResult getFromSearcher(Get get, BiFunction<String, SearcherSc
final Searcher searcher = searcherFactory.apply("get", scope);
final DocIdAndVersion docIdAndVersion;
try {
docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.reader(), get.uid());
docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.reader(), get.uid(), true);
} catch (Exception e) {
Releasables.closeWhileHandlingException(searcher);
//TODO: A better exception goes here
Expand Down Expand Up @@ -1345,14 +1345,23 @@ public static class Index extends Operation {
private final ParsedDocument doc;
private final long autoGeneratedIdTimestamp;
private final boolean isRetry;
private final long ifSeqNoMatch;
private final long ifPrimaryTermMatch;

public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin,
long startTime, long autoGeneratedIdTimestamp, boolean isRetry) {
long startTime, long autoGeneratedIdTimestamp, boolean isRetry, long ifSeqNoMatch, long ifPrimaryTermMatch) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
assert ifPrimaryTermMatch >= 0 : "ifPrimaryTermMatch [" + ifPrimaryTermMatch + "] must be non negative";
assert ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO || ifSeqNoMatch >=0 :
"ifSeqNoMatch [" + ifSeqNoMatch + "] must be non negative or unset";
assert (origin == Origin.PRIMARY) || (ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO && ifPrimaryTermMatch == 0) :
"cas operations are only allowed if origin is primary. get [" + origin + "]";
this.doc = doc;
this.isRetry = isRetry;
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
this.ifSeqNoMatch = ifSeqNoMatch;
this.ifPrimaryTermMatch = ifPrimaryTermMatch;
}

public Index(Term uid, long primaryTerm, ParsedDocument doc) {
Expand All @@ -1361,7 +1370,7 @@ public Index(Term uid, long primaryTerm, ParsedDocument doc) {

Index(Term uid, long primaryTerm, ParsedDocument doc, long version) {
this(uid, doc, SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, VersionType.INTERNAL,
Origin.PRIMARY, System.nanoTime(), -1, false);
Origin.PRIMARY, System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
} // TEST ONLY

public ParsedDocument parsedDoc() {
Expand Down Expand Up @@ -1417,29 +1426,45 @@ public boolean isRetry() {
return isRetry;
}

public long getIfSeqNoMatch() {
return ifSeqNoMatch;
}

public long getIfPrimaryTermMatch() {
return ifPrimaryTermMatch;
}
}

public static class Delete extends Operation {

private final String type;
private final String id;
private final long ifSeqNoMatch;
private final long ifPrimaryTermMatch;

public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType,
Origin origin, long startTime) {
Origin origin, long startTime, long ifSeqNoMatch, long ifPrimaryTermMatch) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
assert ifPrimaryTermMatch >= 0 : "ifPrimaryTermMatch [" + ifPrimaryTermMatch + "] must be non negative";
assert ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO || ifSeqNoMatch >=0 :
"ifSeqNoMatch [" + ifSeqNoMatch + "] must be non negative or unset";
assert (origin == Origin.PRIMARY) || (ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO && ifPrimaryTermMatch == 0) :
"cas operations are only allowed if origin is primary. get [" + origin + "]";
this.type = Objects.requireNonNull(type);
this.id = Objects.requireNonNull(id);
this.ifSeqNoMatch = ifSeqNoMatch;
this.ifPrimaryTermMatch = ifPrimaryTermMatch;
}

public Delete(String type, String id, Term uid, long primaryTerm) {
this(type, id, uid, SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL,
Origin.PRIMARY, System.nanoTime());
Origin.PRIMARY, System.nanoTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
}

public Delete(Delete template, VersionType versionType) {
this(template.type(), template.id(), template.uid(), template.seqNo(), template.primaryTerm(), template.version(),
versionType, template.origin(), template.startTime());
versionType, template.origin(), template.startTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
}

@Override
Expand All @@ -1462,6 +1487,13 @@ public int estimatedSizeInBytes() {
return (uid().field().length() + uid().text().length()) * 2 + 20;
}

public long getIfSeqNoMatch() {
return ifSeqNoMatch;
}

public long getIfPrimaryTermMatch() {
return ifPrimaryTermMatch;
}
}

public static class NoOp extends Operation {
Expand Down
Loading