Skip to content

Drop rewriting in date_histogram #57836

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -476,30 +476,6 @@ public Relation isFieldWithinQuery(IndexReader reader,
}
}

return isFieldWithinRange(reader, fromInclusive, toInclusive);
}

/**
* Return whether all values of the given {@link IndexReader} are within the range,
* outside the range or cross the range. Unlike {@link #isFieldWithinQuery} this
* accepts values that are out of the range of the {@link #resolution} of this field.
* @param fromInclusive start date, inclusive
* @param toInclusive end date, inclusive
*/
public Relation isFieldWithinRange(IndexReader reader, Instant fromInclusive, Instant toInclusive)
throws IOException {
return isFieldWithinRange(reader,
resolution.convert(resolution.clampToValidRange(fromInclusive)),
resolution.convert(resolution.clampToValidRange(toInclusive)));
}

/**
* Return whether all values of the given {@link IndexReader} are within the range,
* outside the range or cross the range.
* @param fromInclusive start date, inclusive, {@link Resolution#convert(Instant) converted} to the appropriate scale
* @param toInclusive end date, inclusive, {@link Resolution#convert(Instant) converted} to the appropriate scale
*/
private Relation isFieldWithinRange(IndexReader reader, long fromInclusive, long toInclusive) throws IOException {
if (PointValues.size(reader, name()) == 0) {
// no points, so nothing matches
return Relation.DISJOINT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,13 @@

package org.elasticsearch.search.aggregations.bucket.histogram;

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.DocIdSetIterator;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.fielddata.LeafNumericFieldData;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MappedFieldType.Relation;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
Expand All @@ -50,10 +41,7 @@
import org.elasticsearch.search.aggregations.support.ValuesSourceType;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.zone.ZoneOffsetTransition;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -401,144 +389,32 @@ public String getType() {
return NAME;
}

/**
* Returns a {@linkplain ZoneId} that functions the same as
* {@link #timeZone()} on the data in the shard referred to by
* {@code context}. It <strong>attempts</strong> to convert zones that
* have non-fixed offsets into fixed offset zones that produce the
* same results on all data in the shard.
* <p>
* We go about this in three phases:
* <ol>
* <li>A bunch of preflight checks to see if we *can* optimize it
* <li>Find the any Instant in shard
* <li>Find the DST transition before and after that Instant
* <li>Round those into the interval
* <li>Check if the rounded value include all values within shard
* <li>If they do then return a fixed offset time zone because it
* will return the same values for all time in the shard as the
* original time zone, but faster
* <li>Otherwise return the original time zone. It'll be slower, but
* correct.
* </ol>
* <p>
* NOTE: this can't be done in rewrite() because the timezone is then also used on the
* coordinating node in order to generate missing buckets, which may cross a transition
* even though data on the shards doesn't.
*/
ZoneId rewriteTimeZone(QueryShardContext context) throws IOException {
final ZoneId tz = timeZone();
if (tz == null || tz.getRules().isFixedOffset()) {
// This time zone is already as fast as it is going to get.
return tz;
}
if (script() != null) {
// We can't be sure what dates the script will return so we don't attempt to optimize anything
return tz;
}
if (field() == null) {
// Without a field we're not going to be able to look anything up.
return tz;
}
MappedFieldType ft = context.fieldMapper(field());
if (ft == null || false == ft instanceof DateFieldMapper.DateFieldType) {
// If the field is unmapped or not a date then we can't get its range.
return tz;
}
DateFieldMapper.DateFieldType dft = (DateFieldMapper.DateFieldType) ft;
final IndexReader reader = context.getIndexReader();
if (reader == null) {
return tz;
}

Instant instant = null;
final IndexNumericFieldData fieldData = context.getForField(ft);
for (LeafReaderContext ctx : reader.leaves()) {
LeafNumericFieldData leafFD = fieldData.load(ctx);
SortedNumericDocValues values = leafFD.getLongValues();
if (values.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
instant = Instant.ofEpochMilli(values.nextValue());
break;
}
}
if (instant == null) {
return tz;
}

ZoneOffsetTransition prevOffsetTransition = tz.getRules().previousTransition(instant);
final long prevTransition;
if (prevOffsetTransition != null) {
prevTransition = prevOffsetTransition.getInstant().toEpochMilli();
} else {
prevTransition = instant.toEpochMilli();
}
ZoneOffsetTransition nextOffsetTransition = tz.getRules().nextTransition(instant);
final long nextTransition;
if (nextOffsetTransition != null) {
nextTransition = nextOffsetTransition.getInstant().toEpochMilli();
} else {
nextTransition = Long.MAX_VALUE; // fixed time-zone after prevTransition
}

// We need all not only values but also rounded values to be within
// [prevTransition, nextTransition].
final long low;

DateIntervalWrapper.IntervalTypeEnum intervalType = dateHistogramInterval.getIntervalType();
if (intervalType.equals(DateIntervalWrapper.IntervalTypeEnum.FIXED)) {
low = Math.addExact(prevTransition, dateHistogramInterval.tryIntervalAsFixedUnit().millis());
} else if (intervalType.equals(DateIntervalWrapper.IntervalTypeEnum.CALENDAR)) {
final Rounding.DateTimeUnit intervalAsUnit = dateHistogramInterval.tryIntervalAsCalendarUnit();
final Rounding rounding = Rounding.builder(intervalAsUnit).timeZone(timeZone()).build();
low = rounding.nextRoundingValue(prevTransition);
} else {
// We're not sure what the interval was originally (legacy) so use old behavior of assuming
// calendar first, then fixed. Required because fixed/cal overlap in places ("1h")
Rounding.DateTimeUnit intervalAsUnit = dateHistogramInterval.tryIntervalAsCalendarUnit();
if (intervalAsUnit != null) {
final Rounding rounding = Rounding.builder(intervalAsUnit).timeZone(timeZone()).build();
low = rounding.nextRoundingValue(prevTransition);
} else {
final TimeValue intervalAsMillis = dateHistogramInterval.tryIntervalAsFixedUnit();
low = Math.addExact(prevTransition, intervalAsMillis.millis());
}
}
// rounding rounds down, so 'nextTransition' is a good upper bound
final long high = nextTransition;

if (dft.isFieldWithinRange(
reader, Instant.ofEpochMilli(low), Instant.ofEpochMilli(high - 1)) == Relation.WITHIN) {
// All values in this reader have the same offset despite daylight saving times.
// This is very common for location-based timezones such as Europe/Paris in
// combination with time-based indices.
return ZoneOffset.ofTotalSeconds(tz.getRules().getOffset(instant).getTotalSeconds());
}
return tz;
}

@Override
protected ValuesSourceAggregatorFactory innerBuild(QueryShardContext queryShardContext,
ValuesSourceConfig config,
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder) throws IOException {
final ZoneId tz = timeZone();
final Rounding rounding = dateHistogramInterval.createRounding(tz, offset);
// TODO once we optimize TimeIntervalRounding we won't need to rewrite the time zone
final ZoneId rewrittenTimeZone = rewriteTimeZone(queryShardContext);
final Rounding shardRounding;
if (tz == rewrittenTimeZone) {
shardRounding = rounding;
} else {
shardRounding = dateHistogramInterval.createRounding(rewrittenTimeZone, offset);
}

ExtendedBounds roundedBounds = null;
if (this.extendedBounds != null) {
// parse any string bounds to longs and round
roundedBounds = this.extendedBounds.parseAndValidate(name, queryShardContext, config.format()).round(rounding);
}
return new DateHistogramAggregatorFactory(name, config, order, keyed, minDocCount,
rounding, shardRounding, roundedBounds, queryShardContext, parent, subFactoriesBuilder, metadata);
return new DateHistogramAggregatorFactory(
name,
config,
order,
keyed,
minDocCount,
rounding,
roundedBounds,
queryShardContext,
parent,
subFactoriesBuilder,
metadata
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,26 @@ public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
private final long minDocCount;
private final ExtendedBounds extendedBounds;
private final Rounding rounding;
private final Rounding shardRounding;

public DateHistogramAggregatorFactory(String name, ValuesSourceConfig config,
BucketOrder order, boolean keyed, long minDocCount,
Rounding rounding, Rounding shardRounding, ExtendedBounds extendedBounds, QueryShardContext queryShardContext,
AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metadata) throws IOException {
public DateHistogramAggregatorFactory(
String name,
ValuesSourceConfig config,
BucketOrder order,
boolean keyed,
long minDocCount,
Rounding rounding,
ExtendedBounds extendedBounds,
QueryShardContext queryShardContext,
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metadata
) throws IOException {
super(name, config, queryShardContext, parent, subFactoriesBuilder, metadata);
this.order = order;
this.keyed = keyed;
this.minDocCount = minDocCount;
this.extendedBounds = extendedBounds;
this.rounding = rounding;
this.shardRounding = shardRounding;
}

public long minDocCount() {
Expand All @@ -89,7 +95,7 @@ protected Aggregator doCreateInternal(
// TODO: Is there a reason not to get the prepared rounding in the supplier itself?
Rounding.Prepared preparedRounding = config.getValuesSource()
.roundingPreparer(queryShardContext.getIndexReader())
.apply(shardRounding);
.apply(rounding);
return ((DateHistogramAggregationSupplier) aggregatorSupplier).build(
name,
factories,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public void testIsFieldWithinRangeEmptyReader() throws IOException {
DateFieldType ft = new DateFieldType("my_date");
assertEquals(Relation.DISJOINT, ft.isFieldWithinQuery(reader, "2015-10-12", "2016-04-03",
randomBoolean(), randomBoolean(), null, null, context));
assertEquals(Relation.DISJOINT, ft.isFieldWithinRange(reader, instant("2015-10-12"), instant("2016-04-03")));
}

public void testIsFieldWithinQueryDateMillis() throws IOException {
Expand Down Expand Up @@ -109,49 +108,11 @@ public void isFieldWithinRangeTestCase(DateFieldType ft) throws IOException {
doTestIsFieldWithinQuery(ft, reader, DateTimeZone.UTC, alternateFormat);

QueryRewriteContext context = new QueryRewriteContext(xContentRegistry(), writableRegistry(), null, () -> nowInMillis);
assertEquals(Relation.INTERSECTS, ft.isFieldWithinRange(reader, instant("2015-10-09"), instant("2016-01-02")));
assertEquals(Relation.INTERSECTS, ft.isFieldWithinRange(reader, instant("2016-01-02"), instant("2016-06-20")));
assertEquals(Relation.INTERSECTS, ft.isFieldWithinRange(reader, instant("2016-01-02"), instant("2016-02-12")));
assertEquals(Relation.DISJOINT, ft.isFieldWithinRange(reader, instant("2014-01-02"), instant("2015-02-12")));
assertEquals(Relation.DISJOINT, ft.isFieldWithinRange(reader, instant("2016-05-11"), instant("2016-08-30")));
assertEquals(Relation.WITHIN, ft.isFieldWithinRange(reader, instant("2015-09-25"), instant("2016-05-29")));
assertEquals(Relation.WITHIN, ft.isFieldWithinRange(reader, instant("2015-10-12"), instant("2016-04-03")));
assertEquals(Relation.INTERSECTS,
ft.isFieldWithinRange(reader, instant("2015-10-12").plusMillis(1), instant("2016-04-03").minusMillis(1)));
assertEquals(Relation.INTERSECTS,
ft.isFieldWithinRange(reader, instant("2015-10-12").plusMillis(1), instant("2016-04-03")));
assertEquals(Relation.INTERSECTS,
ft.isFieldWithinRange(reader, instant("2015-10-12"), instant("2016-04-03").minusMillis(1)));
assertEquals(Relation.INTERSECTS,
ft.isFieldWithinRange(reader, instant("2015-10-12").plusNanos(1), instant("2016-04-03").minusNanos(1)));
assertEquals(ft.resolution() == Resolution.NANOSECONDS ? Relation.INTERSECTS : Relation.WITHIN, // Millis round down here.
ft.isFieldWithinRange(reader, instant("2015-10-12").plusNanos(1), instant("2016-04-03")));
assertEquals(Relation.INTERSECTS,
ft.isFieldWithinRange(reader, instant("2015-10-12"), instant("2016-04-03").minusNanos(1)));

// Some edge cases
assertEquals(Relation.WITHIN, ft.isFieldWithinRange(reader, Instant.EPOCH, instant("2016-04-03")));
assertEquals(Relation.WITHIN, ft.isFieldWithinRange(reader, Instant.ofEpochMilli(-1000), instant("2016-04-03")));
assertEquals(Relation.WITHIN, ft.isFieldWithinRange(reader, Instant.ofEpochMilli(Long.MIN_VALUE), instant("2016-04-03")));
assertEquals(Relation.WITHIN, ft.isFieldWithinRange(reader, instant("2015-10-12"), Instant.ofEpochMilli(Long.MAX_VALUE)));

// Fields with no value indexed.
DateFieldType ft2 = new DateFieldType("my_date2");

assertEquals(Relation.DISJOINT, ft2.isFieldWithinQuery(reader, "2015-10-09", "2016-01-02", false, false, null, null, context));
assertEquals(Relation.DISJOINT, ft2.isFieldWithinRange(reader, instant("2015-10-09"), instant("2016-01-02")));

// Fire a bunch of random values into isFieldWithinRange to make sure it doesn't crash
for (int iter = 0; iter < 1000; iter++) {
long min = randomLong();
long max = randomLong();
if (min > max) {
long swap = max;
max = min;
min = swap;
}
ft.isFieldWithinRange(reader, Instant.ofEpochMilli(min), Instant.ofEpochMilli(max));
}

IOUtils.close(reader, w, dir);
}
Expand Down
Loading