Skip to content

Commit 0e42b07

Browse files
committed
bump zio version, introduce a zio entry stream
GitOrigin-RevId: 391541f992c798d3836a49e104d29d488a73f7dc
1 parent 923b247 commit 0e42b07

File tree

7 files changed

+337
-56
lines changed

7 files changed

+337
-56
lines changed

build.sbt

+10-11
Original file line numberDiff line numberDiff line change
@@ -9,28 +9,27 @@ organizationName := "Sqooba"
99
organizationHomepage := Some(url("https://sqooba.io"))
1010

1111
scalaVersion := "2.13.0"
12-
crossScalaVersions := Seq("2.13.0", "2.12.10")
12+
crossScalaVersions := Seq("2.12.12", "2.13.3")
1313

1414
resolvers += Resolver.bintrayRepo("twittercsl", "sbt-plugins")
1515
resolvers += Resolver.sonatypeRepo("snapshots")
1616

17-
val zioVersion = "1.0.0-RC17"
17+
val zioVersion = "1.0.1"
1818

1919
libraryDependencies ++= Seq(
20-
"com.storm-enroute" %% "scalameter" % "0.19",
21-
"fi.iki.yak" % "compression-gorilla" % "2.1.1",
22-
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.2",
23-
"org.apache.thrift" % "libthrift" % "0.12.0",
20+
"fi.iki.yak" % "compression-gorilla" % "2.1.1",
21+
"org.apache.thrift" % "libthrift" % "0.12.0",
2422
"com.twitter" %% "scrooge-core" % "19.10.0",
23+
"org.scala-lang.modules" %% "scala-collection-compat" % "2.1.2",
2524
"dev.zio" %% "zio" % zioVersion,
2625
"dev.zio" %% "zio-streams" % zioVersion,
26+
"com.storm-enroute" %% "scalameter" % "0.19" % Test,
2727
"dev.zio" %% "zio-test" % zioVersion % Test,
2828
"dev.zio" %% "zio-test-sbt" % zioVersion % Test,
29-
"junit" % "junit" % "4.12" % Test,
30-
"org.scalactic" %% "scalactic" % "3.1.2",
31-
"org.scalatest" %% "scalatest" % "3.1.2" % Test,
32-
"org.scala-lang.modules" %% "scala-collection-compat" % "2.1.2",
33-
"io.dropwizard.metrics" % "metrics-core" % "4.0.0" % Test
29+
"junit" % "junit" % "4.12" % Test,
30+
"org.scalactic" %% "scalactic" % "3.1.2" % Test,
31+
"org.scalatest" %% "scalatest" % "3.1.2" % Test,
32+
"io.dropwizard.metrics" % "metrics-core" % "4.0.0" % Test
3433
)
3534

3635
testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework")
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
package io.sqooba.oss.timeseries
22

3-
import zio.IO
4-
import zio.stream.ZStream.Pull
5-
import zio.stream.{Stream, Take}
63

74
object StreamMerger {
85

@@ -15,47 +12,48 @@ object StreamMerger {
1512
* @param rightStream ordered
1613
* @return stream that contains all elements of left and right in the correct order
1714
*/
18-
private[timeseries] def mergeOrderedSeqs[E, A](
19-
leftStream: Stream[E, A],
20-
rightStream: Stream[E, A]
21-
)(implicit o: Ordering[A]): Stream[E, A] = {
22-
import o._
23-
24-
def loop(
25-
leftTake: Option[Take[E, A]],
26-
rightTake: Option[Take[E, A]],
27-
leftPull: Pull[Any, E, A],
28-
rightPull: Pull[Any, E, A]
29-
): IO[E, ((Option[Take[E, A]], Option[Take[E, A]]), Take[E, A])] = (leftTake, rightTake) match {
30-
// both stopped so we stop
31-
case (Some(Take.End), Some(Take.End)) => IO.succeed((leftTake, rightTake), Take.End)
32-
33-
// errors get forwarded
34-
case (Some(Take.Fail(e)), _) => IO.succeed((None, None), Take.Fail(e))
35-
case (_, Some(Take.Fail(e))) => IO.succeed((None, None), Take.Fail(e))
36-
37-
// If one side is empty, we evaluate it and recurse
38-
// TODO: optimise with parallel evaluation if both takes are not present
39-
case (None, _) => Take.fromPull(leftPull).flatMap(l => loop(Some(l), rightTake, leftPull, rightPull))
40-
case (_, None) => Take.fromPull(rightPull).flatMap(r => loop(leftTake, Some(r), leftPull, rightPull))
41-
42-
// If one has ended, we always take the other
43-
case (Some(Take.End), Some(Take.Value(r))) => IO.succeed((leftTake, None), Take.Value(r))
44-
case (Some(Take.Value(l)), Some(Take.End)) => IO.succeed((None, rightTake), Take.Value(l))
45-
46-
// If both are present, we take the smaller
47-
case (Some(Take.Value(l)), Some(Take.Value(r))) if l <= r =>
48-
IO.succeed((None, rightTake), Take.Value(l))
49-
50-
case (Some(Take.Value(_)), Some(Take.Value(r))) =>
51-
IO.succeed((leftTake, None), Take.Value(r))
52-
}
53-
54-
leftStream.combine(rightStream)(
55-
(Option.empty[Take[E, A]], Option.empty[Take[E, A]])
56-
) {
57-
case ((leftTake, rightTake), leftPull, rightPull) => loop(leftTake, rightTake, leftPull, rightPull)
58-
}
59-
}
15+
// private[timeseries] def mergeOrderedSeqs[E, A](
16+
// leftStream: Stream[E, A],
17+
// rightStream: Stream[E, A]
18+
// )(implicit o: Ordering[A]): Stream[E, A] = {
19+
// import o._
20+
//
21+
// def loop(
22+
// leftTake: Option[Take[E, A]],
23+
// rightTake: Option[Take[E, A]],
24+
// leftPull: Pull[Any, E, A],
25+
// rightPull: Pull[Any, E, A]
26+
// ): IO[E, ((Option[Take[E, A]], Option[Take[E, A]]), Take[E, A])] = (leftTake, rightTake) match {
27+
// // both stopped so we stop
28+
// case (Some(Take.end), Some(Take.end)) => IO.succeed((leftTake, rightTake), Take.end)
29+
//
30+
// // errors get forwarded
31+
// case (Some(Fail(e)), _) => IO.succeed((None, None), Take.fail(e))
32+
// case (_, Some(Fail(e))) => IO.succeed((None, None), Take.fail(e))
33+
//
34+
// // If one side is empty, we evaluate it and recurse
35+
// // TODO: optimise with parallel evaluation if both takes are not present
36+
// case (None, _) => Take.fromPull(leftPull).flatMap(l => loop(Some(l), rightTake, leftPull, rightPull))
37+
// case (_, None) =>
38+
// Take.fromPull(rightPull).flatMap(r => loop(leftTake, Some(r), leftPull, rightPull))
39+
//
40+
// // If one has ended, we always take the other
41+
// case (Some(Take.end), Some(Take(Success(r)))) => IO.succeed((leftTake, None), Take.single(r))
42+
// case (Some(Take(Success(l))), Some(Take.end)) => IO.succeed((None, rightTake), Take.single(l))
43+
//
44+
// // If both are present, we take the smaller
45+
// case (Some(Take(Success(NonEmptyChunk))), Some(Take(Success(r)))) if l <= r =>
46+
// IO.succeed((None, rightTake), Take.single(l))
47+
//
48+
// case (Some(Take(Success(_))), Some(Take(Success(Chunk(r))))) =>
49+
// IO.succeed((leftTake, None), Take.single(r))
50+
// }
51+
//
52+
// leftStream.combine(rightStream)(
53+
// (Option.empty[Take[E, A]], Option.empty[Take[E, A]])
54+
// ) {
55+
// case ((leftTake, rightTake), leftPull, rightPull) => loop(leftTake, rightTake, leftPull, rightPull)
56+
// }
57+
// }
6058

6159
}

src/main/scala/io/sqooba/oss/timeseries/validation/TimestampValidator.scala

+2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.sqooba.oss.timeseries.validation
22

3+
import zio.ZIO
4+
35
/**
46
* This object specifies and checks all the different types of constraints that
57
* are imposed on timestamps of timeseries.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package io.sqooba.oss.timeseries.zio
2+
3+
import io.sqooba.oss.timeseries.immutable.TSEntry
4+
import zio.stream._
5+
import zio.{Queue, Task, UIO, ZIO}
6+
7+
class AppendableEntryStream[T](
8+
finalizedSink: Queue[Take[Nothing, TSEntry[T]]],
9+
val finalizedEntries: Stream[Nothing, TSEntry[T]],
10+
fitter: ZEntryFitter[T]
11+
) {
12+
13+
def +=(elem: TSEntry[T]): Task[Unit] = addOne(elem)
14+
15+
def ++=(xs: Seq[TSEntry[T]]): Task[Unit] =
16+
ZIO.foreach(xs)(addOne).unit
17+
18+
def addOne(elem: TSEntry[T]): Task[Unit] = {
19+
fitter.addAndFitLast(elem).flatMap {
20+
case Some(entry) =>
21+
finalizedSink.offer(Take.single(entry)).unit
22+
case None =>
23+
ZIO.unit
24+
}
25+
}
26+
27+
/**
28+
* Appends the last entry present in the fitter (if any)
29+
* and emits a terminating 'Take' to the queue.
30+
*
31+
* No entries should subsequently be added.
32+
*/
33+
def close(): UIO[Unit] =
34+
appendLastEntryIfRequired() *> finalizedSink.offer(Take.end).unit
35+
36+
private def appendLastEntryIfRequired() =
37+
fitter.lastEntry.flatMap {
38+
case Some(e) =>
39+
finalizedSink.offer(Take.single(e)).unit
40+
case None =>
41+
ZIO.unit
42+
}
43+
}
44+
45+
object AppendableEntryStream {
46+
47+
def unbounded[T](compress: Boolean): UIO[AppendableEntryStream[T]] = {
48+
for {
49+
// Create an unbounded queue that receives Takes
50+
q <- Queue.unbounded[Take[Nothing, TSEntry[T]]]
51+
// Build a stream from the queue and do a `flattenTake` so we can
52+
// terminate a stream of entries by passing a Take.end to the queue
53+
// (Using #fromQueueWithShutdown() so the queue gets shutdown
54+
// once the stream has received the terminating entry)
55+
s = Stream.fromQueueWithShutdown(q).flattenTake
56+
fitter <- ZEntryFitter.init[T](compress)
57+
} yield new AppendableEntryStream(q, s, fitter)
58+
}
59+
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package io.sqooba.oss.timeseries.zio
2+
3+
import io.sqooba.oss.timeseries.immutable.TSEntry
4+
import io.sqooba.oss.timeseries.validation.TimestampValidator
5+
import zio.{Ref, Task, UIO, ZIO}
6+
7+
/**
8+
* This encapsulates the logic of fitting TSEntries one after another. It is used to take
9+
* care of trimming, compressing and checking the sanity of consecutive entries.
10+
*
11+
* @param compress Whether consecutive entries of equal value should be compressed into one or not.
12+
*/
13+
class ZEntryFitter[T](compress: Boolean,
14+
// Contains the last added entry: we need to keep it around
15+
// as it may be subject to trimming or extension
16+
lastAddedRef: Ref[Option[TSEntry[T]]],
17+
isDomainContRef: Ref[Boolean]) {
18+
19+
/**
20+
* Trim or extend the previous entry based on the passed entry and depending
21+
* on the compress flag.
22+
*
23+
* @return a finalized entry
24+
*/
25+
def addAndFitLast(addMe: TSEntry[T]): Task[Option[TSEntry[T]]] = {
26+
for {
27+
previouslyAdded <- lastAddedRef.get
28+
previouslyContinuous <- isDomainContRef.get
29+
(newlyAdded, fitted, continuous) <- trimOrExpand(previouslyAdded, addMe)
30+
_ <- lastAddedRef.set(newlyAdded) *>
31+
isDomainContRef.set(previouslyContinuous && continuous)
32+
} yield fitted
33+
}
34+
35+
private def trimOrExpand(last: Option[TSEntry[T]], added: TSEntry[T])
36+
// Return a triple of: 'last added entry', 'fitted entry', 'domain continuous
37+
// (Or a failed task if the added entry is invalid)
38+
: Task[(Option[TSEntry[T]], Option[TSEntry[T]], Boolean)] = {
39+
last match {
40+
// First Entry: save it and return no fitted entry
41+
case None => ZIO.succeed((Some(added), None, true))
42+
43+
// A previous entry exists: attempt to append the new one
44+
case Some(last) =>
45+
if (last.timestamp >= added.timestamp) {
46+
ZIO.fail(
47+
new IllegalArgumentException(
48+
s"The timestamps need to be strictly increasing, was ${added.timestamp} before ${last.timestamp}."
49+
)
50+
)
51+
} else {
52+
53+
@inline
54+
def isDomainCont = last.definedUntil >= added.timestamp
55+
56+
ZIO.succeed(last.appendEntry(added, compress) match {
57+
// A compression occurred. Keep that entry around, return no fitted
58+
case Seq(compressed) => (Some(compressed), None, isDomainCont)
59+
60+
// No further compression:
61+
// - return the finished entry
62+
// - keep the second around
63+
case Seq(finished, second) =>
64+
(Some(second), Some(finished), isDomainCont)
65+
})
66+
}
67+
}
68+
}
69+
70+
private def validateTimestamps(lastTs: Long, currentTs: Long) = {
71+
if (lastTs < currentTs) {
72+
ZIO.succeed(())
73+
} else {
74+
ZIO.fail(
75+
new IllegalArgumentException(
76+
s"The timestamps need to be strictly increasing, was $lastTs before $currentTs."
77+
)
78+
)
79+
}
80+
}
81+
82+
/** @return the last entry that was added to the fitter. This entry can still change
83+
* if more entries are added (it might be compressed/trimmed).
84+
*/
85+
def lastEntry: UIO[Option[TSEntry[T]]] = lastAddedRef.get
86+
87+
/** @return whether all added entries so far were either contiguous or overlapping.
88+
* I.e. there were no holes in the domain of definition of the entries seen so far.
89+
*/
90+
def isDomainContinuous: UIO[Boolean] = isDomainContRef.get
91+
92+
}
93+
94+
object ZEntryFitter {
95+
96+
def init[T](compress: Boolean): UIO[ZEntryFitter[T]] =
97+
for {
98+
initPrevious <- Ref.make[Option[TSEntry[T]]](None)
99+
continuous <- Ref.make(true)
100+
} yield new ZEntryFitter[T](compress, initPrevious, continuous)
101+
102+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package io.sqooba.oss.timeseries.zio
2+
3+
import io.sqooba.oss.timeseries.immutable.TSEntry
4+
import zio.Chunk
5+
import zio.test.Assertion.{anything, equalTo, isSubtype}
6+
import zio.test.{DefaultRunnableSpec, ZSpec, suite, testM, assert}
7+
8+
object AppendableEntryStreamSpec extends DefaultRunnableSpec {
9+
10+
override def spec: ZSpec[_root_.zio.test.environment.TestEnvironment, Any] =
11+
suite("AppendableEntryStream")(
12+
testM("works as expected without compression")(
13+
for {
14+
aes <- AppendableEntryStream.unbounded[String](false)
15+
_ <- aes.addOne(TSEntry(0, "One", 1000)) *>
16+
aes.addOne(TSEntry(500, "One", 1000)) *>
17+
aes.close()
18+
taken <- aes.finalizedEntries.runCollect
19+
} yield
20+
assert(taken)(
21+
equalTo(Chunk(TSEntry(0, "One", 500), TSEntry(500, "One", 1000)))
22+
)
23+
),
24+
testM("works as expected with compression")(
25+
for {
26+
aes <- AppendableEntryStream.unbounded[String](true)
27+
_ <- aes.addOne(TSEntry(0, "One", 1000)) *>
28+
aes.addOne(TSEntry(500, "One", 1000)) *>
29+
aes.addOne(TSEntry(1000, "Two", 1000)) *>
30+
aes.close()
31+
taken <- aes.finalizedEntries.runCollect
32+
} yield
33+
assert(taken)(
34+
equalTo(Chunk(TSEntry(0, "One", 1000), TSEntry(1000, "Two", 1000)))
35+
)
36+
),
37+
testM("is not broken by a faulty input")(
38+
for {
39+
aes <- AppendableEntryStream.unbounded[String](true)
40+
_ <- aes.addOne(TSEntry(500, "One", 1000)) *>
41+
// This is expected to fail -> flip it so if it succeeds we know
42+
aes.addOne(TSEntry(500, "One", 1000)).flip *>
43+
aes.addOne(TSEntry(1000, "Two", 1000)) *>
44+
aes.close()
45+
taken <- aes.finalizedEntries.runCollect
46+
} yield
47+
assert(taken)(
48+
equalTo(Chunk(TSEntry(500, "One", 500), TSEntry(1000, "Two", 1000)))
49+
)
50+
)
51+
)
52+
}

0 commit comments

Comments
 (0)