@@ -209,6 +209,7 @@ private class TimeSeriesBucketCollector extends BucketCollector {
209
209
private long bucketsCreated ;
210
210
private final RollupBucketBuilder rollupBucketBuilder = new RollupBucketBuilder ();
211
211
long lastTimestamp = Long .MAX_VALUE ;
212
+ long lastHistoTimestamp = Long .MAX_VALUE ;
212
213
BytesRef lastTsid = null ;
213
214
214
215
TimeSeriesBucketCollector (BulkProcessor bulkProcessor ) {
@@ -232,14 +233,18 @@ public void collect(int docId, long owningBucketOrd) throws IOException {
232
233
final BytesRef tsid = aggCtx .getTsid ();
233
234
assert tsid != null : "Document without [" + TimeSeriesIdFieldMapper .NAME + "] field was found." ;
234
235
final long timestamp = aggCtx .getTimestamp ();
235
- final long histoTimestamp = rounding .round (timestamp );
236
+
237
+ boolean tsidChanged = tsid .equals (rollupBucketBuilder .tsid ()) == false ;
238
+ if (tsidChanged || timestamp < lastHistoTimestamp ) {
239
+ lastHistoTimestamp = rounding .round (timestamp );
240
+ }
236
241
237
242
logger .trace (
238
243
"Doc: [{}] - _tsid: [{}], @timestamp: [{}}] -> rollup bucket ts: [{}]" ,
239
244
docId ,
240
245
DocValueFormat .TIME_SERIES_ID .format (tsid ),
241
246
timestampFormat .format (timestamp ),
242
- timestampFormat .format (histoTimestamp )
247
+ timestampFormat .format (lastHistoTimestamp )
243
248
);
244
249
245
250
/*
@@ -262,15 +267,15 @@ public void collect(int docId, long owningBucketOrd) throws IOException {
262
267
lastTsid = BytesRef .deepCopyOf (tsid );
263
268
lastTimestamp = timestamp ;
264
269
265
- if (tsid . equals ( rollupBucketBuilder . tsid ()) == false || rollupBucketBuilder .timestamp () != histoTimestamp ) {
270
+ if (tsidChanged || rollupBucketBuilder .timestamp () != lastHistoTimestamp ) {
266
271
// Flush rollup doc if not empty
267
272
if (rollupBucketBuilder .isEmpty () == false ) {
268
273
Map <String , Object > doc = rollupBucketBuilder .buildRollupDocument ();
269
274
indexBucket (doc );
270
275
}
271
276
272
277
// Create new rollup bucket
273
- rollupBucketBuilder .init (tsid , histoTimestamp );
278
+ rollupBucketBuilder .init (tsid , lastHistoTimestamp );
274
279
bucketsCreated ++;
275
280
}
276
281
0 commit comments