2121
2222import org .elasticsearch .common .io .stream .StreamInput ;
2323import org .elasticsearch .common .io .stream .StreamOutput ;
24+ import org .elasticsearch .common .xcontent .XContentBuilder ;
2425import org .elasticsearch .search .aggregations .AggregatorFactory ;
2526import org .elasticsearch .search .aggregations .InternalAggregation ;
2627import org .elasticsearch .search .aggregations .InternalAggregation .ReduceContext ;
3334import org .elasticsearch .search .aggregations .pipeline .PipelineAggregator ;
3435import org .elasticsearch .search .aggregations .pipeline .PipelineAggregatorFactory ;
3536import org .elasticsearch .search .aggregations .pipeline .PipelineAggregatorStreams ;
37+ import org .elasticsearch .search .aggregations .pipeline .bucketmetrics .BucketMetricsParser ;
38+ import org .elasticsearch .search .aggregations .support .format .ValueFormat ;
3639import org .elasticsearch .search .aggregations .support .format .ValueFormatter ;
3740import org .elasticsearch .search .aggregations .support .format .ValueFormatterStreams ;
3841
3942import java .io .IOException ;
4043import java .util .ArrayList ;
4144import java .util .List ;
4245import java .util .Map ;
46+ import java .util .Objects ;
4347import java .util .stream .Collectors ;
4448import java .util .stream .StreamSupport ;
4549
@@ -109,16 +113,37 @@ public void doWriteTo(StreamOutput out) throws IOException {
109113
110114 public static class Factory extends PipelineAggregatorFactory {
111115
112- private final ValueFormatter formatter ;
116+ private String format ;
113117
114- public Factory (String name , String [] bucketsPaths , ValueFormatter formatter ) {
118+ public Factory (String name , String [] bucketsPaths ) {
115119 super (name , TYPE .name (), bucketsPaths );
116- this .formatter = formatter ;
120+ }
121+
122+ /**
123+ * Sets the format to use on the output of this aggregation.
124+ */
125+ public void format (String format ) {
126+ this .format = format ;
127+ }
128+
129+ /**
130+ * Gets the format to use on the output of this aggregation.
131+ */
132+ public String format () {
133+ return format ;
134+ }
135+
136+ protected ValueFormatter formatter () {
137+ if (format != null ) {
138+ return ValueFormat .Patternable .Number .format (format ).formatter ();
139+ } else {
140+ return ValueFormatter .RAW ;
141+ }
117142 }
118143
119144 @ Override
120145 protected PipelineAggregator createInternal (Map <String , Object > metaData ) throws IOException {
121- return new CumulativeSumPipelineAggregator (name , bucketsPaths , formatter , metaData );
146+ return new CumulativeSumPipelineAggregator (name , bucketsPaths , formatter () , metaData );
122147 }
123148
124149 @ Override
@@ -139,5 +164,35 @@ public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactorie
139164 }
140165 }
141166
167+ @ Override
168+ protected final XContentBuilder internalXContent (XContentBuilder builder , Params params ) throws IOException {
169+ if (format != null ) {
170+ builder .field (BucketMetricsParser .FORMAT .getPreferredName (), format );
171+ }
172+ return builder ;
173+ }
174+
175+ @ Override
176+ protected final PipelineAggregatorFactory doReadFrom (String name , String [] bucketsPaths , StreamInput in ) throws IOException {
177+ Factory factory = new Factory (name , bucketsPaths );
178+ factory .format = in .readOptionalString ();
179+ return factory ;
180+ }
181+
182+ @ Override
183+ protected final void doWriteTo (StreamOutput out ) throws IOException {
184+ out .writeOptionalString (format );
185+ }
186+
187+ @ Override
188+ protected int doHashCode () {
189+ return Objects .hash (format );
190+ }
191+
192+ @ Override
193+ protected boolean doEquals (Object obj ) {
194+ Factory other = (Factory ) obj ;
195+ return Objects .equals (format , other .format );
196+ }
142197 }
143198}
0 commit comments