Skip to content

Commit b0e6958

Browse files
yannbolligerShastick
authored andcommitted
Clean interface of time series trait.
GitOrigin-RevId: 7a9f42428fa5abf83c282e3c4f1c882b36f07142
1 parent e775d1a commit b0e6958

File tree

7 files changed

+96
-165
lines changed

7 files changed

+96
-165
lines changed

src/main/scala/io/sqooba/oss/timeseries/TimeSeries.scala

+86-77
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import java.util.concurrent.TimeUnit
44

55
import io.sqooba.oss.timeseries.bucketing.TimeBucketer
66
import io.sqooba.oss.timeseries.immutable._
7+
import io.sqooba.oss.timeseries.window.WindowSlider.window
8+
import io.sqooba.oss.timeseries.window.{TimeAwareReversibleAggregator, TimeUnawareReversibleAggregator, WindowSlider}
79

810
import scala.annotation.tailrec
911
import scala.collection.mutable
@@ -92,37 +94,48 @@ trait TimeSeries[+T] {
9294
def isCompressed: Boolean
9395

9496
/** Map the values within the time series.
95-
* the 'compress' parameters allows callers to control whether or not compression should occur.
96-
* If set to false, timestamps and validities remain unchanged. Defaults to true */
97-
def map[O: WeakTypeTag](f: T => O, compress: Boolean = true): TimeSeries[O]
97+
*
98+
* @param compress controls whether or not compression should occur on the output series.
99+
* If set to false, timestamps and validities remain unchanged. Defaults to true.
100+
*/
101+
def map[O: WeakTypeTag](f: T => O, compress: Boolean = true): TimeSeries[O] =
102+
mapWithTime[O]((_, value) => f(value), compress)
98103

99-
/** Map the values within the time series.
100-
* Timestamps and validities of entries remain unchanged,
101-
* but the time is made available for cases where the new value would depend on it. */
104+
/** Map the values within the time series. If not compressing, the timestamps and
105+
* validities of entries remain unchanged, but the time is made available for
106+
* cases where the new value would depend on it.
107+
*
108+
* @param compress controls whether or not compression should occur on the output series.
109+
* Defaults to true.
110+
*/
102111
def mapWithTime[O: WeakTypeTag](f: (Long, T) => O, compress: Boolean = true): TimeSeries[O]
103112

104113
/** Return a time series that will only contain entries for which the passed predicate returned True. */
105114
def filter(predicate: TSEntry[T] => Boolean): TimeSeries[T]
106115

107116
/** Return a time series that will only contain entries containing values for which the passed predicate returned True. */
108-
def filterValues(predicate: T => Boolean): TimeSeries[T]
117+
def filterValues(predicate: T => Boolean): TimeSeries[T] =
118+
filter(tse => predicate(tse.value))
109119

110120
/** Fill the wholes in the definition domain of this time series with the passed value.
111121
* The resulting time series will have a single continuous definition domain,
112-
* provided the original time series was non-empty. */
113-
def fill[U >: T](whenUndef: U): TimeSeries[U]
122+
* provided the original time series was non-empty.
123+
*/
124+
def fill[U >: T](whenUndef: U): TimeSeries[U] = {
125+
val (start, end) = (this.head.timestamp, this.last.definedUntil)
126+
this.fallback(TSEntry(start, whenUndef, end - start))
127+
}
114128

115129
/** Return a Seq of the TSEntries representing this time series. */
116130
def entries: Seq[TSEntry[T]]
117131

118132
/** Return a Seq of the values contained by this series, in their chronological order. */
119-
def values: Seq[T] =
120-
entries.map(_.value)
133+
def values: Seq[T] = entries.map(_.value)
121134

122135
/** Return the first (chronological) entry in this time series.
123136
*
124137
* @throws NoSuchElementException if this time series is empty. */
125-
def head: TSEntry[T]
138+
def head: TSEntry[T] = headOption.get
126139

127140
/** Return a filled option containing the first (chronological) entry in this
128141
* time series.
@@ -142,7 +155,7 @@ trait TimeSeries[+T] {
142155
/** Return the last (chronological) entry in this time series.
143156
*
144157
* @throws NoSuchElementException if this time series is empty. */
145-
def last: TSEntry[T]
158+
def last: TSEntry[T] = lastOption.get
146159

147160
/** Return a filled option containing the last (chronological) entry in this
148161
* time series.
@@ -338,6 +351,55 @@ trait TimeSeries[+T] {
338351
.foldLeft(newBuilder[Double]())(_ += _)
339352
.result()
340353

354+
/** Slides a window of size 'windowWidth' over the entries present in this series.
355+
* It calculates some aggregate on each window that does not depend on the time of
356+
* validity of the entries. As the given aggregator is reversible this can be done
357+
* efficiently.
358+
*
359+
* Each returned entry E is calculated from the entries of the original time
360+
* series that intersect with any window that ends in the domain of E.
361+
*
362+
* @note The difference between [[rollup()]] and [[slidingWindow()]] is that
363+
* rollup generates disjoint slices of the time series and aggregates over those,
364+
* whereas for sliding window an entry can be part of multiple windows.
365+
*
366+
* @param windowWidth width of the window
367+
* @param aggregator a reversible aggregator to efficiently compute aggregations over the window
368+
* @return a new series contianing all the aggregates as entries
369+
*/
370+
def slidingWindow[U >: T, A](
371+
windowWidth: Long,
372+
aggregator: TimeUnawareReversibleAggregator[U, A]
373+
): TimeSeries[A] =
374+
aggregateStreamToSeries(
375+
WindowSlider.window(this.entries.toStream, windowWidth, aggregator)
376+
)
377+
378+
/** See [[slidingWindow()]]. This function slides a window and uses a time-aware
379+
* aggregator, i.e. the aggregated values can depend on the duration of validity
380+
* of each entry (example: average weighted by time of validity). Therefore it
381+
* samples the entries first.
382+
*
383+
* @param sampleRate to resample the entries
384+
* @param useClosestInWindow whether to sample strictly or not (see [[TimeSeries.sample()]])
385+
*/
386+
def slidingWindow[U >: T, A](
387+
windowWidth: Long,
388+
aggregator: TimeAwareReversibleAggregator[U, A],
389+
sampleRate: Long,
390+
useClosestInWindow: Boolean = true
391+
): TimeSeries[A] =
392+
aggregateStreamToSeries(
393+
WindowSlider.window(this.entries.toStream, windowWidth, aggregator, sampleRate, useClosestInWindow)
394+
)
395+
396+
private def aggregateStreamToSeries[A](seq: Seq[(TSEntry[_], Option[A])]): TimeSeries[A] =
397+
seq.flatMap {
398+
// Drop the content of the window, just keep the aggregator's result.
399+
case (entry, aggregateOpt) => aggregateOpt.map(a => entry.map(_ => a))
400+
}.foldLeft(newBuilder[A]())(_ += _)
401+
.result()
402+
341403
/** Sample this TimeSeries at fixed time intervals of length sampleRate starting at
342404
* the start timestamp. By default, all resulting entries will have the duration
343405
* of sampleRate. If equal contiguous entries are compressed (set the compress flag)
@@ -372,17 +434,21 @@ trait TimeSeries[+T] {
372434
* will generate buckets with domain (([a, b[), ([b, c[), ...)
373435
* Note that it is wise to have 'buckets' start at a meaningfully close point in time
374436
* relative to the time series first entry.
375-
* @return a stream of (bucket-start, timeseries).
437+
* @return a stream of (bucket-start, time series).
376438
*/
377439
def bucket(buckets: Stream[Long]): Stream[(Long, TimeSeries[T])] =
378440
TimeBucketer.bucketEntriesToTimeSeries(buckets, this.entries, newBuilder[T]())
379441

380-
/**
381-
* Given the passed bucket delimiters, apply 'aggregator' for each generated bucket.
442+
/** Given the passed bucket delimiters, apply 'aggregator' for each generated bucket.
443+
*
444+
* Note that the timestamps and validities of the entries present in the returned
445+
* time series are ONLY driven by the boundaries generated by 'buckets': the first
446+
* and last entry may well be defined outside of the domain of definition of this
447+
* time series.
382448
*
383-
* Note that the timestamps and validities of the entries present in the returned timeseries
384-
* are ONLY driven by the boundaries generated by 'buckets': the first and last entry
385-
* may well be defined outside of the domain of definition of this time series
449+
* @note The difference between [[rollup()]] and [[slidingWindow()]] is that
450+
* rollup generates disjoint slices of the time series and aggregates over those,
451+
* whereas for sliding window an entry can be part of multiple windows.
386452
*
387453
* @param buckets a stream generating the bucket boundaries for the rollup/aggregation
388454
* @param aggregator a function that computes an aggregate over a time series
@@ -454,64 +520,7 @@ object TimeSeries {
454520
* The result will be properly fitted and compressed as well.
455521
*/
456522
def fillGaps[T](in: Seq[TSEntry[T]], fillValue: T): Seq[TSEntry[T]] =
457-
if (in.size < 2) {
458-
in
459-
} else {
460-
fillMe(in, fillValue, Seq.newBuilder[TSEntry[T]])
461-
}
462-
463-
@tailrec
464-
private def fillMe[T](in: Seq[TSEntry[T]], fillValue: T, acc: mutable.Builder[TSEntry[T], Seq[TSEntry[T]]]): Seq[TSEntry[T]] =
465-
in match {
466-
case Seq(first, last) =>
467-
// Only two elements remaining: the recursion can end
468-
(acc ++= fillAndCompress(first, last, fillValue)).result()
469-
case Seq(first, second, tail @ _*) =>
470-
// Fill the gap, and check the result
471-
fillAndCompress(first, second, fillValue) match {
472-
// the above may return 1, 2 or 3 entries,
473-
// of which the last one must not yet
474-
// be added to the accumulator,
475-
// instead it is prepended to what is passed to the recursive call
476-
case Seq(compressed) =>
477-
// Nothing to add to acc:
478-
// compressed may still be extended by the next filler
479-
fillMe(compressed +: tail, fillValue, acc)
480-
case Seq(one, two) =>
481-
// The fill value either extended 'first' or advanced 'second:
482-
// we don't need to know and just add first to acc
483-
fillMe(two +: tail, fillValue, acc += one)
484-
case Seq(_, filler, _) =>
485-
// the fill value did not extend the first,
486-
// and did not advance the second
487-
// first and filler are added to the accumulator
488-
fillMe(second +: tail, fillValue, acc ++= Seq(first, filler))
489-
}
490-
}
491-
492-
/** Returns a Sequence of entries such that there is no discontinuity
493-
* between current.timestamp and next.definedUntil, filling the gap
494-
* between the entries and compression them if necessary. */
495-
def fillAndCompress[T](first: TSEntry[T], second: TSEntry[T], fillValue: T): Seq[TSEntry[T]] = {
496-
if (first.definedUntil == second.timestamp) {
497-
// Entries contiguous.
498-
Seq(first, second)
499-
} else {
500-
// There is space to fill
501-
first.appendEntry(
502-
TSEntry(first.definedUntil, fillValue, second.timestamp - first.definedUntil)
503-
) match {
504-
case Seq(single) =>
505-
// 'first' was extended.
506-
// // Check if 'second' can be compressed into the result
507-
single.appendEntry(second)
508-
case Seq(notExtended, filler) =>
509-
// 'first' was not extended.
510-
// Check if 'second' can be advanced with the filling value
511-
notExtended +: filler.appendEntry(second)
512-
}
513-
}
514-
}
523+
TimeSeries.ofOrderedEntriesUnsafe(in).fill(fillValue).entries
515524

516525
/** @see [[TimeSeriesMerger.mergeEntries]] */
517526
def mergeEntries[A, B, C](a: Seq[TSEntry[A]])(b: Seq[TSEntry[B]])(op: (Option[A], Option[B]) => Option[C]): Seq[TSEntry[C]] =

src/main/scala/io/sqooba/oss/timeseries/immutable/ColumnTimeSeries.scala

+3-25
Original file line numberDiff line numberDiff line change
@@ -58,17 +58,14 @@ case class ColumnTimeSeries[+T] private (
5858

5959
private def entryAtIndex(index: Int): TSEntry[T] = TSEntry(timestamps(index), values(index), validities(index))
6060

61-
def head: TSEntry[T] = entryAtIndex(0)
61+
override def head: TSEntry[T] = entryAtIndex(0)
6262

6363
def headOption: Option[TSEntry[T]] = Some(head)
6464

65-
def last: TSEntry[T] = entryAtIndex(timestamps.length - 1)
65+
override def last: TSEntry[T] = entryAtIndex(timestamps.length - 1)
6666

6767
def lastOption: Option[TSEntry[T]] = Some(last)
6868

69-
def map[O: WeakTypeTag](f: T => O, compress: Boolean = true): TimeSeries[O] =
70-
mapWithTime[O]((_, value) => f(value), compress)
71-
7269
def mapWithTime[O: WeakTypeTag](f: (Long, T) => O, compress: Boolean = true): TimeSeries[O] = {
7370
val mappedVs = (timestamps, values).zipped.map(f)
7471

@@ -82,30 +79,11 @@ case class ColumnTimeSeries[+T] private (
8279
}
8380

8481
def filter(predicate: TSEntry[T] => Boolean): TimeSeries[T] =
85-
filterTriples((ts, value, valid) => predicate(TSEntry(ts, value, valid)))
86-
87-
def filterValues(predicate: T => Boolean): TimeSeries[T] =
88-
filterTriples((_, value, _) => predicate(value))
89-
90-
private def filterTriples(predicate: (Long, T, Long) => Boolean): TimeSeries[T] =
9182
// We are not updating entries: no need to order or trim them
9283
ColumnTimeSeries.ofColumnVectorsUnsafe(
93-
(timestamps, values, validities).zipped.filter(predicate)
84+
(timestamps, values, validities).zipped.filter((ts, value, valid) => predicate(TSEntry(ts, value, valid)))
9485
)
9586

96-
def fill[U >: T](whenUndef: U): TimeSeries[U] =
97-
(timestamps, values, validities).zipped
98-
.foldLeft(newBuilder[U]()) {
99-
case (builder, (ts, va, vd)) =>
100-
// if the last entry does not extend until the next entry, we add a filler
101-
if (builder.definedUntil.exists(_ < ts)) {
102-
val fillerTs = builder.definedUntil.get
103-
builder += TSEntry(fillerTs, whenUndef, ts - fillerTs)
104-
}
105-
builder += TSEntry(ts, va, vd)
106-
}
107-
.result()
108-
10987
lazy val size: Int = timestamps.size
11088

11189
def isEmpty: Boolean = false

src/main/scala/io/sqooba/oss/timeseries/immutable/EmptyTimeSeries.scala

+1-9
Original file line numberDiff line numberDiff line change
@@ -27,26 +27,18 @@ case object EmptyTimeSeries extends TimeSeries[Nothing] {
2727

2828
def trimRightDiscrete(at: Long, includeEntry: Boolean): TimeSeries[Nothing] = this
2929

30-
def map[O: WeakTypeTag](f: Nothing => O, compress: Boolean = true): TimeSeries[O] = this
31-
3230
def mapWithTime[O: WeakTypeTag](f: (Long, Nothing) => O, compress: Boolean = true): TimeSeries[O] = this
3331

3432
def filter(predicate: TSEntry[Nothing] => Boolean): TimeSeries[Nothing] = this
3533

36-
def filterValues(predicate: Nothing => Boolean): TimeSeries[Nothing] = this
37-
38-
def fill[U >: Nothing](whenUndef: U): TimeSeries[U] = this
34+
override def fill[U >: Nothing](whenUndef: U): TimeSeries[U] = this
3935

4036
def entries: Seq[TSEntry[Nothing]] = Seq()
4137

4238
override def values: Seq[Nothing] = Seq()
4339

44-
def head: TSEntry[Nothing] = throw new NoSuchElementException()
45-
4640
def headOption: Option[TSEntry[Nothing]] = None
4741

48-
def last: TSEntry[Nothing] = throw new NoSuchElementException()
49-
5042
def lastOption: Option[TSEntry[Nothing]] = None
5143

5244
override def splitEntriesLongerThan(sampleLengthMs: Long): TimeSeries[Nothing] = this

src/main/scala/io/sqooba/oss/timeseries/immutable/GorillaBlockTimeSeries.scala

-17
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,7 @@ case class GorillaBlockTimeSeries private[immutable] (
2222

2323
def entries: Seq[TSEntry[Double]] = block.decompress
2424

25-
def head: TSEntry[Double] = entries.head
26-
2725
def headOption: Option[TSEntry[Double]] = entries.headOption
28-
29-
def last: TSEntry[Double] = entries.last
30-
3126
def lastOption: Option[TSEntry[Double]] = entries.lastOption
3227

3328
def at(t: Long): Option[Double] = entryAt(t).map(_.value)
@@ -46,9 +41,6 @@ case class GorillaBlockTimeSeries private[immutable] (
4641
lazy val supportRatio: Double =
4742
entries.map(_.looseDomain.size).sum.toFloat / looseDomain.size
4843

49-
def map[O: WeakTypeTag](f: Double => O, compress: Boolean = true): TimeSeries[O] =
50-
mapEntries[O](e => f(e.value), compress)
51-
5244
def mapWithTime[O: WeakTypeTag](f: (Long, Double) => O, compress: Boolean = true): TimeSeries[O] =
5345
mapEntries[O](e => f(e.timestamp, e.value), compress)
5446

@@ -63,15 +55,6 @@ case class GorillaBlockTimeSeries private[immutable] (
6355
entries.filter(predicate).toStream
6456
)
6557

66-
def filterValues(predicate: Double => Boolean): TimeSeries[Double] =
67-
filter(e => predicate(e.value))
68-
69-
def fill[U >: Double](whenUndef: U): TimeSeries[U] =
70-
TimeSeries
71-
.fillGaps(entries, whenUndef)
72-
.foldLeft(newBuilder[U]())(_ += _)
73-
.result()
74-
7558
def trimRight(t: Long): TimeSeries[Double] =
7659
// trimRight can handle the case where t is before the timestamp of the entry
7760
// therefore we use it rather than trimEntryRight

src/main/scala/io/sqooba/oss/timeseries/immutable/NestedTimeSeries.scala

+1-9
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,7 @@ case class NestedTimeSeries[+T] private (
3838

3939
override def values: Seq[T] = underlying.values.flatMap(_.values)
4040

41-
def head: TSEntry[T] = underlying.head.value.head
4241
def headOption: Option[TSEntry[T]] = underlying.head.value.headOption
43-
def last: TSEntry[T] = underlying.last.value.last
4442
def lastOption: Option[TSEntry[T]] = underlying.last.value.lastOption
4543

4644
def looseDomain: TimeDomain = ContiguousTimeDomain(head.timestamp, last.definedUntil)
@@ -65,19 +63,13 @@ case class NestedTimeSeries[+T] private (
6563
underlying.map(f).filter(_.value.nonEmpty)
6664
)
6765

68-
def map[O: universe.WeakTypeTag](f: T => O, compress: Boolean): TimeSeries[O] =
69-
mapInnerSeries(_.map(f, compress))
70-
7166
def mapWithTime[O: universe.WeakTypeTag](f: (Long, T) => O, compress: Boolean): TimeSeries[O] =
7267
mapInnerSeries(_.mapWithTime(f, compress))
7368

7469
def filter(predicate: TSEntry[T] => Boolean): TimeSeries[T] =
7570
mapInnerSeries(_.filter(predicate))
7671

77-
def filterValues(predicate: T => Boolean): TimeSeries[T] =
78-
mapInnerSeries(_.filterValues(predicate))
79-
80-
def fill[U >: T](whenUndef: U): TimeSeries[U] =
72+
override def fill[U >: T](whenUndef: U): TimeSeries[U] =
8173
mapInnerSeries(_.fill(whenUndef))
8274

8375
def trimRight(at: Long): TimeSeries[T] =

0 commit comments

Comments
 (0)