Skip to content

Commit 097906a

Browse files
committed
add answers to exercise 1-4 chapter 6
1 parent e4f3b38 commit 097906a

File tree

4 files changed

+180
-0
lines changed

4 files changed

+180
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package org.learningconcurrency
2+
package exercises
3+
package ch6
4+
5+
import rx.lang.scala._
6+
7+
import org.learningconcurrency.ch2._
8+
9+
/**
10+
* Implement a custom Observable[Thread] object that emits an event when it detects that a thread was started.
11+
* The implementation is allowed to miss some of the events.
12+
*/
13+
object Ex1 extends App {
14+
15+
val observable = Observable.apply[Thread] {
16+
obs =>
17+
18+
val t = new Thread {
19+
override def start(): Unit = {
20+
super.start()
21+
obs.onNext(this)
22+
}
23+
}
24+
25+
obs.onNext(t)
26+
Subscription()
27+
}
28+
29+
observable.subscribe((t) => log(t.toString()))
30+
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package org.learningconcurrency
2+
package exercises
3+
package ch6
4+
5+
import rx.lang.scala.Observable
6+
7+
/**
8+
* Implement an Observable object that emits an event every 5 seconds and every 12 seconds,
9+
* but not if the elapsed time is a multiple of 30 seconds.
10+
* Use functional combinators on Observable objects.
11+
*/
12+
13+
import scala.concurrent.duration._
14+
15+
object Ex2A extends App {
16+
17+
val a = Observable.interval(5 seconds).map(_ * 5)
18+
val b = Observable.interval(12 seconds).map(_ * 12)
19+
20+
val c = (a merge b distinct) filter (_ % 30 != 0)
21+
22+
c.subscribe((s) => log(s.toString))
23+
24+
Thread.sleep(70000)
25+
}
26+
27+
object Ex2B extends App {
28+
29+
val d = Observable.interval(1 seconds).filter((l) => (l % 30 != 0) && ((l % 5 == 0) || (l % 12 == 0)))
30+
d.subscribe((s) => log(s.toString))
31+
32+
Thread.sleep(70000)
33+
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package org.learningconcurrency
2+
package exercises
3+
package ch6
4+
5+
import rx.lang.scala._
6+
import rx.lang.scala.Observable._
7+
8+
import scala.annotation.tailrec
9+
import scala.io.Source
10+
import scala.util.Random
11+
12+
/**
13+
* Use the randomQuote method from this section in order to create an Observable object
14+
* with the moving average of the quote lengths.
15+
* Each time a new quote arrives, a new average value should be emitted.
16+
*/
17+
object Ex3 extends App {
18+
19+
import scala.concurrent.duration._
20+
21+
@tailrec
22+
def randomString(length: Int, l: List[Char] = List.empty[Char]):List[Char] = {
23+
if (length == 1) util.Random.nextPrintableChar :: l
24+
else randomString(length-1,util.Random.nextPrintableChar :: l)
25+
26+
}
27+
28+
def randomQuoteMock = Observable.interval(1 seconds).map((l) => randomString(Random.nextInt(10)+1))
29+
30+
randomQuoteMock.scan((0D,0)){
31+
(n,q) => n match {
32+
case (s,c) => (s + q.length, c + 1)
33+
}
34+
}.filter(_._2 != 0).map((e) => e._1/e._2).subscribe((e) => log(s"avg = $e"))
35+
36+
Thread.sleep(10000)
37+
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package org.learningconcurrency
2+
package exercises
3+
package ch6
4+
5+
/**
6+
Implement the reactive signal abstraction, represented with the Signal[T] type.
7+
8+
The Signal[T] type comes with the method apply, used to query the last event emitted by this signal,
9+
and several combinators with the same semantics as the corresponding Observable methods:
10+
11+
class Signal[T] {
12+
def apply(): T = ???
13+
def map(f: T => S): Signal[S] = ???
14+
def zip[S](that: Signal[S]): Signal[(T, S)] = ???
15+
def scan[S](z: S)(f: (S, T) => S) = ???
16+
}
17+
18+
Then, add the method toSignal to the Observable[T] type, which converts
19+
an Observable object to a reactive signal: def toSignal: Signal[T] = ???
20+
21+
Consider using Rx subjects for this task.
22+
*/
23+
24+
object Ex4 extends App {
25+
26+
import rx.lang.scala._
27+
28+
implicit class ObserverableAdditional[T](val self:Observable[T]) extends AnyVal {
29+
30+
def toSignal:Signal[T] = {
31+
val s = new Signal[T]
32+
self.last.subscribe(s.subject)
33+
s
34+
}
35+
36+
}
37+
38+
class Signal[T] {
39+
40+
def this(t:T) {
41+
this()
42+
a = t
43+
}
44+
45+
var a:T = _
46+
47+
val subject = Subject[T]()
48+
49+
subject.subscribe(a = _)
50+
51+
def apply(): T = a
52+
53+
def map[S](f: T => S): Signal[S] = new Signal[S](f(a))
54+
55+
def zip[S](that: Signal[S]): Signal[(T, S)] = new Signal[(T,S)]((a,that.a))
56+
57+
def scan[S](z: S)(f: (S, T) => S):Signal[S] = new Signal[S](f(z,a))
58+
}
59+
60+
//test
61+
val s1 = Observable.items[String]("A","B","C").toSignal
62+
log(s"element = ${s1()}")
63+
64+
val s2 = Observable.items[Int](1,2,3).toSignal
65+
66+
val sMap = s1.map(_+"~")
67+
log(s"sMap: element = ${sMap()}")
68+
69+
val sZip = s1.zip(s2)
70+
log(s"sZip: element = ${sZip()}")
71+
72+
val sScan = s2.scan(10)((s,t)=>s+t)
73+
log(s"sScan: element = ${sScan()}")
74+
75+
76+
77+
}

0 commit comments

Comments
 (0)