Skip to content

Optimize the order of bytes in uuids for better compression. #24615

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.RoutingMissingException;
Expand Down Expand Up @@ -289,13 +290,11 @@ protected void doRun() throws Exception {
case CREATE:
case INDEX:
IndexRequest indexRequest = (IndexRequest) docWriteRequest;
MappingMetaData mappingMd = null;
final IndexMetaData indexMetaData = metaData.index(concreteIndex);
if (indexMetaData != null) {
mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());
}
MappingMetaData mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());
Version indexCreated = indexMetaData.getCreationVersion();
indexRequest.resolveRouting(metaData);
indexRequest.process(mappingMd, concreteIndex.getName());
indexRequest.process(indexCreated, mappingMd, concreteIndex.getName());
break;
case UPDATE:
TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ static BulkItemResultHolder executeUpdateRequestOnce(UpdateRequest updateRequest
case UPDATED:
IndexRequest indexRequest = translate.action();
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
indexRequest.process(mappingMd, concreteIndex);
indexRequest.process(metaData.getCreationVersion(), mappingMd, concreteIndex);
result = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater);
break;
case DELETED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.shard.ShardId;

Expand Down Expand Up @@ -484,7 +485,7 @@ public VersionType versionType() {
}


public void process(@Nullable MappingMetaData mappingMd, String concreteIndex) {
public void process(Version indexCreatedVersion, @Nullable MappingMetaData mappingMd, String concreteIndex) {
if (mappingMd != null) {
// might as well check for routing here
if (mappingMd.routing().required() && routing == null) {
Expand All @@ -508,7 +509,13 @@ public void process(@Nullable MappingMetaData mappingMd, String concreteIndex) {
if (id == null) {
assert autoGeneratedTimestamp == -1 : "timestamp has already been generated!";
autoGeneratedTimestamp = Math.max(0, System.currentTimeMillis()); // extra paranoia
id(UUIDs.base64UUID());
String uid;
if (indexCreatedVersion.onOrAfter(Version.V_6_0_0_beta1)) {
uid = UUIDs.base64UUID();
} else {
uid = UUIDs.legacyBase64UUID();
}
id(uid);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common;

import java.util.Base64;
import java.util.concurrent.atomic.AtomicInteger;

/**
* These are essentially flake ids (http://boundary.com/blog/2012/01/12/flake-a-decentralized-k-ordered-unique-id-generator-in-erlang) but
* we use 6 (not 8) bytes for timestamp, and use 3 (not 2) bytes for sequence number.
*/

class LegacyTimeBasedUUIDGenerator implements UUIDGenerator {

// We only use bottom 3 bytes for the sequence number. Paranoia: init with random int so that if JVM/OS/machine goes down, clock slips
// backwards, and JVM comes back up, we are less likely to be on the same sequenceNumber at the same time:
private final AtomicInteger sequenceNumber = new AtomicInteger(SecureRandomHolder.INSTANCE.nextInt());

// Used to ensure clock moves forward:
private long lastTimestamp;

private static final byte[] SECURE_MUNGED_ADDRESS = MacAddressProvider.getSecureMungedAddress();

static {
assert SECURE_MUNGED_ADDRESS.length == 6;
}

/** Puts the lower numberOfLongBytes from l into the array, starting index pos. */
private static void putLong(byte[] array, long l, int pos, int numberOfLongBytes) {
for (int i=0; i<numberOfLongBytes; ++i) {
array[pos+numberOfLongBytes-i-1] = (byte) (l >>> (i*8));
}
}

@Override
public String getBase64UUID() {
final int sequenceId = sequenceNumber.incrementAndGet() & 0xffffff;
long timestamp = System.currentTimeMillis();

synchronized (this) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why we don't make lastTimestamp an AtomicLong and use compareAndSet since we don't care if the compare and set was successful as long as we are moving forward? something like this:

long currentTimestamp = System.currentTimeMillis();
long lastUsedTimestamp = lastTimestamp.get();
long timestamp  = Math.max(currentTimestamp, lastUsedTimestamp);
if (sequenceId == 0) {
  timestamp++;
}
lastTimestamp.compareAndSet(timestamp, lastUsedTimestamp);

if that doesn't work and we really need to block or have consistent values aren't we vulnerable in terms of collisions on the same timestamp?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm working on this for a separate PR if you don't mind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it more, it would ensure that lastUsedTimestamp is greater than it was before, but not necessarily greater than timestamp. So there is a possibility of collision?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but don't we have the sequenc ID for this pupose. there are 2^31 docs to be indexed before you have a collision. I mean if System.currentTimeInMillis() goes backwards we reuse the same timestamp all the time? I am not sure how this would work at all if we can't sustain that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if System.currentTimeInMillis() goes backwards we reuse the same timestamp all the time

no, since we force the clock to tick forward whenever the sequence id is 0?

there are 2^31 docs to be indexed before you have a collision

2^24 actually, we only use the 3 lower bytes of the sequence id

// Don't let timestamp go backwards, at least "on our watch" (while this JVM is running). We are still vulnerable if we are
// shut down, clock goes backwards, and we restart... for this we randomize the sequenceNumber on init to decrease chance of
// collision:
timestamp = Math.max(lastTimestamp, timestamp);

if (sequenceId == 0) {
// Always force the clock to increment whenever sequence number is 0, in case we have a long time-slip backwards:
timestamp++;
}

lastTimestamp = timestamp;
}

final byte[] uuidBytes = new byte[15];

// Only use lower 6 bytes of the timestamp (this will suffice beyond the year 10000):
putLong(uuidBytes, timestamp, 0, 6);

// MAC address adds 6 bytes:
System.arraycopy(SECURE_MUNGED_ADDRESS, 0, uuidBytes, 6, SECURE_MUNGED_ADDRESS.length);

// Sequence number adds 3 bytes:
putLong(uuidBytes, sequenceId, 12, 3);

assert 9 + SECURE_MUNGED_ADDRESS.length == uuidBytes.length;

return Base64.getUrlEncoder().withoutPadding().encodeToString(uuidBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import java.util.concurrent.atomic.AtomicInteger;

/** These are essentially flake ids (http://boundary.com/blog/2012/01/12/flake-a-decentralized-k-ordered-unique-id-generator-in-erlang) but
* we use 6 (not 8) bytes for timestamp, and use 3 (not 2) bytes for sequence number. */
* we use 6 (not 8) bytes for timestamp, and use 3 (not 2) bytes for sequence number. We also reorder bytes in a way that does not make ids
* sort in order anymore, but is more friendly to the way that the Lucene terms dictionary is structured. */

class TimeBasedUUIDGenerator implements UUIDGenerator {

Expand All @@ -40,17 +41,20 @@ class TimeBasedUUIDGenerator implements UUIDGenerator {
assert SECURE_MUNGED_ADDRESS.length == 6;
}

/** Puts the lower numberOfLongBytes from l into the array, starting index pos. */
private static void putLong(byte[] array, long l, int pos, int numberOfLongBytes) {
for (int i=0; i<numberOfLongBytes; ++i) {
array[pos+numberOfLongBytes-i-1] = (byte) (l >>> (i*8));
}
// protected for testing
protected long currentTimeMillis() {
return System.currentTimeMillis();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth looking at nanotime which doesn't go backwards?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would not work as we need some absolute times. For instance you switch to a different JVM that uses a different origin for nanotime, you might reuse existing ids.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about initializing to System.currentTimeMillis() on startup and using that time + System.nanoTime() on generation? Regardless, if it is worth doing at all it can wait for a followup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, maybe we should focus on the order of bytes here and rethink generation separately?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, maybe we should focus on the order of bytes here and rethink generation separately?

++

}

// protected for testing
protected byte[] macAddress() {
return SECURE_MUNGED_ADDRESS;
}

@Override
public String getBase64UUID() {
final int sequenceId = sequenceNumber.incrementAndGet() & 0xffffff;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you skip the masking given the way you are using the sequence id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, because of the if (sequenceId == 0) { check below

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now.

long timestamp = System.currentTimeMillis();
long timestamp = currentTimeMillis();

synchronized (this) {
// Don't let timestamp go backwards, at least "on our watch" (while this JVM is running). We are still vulnerable if we are
Expand All @@ -67,17 +71,45 @@ public String getBase64UUID() {
}

final byte[] uuidBytes = new byte[15];

// Only use lower 6 bytes of the timestamp (this will suffice beyond the year 10000):
putLong(uuidBytes, timestamp, 0, 6);

// MAC address adds 6 bytes:
System.arraycopy(SECURE_MUNGED_ADDRESS, 0, uuidBytes, 6, SECURE_MUNGED_ADDRESS.length);

// Sequence number adds 3 bytes:
putLong(uuidBytes, sequenceId, 12, 3);

assert 9 + SECURE_MUNGED_ADDRESS.length == uuidBytes.length;
int i = 0;

// We have auto-generated ids, which are usually used for append-only workloads.
// So we try to optimize the order of bytes for indexing speed (by having quite
// unique bytes close to the beginning of the ids so that sorting is fast) and
// compression (by making sure we share common prefixes between enough ids),
// but not necessarily for lookup speed (by having the leading bytes identify
// segments whenever possible)

// Blocks in the block tree have between 25 and 48 terms. So all prefixes that
// are shared by ~30 terms should be well compressed. I first tried putting the
// two lower bytes of the sequence id in the beginning of the id, but compression
// is only triggered when you have at least 30*2^16 ~= 2M documents in a segment,
// which is already quite large. So instead, we are putting the 1st and 3rd byte
// of the sequence number so that compression starts to be triggered with smaller
// segment sizes and still gives pretty good indexing speed. We use the sequenceId
// rather than the timestamp because the distribution of the timestamp depends too
// much on the indexing rate, so it is less reliable.

uuidBytes[i++] = (byte) sequenceId;
// changes every 65k docs, so potentially every second if you have a steady indexing rate
uuidBytes[i++] = (byte) (sequenceId >>> 16);

// Now we start focusing on compression and put bytes that should not change too often.
uuidBytes[i++] = (byte) (timestamp >>> 16); // changes every ~65 secs
uuidBytes[i++] = (byte) (timestamp >>> 24); // changes every ~4.5h
uuidBytes[i++] = (byte) (timestamp >>> 32); // changes every ~50 days
uuidBytes[i++] = (byte) (timestamp >>> 40); // changes every 35 years
byte[] macAddress = macAddress();
assert macAddress.length == 6;
System.arraycopy(macAddress, 0, uuidBytes, i, macAddress.length);
i += macAddress.length;

// Finally we put the remaining bytes, which will likely not be compressed at all.
uuidBytes[i++] = (byte) (timestamp >>> 8);
uuidBytes[i++] = (byte) (sequenceId >>> 8);
uuidBytes[i++] = (byte) timestamp;

assert i == uuidBytes.length;

return Base64.getUrlEncoder().withoutPadding().encodeToString(uuidBytes);
}
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/elasticsearch/common/UUIDs.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
public class UUIDs {

private static final RandomBasedUUIDGenerator RANDOM_UUID_GENERATOR = new RandomBasedUUIDGenerator();
private static final UUIDGenerator LEGACY_TIME_UUID_GENERATOR = new LegacyTimeBasedUUIDGenerator();
private static final UUIDGenerator TIME_UUID_GENERATOR = new TimeBasedUUIDGenerator();

/** Generates a time-based UUID (similar to Flake IDs), which is preferred when generating an ID to be indexed into a Lucene index as
Expand All @@ -32,6 +33,11 @@ public static String base64UUID() {
return TIME_UUID_GENERATOR.getBase64UUID();
}

/** Legacy implementation of {@link #base64UUID()}, for pre 6.0 indices. */
public static String legacyBase64UUID() {
return LEGACY_TIME_UUID_GENERATOR.getBase64UUID();
}

/** Returns a Base64 encoded version of a Version 4.0 compatible UUID as defined here: http://www.ietf.org/rfc/rfc4122.txt, using the
* provided {@code Random} instance */
public static String randomBase64UUID(Random random) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void onResponse(InFlightOpsResponse response) {
actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight + "] ongoing operations on primary"));
} else {
// 3. now send the sync request to all the shards
String syncId = UUIDs.base64UUID();
String syncId = UUIDs.randomBase64UUID();
sendSyncRequests(syncId, activeShards, state, commitIds, shardId, totalShards, actionListener);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public RecoveryTarget(final IndexShard indexShard,
this.indexShard = indexShard;
this.sourceNode = sourceNode;
this.shardId = indexShard.shardId();
this.tempFilePrefix = RECOVERY_PREFIX + UUIDs.base64UUID() + ".";
this.tempFilePrefix = RECOVERY_PREFIX + UUIDs.randomBase64UUID() + ".";
this.store = indexShard.store();
this.ensureClusterStateVersionCallback = ensureClusterStateVersionCallback;
// make sure the store is not released until we are done.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ public void testWaitForActiveShards() {

public void testAutoGenIdTimestampIsSet() {
IndexRequest request = new IndexRequest("index", "type");
request.process(null, "index");
request.process(Version.CURRENT, null, "index");
assertTrue("expected > 0 but got: " + request.getAutoGeneratedTimestamp(), request.getAutoGeneratedTimestamp() > 0);
request = new IndexRequest("index", "type", "1");
request.process(null, "index");
request.process(Version.CURRENT, null, "index");
assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, request.getAutoGeneratedTimestamp());
}

Expand Down
72 changes: 72 additions & 0 deletions core/src/test/java/org/elasticsearch/common/UUIDTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,24 @@
*/
package org.elasticsearch.common;

import com.carrotsearch.randomizedtesting.generators.RandomPicks;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;

import java.util.HashSet;
import java.util.Random;
import java.util.Set;

public class UUIDTests extends ESTestCase {
Expand Down Expand Up @@ -100,4 +115,61 @@ public void testUUIDThreaded(UUIDGenerator uuidSource) {
}
assertEquals(count*uuids, globalSet.size());
}

public void testCompression() throws Exception {
Logger logger = Loggers.getLogger(UUIDTests.class);
// Low number so that the test runs quickly, but the results are more interesting with larger numbers
// of indexed documents
assertThat(testCompression(500000, 10000, 3, logger), Matchers.lessThan(12d)); // ~10.8 in practice
assertThat(testCompression(500000, 1000, 3, logger), Matchers.lessThan(13d)); // ~11.5 in practice
assertThat(testCompression(500000, 100, 3, logger), Matchers.lessThan(21d)); // ~19.5 in practice
}

private static double testCompression(int numDocs, int numDocsPerSecond, int numNodes, Logger logger) throws Exception {
final double intervalBetweenDocs = 1000. / numDocsPerSecond; // milliseconds
final byte[][] macAddresses = new byte[numNodes][];
Random r = random();
for (int i = 0; i < macAddresses.length; ++i) {
macAddresses[i] = new byte[6];
random().nextBytes(macAddresses[i]);
}
UUIDGenerator generator = new TimeBasedUUIDGenerator() {
double currentTimeMillis = System.currentTimeMillis();

@Override
protected long currentTimeMillis() {
currentTimeMillis += intervalBetweenDocs * 2 * r.nextDouble();
return (long) currentTimeMillis;
}

@Override
protected byte[] macAddress() {
return RandomPicks.randomFrom(r, macAddresses);
}
};
// Avoid randomization which will slow down things without improving
// the quality of this test
Directory dir = newFSDirectory(createTempDir());
IndexWriter w = new IndexWriter(dir, new IndexWriterConfig().setOpenMode(OpenMode.CREATE));
Document doc = new Document();
StringField id = new StringField("_id", "", Store.NO);
doc.add(id);
long start = System.nanoTime();
for (int i = 0; i < numDocs; ++i) {
id.setStringValue(generator.getBase64UUID());
w.addDocument(doc);
}
w.forceMerge(1);
long time = (System.nanoTime() - start) / 1000 / 1000;
w.close();
long size = 0;
for (String file : dir.listAll()) {
size += dir.fileLength(file);
}
dir.close();
double bytesPerDoc = (double) size / numDocs;
logger.info(numDocs + " docs indexed at " + numDocsPerSecond + " docs/s required " + new ByteSizeValue(size)
+ " bytes of disk space, or " + bytesPerDoc + " bytes per document. Took: " + new TimeValue(time) + ".");
return bytesPerDoc;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ protected void performOnReplica(BulkShardRequest request, IndexShard replica) th
private TransportWriteAction.WritePrimaryResult<BulkShardRequest, BulkShardResponse> executeShardBulkOnPrimary(IndexShard primary, BulkShardRequest request) throws Exception {
for (BulkItemRequest itemRequest : request.items()) {
if (itemRequest.request() instanceof IndexRequest) {
((IndexRequest) itemRequest.request()).process(null, index.getName());
((IndexRequest) itemRequest.request()).process(Version.CURRENT, null, index.getName());
}
}
final TransportWriteAction.WritePrimaryResult<BulkShardRequest, BulkShardResponse> result =
Expand Down
Loading