Skip to content

Begin moving date_histogram to offset rounding (backport of #50873) #50978

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions server/src/main/java/org/elasticsearch/common/Rounding.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,19 @@ public void writeTo(StreamOutput out) throws IOException {
*/
public abstract long nextRoundingValue(long value);

/**
* How "offset" this rounding is from the traditional "start" of the period.
* @deprecated We're in the process of abstracting offset *into* Rounding
* so keep any usage to migratory shims
*/
@Deprecated
public abstract long offset();

/**
* Strip the {@code offset} from these bounds.
*/
public abstract Rounding withoutOffset();

@Override
public abstract boolean equals(Object obj);

Expand Down Expand Up @@ -425,6 +438,16 @@ public long nextRoundingValue(long utcMillis) {
}
}

@Override
public long offset() {
return 0;
}

@Override
public Rounding withoutOffset() {
return this;
}

@Override
public int hashCode() {
return Objects.hash(unit, timeZone);
Expand Down Expand Up @@ -556,6 +579,16 @@ public long nextRoundingValue(long time) {
.toInstant().toEpochMilli();
}

@Override
public long offset() {
return 0;
}

@Override
public Rounding withoutOffset() {
return this;
}

@Override
public int hashCode() {
return Objects.hash(interval, timeZone);
Expand Down Expand Up @@ -617,8 +650,17 @@ public long round(long value) {

@Override
public long nextRoundingValue(long value) {
// This isn't needed by the current users. We'll implement it when we migrate other users to it.
throw new UnsupportedOperationException("not yet supported");
return delegate.nextRoundingValue(value - offset) + offset;
}

@Override
public long offset() {
return offset;
}

@Override
public Rounding withoutOffset() {
return delegate;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,21 +500,21 @@ protected ValuesSourceAggregatorFactory<ValuesSource> innerBuild(QueryShardConte
Builder subFactoriesBuilder) throws IOException {
final ZoneId tz = timeZone();
// TODO use offset here rather than explicitly in the aggregation
final Rounding rounding = dateHistogramInterval.createRounding(tz, 0);
final Rounding rounding = dateHistogramInterval.createRounding(tz, offset);
final ZoneId rewrittenTimeZone = rewriteTimeZone(queryShardContext);
final Rounding shardRounding;
if (tz == rewrittenTimeZone) {
shardRounding = rounding;
} else {
shardRounding = dateHistogramInterval.createRounding(rewrittenTimeZone, 0);
shardRounding = dateHistogramInterval.createRounding(rewrittenTimeZone, offset);
}

ExtendedBounds roundedBounds = null;
if (this.extendedBounds != null) {
// parse any string bounds to longs and round
roundedBounds = this.extendedBounds.parseAndValidate(name, queryShardContext, config.format()).round(rounding);
}
return new DateHistogramAggregatorFactory(name, config, offset, order, keyed, minDocCount,
return new DateHistogramAggregatorFactory(name, config, order, keyed, minDocCount,
rounding, shardRounding, roundedBounds, queryShardContext, parent, subFactoriesBuilder, metaData);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,16 @@ class DateHistogramAggregator extends BucketsAggregator {
private final ExtendedBounds extendedBounds;

private final LongHash bucketOrds;
private long offset;

DateHistogramAggregator(String name, AggregatorFactories factories, Rounding rounding, Rounding shardRounding,
long offset, BucketOrder order, boolean keyed,
BucketOrder order, boolean keyed,
long minDocCount, @Nullable ExtendedBounds extendedBounds, @Nullable ValuesSource.Numeric valuesSource,
DocValueFormat formatter, SearchContext aggregationContext,
Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {

super(name, factories, aggregationContext, parent, pipelineAggregators, metaData);
this.rounding = rounding;
this.shardRounding = shardRounding;
this.offset = offset;
this.order = InternalOrder.validate(order, this);
this.keyed = keyed;
this.minDocCount = minDocCount;
Expand Down Expand Up @@ -113,7 +111,7 @@ public void collect(int doc, long bucket) throws IOException {
long value = values.nextValue();
// We can use shardRounding here, which is sometimes more efficient
// if daylight saving times are involved.
long rounded = shardRounding.round(value - offset) + offset;
long rounded = shardRounding.round(value);
assert rounded >= previousRounded;
if (rounded == previousRounded) {
continue;
Expand Down Expand Up @@ -150,7 +148,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
? new InternalDateHistogram.EmptyBucketInfo(rounding, buildEmptySubAggregations(), extendedBounds)
: null;
return new InternalDateHistogram(name, buckets, order, minDocCount, offset, emptyBucketInfo, formatter, keyed,
return new InternalDateHistogram(name, buckets, order, minDocCount, rounding.offset(), emptyBucketInfo, formatter, keyed,
pipelineAggregators(), metaData());
}

Expand All @@ -159,8 +157,8 @@ public InternalAggregation buildEmptyAggregation() {
InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
? new InternalDateHistogram.EmptyBucketInfo(rounding, buildEmptySubAggregations(), extendedBounds)
: null;
return new InternalDateHistogram(name, Collections.emptyList(), order, minDocCount, offset, emptyBucketInfo, formatter, keyed,
pipelineAggregators(), metaData());
return new InternalDateHistogram(name, Collections.emptyList(), order, minDocCount, rounding.offset(), emptyBucketInfo, formatter,
keyed, pipelineAggregators(), metaData());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
public final class DateHistogramAggregatorFactory
extends ValuesSourceAggregatorFactory<ValuesSource> {

private final long offset;
private final BucketOrder order;
private final boolean keyed;
private final long minDocCount;
Expand All @@ -48,12 +47,11 @@ public final class DateHistogramAggregatorFactory
private final Rounding shardRounding;

public DateHistogramAggregatorFactory(String name, ValuesSourceConfig<ValuesSource> config,
long offset, BucketOrder order, boolean keyed, long minDocCount,
BucketOrder order, boolean keyed, long minDocCount,
Rounding rounding, Rounding shardRounding, ExtendedBounds extendedBounds, QueryShardContext queryShardContext,
AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metaData) throws IOException {
super(name, config, queryShardContext, parent, subFactoriesBuilder, metaData);
this.offset = offset;
this.order = order;
this.keyed = keyed;
this.minDocCount = minDocCount;
Expand Down Expand Up @@ -104,7 +102,7 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource,
private Aggregator createAggregator(ValuesSource.Numeric valuesSource, SearchContext searchContext,
Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
return new DateHistogramAggregator(name, factories, rounding, shardRounding, offset, order, keyed, minDocCount, extendedBounds,
return new DateHistogramAggregator(name, factories, rounding, shardRounding, order, keyed, minDocCount, extendedBounds,
valuesSource, config.format(), searchContext, parent, pipelineAggregators, metaData);
}

Expand All @@ -113,7 +111,7 @@ private Aggregator createRangeAggregator(ValuesSource.Range valuesSource,
Aggregator parent,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
return new DateRangeHistogramAggregator(name, factories, rounding, shardRounding, offset, order, keyed, minDocCount, extendedBounds,
return new DateRangeHistogramAggregator(name, factories, rounding, shardRounding, order, keyed, minDocCount, extendedBounds,
valuesSource, config.format(), searchContext, parent, pipelineAggregators, metaData);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,9 @@ class DateRangeHistogramAggregator extends BucketsAggregator {
private final ExtendedBounds extendedBounds;

private final LongHash bucketOrds;
private long offset;

DateRangeHistogramAggregator(String name, AggregatorFactories factories, Rounding rounding, Rounding shardRounding,
long offset, BucketOrder order, boolean keyed,
BucketOrder order, boolean keyed,
long minDocCount, @Nullable ExtendedBounds extendedBounds, @Nullable ValuesSource.Range valuesSource,
DocValueFormat formatter, SearchContext aggregationContext,
Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Expand All @@ -79,7 +78,6 @@ class DateRangeHistogramAggregator extends BucketsAggregator {
super(name, factories, aggregationContext, parent, pipelineAggregators, metaData);
this.rounding = rounding;
this.shardRounding = shardRounding;
this.offset = offset;
this.order = InternalOrder.validate(order, this);
this.keyed = keyed;
this.minDocCount = minDocCount;
Expand Down Expand Up @@ -126,8 +124,8 @@ public void collect(int doc, long bucket) throws IOException {
// The encoding should ensure that this assert is always true.
assert from >= previousFrom : "Start of range not >= previous start";
final Long to = (Long) range.getTo();
final long startKey = offsetAwareRounding(shardRounding, from, offset);
final long endKey = offsetAwareRounding(shardRounding, to, offset);
final long startKey = shardRounding.round(from);
final long endKey = shardRounding.round(to);
for (long key = startKey > previousKey ? startKey : previousKey; key <= endKey;
key = shardRounding.nextRoundingValue(key)) {
if (key == previousKey) {
Expand All @@ -153,10 +151,6 @@ public void collect(int doc, long bucket) throws IOException {
};
}

private long offsetAwareRounding(Rounding rounding, long value, long offset) {
return rounding.round(value - offset) + offset;
}

@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0;
Expand All @@ -175,7 +169,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
? new InternalDateHistogram.EmptyBucketInfo(rounding, buildEmptySubAggregations(), extendedBounds)
: null;
return new InternalDateHistogram(name, buckets, order, minDocCount, offset, emptyBucketInfo, formatter, keyed,
return new InternalDateHistogram(name, buckets, order, minDocCount, rounding.offset(), emptyBucketInfo, formatter, keyed,
pipelineAggregators(), metaData());
}

Expand All @@ -184,8 +178,8 @@ public InternalAggregation buildEmptyAggregation() {
InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
? new InternalDateHistogram.EmptyBucketInfo(rounding, buildEmptySubAggregations(), extendedBounds)
: null;
return new InternalDateHistogram(name, Collections.emptyList(), order, minDocCount, offset, emptyBucketInfo, formatter, keyed,
pipelineAggregators(), metaData());
return new InternalDateHistogram(name, Collections.emptyList(), order, minDocCount, rounding.offset(), emptyBucketInfo, formatter,
keyed, pipelineAggregators(), metaData());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,11 @@ ExtendedBounds parseAndValidate(String aggName, QueryShardContext queryShardCont
}

ExtendedBounds round(Rounding rounding) {
return new ExtendedBounds(min != null ? rounding.round(min) : null, max != null ? rounding.round(max) : null);
// Extended bounds shouldn't be effected by the offset
Rounding effectiveRounding = rounding.withoutOffset();
return new ExtendedBounds(
min != null ? effectiveRounding.round(min) : null,
max != null ? effectiveRounding.round(max) : null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ public Number getKey(MultiBucketsAggregation.Bucket bucket) {

@Override
public Number nextKey(Number key) {
return emptyBucketInfo.rounding.nextRoundingValue(key.longValue() - offset) + offset;
return emptyBucketInfo.rounding.nextRoundingValue(key.longValue());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,18 @@ public void testOffsetRounding() {
Rounding rounding = Rounding.builder(Rounding.DateTimeUnit.DAY_OF_MONTH).offset(twoHours).build();
assertThat(rounding.round(0), equalTo(-oneDay + twoHours));
assertThat(rounding.round(twoHours), equalTo(twoHours));
assertThat(rounding.nextRoundingValue(-oneDay), equalTo(-oneDay + twoHours));
assertThat(rounding.nextRoundingValue(0), equalTo(twoHours));
assertThat(rounding.withoutOffset().round(0), equalTo(0L));
assertThat(rounding.withoutOffset().nextRoundingValue(0), equalTo(oneDay));

rounding = Rounding.builder(Rounding.DateTimeUnit.DAY_OF_MONTH).offset(-twoHours).build();
assertThat(rounding.round(0), equalTo(-twoHours));
assertThat(rounding.round(oneDay - twoHours), equalTo(oneDay - twoHours));
assertThat(rounding.nextRoundingValue(-oneDay), equalTo(-twoHours));
assertThat(rounding.nextRoundingValue(0), equalTo(oneDay - twoHours));
assertThat(rounding.withoutOffset().round(0), equalTo(0L));
assertThat(rounding.withoutOffset().nextRoundingValue(0), equalTo(oneDay));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ static AggregatorFactory getRandomSequentiallyOrderedParentAgg() throws IOExcept
new AggregatorFactories.Builder(), Collections.emptyMap());
break;
case 1:
factory = new DateHistogramAggregatorFactory("name", mock(ValuesSourceConfig.class), 0L,
factory = new DateHistogramAggregatorFactory("name", mock(ValuesSourceConfig.class),
mock(InternalOrder.class), false, 0L, mock(Rounding.class), mock(Rounding.class),
mock(ExtendedBounds.class), mock(QueryShardContext.class), mock(AggregatorFactory.class),
new AggregatorFactories.Builder(), Collections.emptyMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void testParentValidations() throws IOException {
// Date Histogram
aggBuilders.clear();
aggBuilders.add(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "sum"));
parent = new DateHistogramAggregatorFactory("name", valuesSourceConfig, 0L,
parent = new DateHistogramAggregatorFactory("name", valuesSourceConfig,
mock(InternalOrder.class), false, 0L, mock(Rounding.class), mock(Rounding.class),
mock(ExtendedBounds.class), mock(QueryShardContext.class), mock(AggregatorFactory.class),
new AggregatorFactories.Builder(), Collections.emptyMap());
Expand Down