Skip to content

Commit 4bc77ba

Browse files
yannbolligerShastick
authored andcommitted
Rework window aggregation and add dynamic windowing.
GitOrigin-RevId: 389b238e3b0782f7a3a081e1035be4bcb126180a
1 parent e3b23e1 commit 4bc77ba

16 files changed

+619
-235
lines changed

src/main/scala/io/sqooba/oss/timeseries/NumericTimeSeries.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package io.sqooba.oss.timeseries
33
import java.util.concurrent.TimeUnit
44

55
import io.sqooba.oss.timeseries.immutable.TSEntry
6-
import io.sqooba.oss.timeseries.windowing.{IntegratingAggregator, WindowSlider}
6+
import io.sqooba.oss.timeseries.window.{Aggregator, IntegralAggregator, WindowSlider}
77

88
import scala.annotation.tailrec
99
import scala.collection.mutable.Builder
@@ -102,7 +102,7 @@ object NumericTimeSeries {
102102
.window(
103103
entries.toStream,
104104
window,
105-
new IntegratingAggregator[T](timeUnit)
105+
Aggregator.integral[T](timeUnit)
106106
)
107107
.map {
108108
// Drop the content of the window, just keep the integral's result.

src/main/scala/io/sqooba/oss/timeseries/archive/package.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import fi.iki.yak.ts.compression.gorilla.{GorillaDecompressor, LongArrayInput}
66

77
import scala.util.Try
88

9-
/** Provides abstraction and tools for compressing/archiving timeseries data. The
9+
/** Provides abstraction and tools for compressing/archiving time series data. The
1010
* compression used is Gorilla TSC encoding implemented by the Java library
1111
* [[fi.iki.yak.ts.compression.gorilla]].
1212
*

src/main/scala/io/sqooba/oss/timeseries/immutable/ColumnTimeSeries.scala

-13
Original file line numberDiff line numberDiff line change
@@ -222,19 +222,6 @@ case class ColumnTimeSeries[+T] private (
222222
}
223223
}
224224

225-
override def slidingIntegral[U >: T](
226-
window: Long,
227-
timeUnit: TimeUnit = TimeUnit.MILLISECONDS
228-
)(implicit n: Numeric[U]): TimeSeries[Double] =
229-
if (this.size < 2) {
230-
this.map[Double](n.toDouble)
231-
} else {
232-
// TODO: have slidingSum return compressed output so we can use the unsafe constructor and save an iteration.
233-
// TODO: don't use entries but directly operate on the column vectors.
234-
ColumnTimeSeries
235-
.ofOrderedEntriesSafe(NumericTimeSeries.slidingIntegral[U](this.entries, window, timeUnit))
236-
}
237-
238225
def looseDomain: TimeDomain = ContiguousTimeDomain(timestamps.head, timestamps.last + validities.last)
239226

240227
lazy val supportRatio: Double = validities.sum.toDouble / looseDomain.size
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package io.sqooba.oss.timeseries.window
2+
3+
import io.sqooba.oss.timeseries.immutable.TSEntry
4+
5+
import scala.collection.immutable.Queue
6+
import scala.concurrent.duration.TimeUnit
7+
8+
/** Tooling and state wrapper to efficiently compute aggregates over sliding windows,
9+
* in contexts where this is possible. Instead of aggregating huge slices of a time
10+
* series, this class iteratively calculates the aggregated value when an entry is
11+
* added. This allows to only store the summed value instead of all the entries in
12+
* the case of addition, for example.
13+
*
14+
* The aggregator will be applied sequentially, so it may keep track of any state
15+
* from one entry the next.
16+
*
17+
* @tparam T the type of the entries being aggregated over
18+
* @tparam A the type of the aggregated value
19+
*/
20+
trait Aggregator[T, A] {
21+
22+
/** @return the current aggregated value or None */
23+
def currentValue: Option[A]
24+
25+
/** Update the internal aggregated value according to the entry that is about
26+
* to be added to the window.
27+
*
28+
* @note By default this ignores the currentWindow and passes the entry to
29+
* the function that only takes the entry. If you want to use the
30+
* entire window in the aggregaton you can override this method.
31+
*
32+
* @param e the entry that is about to enter the window
33+
* @param currentWindow the current content of the window: it does not
34+
* include 'e' at this point.
35+
*/
36+
// TODO: consider returning the resulting aggregated value?
37+
def addEntry(e: TSEntry[T], currentWindow: Queue[TSEntry[T]]): Unit =
38+
addEntry(e)
39+
40+
/** Update the internal aggregated value according to the entry that is about
41+
* to be added to the window.
42+
*
43+
* @param e the entry that is about to enter the window
44+
*/
45+
// TODO: consider returning the resulting aggregated value?
46+
def addEntry(e: TSEntry[T]): Unit
47+
}
48+
49+
object Aggregator {
50+
51+
/** Factory for aggregators that need to act on the entire window, like median.
52+
*
53+
* @note Aggregating this way is a lot less efficient for computations that only
54+
* need little intermediary state. Rather define your own Aggregator for those
55+
* cases (see for example [[SumAggregator]], [[MinAggregator]].
56+
*
57+
* @param f aggregation function from a sequence of entries to an option
58+
* @return a reversible aggregator
59+
*/
60+
def queueAggregator[T, A](f: Seq[T] => Option[A]): QueueAggregator[T, A] =
61+
new QueueAggregator[T, A] {
62+
override def currentValue: Option[A] = f(queue.toSeq)
63+
}
64+
65+
/** See [[SumAggregator]] */
66+
def sum[T: Numeric]: SumAggregator[T] = new SumAggregator[T]()
67+
68+
/** See [[MeanAggregator]] */
69+
def mean[T: Numeric]: MeanAggregator[T] = new MeanAggregator[T]()
70+
71+
/** Aggregator that returns the minimum of all values in the window.
72+
* See [[MinAggregator]].
73+
*/
74+
def min[T: Ordering]: TimeUnawareReversibleAggregator[T, T] = new MinAggregator[T]()
75+
76+
/** Aggregator that returns the maximum of all values in the window.
77+
* See [[MinAggregator]].
78+
*/
79+
def max[T](implicit ordering: Ordering[T]): TimeUnawareReversibleAggregator[T, T] =
80+
new MinAggregator[T]()(ordering.reverse)
81+
82+
/** See [[StdAggregator]] */
83+
def std[T: Numeric]: StdAggregator[T] = new StdAggregator[T]()
84+
85+
/** See [[IntegralAggregator]] */
86+
def integral[T: Numeric](timeunit: TimeUnit, initialValue: Double = .0): IntegralAggregator[T] =
87+
new IntegralAggregator[T](timeunit, initialValue)
88+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package io.sqooba.oss.timeseries.window
2+
3+
import io.sqooba.oss.timeseries.immutable.TSEntry
4+
5+
import scala.collection.mutable
6+
7+
/** An aggregator that does strictly nothing. Used for window creation without aggregation. */
8+
object DoNothingAggregator extends TimeUnawareReversibleAggregator[Nothing, Nothing] {
9+
10+
def currentValue: Option[Nothing] = None
11+
12+
def addEntry(entry: TSEntry[Nothing]): Unit = ()
13+
14+
def dropEntry(entry: TSEntry[Nothing]): Unit = ()
15+
}
16+
17+
/** A reversible aggregator that keeps track of the total sum of the values
18+
* present in each entry that is at least partially within the window's domain.
19+
*
20+
* Discontinuities in the domain of definition between entries are completely ignored.
21+
*/
22+
class SumAggregator[T](implicit n: Numeric[T]) extends TimeUnawareReversibleAggregator[T, T] {
23+
import n._
24+
25+
private var sum = n.zero
26+
27+
def currentValue: Option[T] = Some(sum)
28+
29+
def addEntry(entry: TSEntry[T]): Unit =
30+
sum += entry.value
31+
32+
def dropEntry(entry: TSEntry[T]): Unit =
33+
sum -= entry.value
34+
35+
}
36+
37+
/** Reversible aggregator that calculates the mean of the values in the window
38+
* weighted by time of validity. It is therefore time-aware and needs entries
39+
* to be contained in the window.
40+
*/
41+
class MeanAggregator[T](implicit n: Numeric[T]) extends TimeAwareReversibleAggregator[T, Double] {
42+
import n._
43+
44+
// Sum of X_i * d_i
45+
private var sum = .0
46+
// Sum of d_i
47+
private var durations: Long = 0
48+
49+
def currentValue: Option[Double] =
50+
// Sum of X_i * d_i / Sum of d_i or None
51+
if (durations > 0) Some(sum / durations) else None
52+
53+
def addEntry(entry: TSEntry[T]): Unit = {
54+
sum += weighted(entry)
55+
durations += entry.validity
56+
}
57+
58+
def dropEntry(entry: TSEntry[T]): Unit = {
59+
sum -= weighted(entry)
60+
durations -= entry.validity
61+
}
62+
63+
private def weighted(e: TSEntry[T]): Double = e.value.toDouble * e.validity
64+
}
65+
66+
/** Reversible aggregator that calculates the (biased) standard deviation (e.g.
67+
* square root of the variance) of the values in the window, weighted by time of
68+
* validity. It is therefore time-aware and needs entries to be contained in the
69+
* window.
70+
*/
71+
class StdAggregator[T](implicit n: Numeric[T]) extends TimeAwareReversibleAggregator[T, Double] {
72+
import n._
73+
74+
// Weighted mean of squares E_w[X^2]
75+
private val squareMean = new MeanAggregator[T]()
76+
// Weighted mean E_w[X]
77+
private val mean = new MeanAggregator[T]()
78+
79+
def currentValue: Option[Double] =
80+
for {
81+
mean <- mean.currentValue
82+
squares <- squareMean.currentValue
83+
} yield
84+
// std = sqrt{ E_w[X^2] - E_w[X]^2 }
85+
Math.sqrt(squares - mean * mean)
86+
87+
def addEntry(entry: TSEntry[T]): Unit = {
88+
squareMean.addEntry(entry.map(v => v * v))
89+
mean.addEntry(entry)
90+
}
91+
92+
def dropEntry(entry: TSEntry[T]): Unit = {
93+
squareMean.dropEntry(entry.map(v => v * v))
94+
mean.dropEntry(entry)
95+
}
96+
}
97+
98+
/** A reversible aggregator that keeps track of the minimum of the values
99+
* present in the window. You can get a maximum aggregator by simply
100+
* reversing the ordering passed as an implicit.
101+
*
102+
* The aggregator uses an ordered internal queue and discards values that
103+
* can never be the minimum.
104+
*/
105+
class MinAggregator[T](implicit n: Ordering[T]) extends TimeUnawareReversibleAggregator[T, T] {
106+
import n._
107+
108+
private var minQueue = mutable.Queue[T]()
109+
110+
override def currentValue: Option[T] = minQueue.headOption
111+
112+
def addEntry(e: TSEntry[T]): Unit = {
113+
// In Scala 2.13, this can be more elegantly solved:
114+
// minQueue.takeWhileInPlace(_ <= e.value).append(e.value)
115+
minQueue = minQueue.takeWhile(_ <= e.value)
116+
minQueue.enqueue(e.value)
117+
}
118+
119+
def dropEntry(entry: TSEntry[T]): Unit = {
120+
if (minQueue.head == entry.value) minQueue.dequeue()
121+
}
122+
}
123+
124+
/** Template aggregator that keeps track of the entire window. It is therefore not
125+
* efficient for most calculations.
126+
*/
127+
abstract class QueueAggregator[T, A] extends TimeUnawareReversibleAggregator[T, A] {
128+
129+
private[window] val queue: mutable.Queue[T] = mutable.Queue.empty
130+
131+
def addEntry(e: TSEntry[T]): Unit =
132+
queue += e.value
133+
134+
def dropEntry(entry: TSEntry[T]): Unit =
135+
queue.dequeue()
136+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package io.sqooba.oss.timeseries.window
2+
3+
import io.sqooba.oss.timeseries.immutable.TSEntry
4+
5+
import scala.collection.immutable.Queue
6+
import scala.concurrent.duration.TimeUnit
7+
8+
/** An aggregator that relies on the passed entries' integral function.
9+
*
10+
* Very similar to the SummingAggregator, but takes the validity of each entry into
11+
* account. It is therefore time-aware and needs entries to be contained in the
12+
* window.
13+
*
14+
* @param timeunit that will be passe to the entries' integral function
15+
* @param initialValue to initialise the aggregator with.
16+
*/
17+
class IntegralAggregator[T](
18+
timeunit: TimeUnit,
19+
initialValue: Double = .0
20+
)(implicit n: Numeric[T])
21+
extends TimeAwareReversibleAggregator[T, Double] {
22+
23+
private var integral = initialValue
24+
25+
def currentValue: Option[Double] = Some(integral)
26+
27+
def addEntry(entry: TSEntry[T]): Unit =
28+
integral += entry.integral(timeunit)
29+
30+
def dropEntry(entry: TSEntry[T]): Unit =
31+
integral -= entry.integral(timeunit)
32+
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package io.sqooba.oss.timeseries.window
2+
3+
import io.sqooba.oss.timeseries.immutable.TSEntry
4+
5+
import scala.collection.immutable.Queue
6+
7+
/** Extension to the Aggregator that also supports removing entries from the
8+
* aggregated value. Assuming we want to aggregate the content of a window, and to
9+
* do so for each different window returned by WindowSlider, many iterations will
10+
* be required.
11+
*
12+
* Depending on the aggregation function, this is however not required: For simple
13+
* cases like addition or multiplication and any situation where the contributions
14+
* of a single entry to the aggregated value may be reversed, we can compute an
15+
* aggregated value for each window in linear time.
16+
*
17+
* The reversible aggregator will be applied sequentially, so it may keep track of
18+
* any state from one addition or removal to the next.
19+
*
20+
* Some aggregations depend on the duration of the entries like integration or
21+
* averaging, others like min max don't. To keep those types of aggregations well
22+
* separated, implementations need to extend either the time-aware or the
23+
* time-unaware subtrait. This allows us to use different windowing functions for
24+
* the two types.
25+
*
26+
* @tparam T the type of the entries being aggregated over
27+
* @tparam A the type of the aggregated value
28+
*/
29+
sealed trait ReversibleAggregator[T, A] extends Aggregator[T, A] {
30+
31+
/** Updates the aggregated value according to the fact that
32+
* the head of the currentWindow is being removed.
33+
*
34+
* @param currentWindow the current content of the window. It still
35+
* contains the entry that has to be removed
36+
*/
37+
// TODO: consider returning the resulting aggregated value?
38+
def dropHead(currentWindow: Queue[TSEntry[T]]): Unit =
39+
dropEntry(currentWindow.head)
40+
41+
/** Updates the aggregated value according to the fact that
42+
* this entry is being removed.
43+
*
44+
* @param entry to remove from the head of the window
45+
*/
46+
// TODO: consider returning the resulting aggregated value?
47+
def dropEntry(entry: TSEntry[T]): Unit
48+
49+
/** Combine the addition and the removal of entries from the aggregated value.
50+
*
51+
* @param add the value that will be added
52+
* @param currentWindow the current window, from which we will drop the first entry.
53+
* Note that it does not yet contain 'add'
54+
*/
55+
def addAndDrop(add: TSEntry[T], currentWindow: Queue[TSEntry[T]]): Unit = {
56+
dropHead(currentWindow)
57+
// addEntry needs to work on the updated window
58+
addEntry(add, currentWindow.tail)
59+
}
60+
61+
/** Combine the addition and the removal of entries from the aggregated value.
62+
*
63+
* @param add the entry that will be added at the tail
64+
* @param remove the entry that will be removed at the head
65+
*/
66+
def addAndDrop(add: TSEntry[T], remove: TSEntry[T]): Unit = {
67+
dropEntry(remove)
68+
addEntry(add)
69+
}
70+
}
71+
72+
/** This trait should be extended by all aggregators that depend on the time/duration
73+
* in their calculation like integration, averaging over time etc.
74+
*/
75+
trait TimeAwareReversibleAggregator[T, A] extends ReversibleAggregator[T, A]
76+
77+
/** This trait should be extended by all aggregators that don't depend on the
78+
* duration in their calculation like min, max, median.
79+
*/
80+
trait TimeUnawareReversibleAggregator[T, A] extends ReversibleAggregator[T, A]

0 commit comments

Comments
 (0)