Skip to content

Remove LocalCheckpointTracker#resetCheckpoint #34667

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Dec 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,6 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) {
}
}

/**
* Resets the checkpoint to the specified value.
*
* @param checkpoint the local checkpoint to reset this tracker to
*/
public synchronized void resetCheckpoint(final long checkpoint) {
// TODO: remove this method as after we restore the local history on promotion.
assert checkpoint != SequenceNumbers.UNASSIGNED_SEQ_NO;
assert checkpoint <= this.checkpoint;
processedSeqNo.clear();
this.checkpoint = checkpoint;
}

/**
* The current checkpoint which can be advanced by {@link #markSeqNoAsCompleted(long)}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5464,7 +5464,7 @@ public void testOpenSoftDeletesIndexWithSoftDeletesDisabled() throws Exception {
final List<DocIdSeqNoAndTerm> docs;
try (InternalEngine engine = createEngine(
config(softDeletesEnabled, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get))) {
List<Engine.Operation> ops = generateReplicaHistory(between(1, 100), randomBoolean());
List<Engine.Operation> ops = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean());
applyOperations(engine, ops);
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
engine.syncTranslog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.translog.SnapshotMatchers;
Expand All @@ -32,7 +31,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -150,35 +148,35 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34667")
public void testDedupByPrimaryTerm() throws Exception {
Map<Long, Long> latestOperations = new HashMap<>();
List<Integer> terms = Arrays.asList(between(1, 1000), between(1000, 2000));
/**
* If an operation above the local checkpoint is delivered multiple times, an engine will add multiple copies of that operation
* into Lucene (only the first copy is non-stale; others are stale and soft-deleted). Moreover, a nested document is indexed into
* Lucene as multiple documents (only the root document has both seq_no and term, non-root docs only have seq_no). This test verifies
* that {@link LuceneChangesSnapshot} returns exactly one operation per seq_no, and skip non-root nested documents or stale copies.
*/
public void testSkipStaleOrNonRootOfNestedDocuments() throws Exception {
Map<Long, Long> seqNoToTerm = new HashMap<>();
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean());
int totalOps = 0;
for (long term : terms) {
final List<Engine.Operation> ops = generateSingleDocHistory(true,
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE), term, 2, 20, "1");
primaryTerm.set(Math.max(primaryTerm.get(), term));
engine.rollTranslogGeneration();
for (Engine.Operation op : ops) {
// We need to simulate a rollback here as only ops after local checkpoint get into the engine
if (op.seqNo() <= engine.getLocalCheckpointTracker().getCheckpoint()) {
engine.getLocalCheckpointTracker().resetCheckpoint(randomLongBetween(-1, op.seqNo() - 1));
engine.rollTranslogGeneration();
}
for (Engine.Operation op : operations) {
// Engine skips deletes or indexes below the local checkpoint
if (engine.getLocalCheckpoint() < op.seqNo() || op instanceof Engine.NoOp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that we process noops differently than indexing / delete ops (w.r.t localcheckpoint) sounds like a bug (different) PR)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I will make this in a follow up.

seqNoToTerm.put(op.seqNo(), op.primaryTerm());
if (op instanceof Engine.Index) {
engine.index((Engine.Index) op);
} else if (op instanceof Engine.Delete) {
engine.delete((Engine.Delete) op);
}
latestOperations.put(op.seqNo(), op.primaryTerm());
if (rarely()) {
engine.refresh("test");
}
if (rarely()) {
engine.flush();
totalOps += ((Engine.Index) op).docs().size();
} else {
totalOps++;
}
totalOps++;
}
applyOperation(engine, op);
if (rarely()) {
engine.refresh("test");
}
if (rarely()) {
engine.rollTranslogGeneration();
}
if (rarely()) {
engine.flush();
}
}
long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo();
Expand All @@ -188,9 +186,9 @@ public void testDedupByPrimaryTerm() throws Exception {
searcher = null;
Translog.Operation op;
while ((op = snapshot.next()) != null) {
assertThat(op.toString(), op.primaryTerm(), equalTo(latestOperations.get(op.seqNo())));
assertThat(op.toString(), op.primaryTerm(), equalTo(seqNoToTerm.get(op.seqNo())));
}
assertThat(snapshot.skippedOperations(), equalTo(totalOps - latestOperations.size()));
assertThat(snapshot.skippedOperations(), equalTo(totalOps - seqNoToTerm.size()));
} finally {
IOUtils.close(searcher);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@

package org.elasticsearch.index.seqno;

import com.carrotsearch.hppc.LongObjectHashMap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.Before;

import java.util.ArrayList;
Expand Down Expand Up @@ -266,35 +263,6 @@ public void testWaitForOpsToComplete() throws BrokenBarrierException, Interrupte
thread.join();
}

public void testResetCheckpoint() {
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
int maxSeqNo = Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED);
for (int i = 0; i < operations; i++) {
if (!rarely()) {
tracker.markSeqNoAsCompleted(i);
maxSeqNo = i;
}
}

final int localCheckpoint =
randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Math.toIntExact(tracker.getCheckpoint()));
tracker.resetCheckpoint(localCheckpoint);
assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint));
assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo));
assertThat(tracker.processedSeqNo, new BaseMatcher<LongObjectHashMap<CountedBitSet>>() {
@Override
public boolean matches(Object item) {
return (item instanceof LongObjectHashMap && ((LongObjectHashMap) item).isEmpty());
}

@Override
public void describeTo(Description description) {
description.appendText("empty");
}
});
assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1)));
}

public void testContains() {
final long maxSeqNo = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, 100);
final long localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -316,18 +316,17 @@ protected static ParsedDocument testParsedDocument(
mappingUpdate);
}

public static CheckedFunction<String, ParsedDocument, IOException> nestedParsedDocFactory() throws Exception {
public static CheckedBiFunction<String, Integer, ParsedDocument, IOException> nestedParsedDocFactory() throws Exception {
final MapperService mapperService = createMapperService("type");
final String nestedMapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type")
.startObject("properties").startObject("nested_field").field("type", "nested").endObject().endObject()
.endObject().endObject());
final DocumentMapper nestedMapper = mapperService.documentMapperParser().parse("type", new CompressedXContent(nestedMapping));
return docId -> {
return (docId, nestedFieldValues) -> {
final XContentBuilder source = XContentFactory.jsonBuilder().startObject().field("field", "value");
final int nestedValues = between(0, 3);
if (nestedValues > 0) {
if (nestedFieldValues > 0) {
XContentBuilder nestedField = source.startObject("nested_field");
for (int i = 0; i < nestedValues; i++) {
for (int i = 0; i < nestedFieldValues; i++) {
nestedField.field("field-" + i, "value-" + i);
}
source.endObject();
Expand Down Expand Up @@ -705,22 +704,36 @@ public static List<Engine.Operation> generateSingleDocHistory(boolean forReplica
return ops;
}

public List<Engine.Operation> generateReplicaHistory(int numOps, boolean allowGapInSeqNo) {
public List<Engine.Operation> generateHistoryOnReplica(int numOps, boolean allowGapInSeqNo, boolean allowDuplicate,
boolean includeNestedDocs) throws Exception {
long seqNo = 0;
List<Engine.Operation> operations = new ArrayList<>(numOps);
final int maxIdValue = randomInt(numOps * 2);
final List<Engine.Operation> operations = new ArrayList<>(numOps);
CheckedBiFunction<String, Integer, ParsedDocument, IOException> nestedParsedDocFactory = nestedParsedDocFactory();
for (int i = 0; i < numOps; i++) {
String id = Integer.toString(between(1, 100));
final ParsedDocument doc = EngineTestCase.createParsedDoc(id, null);
if (randomBoolean()) {
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(),
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(),
-1, true));
} else if (randomBoolean()) {
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(),
i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis()));
} else {
operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA,
threadPool.relativeTimeInMillis(), "test-" + i));
final String id = Integer.toString(randomInt(maxIdValue));
final Engine.Operation.TYPE opType = randomFrom(Engine.Operation.TYPE.values());
final boolean isNestedDoc = includeNestedDocs && opType == Engine.Operation.TYPE.INDEX && randomBoolean();
final int nestedValues = between(0, 3);
final long startTime = threadPool.relativeTimeInMillis();
final int copies = allowDuplicate && rarely() ? between(2, 4) : 1;
for (int copy = 0; copy < copies; copy++) {
final ParsedDocument doc = isNestedDoc ? nestedParsedDocFactory.apply(id, nestedValues) : createParsedDoc(id, null);
switch (opType) {
case INDEX:
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(),
i, null, Engine.Operation.Origin.REPLICA, startTime, -1, true));
break;
case DELETE:
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(),
i, null, Engine.Operation.Origin.REPLICA, startTime));
break;
case NO_OP:
operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA, startTime, "test-" + i));
break;
default:
throw new IllegalStateException("Unknown operation type [" + opType + "]");
}
}
seqNo++;
if (allowGapInSeqNo && rarely()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -567,12 +567,12 @@ public void testProcessOnceOnPrimary() throws Exception {
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
final CheckedFunction<String, ParsedDocument, IOException> nestedDocFactory = EngineTestCase.nestedParsedDocFactory();
final CheckedBiFunction<String, Integer, ParsedDocument, IOException> nestedDocFunc = EngineTestCase.nestedParsedDocFactory();
int numOps = between(10, 100);
List<Engine.Operation> operations = new ArrayList<>(numOps);
for (int i = 0; i < numOps; i++) {
String docId = Integer.toString(between(1, 100));
ParsedDocument doc = randomBoolean() ? EngineTestCase.createParsedDoc(docId, null) : nestedDocFactory.apply(docId);
ParsedDocument doc = randomBoolean() ? EngineTestCase.createParsedDoc(docId, null) : nestedDocFunc.apply(docId, randomInt(3));
if (randomBoolean()) {
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, i, primaryTerm.get(), 1L,
VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), -1, true));
Expand Down