Skip to content

Reset LiveVersionMap on sync commit #27534

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,24 @@ public long ramBytesUsed() {
return BASE_RAM_BYTES_USED;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;

DeleteVersionValue that = (DeleteVersionValue) o;

return time == that.time;
}

@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (int) (time ^ (time >>> 32));
return result;
}

@Override
public String toString() {
return "DeleteVersionValue{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> search
ensureOpen();
SearcherScope scope;
if (get.realtime()) {
VersionValue versionValue = versionMap.getUnderLock(get.uid());
VersionValue versionValue = versionMap.getUnderLock(get.uid().bytes());
if (versionValue != null) {
if (versionValue.isDelete()) {
return GetResult.NOT_EXISTS;
Expand Down Expand Up @@ -598,7 +598,7 @@ enum OpVsLuceneDocStatus {
private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException {
assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found";
final OpVsLuceneDocStatus status;
final VersionValue versionValue = versionMap.getUnderLock(op.uid());
final VersionValue versionValue = versionMap.getUnderLock(op.uid().bytes());
assert incrementVersionLookup();
if (versionValue != null) {
if (op.seqNo() > versionValue.seqNo ||
Expand Down Expand Up @@ -635,7 +635,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
/** resolves the current version of the document, returning null if not found */
private VersionValue resolveDocVersion(final Operation op) throws IOException {
assert incrementVersionLookup(); // used for asserting in tests
VersionValue versionValue = versionMap.getUnderLock(op.uid());
VersionValue versionValue = versionMap.getUnderLock(op.uid().bytes());
if (versionValue == null) {
assert incrementIndexVersionLookup(); // used for asserting in tests
final long currentVersion = loadCurrentVersionFromIndex(op.uid());
Expand Down Expand Up @@ -1048,7 +1048,7 @@ static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted,
* Asserts that the doc in the index operation really doesn't exist
*/
private boolean assertDocDoesNotExist(final Index index, final boolean allowDeleted) throws IOException {
final VersionValue versionValue = versionMap.getUnderLock(index.uid());
final VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
if (versionValue != null) {
if (versionValue.isDelete() == false || allowDeleted == false) {
throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists in version map (version " + versionValue + ")");
Expand Down Expand Up @@ -1376,6 +1376,8 @@ public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) thr
commitIndexWriter(indexWriter, translog, syncId);
logger.debug("successfully sync committed. sync id [{}].", syncId);
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
// we are guaranteed to have no operations in the version map here!
versionMap.adjustMapSizeUnderLock();
return SyncedFlushResult.SUCCESS;
} catch (IOException ex) {
maybeFailEngine("sync commit", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.index.engine;

import org.apache.lucene.index.Term;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef;
Expand All @@ -35,6 +34,18 @@
/** Maps _uid value to its version information. */
class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {

/**
* Resets the internal map and adjusts it's capacity as if there were no indexing operations.
* This must be called under write lock in the engine
*/
void adjustMapSizeUnderLock() {
if (maps.current.isEmpty() == false || maps.old.isEmpty() == false) {
assert false : "map must be empty"; // fail hard if not empty and fail with assertion in tests to ensure we never swallow it
throw new IllegalStateException("map must be empty");
}
maps = new Maps();
}

private static class Maps {

// All writes (adds and deletes) go into here:
Expand All @@ -50,7 +61,7 @@ private static class Maps {

Maps() {
this(ConcurrentCollections.<BytesRef,VersionValue>newConcurrentMapWithAggressiveConcurrency(),
ConcurrentCollections.<BytesRef,VersionValue>newConcurrentMapWithAggressiveConcurrency());
Collections.emptyMap());
}
}

Expand Down Expand Up @@ -121,21 +132,21 @@ public void afterRefresh(boolean didRefresh) throws IOException {
}

/** Returns the live version (add or delete) for this uid. */
VersionValue getUnderLock(final Term uid) {
VersionValue getUnderLock(final BytesRef uid) {
Maps currentMaps = maps;

// First try to get the "live" value:
VersionValue value = currentMaps.current.get(uid.bytes());
VersionValue value = currentMaps.current.get(uid);
if (value != null) {
return value;
}

value = currentMaps.old.get(uid.bytes());
value = currentMaps.old.get(uid);
if (value != null) {
return value;
}

return tombstones.get(uid.bytes());
return tombstones.get(uid);
}

/** Adds this uid/version to the pending adds map. */
Expand Down Expand Up @@ -250,4 +261,8 @@ public Collection<Accountable> getChildResources() {
// TODO: useful to break down RAM usage here?
return Collections.emptyList();
}
}

/** Returns the current internal versions as a point in time snapshot*/
Map<BytesRef, VersionValue> getAllCurrent() {
return maps.current;
}}
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,31 @@ public Collection<Accountable> getChildResources() {
return Collections.emptyList();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

VersionValue that = (VersionValue) o;

if (version != that.version) return false;
if (seqNo != that.seqNo) return false;
return term == that.term;
}

@Override
public int hashCode() {
int result = (int) (version ^ (version >>> 32));
result = 31 * result + (int) (seqNo ^ (seqNo >>> 32));
result = 31 * result + (int) (term ^ (term >>> 32));
return result;
}

@Override
public String toString() {
return "VersionValue{" +
"version=" + version +

", seqNo=" + seqNo +
", term=" + term +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,25 @@

package org.elasticsearch.index.engine;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.RamUsageTester;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.Assertions;
import org.elasticsearch.bootstrap.JavaVersion;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

public class LiveVersionMapTests extends ESTestCase {

public void testRamBytesUsed() throws Exception {
Expand Down Expand Up @@ -57,4 +70,151 @@ public void testRamBytesUsed() throws Exception {
assertEquals(actualRamBytesUsed, estimatedRamBytesUsed, actualRamBytesUsed / 4);
}

private BytesRef uid(String string) {
BytesRefBuilder builder = new BytesRefBuilder();
builder.copyChars(string);
// length of the array must be the same as the len of the ref... there is an assertion in LiveVersionMap#putUnderLock
return BytesRef.deepCopyOf(builder.get());
}

public void testBasics() throws IOException {
LiveVersionMap map = new LiveVersionMap();
map.putUnderLock(uid("test"), new VersionValue(1,1,1));
assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test")));
map.beforeRefresh();
assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test")));
map.afterRefresh(randomBoolean());
assertNull(map.getUnderLock(uid("test")));


map.putUnderLock(uid("test"), new DeleteVersionValue(1,1,1, Long.MAX_VALUE));
assertEquals(new DeleteVersionValue(1,1,1, Long.MAX_VALUE), map.getUnderLock(uid("test")));
map.beforeRefresh();
assertEquals(new DeleteVersionValue(1,1,1, Long.MAX_VALUE), map.getUnderLock(uid("test")));
map.afterRefresh(randomBoolean());
assertEquals(new DeleteVersionValue(1,1,1, Long.MAX_VALUE), map.getUnderLock(uid("test")));
map.removeTombstoneUnderLock(uid("test"));
assertNull(map.getUnderLock(uid("test")));
}


public void testAdjustMapSizeUnderLock() throws IOException {
LiveVersionMap map = new LiveVersionMap();
map.putUnderLock(uid("test"), new VersionValue(1,1,1));
boolean withinRefresh = randomBoolean();
if (withinRefresh) {
map.beforeRefresh();
}
assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test")));
final String msg;
if (Assertions.ENABLED) {
msg = expectThrows(AssertionError.class, map::adjustMapSizeUnderLock).getMessage();
} else {
msg = expectThrows(IllegalStateException.class, map::adjustMapSizeUnderLock).getMessage();
}
assertEquals("map must be empty", msg);
assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test")));
if (withinRefresh == false) {
map.beforeRefresh();
}
map.afterRefresh(randomBoolean());
Map<BytesRef, VersionValue> allCurrent = map.getAllCurrent();
map.adjustMapSizeUnderLock();
assertNotSame(allCurrent, map.getAllCurrent());
}

public void testConcurrently() throws IOException, InterruptedException {
HashSet<BytesRef> keySet = new HashSet<>();
int numKeys = randomIntBetween(50, 200);
for (int i = 0; i < numKeys; i++) {
keySet.add(uid(TestUtil.randomSimpleString(random(), 10, 20)));
}
List<BytesRef> keyList = new ArrayList<>(keySet);
ConcurrentHashMap<BytesRef, VersionValue> values = new ConcurrentHashMap<>();
KeyedLock<BytesRef> keyedLock = new KeyedLock<>();
LiveVersionMap map = new LiveVersionMap();
int numThreads = randomIntBetween(2, 5);

Thread[] threads = new Thread[numThreads];
CountDownLatch startGun = new CountDownLatch(numThreads);
CountDownLatch done = new CountDownLatch(numThreads);
int randomValuesPerThread = randomIntBetween(5000, 20000);
for (int j = 0; j < threads.length; j++) {
threads[j] = new Thread(() -> {
startGun.countDown();
try {
startGun.await();
} catch (InterruptedException e) {
done.countDown();
throw new AssertionError(e);
}
try {
for (int i = 0; i < randomValuesPerThread; ++i) {
BytesRef bytesRef = randomFrom(random(), keyList);
try (Releasable r = keyedLock.acquire(bytesRef)) {
VersionValue versionValue = values.computeIfAbsent(bytesRef,
v -> new VersionValue(randomLong(), randomLong(), randomLong()));
boolean isDelete = versionValue instanceof DeleteVersionValue;
if (isDelete) {
map.removeTombstoneUnderLock(bytesRef);
}
if (isDelete == false && rarely()) {
versionValue = new DeleteVersionValue(versionValue.version + 1, versionValue.seqNo + 1,
versionValue.term, Long.MAX_VALUE);
} else {
versionValue = new VersionValue(versionValue.version + 1, versionValue.seqNo + 1, versionValue.term);
}
values.put(bytesRef, versionValue);
map.putUnderLock(bytesRef, versionValue);
}
}
} finally {
done.countDown();
}
});
threads[j].start();


}
do {
Map<BytesRef, VersionValue> valueMap = new HashMap<>(map.getAllCurrent());
map.beforeRefresh();
valueMap.forEach((k, v) -> {
VersionValue actualValue = map.getUnderLock(k);
assertNotNull(actualValue);
assertTrue(v.version <= actualValue.version);
});
map.afterRefresh(randomBoolean());
valueMap.forEach((k, v) -> {
VersionValue actualValue = map.getUnderLock(k);
if (actualValue != null) {
if (actualValue instanceof DeleteVersionValue) {
assertTrue(v.version <= actualValue.version); // deletes can be the same version
} else {
assertTrue(v.version < actualValue.version);
}

}
});
if (randomBoolean()) {
Thread.yield();
}
} while (done.getCount() != 0);

for (int j = 0; j < threads.length; j++) {
threads[j].join();
}
map.getAllCurrent().forEach((k, v) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a check that all deletes are tracked in the tombstone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DONE!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THANKS! ;)

VersionValue versionValue = values.get(k);
assertNotNull(versionValue);
assertEquals(v, versionValue);
});

map.getAllTombstones().forEach(e -> {
VersionValue versionValue = values.get(e.getKey());
assertNotNull(versionValue);
assertEquals(e.getValue(), versionValue);
assertTrue(versionValue instanceof DeleteVersionValue);
});
}
}