Skip to content

TSDB: fix the time_series in order collect priority #85526

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/85526.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 85526
summary: "TSDB: fix the time_series in order collect priority"
area: TSDB
type: bug
issues: []
22 changes: 18 additions & 4 deletions server/src/main/java/org/elasticsearch/index/IndexSortConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ public final class IndexSortConfig {
Setting.Property.Final
);

public static final FieldSortSpec[] TIME_SERIES_SORT;

static {
FieldSortSpec timeStampSpec = new FieldSortSpec(DataStreamTimestampFieldMapper.DEFAULT_PATH);
timeStampSpec.order = SortOrder.DESC;
TIME_SERIES_SORT = new FieldSortSpec[] { new FieldSortSpec(TimeSeriesIdFieldMapper.NAME), timeStampSpec };
}

private static String validateMissingValue(String missing) {
if ("_last".equals(missing) == false && "_first".equals(missing) == false) {
throw new IllegalArgumentException("Illegal missing value:[" + missing + "], " + "must be one of [_last, _first]");
Expand Down Expand Up @@ -138,9 +146,7 @@ public IndexSortConfig(IndexSettings indexSettings) {
this.indexMode = indexSettings.getMode();

if (this.indexMode == IndexMode.TIME_SERIES) {
FieldSortSpec timeStampSpec = new FieldSortSpec(DataStreamTimestampFieldMapper.DEFAULT_PATH);
timeStampSpec.order = SortOrder.DESC;
this.sortSpecs = new FieldSortSpec[] { new FieldSortSpec(TimeSeriesIdFieldMapper.NAME), timeStampSpec };
this.sortSpecs = TIME_SERIES_SORT;
return;
}

Expand Down Expand Up @@ -264,7 +270,7 @@ private static void validateIndexSortField(SortField sortField) {
}
}

static class FieldSortSpec {
public static class FieldSortSpec {
final String field;
SortOrder order;
MultiValueMode mode;
Expand All @@ -273,6 +279,14 @@ static class FieldSortSpec {
FieldSortSpec(String field) {
this.field = field;
}

public String getField() {
return field;
}

public SortOrder getOrder() {
return order;
}
}

/** We only allow index sorting on these types */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.sort.SortOrder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import static org.elasticsearch.index.IndexSortConfig.TIME_SERIES_SORT;

/**
* An IndexSearcher wrapper that executes the searches in time-series indices by traversing them by tsid and timestamp
* TODO: Convert it to use index sort instead of hard-coded tsid and timestamp values
Expand All @@ -43,10 +47,18 @@ public class TimeSeriesIndexSearcher {
// IndexSearcher would most of the time be a ContextIndexSearcher that has important logic related to e.g. document-level security.
private final IndexSearcher searcher;
private final List<Runnable> cancellations;
private final boolean tsidReverse;
private final boolean timestampReverse;

public TimeSeriesIndexSearcher(IndexSearcher searcher, List<Runnable> cancellations) {
this.searcher = searcher;
this.cancellations = cancellations;

assert TIME_SERIES_SORT.length == 2;
assert TIME_SERIES_SORT[0].getField().equals(TimeSeriesIdFieldMapper.NAME);
assert TIME_SERIES_SORT[1].getField().equals(DataStreamTimestampFieldMapper.DEFAULT_PATH);
this.tsidReverse = TIME_SERIES_SORT[0].getOrder() == SortOrder.DESC;
this.timestampReverse = TIME_SERIES_SORT[1].getOrder() == SortOrder.DESC;
}

public void search(Query query, BucketCollector bucketCollector) throws IOException {
Expand Down Expand Up @@ -78,7 +90,11 @@ public void search(Query query, BucketCollector bucketCollector) throws IOExcept
PriorityQueue<LeafWalker> queue = new PriorityQueue<>(searcher.getIndexReader().leaves().size()) {
@Override
protected boolean lessThan(LeafWalker a, LeafWalker b) {
return a.timestamp < b.timestamp;
if (timestampReverse) {
return a.timestamp > b.timestamp;
} else {
return a.timestamp < b.timestamp;
}
}
};

Expand All @@ -103,7 +119,7 @@ protected boolean lessThan(LeafWalker a, LeafWalker b) {
}

// Re-populate the queue with walkers on the same TSID.
private static boolean populateQueue(List<LeafWalker> leafWalkers, PriorityQueue<LeafWalker> queue) throws IOException {
private boolean populateQueue(List<LeafWalker> leafWalkers, PriorityQueue<LeafWalker> queue) throws IOException {
BytesRef currentTsid = null;
assert queue.size() == 0;
Iterator<LeafWalker> it = leafWalkers.iterator();
Expand All @@ -120,17 +136,16 @@ private static boolean populateQueue(List<LeafWalker> leafWalkers, PriorityQueue
currentTsid = tsid;
}
int comp = tsid.compareTo(currentTsid);
if (comp < 0) {
if (comp == 0) {
queue.add(leafWalker);
} else if ((tsidReverse && comp > 0) || (false == tsidReverse && comp < 0)) {
// We've found a walker on a lower TSID, so we remove all walkers
// collected so far from the queue and reset our comparison TSID
// to be the lower value
queue.clear();
queue.add(leafWalker);
currentTsid = tsid;
}
if (comp == 0) {
queue.add(leafWalker);
}
}
assert queueAllHaveTsid(queue, currentTsid);
// If all walkers are exhausted then nothing will have been added to the queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
Expand All @@ -40,6 +41,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.index.IndexSortConfig.TIME_SERIES_SORT;

public class TimeSeriesIndexSearcherTests extends ESTestCase {

// Index a random set of docs with timestamp and tsid with the tsid/timestamp sort order
Expand All @@ -50,12 +53,13 @@ public void testCollectInOrderAcrossSegments() throws IOException, InterruptedEx

Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setIndexSort(
new Sort(
new SortField(TimeSeriesIdFieldMapper.NAME, SortField.Type.STRING),
new SortField(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, SortField.Type.LONG)
)
boolean tsidReverse = TIME_SERIES_SORT[0].getOrder() == SortOrder.DESC;
boolean timestampReverse = TIME_SERIES_SORT[1].getOrder() == SortOrder.DESC;
Sort sort = new Sort(
new SortField(TimeSeriesIdFieldMapper.NAME, SortField.Type.STRING, tsidReverse),
new SortField(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, SortField.Type.LONG, timestampReverse)
);
iwc.setIndexSort(sort);
RandomIndexWriter iw = new RandomIndexWriter(random(), dir, iwc);

AtomicInteger clock = new AtomicInteger(0);
Expand Down Expand Up @@ -110,9 +114,15 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
BytesRef latestTSID = tsid.lookupOrd(tsid.ordValue());
long latestTimestamp = timestamp.longValue();
if (currentTSID != null) {
assertTrue(currentTSID + "->" + latestTSID.utf8ToString(), latestTSID.compareTo(currentTSID) >= 0);
assertTrue(
currentTSID + "->" + latestTSID.utf8ToString(),
tsidReverse ? latestTSID.compareTo(currentTSID) <= 0 : latestTSID.compareTo(currentTSID) >= 0
);
if (latestTSID.equals(currentTSID)) {
assertTrue(currentTimestamp + "->" + latestTimestamp, latestTimestamp >= currentTimestamp);
assertTrue(
currentTimestamp + "->" + latestTimestamp,
timestampReverse ? latestTimestamp <= currentTimestamp : latestTimestamp >= currentTimestamp
);
}
}
currentTimestamp = latestTimestamp;
Expand Down Expand Up @@ -145,5 +155,4 @@ public ScoreMode scoreMode() {
dir.close();

}

}