Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.logging.LogConfigurator;
import org.elasticsearch.index.codec.Elasticsearch92Lucene103Codec;
import org.elasticsearch.index.codec.tsdb.BinaryDVCompressionMode;
import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand Down Expand Up @@ -257,7 +258,7 @@ private static IndexWriterConfig createIndexWriterConfig(boolean optimizedMergeE
);
config.setLeafSorter(DataStream.TIMESERIES_LEAF_READERS_SORTER);
config.setMergePolicy(new LogByteSizeMergePolicy());
var docValuesFormat = new ES819TSDBDocValuesFormat(4096, 512, optimizedMergeEnabled);
var docValuesFormat = new ES819TSDBDocValuesFormat(4096, 512, optimizedMergeEnabled, BinaryDVCompressionMode.COMPRESSED_WITH_ZSTD);
config.setCodec(new Elasticsearch92Lucene103Codec() {
@Override
public DocValuesFormat getDocValuesFormatForField(String field) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,22 @@ public KnnVectorsFormat getKnnVectorsFormatForField(String field) {
}

public DocValuesFormat getDocValuesFormatForField(String field) {
if (useTSDBDocValuesFormat(field)) {
if (useTSDBDocValuesFormat(field) || isBinaryDocValueField(field)) {
return tsdbDocValuesFormat;
}
return docValuesFormat;
}

boolean isBinaryDocValueField(final String field) {
if (mapperService != null) {
Mapper mapper = mapperService.mappingLookup().getMapper(field);
if (mapper != null && "wildcard".equals(mapper.typeName())) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is not actually how we want to enable this feature, given that we want it for all BinaryDocValues. I'm thinking we need some sort of PerTypeDelegatingDocValueFormat, which would delegate all types to Lucene90DocValuesFormat except for binary would be delegate to ES819TSDBDocValuesFormat. Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this is for using compressed binary doc values out side of logsdb / tsdb use cases. Let's wait with doing this for now? Let's just always enable compressed binary doc values if tsdb doc values codec is used.

I think we should add the compressed binary doc. values compression to Lucene90DocValuesFormat in Lucene. In order to do this, I think we need to make compression more pluggable (similar to stored fields). Because zstds isn't available in Lucene and I think we need to default to lz4.

return true;
}
}
return false;
}

boolean useTSDBDocValuesFormat(final String field) {
if (excludeFields(field)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.codec.tsdb;

public enum BinaryDVCompressionMode {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe make more use of this abstraction here? For example I think we can add methods:

  • To get the Compressor instance. For NO_COMPRESS this would return null and for the other this would return ZstdCompressor.
  • Add minBlockBytes() that returns CompressedBinaryBlockWriter#MIN_BLOCK_BYTES or -1 (for uncompressed).
  • And perhaps add a level() method that returns -1 or CompressedBinaryBlockWriter#ZSTD_LEVEL? But maybe this isn't necessary.


NO_COMPRESS((byte) 0),
COMPRESSED_WITH_ZSTD((byte) 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename to COMPRESSED_WITH_ZSTD_LEVEL)1?


public final byte code;

BinaryDVCompressionMode(byte code) {
this.code = code;
}

public static BinaryDVCompressionMode fromMode(byte mode) {
return switch (mode) {
case 0 -> NO_COMPRESS;
case 1 -> COMPRESSED_WITH_ZSTD;
default -> throw new IllegalStateException("unknown compression mode [" + mode + "]");
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.codec.tsdb.es819;

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.packed.DirectMonotonicWriter;
import org.elasticsearch.core.IOUtils;

import java.io.Closeable;
import java.io.IOException;

/**
* Like OffsetsAccumulator builds offsets and stores in a DirectMonotonicWriter. But write to temp file
* rather than directly to a DirectMonotonicWriter because the number of values is unknown. If number of
* values if known prefer OffsetsWriter.
*/
final class DelayedOffsetAccumulator implements Closeable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to include block_addresses and block_doc_ranges suffixes to FsDirectoryFactory#avoidDelegateForFdtTempFiles(...) method. Otherwise these tmp fles get mmapped, which we should avoid here.

private final Directory dir;
private final long startOffset;

private int numValues = 0;
private final IndexOutput tempOutput;
private final String suffix;

DelayedOffsetAccumulator(Directory dir, IOContext context, IndexOutput data, String suffix, long startOffset) throws IOException {
this.dir = dir;
this.startOffset = startOffset;
this.suffix = suffix;

boolean success = false;
try {
tempOutput = dir.createTempOutput(data.getName(), suffix, context);
CodecUtil.writeHeader(tempOutput, ES819TSDBDocValuesFormat.META_CODEC + suffix, ES819TSDBDocValuesFormat.VERSION_CURRENT);
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(this); // self-close because constructor caller can't
}
}
}

public void addDoc(long delta) throws IOException {
tempOutput.writeVLong(delta);
numValues++;
}

public void build(IndexOutput meta, IndexOutput data) throws IOException {
CodecUtil.writeFooter(tempOutput);
IOUtils.close(tempOutput);

// write the offsets info to the meta file by reading from temp file
try (ChecksumIndexInput tempInput = dir.openChecksumInput(tempOutput.getName());) {
CodecUtil.checkHeader(
tempInput,
ES819TSDBDocValuesFormat.META_CODEC + suffix,
ES819TSDBDocValuesFormat.VERSION_CURRENT,
ES819TSDBDocValuesFormat.VERSION_CURRENT
);
Throwable priorE = null;
try {
final DirectMonotonicWriter writer = DirectMonotonicWriter.getInstance(
meta,
data,
numValues + 1,
ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT
);

long offset = startOffset;
writer.add(offset);
for (int i = 0; i < numValues; ++i) {
offset += tempInput.readVLong();
writer.add(offset);
}
writer.finish();
} catch (Throwable e) {
priorE = e;
} finally {
CodecUtil.checkFooter(tempInput, priorE);
}
}
}

@Override
public void close() throws IOException {
if (tempOutput != null) {
IOUtils.close(tempOutput, () -> dir.deleteFile(tempOutput.getName()));
}
}
}
Loading