Skip to content

Commit f18b9d5

Browse files
Add segment sorter for data streams (#75195)
It is beneficial to sort segments within a datastream's index by desc order of their max timestamp field, so that the most recent (in terms of timestamp) segments will be first. This allows to speed up sort query on @timestamp desc field, which is the most common type of query for datastreams, as we are mostly concerned with the recent data. This patch addressed this for writable indices. Segments' sorter is different from index sorting. An index sorter by itself is concerned about the order of docs within an individual segment (and not how the segments are organized), while the segment sorter is only used during search and allows to start docs collection with the "right" segment, so we can terminate the collection faster. This PR adds a property to IndexShard `isDataStreamIndex` that shows if a shard is a part of datastream.
1 parent 574d643 commit f18b9d5

File tree

15 files changed

+212
-20
lines changed

15 files changed

+212
-20
lines changed

server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
5757
config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(),
5858
config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(),
5959
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
60-
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
60+
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
6161
}
6262

6363
@Override

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
*/
88
package org.elasticsearch.cluster.metadata;
99

10+
import org.apache.lucene.document.LongPoint;
11+
import org.apache.lucene.index.LeafReader;
12+
import org.apache.lucene.index.PointValues;
1013
import org.elasticsearch.cluster.AbstractDiffable;
1114
import org.elasticsearch.cluster.Diff;
1215
import org.elasticsearch.core.Nullable;
@@ -25,6 +28,7 @@
2528
import java.util.ArrayList;
2629
import java.util.Collection;
2730
import java.util.Collections;
31+
import java.util.Comparator;
2832
import java.util.HashMap;
2933
import java.util.List;
3034
import java.util.Locale;
@@ -36,6 +40,25 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
3640

3741
public static final String BACKING_INDEX_PREFIX = ".ds-";
3842
public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd");
43+
// Datastreams' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations
44+
public static Comparator<LeafReader> DATASTREAM_LEAF_READERS_SORTER =
45+
Comparator.comparingLong(
46+
(LeafReader r) -> {
47+
try {
48+
PointValues points = r.getPointValues(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD);
49+
if (points != null) {
50+
byte[] sortValue = points.getMaxPackedValue();
51+
return LongPoint.decodeDimension(sortValue, 0);
52+
} else if (r.numDocs() == 0) {
53+
// points can be null if the segment contains only deleted documents
54+
return Long.MIN_VALUE;
55+
}
56+
} catch (IOException e) {
57+
}
58+
throw new IllegalStateException("Can't access [" +
59+
DataStream.TimestampField.FIXED_TIMESTAMP_FIELD + "] field for the data stream!");
60+
})
61+
.reversed();
3962

4063
private final LongSupplier timeProvider;
4164
private final String name;

server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.apache.lucene.analysis.Analyzer;
1111
import org.apache.lucene.codecs.Codec;
12+
import org.apache.lucene.index.LeafReader;
1213
import org.apache.lucene.index.MergePolicy;
1314
import org.apache.lucene.search.QueryCache;
1415
import org.apache.lucene.search.QueryCachingPolicy;
@@ -32,6 +33,7 @@
3233
import org.elasticsearch.plugins.IndexStorePlugin;
3334
import org.elasticsearch.threadpool.ThreadPool;
3435

36+
import java.util.Comparator;
3537
import java.util.List;
3638
import java.util.Objects;
3739
import java.util.function.LongSupplier;
@@ -70,6 +72,7 @@ public final class EngineConfig {
7072
private final CircuitBreakerService circuitBreakerService;
7173
private final LongSupplier globalCheckpointSupplier;
7274
private final Supplier<RetentionLeases> retentionLeasesSupplier;
75+
private final Comparator<LeafReader> leafSorter;
7376

7477
/**
7578
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
@@ -131,7 +134,8 @@ public EngineConfig(
131134
LongSupplier globalCheckpointSupplier,
132135
Supplier<RetentionLeases> retentionLeasesSupplier,
133136
LongSupplier primaryTermSupplier,
134-
IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) {
137+
IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
138+
Comparator<LeafReader> leafSorter) {
135139
this.shardId = shardId;
136140
this.indexSettings = indexSettings;
137141
this.threadPool = threadPool;
@@ -169,6 +173,7 @@ public EngineConfig(
169173
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
170174
this.primaryTermSupplier = primaryTermSupplier;
171175
this.snapshotCommitSupplier = snapshotCommitSupplier;
176+
this.leafSorter = leafSorter;
172177
}
173178

174179
/**
@@ -353,4 +358,12 @@ public LongSupplier getPrimaryTermSupplier() {
353358
public IndexStorePlugin.SnapshotCommitSupplier getSnapshotCommitSupplier() {
354359
return snapshotCommitSupplier;
355360
}
361+
362+
/**
363+
* Returns how segments should be sorted for reading or @null if no sorting should be applied.
364+
*/
365+
@Nullable
366+
public Comparator<LeafReader> getLeafSorter() {
367+
return leafSorter;
368+
}
356369
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2193,6 +2193,11 @@ private IndexWriterConfig getIndexWriterConfig() {
21932193
if (config().getIndexSort() != null) {
21942194
iwc.setIndexSort(config().getIndexSort());
21952195
}
2196+
// Provide a custom leaf sorter, so that index readers opened from this writer
2197+
// will have its leaves sorted according the given leaf sorter.
2198+
if (engineConfig.getLeafSorter() != null) {
2199+
iwc.setLeafSorter(engineConfig.getLeafSorter());
2200+
}
21962201
return iwc;
21972202
}
21982203

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,8 @@ protected final ElasticsearchDirectoryReader wrapReader(DirectoryReader reader,
199199
}
200200

201201
protected DirectoryReader open(IndexCommit commit) throws IOException {
202+
// TODO: provide engineConfig.getLeafSorter() when opening a DirectoryReader from a commit
203+
// should be available from Lucene v 8.10
202204
assert Transports.assertNotTransportThread("opening index commit of a read-only engine");
203205
if (lazilyLoadSoftDeletes) {
204206
return new LazySoftDeletesDirectoryReaderWrapper(DirectoryReader.open(commit), Lucene.SOFT_DELETES_FIELD);

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.index.shard;
1010

1111
import com.carrotsearch.hppc.ObjectLongMap;
12+
1213
import org.apache.logging.log4j.Logger;
1314
import org.apache.logging.log4j.message.ParameterizedMessage;
1415
import org.apache.lucene.analysis.Analyzer;
@@ -185,6 +186,7 @@
185186
import java.util.stream.Collectors;
186187
import java.util.stream.StreamSupport;
187188

189+
import static org.elasticsearch.cluster.metadata.DataStream.DATASTREAM_LEAF_READERS_SORTER;
188190
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
189191
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
190192

@@ -283,6 +285,7 @@ Runnable getGlobalCheckpointSyncer() {
283285
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
284286
private final RefreshPendingLocationListener refreshPendingLocationListener;
285287
private volatile boolean useRetentionLeasesInPeerRecovery;
288+
private final boolean isDataStreamIndex; // if a shard is a part of data stream
286289

287290
public IndexShard(
288291
final ShardRouting shardRouting,
@@ -387,6 +390,7 @@ public boolean shouldCache(Query query) {
387390
persistMetadata(path, indexSettings, shardRouting, null, logger);
388391
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
389392
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
393+
this.isDataStreamIndex = mapperService == null ? false : mapperService.mappingLookup().isDataStreamTimestampFieldEnabled();
390394
}
391395

392396
public ThreadPool getThreadPool() {
@@ -2912,7 +2916,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
29122916
globalCheckpointSupplier,
29132917
replicationTracker::getRetentionLeases,
29142918
this::getOperationPrimaryTerm,
2915-
snapshotCommitSupplier);
2919+
snapshotCommitSupplier,
2920+
isDataStreamIndex ? DATASTREAM_LEAF_READERS_SORTER : null);
29162921
}
29172922

29182923
/**

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2948,7 +2948,8 @@ public void testRecoverFromForeignTranslog() throws IOException {
29482948
() -> UNASSIGNED_SEQ_NO,
29492949
() -> RetentionLeases.EMPTY,
29502950
primaryTerm::get,
2951-
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER);
2951+
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
2952+
null);
29522953
expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig));
29532954

29542955
engine = createEngine(store, primaryTranslogDir); // and recover again!
@@ -6021,7 +6022,7 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception {
60216022
config.getQueryCachingPolicy(), translogConfig, config.getFlushMergesAfter(),
60226023
config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(),
60236024
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
6024-
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
6025+
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
60256026
try (InternalEngine engine = createEngine(configWithWarmer)) {
60266027
assertThat(warmedUpReaders, empty());
60276028
assertThat(expectThrows(Throwable.class, () -> engine.acquireSearcher("test")).getMessage(),

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3928,7 +3928,7 @@ public void testCloseShardWhileEngineIsWarming() throws Exception {
39283928
config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(),
39293929
config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(),
39303930
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
3931-
config.getPrimaryTermSupplier(), IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER);
3931+
config.getPrimaryTermSupplier(), IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, config.getLeafSorter());
39323932
return new InternalEngine(configWithWarmer);
39333933
});
39343934
Thread recoveryThread = new Thread(() -> expectThrows(AlreadyClosedException.class, () -> recoverShardFromStore(shard)));

server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) {
139139
() -> SequenceNumbers.NO_OPS_PERFORMED,
140140
() -> RetentionLeases.EMPTY,
141141
() -> primaryTerm,
142-
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER);
142+
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
143+
null);
143144
engine = new InternalEngine(config);
144145
engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE);
145146
listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation);

server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ EngineConfig configWithRefreshListener(EngineConfig config, ReferenceManager.Ref
371371
config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(),
372372
config.getExternalRefreshListener(), internalRefreshListener, config.getIndexSort(),
373373
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
374-
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
374+
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
375375
}
376376

377377
ThreadPoolStats.Stats getRefreshThreadPoolStats() {

test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ public EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSuppl
238238
config.getTranslogConfig(), config.getFlushMergesAfter(),
239239
config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(),
240240
config.getCircuitBreakerService(), globalCheckpointSupplier, config.retentionLeasesSupplier(),
241-
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
241+
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
242242
}
243243

244244
public EngineConfig copy(EngineConfig config, Analyzer analyzer) {
@@ -248,7 +248,7 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) {
248248
config.getTranslogConfig(), config.getFlushMergesAfter(),
249249
config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(),
250250
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
251-
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
251+
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
252252
}
253253

254254
public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) {
@@ -258,7 +258,7 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) {
258258
config.getTranslogConfig(), config.getFlushMergesAfter(),
259259
config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(),
260260
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
261-
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
261+
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
262262
}
263263

264264
@Override
@@ -669,7 +669,8 @@ public EngineConfig config(
669669
globalCheckpointSupplier,
670670
retentionLeasesSupplier,
671671
primaryTerm,
672-
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER);
672+
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
673+
null);
673674
}
674675

675676
protected EngineConfig config(EngineConfig config, Store store, Path translogPath) {
@@ -683,7 +684,7 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat
683684
translogConfig, config.getFlushMergesAfter(), config.getExternalRefreshListener(),
684685
config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(),
685686
config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
686-
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier());
687+
config.getPrimaryTermSupplier(), config.getSnapshotCommitSupplier(), config.getLeafSorter());
687688
}
688689

689690
protected EngineConfig noOpConfig(IndexSettings indexSettings, Store store, Path translogPath) {

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,8 @@ public void onFailedEngine(String reason, Exception e) {
272272
globalCheckpoint::longValue,
273273
() -> RetentionLeases.EMPTY,
274274
() -> primaryTerm.get(),
275-
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER);
275+
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
276+
null);
276277
}
277278

278279
private static Store createStore(

x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@
6262
import org.elasticsearch.indices.InvalidAliasNameException;
6363
import org.elasticsearch.plugins.Plugin;
6464
import org.elasticsearch.rest.RestStatus;
65+
import org.elasticsearch.search.SearchHit;
66+
import org.elasticsearch.search.builder.SearchSourceBuilder;
67+
import org.elasticsearch.search.fetch.subphase.FieldAndFormat;
6568
import org.elasticsearch.test.ESIntegTestCase;
6669
import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
6770
import org.elasticsearch.xpack.core.action.DeleteDataStreamAction;
@@ -1213,10 +1216,6 @@ public void testGetDataStream() throws Exception {
12131216
assertThat(metricsFooDataStream.getIlmPolicy(), is(nullValue()));
12141217
}
12151218

1216-
private static void assertBackingIndex(String backingIndex, String timestampFieldPathInMapping) {
1217-
assertBackingIndex(backingIndex, timestampFieldPathInMapping, Map.of("type", "date"));
1218-
}
1219-
12201219
private static void assertBackingIndex(String backingIndex, String timestampFieldPathInMapping, Map<?, ?> expectedMapping) {
12211220
GetIndexResponse getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices(backingIndex)).actionGet();
12221221
assertThat(getIndexResponse.getSettings().get(backingIndex), notNullValue());
@@ -1481,6 +1480,42 @@ public void testMultiThreadedRollover() throws Exception {
14811480
);
14821481
}
14831482

1483+
// Test that datastream's segments by default are sorted on @timestamp desc
1484+
public void testSegmentsSortedOnTimestampDesc() throws Exception {
1485+
Settings settings = Settings.builder()
1486+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
1487+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
1488+
.build();
1489+
putComposableIndexTemplate("template_for_foo", null, List.of("metrics-foo*"), settings, null);
1490+
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("metrics-foo");
1491+
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
1492+
1493+
// We index data in the increasing order of @timestamp field
1494+
int numDocs1 = randomIntBetween(2, 10);
1495+
indexDocs("metrics-foo", numDocs1); // 1st segment
1496+
int numDocs2 = randomIntBetween(2, 10);
1497+
indexDocs("metrics-foo", numDocs2); // 2nd segment
1498+
int numDocs3 = randomIntBetween(2, 10);
1499+
indexDocs("metrics-foo", numDocs3); // 3rd segment
1500+
int totalDocs = numDocs1 + numDocs2 + numDocs3;
1501+
1502+
SearchSourceBuilder source = new SearchSourceBuilder();
1503+
source.fetchField(new FieldAndFormat(DEFAULT_TIMESTAMP_FIELD, "epoch_millis"));
1504+
source.size(totalDocs);
1505+
SearchRequest searchRequest = new SearchRequest(new String[] { "metrics-foo" }, source);
1506+
SearchResponse searchResponse = client().search(searchRequest).actionGet();
1507+
assertEquals(totalDocs, searchResponse.getHits().getTotalHits().value);
1508+
SearchHit[] hits = searchResponse.getHits().getHits();
1509+
assertEquals(totalDocs, hits.length);
1510+
1511+
// Test that when we read data, segments come in the reverse order with a segment with the latest date first
1512+
long timestamp1 = Long.valueOf(hits[0].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of 1st seg
1513+
long timestamp2 = Long.valueOf(hits[0 + numDocs3].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of the 2nd seg
1514+
long timestamp3 = Long.valueOf(hits[0 + numDocs3 + numDocs2].field(DEFAULT_TIMESTAMP_FIELD).getValue()); // 1st doc of the 3rd seg
1515+
assertTrue(timestamp1 > timestamp2);
1516+
assertTrue(timestamp2 > timestamp3);
1517+
}
1518+
14841519
private static void verifyResolvability(String dataStream, ActionRequestBuilder<?, ?> requestBuilder, boolean fail) {
14851520
verifyResolvability(dataStream, requestBuilder, fail, 0);
14861521
}

x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/MigrateToDataTiersIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,6 @@
6060
import static org.hamcrest.Matchers.nullValue;
6161

6262
public class MigrateToDataTiersIT extends ESRestTestCase {
63-
private static final Logger logger = LogManager.getLogger(MigrateToDataTiersIT.class);
64-
6563
private String index;
6664
private String policy;
6765
private String alias;

0 commit comments

Comments
 (0)