Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -523,13 +523,12 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse
indexRequest.type(), indexRequest.id(), indexRequest.source(), indexRequest.getContentType())
.routing(indexRequest.routing());
result = replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(),
indexRequest.versionType().versionTypeForReplicationAndRecovery(), indexRequest.getAutoGeneratedTimestamp(),
indexRequest.isRetry(), sourceToParse);
indexRequest.getAutoGeneratedTimestamp(), indexRequest.isRetry(), sourceToParse);
break;
case DELETE:
DeleteRequest deleteRequest = (DeleteRequest) docWriteRequest;
result = replica.applyDeleteOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getVersion(),
deleteRequest.type(), deleteRequest.id(), deleteRequest.versionType().versionTypeForReplicationAndRecovery());
deleteRequest.type(), deleteRequest.id());
break;
default:
throw new IllegalStateException("Unexpected request operation type on replica: "
Expand Down
15 changes: 0 additions & 15 deletions server/src/main/java/org/elasticsearch/index/VersionType.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,6 @@ public boolean validateVersionForReads(long version) {
// not allowing Versions.NOT_FOUND as it is not a valid input value.
return version > 0L || version == Versions.MATCH_ANY;
}

@Override
public VersionType versionTypeForReplicationAndRecovery() {
// replicas get the version from the primary after increment. The same version is stored in
// the transaction log. -> the should use the external semantics.
return EXTERNAL;
}
},
EXTERNAL((byte) 1) {
@Override
Expand Down Expand Up @@ -333,14 +326,6 @@ public byte getValue() {
*/
public abstract boolean validateVersionForReads(long version);

/**
* Some version types require different semantics for primary and replicas. This version allows
* the type to override the default behavior.
*/
public VersionType versionTypeForReplicationAndRecovery() {
return this;
}

public static VersionType fromString(String versionType) {
if ("internal".equals(versionType)) {
return INTERNAL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1168,6 +1168,7 @@ public static class Index extends Operation {
public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin,
long startTime, long autoGeneratedIdTimestamp, boolean isRetry) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
this.doc = doc;
this.isRetry = isRetry;
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
Expand Down Expand Up @@ -1245,6 +1246,7 @@ public static class Delete extends Operation {
public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType,
Origin origin, long startTime) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
this.type = Objects.requireNonNull(type);
this.id = Objects.requireNonNull(id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ private boolean canOptimizeAddDocument(Index index) {
return true;
case PEER_RECOVERY:
case REPLICA:
assert index.version() == 1 && index.versionType() == VersionType.EXTERNAL
assert index.version() == 1 && index.versionType() == null
: "version: " + index.version() + " type: " + index.versionType();
return true;
case LOCAL_TRANSLOG_RECOVERY:
Expand All @@ -704,20 +704,6 @@ private boolean canOptimizeAddDocument(Index index) {
return false;
}

private boolean assertVersionType(final Engine.Operation operation) {
if (operation.origin() == Operation.Origin.REPLICA ||
operation.origin() == Operation.Origin.PEER_RECOVERY ||
operation.origin() == Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
// ensure that replica operation has expected version type for replication
// ensure that versionTypeForReplicationAndRecovery is idempotent
assert operation.versionType() == operation.versionType().versionTypeForReplicationAndRecovery()
: "unexpected version type in request from [" + operation.origin().name() + "] " +
"found [" + operation.versionType().name() + "] " +
"expected [" + operation.versionType().versionTypeForReplicationAndRecovery().name() + "]";
}
return true;
}

private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
if (origin == Operation.Origin.PRIMARY) {
assert assertOriginPrimarySequenceNumber(seqNo);
Expand Down Expand Up @@ -757,7 +743,6 @@ public IndexResult index(Index index) throws IOException {
try (ReleasableLock releasableLock = readLock.acquire()) {
ensureOpen();
assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
assert assertVersionType(index);
try (Releasable ignored = versionMap.acquireLock(index.uid().bytes());
Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) {
lastWriteNanos = index.startTime();
Expand Down Expand Up @@ -860,9 +845,6 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of index [" + index.seqNo() + "]";
}
versionMap.enforceSafeAccess();
// drop out of order operations
assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() :
"resolving out of order delivery based on versioning but version type isn't fit for it. got [" + index.versionType() + "]";
// unlike the primary, replicas don't really care to about creation status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return false for the created flag in favor of code simplicity
Expand Down Expand Up @@ -1096,7 +1078,6 @@ private void updateDocs(final Term uid, final List<ParseContext.Document> docs,
public DeleteResult delete(Delete delete) throws IOException {
versionMap.enforceSafeAccess();
assert Objects.equals(delete.uid().field(), IdFieldMapper.NAME) : delete.uid().field();
assert assertVersionType(delete);
assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo());
final DeleteResult deleteResult;
// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
Expand Down Expand Up @@ -1149,10 +1130,6 @@ public DeleteResult delete(Delete delete) throws IOException {

private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException {
assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
// drop out of order operations
assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() :
"resolving out of order delivery based on versioning but version type isn't fit for it. got ["
+ delete.versionType() + "]";
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr));
assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" +
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]";
Expand Down
23 changes: 11 additions & 12 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -645,22 +645,22 @@ private IndexShardState changeState(IndexShardState newState, String reason) {

public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType versionType, SourceToParse sourceToParse,
long autoGeneratedTimestamp, boolean isRetry) throws IOException {
assert versionType.validateVersionForWrites(version);
return applyIndexOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, versionType, autoGeneratedTimestamp,
isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse);
}

public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, VersionType versionType,
long autoGeneratedTimeStamp, boolean isRetry, SourceToParse sourceToParse)
public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, long autoGeneratedTimeStamp,
boolean isRetry, SourceToParse sourceToParse)
throws IOException {
return applyIndexOperation(seqNo, primaryTerm, version, versionType, autoGeneratedTimeStamp, isRetry,
return applyIndexOperation(seqNo, primaryTerm, version, null, autoGeneratedTimeStamp, isRetry,
Engine.Operation.Origin.REPLICA, sourceToParse);
}

private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, long version, VersionType versionType,
private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, long version, @Nullable VersionType versionType,
long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin,
SourceToParse sourceToParse) throws IOException {
assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
assert versionType.validateVersionForWrites(version);
ensureWriteAllowed(origin);
Engine.Index operation;
try {
Expand Down Expand Up @@ -736,19 +736,18 @@ private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) {

public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType)
throws IOException {
assert versionType.validateVersionForWrites(version);
return applyDeleteOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, type, id, versionType,
Engine.Operation.Origin.PRIMARY);
}

public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id,
VersionType versionType) throws IOException {
return applyDeleteOperation(seqNo, primaryTerm, version, type, id, versionType, Engine.Operation.Origin.REPLICA);
public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id) throws IOException {
return applyDeleteOperation(seqNo, primaryTerm, version, type, id, null, Engine.Operation.Origin.REPLICA);
}

private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, long version, String type, String id,
VersionType versionType, Engine.Operation.Origin origin) throws IOException {
@Nullable VersionType versionType, Engine.Operation.Origin origin) throws IOException {
assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
assert versionType.validateVersionForWrites(version);
ensureWriteAllowed(origin);
// When there is a single type, the unique identifier is only composed of the _id,
// so there is no way to differenciate foo#1 from bar#1. This is especially an issue
Expand Down Expand Up @@ -1211,14 +1210,14 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine
// we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all
// autoGeneratedID docs that are coming from the primary are updated correctly.
result = applyIndexOperation(index.seqNo(), index.primaryTerm(), index.version(),
index.versionType().versionTypeForReplicationAndRecovery(), index.getAutoGeneratedIdTimestamp(), true, origin,
null, index.getAutoGeneratedIdTimestamp(), true, origin,
source(shardId.getIndexName(), index.type(), index.id(), index.source(),
XContentHelper.xContentType(index.source())).routing(index.routing()));
break;
case DELETE:
final Translog.Delete delete = (Translog.Delete) operation;
result = applyDeleteOperation(delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(),
delete.versionType().versionTypeForReplicationAndRecovery(), origin);
null, origin);
break;
case NO_OP:
final Translog.NoOp noOp = (Translog.NoOp) operation;
Expand Down
Loading