|
23 | 23 | import org.elasticsearch.common.collect.EvictingQueue;
|
24 | 24 | import org.elasticsearch.common.io.stream.StreamInput;
|
25 | 25 | import org.elasticsearch.common.io.stream.StreamOutput;
|
| 26 | +import org.elasticsearch.common.xcontent.XContentBuilder; |
26 | 27 | import org.elasticsearch.search.aggregations.InternalAggregation;
|
27 | 28 | import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
|
28 | 29 | import org.elasticsearch.search.aggregations.InternalAggregation.Type;
|
29 | 30 | import org.elasticsearch.search.aggregations.InternalAggregations;
|
30 | 31 | import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
|
| 32 | +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; |
31 | 33 | import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
|
32 | 34 | import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
33 | 35 | import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
|
34 | 36 | import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
|
| 37 | +import org.elasticsearch.search.aggregations.support.format.ValueFormat; |
35 | 38 | import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
36 | 39 | import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
37 | 40 |
|
38 | 41 | import java.io.IOException;
|
39 | 42 | import java.util.ArrayList;
|
40 | 43 | import java.util.List;
|
41 | 44 | import java.util.Map;
|
| 45 | +import java.util.Objects; |
42 | 46 | import java.util.stream.Collectors;
|
43 | 47 | import java.util.stream.StreamSupport;
|
44 | 48 |
|
45 |
| -import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; |
46 | 49 | import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
|
47 | 50 |
|
48 | 51 | public class SerialDiffPipelineAggregator extends PipelineAggregator {
|
@@ -144,20 +147,105 @@ public void doWriteTo(StreamOutput out) throws IOException {
|
144 | 147 |
|
145 | 148 | public static class Factory extends PipelineAggregatorFactory {
|
146 | 149 |
|
147 |
| - private final ValueFormatter formatter; |
148 |
| - private GapPolicy gapPolicy; |
149 |
| - private int lag; |
| 150 | + private String format; |
| 151 | + private GapPolicy gapPolicy = GapPolicy.SKIP; |
| 152 | + private int lag = 1; |
150 | 153 |
|
151 |
| - public Factory(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, GapPolicy gapPolicy, int lag) { |
| 154 | + public Factory(String name, String[] bucketsPaths) { |
152 | 155 | super(name, TYPE.name(), bucketsPaths);
|
153 |
| - this.formatter = formatter; |
154 |
| - this.gapPolicy = gapPolicy; |
| 156 | + } |
| 157 | + |
| 158 | + /** |
| 159 | + * Sets the lag to use when calculating the serial difference. |
| 160 | + */ |
| 161 | + public void lag(int lag) { |
155 | 162 | this.lag = lag;
|
156 | 163 | }
|
157 | 164 |
|
| 165 | + /** |
| 166 | + * Gets the lag to use when calculating the serial difference. |
| 167 | + */ |
| 168 | + public int lag() { |
| 169 | + return lag; |
| 170 | + } |
| 171 | + |
| 172 | + /** |
| 173 | + * Sets the format to use on the output of this aggregation. |
| 174 | + */ |
| 175 | + public void format(String format) { |
| 176 | + this.format = format; |
| 177 | + } |
| 178 | + |
| 179 | + /** |
| 180 | + * Gets the format to use on the output of this aggregation. |
| 181 | + */ |
| 182 | + public String format() { |
| 183 | + return format; |
| 184 | + } |
| 185 | + |
| 186 | + /** |
| 187 | + * Sets the GapPolicy to use on the output of this aggregation. |
| 188 | + */ |
| 189 | + public void gapPolicy(GapPolicy gapPolicy) { |
| 190 | + this.gapPolicy = gapPolicy; |
| 191 | + } |
| 192 | + |
| 193 | + /** |
| 194 | + * Gets the GapPolicy to use on the output of this aggregation. |
| 195 | + */ |
| 196 | + public GapPolicy gapPolicy() { |
| 197 | + return gapPolicy; |
| 198 | + } |
| 199 | + |
| 200 | + protected ValueFormatter formatter() { |
| 201 | + if (format != null) { |
| 202 | + return ValueFormat.Patternable.Number.format(format).formatter(); |
| 203 | + } else { |
| 204 | + return ValueFormatter.RAW; |
| 205 | + } |
| 206 | + } |
| 207 | + |
158 | 208 | @Override
|
159 | 209 | protected PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException {
|
160 |
| - return new SerialDiffPipelineAggregator(name, bucketsPaths, formatter, gapPolicy, lag, metaData); |
| 210 | + return new SerialDiffPipelineAggregator(name, bucketsPaths, formatter(), gapPolicy, lag, metaData); |
| 211 | + } |
| 212 | + |
| 213 | + @Override |
| 214 | + protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { |
| 215 | + if (format != null) { |
| 216 | + builder.field(SerialDiffParser.FORMAT.getPreferredName(), format); |
| 217 | + } |
| 218 | + builder.field(SerialDiffParser.GAP_POLICY.getPreferredName(), gapPolicy.getName()); |
| 219 | + builder.field(SerialDiffParser.LAG.getPreferredName(), lag); |
| 220 | + return builder; |
| 221 | + } |
| 222 | + |
| 223 | + @Override |
| 224 | + protected PipelineAggregatorFactory doReadFrom(String name, String[] bucketsPaths, StreamInput in) throws IOException { |
| 225 | + Factory factory = new Factory(name, bucketsPaths); |
| 226 | + factory.format = in.readOptionalString(); |
| 227 | + factory.gapPolicy = GapPolicy.readFrom(in); |
| 228 | + factory.lag = in.readVInt(); |
| 229 | + return factory; |
| 230 | + } |
| 231 | + |
| 232 | + @Override |
| 233 | + protected void doWriteTo(StreamOutput out) throws IOException { |
| 234 | + out.writeOptionalString(format); |
| 235 | + gapPolicy.writeTo(out); |
| 236 | + out.writeVInt(lag); |
| 237 | + } |
| 238 | + |
| 239 | + @Override |
| 240 | + protected int doHashCode() { |
| 241 | + return Objects.hash(format, gapPolicy, lag); |
| 242 | + } |
| 243 | + @Override |
| 244 | + protected boolean doEquals(Object obj) { |
| 245 | + Factory other = (Factory) obj; |
| 246 | + return Objects.equals(format, other.format) |
| 247 | + && Objects.equals(gapPolicy, other.gapPolicy) |
| 248 | + && Objects.equals(lag, other.lag); |
161 | 249 | }
|
162 | 250 |
|
163 | 251 | }
|
|
0 commit comments