Skip to content

Commit ffbe047

Browse files
committed
Revert "Add more flexibility to MovingFunction window alignment (#44360)"
This reverts commit 1a58a48.
1 parent 1a58a48 commit ffbe047

File tree

5 files changed

+25
-112
lines changed

5 files changed

+25
-112
lines changed

docs/reference/aggregations/pipeline/movfn-aggregation.asciidoc

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,14 @@ A `moving_fn` aggregation looks like this in isolation:
2424
--------------------------------------------------
2525
// NOTCONSOLE
2626

27-
[[moving-fn-params]]
28-
.`moving_fn` Parameters
27+
[[moving-avg-params]]
28+
.`moving_avg` Parameters
2929
[options="header"]
3030
|===
3131
|Parameter Name |Description |Required |Default Value
3232
|`buckets_path` |Path to the metric of interest (see <<buckets-path-syntax, `buckets_path` Syntax>> for more details |Required |
3333
|`window` |The size of window to "slide" across the histogram. |Required |
3434
|`script` |The script that should be executed on each window of data |Required |
35-
|`shift` |<<shift-parameter, Shift>> of window position. |Optional | 0
3635
|===
3736

3837
`moving_fn` aggregations must be embedded inside of a `histogram` or `date_histogram` aggregation. They can be
@@ -170,18 +169,6 @@ POST /_search
170169
// CONSOLE
171170
// TEST[setup:sales]
172171

173-
[[shift-parameter]]
174-
==== shift parameter
175-
176-
By default (with `shift = 0`), the window that is offered for calculation is the last `n` values excluding the current bucket.
177-
Increasing `shift` by 1 moves starting window position by `1` to the right.
178-
179-
- To include current bucket to the window, use `shift = 1`.
180-
- For center alignment (`n / 2` values before and after the current bucket), use `shift = window / 2`.
181-
- For right alignment (`n` values after the current bucket), use `shift = window`.
182-
183-
If either of window edges moves outside the borders of data series, the window shrinks to include available values only.
184-
185172
==== Pre-built Functions
186173

187174
For convenience, a number of functions have been prebuilt and are available inside the `moving_fn` script context:

server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilder.java

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.elasticsearch.search.aggregations.pipeline;
2121

22-
import org.elasticsearch.Version;
2322
import org.elasticsearch.common.ParseField;
2423
import org.elasticsearch.common.Strings;
2524
import org.elasticsearch.common.io.stream.StreamInput;
@@ -49,14 +48,12 @@
4948
public class MovFnPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<MovFnPipelineAggregationBuilder> {
5049
public static final String NAME = "moving_fn";
5150
private static final ParseField WINDOW = new ParseField("window");
52-
private static final ParseField SHIFT = new ParseField("shift");
5351

5452
private final Script script;
5553
private final String bucketsPathString;
5654
private String format = null;
5755
private GapPolicy gapPolicy = GapPolicy.SKIP;
5856
private int window;
59-
private int shift;
6057

6158
private static final Function<String, ConstructingObjectParser<MovFnPipelineAggregationBuilder, Void>> PARSER
6259
= name -> {
@@ -71,7 +68,6 @@ public class MovFnPipelineAggregationBuilder extends AbstractPipelineAggregation
7168
(p, c) -> Script.parse(p), Script.SCRIPT_PARSE_FIELD, ObjectParser.ValueType.OBJECT_OR_STRING);
7269
parser.declareInt(ConstructingObjectParser.constructorArg(), WINDOW);
7370

74-
parser.declareInt(MovFnPipelineAggregationBuilder::setShift, SHIFT);
7571
parser.declareString(MovFnPipelineAggregationBuilder::format, FORMAT);
7672
parser.declareField(MovFnPipelineAggregationBuilder::gapPolicy, p -> {
7773
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
@@ -101,11 +97,6 @@ public MovFnPipelineAggregationBuilder(StreamInput in) throws IOException {
10197
format = in.readOptionalString();
10298
gapPolicy = GapPolicy.readFrom(in);
10399
window = in.readInt();
104-
if (in.getVersion().onOrAfter(Version.V_8_0_0)) { // TODO change this after backport
105-
shift = in.readInt();
106-
} else {
107-
shift = 0;
108-
}
109100
}
110101

111102
@Override
@@ -115,9 +106,6 @@ protected void doWriteTo(StreamOutput out) throws IOException {
115106
out.writeOptionalString(format);
116107
gapPolicy.writeTo(out);
117108
out.writeInt(window);
118-
if (out.getVersion().onOrAfter(Version.V_8_0_0)) { // TODO change this after backport
119-
out.writeInt(shift);
120-
}
121109
}
122110

123111
/**
@@ -180,13 +168,9 @@ public void setWindow(int window) {
180168
this.window = window;
181169
}
182170

183-
public void setShift(int shift) {
184-
this.shift = shift;
185-
}
186-
187171
@Override
188172
public void doValidate(AggregatorFactory parent, Collection<AggregationBuilder> aggFactories,
189-
Collection<PipelineAggregationBuilder> pipelineAggregatorFactories) {
173+
Collection<PipelineAggregationBuilder> pipelineAggregatoractories) {
190174
if (window <= 0) {
191175
throw new IllegalArgumentException("[" + WINDOW.getPreferredName() + "] must be a positive, non-zero integer.");
192176
}
@@ -196,7 +180,7 @@ public void doValidate(AggregatorFactory parent, Collection<AggregationBuilder>
196180

197181
@Override
198182
protected PipelineAggregator createInternal(Map<String, Object> metaData) {
199-
return new MovFnPipelineAggregator(name, bucketsPathString, script, window, shift, formatter(), gapPolicy, metaData);
183+
return new MovFnPipelineAggregator(name, bucketsPathString, script, window, formatter(), gapPolicy, metaData);
200184
}
201185

202186
@Override
@@ -208,7 +192,6 @@ protected XContentBuilder internalXContent(XContentBuilder builder, Params param
208192
}
209193
builder.field(GAP_POLICY.getPreferredName(), gapPolicy.getName());
210194
builder.field(WINDOW.getPreferredName(), window);
211-
builder.field(SHIFT.getPreferredName(), shift);
212195
return builder;
213196
}
214197

@@ -242,7 +225,7 @@ protected boolean overrideBucketsPath() {
242225

243226
@Override
244227
public int hashCode() {
245-
return Objects.hash(super.hashCode(), bucketsPathString, script, format, gapPolicy, window, shift);
228+
return Objects.hash(super.hashCode(), bucketsPathString, script, format, gapPolicy, window);
246229
}
247230

248231
@Override
@@ -255,8 +238,7 @@ public boolean equals(Object obj) {
255238
&& Objects.equals(script, other.script)
256239
&& Objects.equals(format, other.format)
257240
&& Objects.equals(gapPolicy, other.gapPolicy)
258-
&& Objects.equals(window, other.window)
259-
&& Objects.equals(shift, other.shift);
241+
&& Objects.equals(window, other.window);
260242
}
261243

262244
@Override

server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java

Lines changed: 6 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package org.elasticsearch.search.aggregations.pipeline;
2121

22-
import org.elasticsearch.Version;
22+
import org.elasticsearch.common.collect.EvictingQueue;
2323
import org.elasticsearch.common.io.stream.StreamInput;
2424
import org.elasticsearch.common.io.stream.StreamOutput;
2525
import org.elasticsearch.script.Script;
@@ -63,17 +63,15 @@ public class MovFnPipelineAggregator extends PipelineAggregator {
6363
private final Script script;
6464
private final String bucketsPath;
6565
private final int window;
66-
private final int shift;
6766

68-
MovFnPipelineAggregator(String name, String bucketsPath, Script script, int window, int shift, DocValueFormat formatter,
67+
MovFnPipelineAggregator(String name, String bucketsPath, Script script, int window, DocValueFormat formatter,
6968
BucketHelpers.GapPolicy gapPolicy, Map<String, Object> metadata) {
7069
super(name, new String[]{bucketsPath}, metadata);
7170
this.bucketsPath = bucketsPath;
7271
this.script = script;
7372
this.formatter = formatter;
7473
this.gapPolicy = gapPolicy;
7574
this.window = window;
76-
this.shift = shift;
7775
}
7876

7977
public MovFnPipelineAggregator(StreamInput in) throws IOException {
@@ -83,11 +81,6 @@ public MovFnPipelineAggregator(StreamInput in) throws IOException {
8381
gapPolicy = BucketHelpers.GapPolicy.readFrom(in);
8482
bucketsPath = in.readString();
8583
window = in.readInt();
86-
if (in.getVersion().onOrAfter(Version.V_8_0_0)) { // TODO change this after backport
87-
shift = in.readInt();
88-
} else {
89-
shift = 0;
90-
}
9184
}
9285

9386
@Override
@@ -97,9 +90,6 @@ protected void doWriteTo(StreamOutput out) throws IOException {
9790
gapPolicy.writeTo(out);
9891
out.writeString(bucketsPath);
9992
out.writeInt(window);
100-
if (out.getVersion().onOrAfter(Version.V_8_0_0)) { // TODO change this after backport
101-
out.writeInt(shift);
102-
}
10393
}
10494

10595
@Override
@@ -116,6 +106,7 @@ public InternalAggregation reduce(InternalAggregation aggregation, InternalAggre
116106
HistogramFactory factory = (HistogramFactory) histo;
117107

118108
List<MultiBucketsAggregation.Bucket> newBuckets = new ArrayList<>();
109+
EvictingQueue<Double> values = new EvictingQueue<>(this.window);
119110

120111
// Initialize the script
121112
MovingFunctionScript.Factory scriptFactory = reduceContext.scriptService().compile(script, MovingFunctionScript.CONTEXT);
@@ -126,53 +117,30 @@ public InternalAggregation reduce(InternalAggregation aggregation, InternalAggre
126117

127118
MovingFunctionScript executableScript = scriptFactory.newInstance();
128119

129-
List<Double> values = buckets.stream()
130-
.map(b -> resolveBucketValue(histo, b, bucketsPaths()[0], gapPolicy))
131-
.filter(v -> v != null && v.isNaN() == false)
132-
.collect(Collectors.toList());
133-
134-
int index = 0;
135120
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
136121
Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy);
137122

138123
// Default is to reuse existing bucket. Simplifies the rest of the logic,
139124
// since we only change newBucket if we can add to it
140125
MultiBucketsAggregation.Bucket newBucket = bucket;
141126

142-
if (thisBucketValue != null && thisBucketValue.isNaN() == false) {
127+
if (thisBucketValue != null && thisBucketValue.equals(Double.NaN) == false) {
143128

144129
// The custom context mandates that the script returns a double (not Double) so we
145130
// don't need null checks, etc.
146-
int fromIndex = clamp(index - window + shift, values);
147-
int toIndex = clamp(index + shift, values);
148-
double movavg = executableScript.execute(
149-
vars,
150-
values.subList(fromIndex, toIndex).stream()
151-
.mapToDouble(Double::doubleValue)
152-
.toArray()
153-
);
131+
double movavg = executableScript.execute(vars, values.stream().mapToDouble(Double::doubleValue).toArray());
154132

155133
List<InternalAggregation> aggs = StreamSupport
156134
.stream(bucket.getAggregations().spliterator(), false)
157135
.map(InternalAggregation.class::cast)
158136
.collect(Collectors.toList());
159137
aggs.add(new InternalSimpleValue(name(), movavg, formatter, new ArrayList<>(), metaData()));
160138
newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), new InternalAggregations(aggs));
161-
index++;
139+
values.offer(thisBucketValue);
162140
}
163141
newBuckets.add(newBucket);
164142
}
165143

166144
return factory.createAggregation(newBuckets);
167145
}
168-
169-
private int clamp(int index, List<Double> list) {
170-
if (index < 0) {
171-
return 0;
172-
}
173-
if (index > list.size()) {
174-
return list.size();
175-
}
176-
return index;
177-
}
178146
}

server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilderSerializationTests.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.common.io.stream.Writeable;
2323
import org.elasticsearch.common.xcontent.XContentParser;
2424
import org.elasticsearch.script.Script;
25+
import org.elasticsearch.search.aggregations.pipeline.MovFnPipelineAggregationBuilder;
2526
import org.elasticsearch.test.AbstractSerializingTestCase;
2627

2728
import java.io.IOException;
@@ -30,14 +31,7 @@ public class MovFnPipelineAggregationBuilderSerializationTests extends AbstractS
3031

3132
@Override
3233
protected MovFnPipelineAggregationBuilder createTestInstance() {
33-
MovFnPipelineAggregationBuilder builder = new MovFnPipelineAggregationBuilder(
34-
randomAlphaOfLength(10),
35-
"foo",
36-
new Script("foo"),
37-
randomIntBetween(1, 10)
38-
);
39-
builder.setShift(randomIntBetween(1, 10));
40-
return builder;
34+
return new MovFnPipelineAggregationBuilder(randomAlphaOfLength(10), "foo", new Script("foo"), randomIntBetween(1, 10));
4135
}
4236

4337
@Override

server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnUnitTests.java

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import java.util.Map;
5454
import java.util.Set;
5555
import java.util.function.Consumer;
56-
import java.util.stream.Collectors;
5756

5857
import static org.hamcrest.Matchers.equalTo;
5958
import static org.mockito.Mockito.mock;
@@ -80,42 +79,25 @@ public class MovFnUnitTests extends AggregatorTestCase {
8079
private static final List<Integer> datasetValues = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
8180

8281
public void testMatchAllDocs() throws IOException {
83-
check(0, List.of(Double.NaN, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0));
84-
}
85-
86-
public void testShift() throws IOException {
87-
check(1, List.of(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0));
88-
check(5, List.of(5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 10.0, 10.0, Double.NaN, Double.NaN));
89-
check(-5, List.of(Double.NaN, Double.NaN, Double.NaN, Double.NaN, Double.NaN, Double.NaN, 1.0, 2.0, 3.0, 4.0));
90-
}
91-
92-
public void testWideWindow() throws IOException {
93-
Script script = new Script(Script.DEFAULT_SCRIPT_TYPE, "painless", "test", Collections.emptyMap());
94-
MovFnPipelineAggregationBuilder builder = new MovFnPipelineAggregationBuilder("mov_fn", "avg", script, 100);
95-
builder.setShift(50);
96-
check(builder, script, List.of(10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0));
97-
}
98-
99-
private void check(int shift, List<Double> expected) throws IOException {
82+
Query query = new MatchAllDocsQuery();
10083
Script script = new Script(Script.DEFAULT_SCRIPT_TYPE, "painless", "test", Collections.emptyMap());
101-
MovFnPipelineAggregationBuilder builder = new MovFnPipelineAggregationBuilder("mov_fn", "avg", script, 3);
102-
builder.setShift(shift);
103-
check(builder, script, expected);
104-
}
10584

106-
private void check(MovFnPipelineAggregationBuilder builder, Script script, List<Double> expected) throws IOException {
107-
Query query = new MatchAllDocsQuery();
10885
DateHistogramAggregationBuilder aggBuilder = new DateHistogramAggregationBuilder("histo");
10986
aggBuilder.calendarInterval(DateHistogramInterval.DAY).field(DATE_FIELD);
11087
aggBuilder.subAggregation(new AvgAggregationBuilder("avg").field(VALUE_FIELD));
111-
aggBuilder.subAggregation(builder);
88+
aggBuilder.subAggregation(new MovFnPipelineAggregationBuilder("mov_fn", "avg", script, 3));
11289

11390
executeTestCase(query, aggBuilder, histogram -> {
91+
assertEquals(10, histogram.getBuckets().size());
11492
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
115-
List<Double> actual = buckets.stream()
116-
.map(bucket -> ((InternalSimpleValue) (bucket.getAggregations().get("mov_fn"))).value())
117-
.collect(Collectors.toList());
118-
assertThat(actual, equalTo(expected));
93+
for (int i = 0; i < buckets.size(); i++) {
94+
if (i == 0) {
95+
assertThat(((InternalSimpleValue)(buckets.get(i).getAggregations().get("mov_fn"))).value(), equalTo(Double.NaN));
96+
} else {
97+
assertThat(((InternalSimpleValue)(buckets.get(i).getAggregations().get("mov_fn"))).value(), equalTo(((double) i)));
98+
}
99+
100+
}
119101
}, 1000, script);
120102
}
121103

0 commit comments

Comments
 (0)