Skip to content

Commit 6ca1c72

Browse files
committed
Merge pull request #24 from ryblovAV/chapter6
add answers to exercise 1-4 chapter 6
2 parents c32364a + a11d35d commit 6ca1c72

File tree

4 files changed

+240
-0
lines changed

4 files changed

+240
-0
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package org.learningconcurrency
2+
package exercises
3+
package ch6
4+
5+
/**
6+
* Implement a custom Observable[Thread] object that emits an event when it detects that a thread was started.
7+
* The implementation is allowed to miss some of the events.
8+
*/
9+
10+
import java.util.Calendar
11+
12+
import rx.lang.scala.Observable
13+
14+
import scala.annotation.tailrec
15+
import scala.concurrent.duration._
16+
17+
object Ex1 extends App {
18+
19+
val rootThreadGroup = getRootThread(Thread.currentThread.getThreadGroup)
20+
21+
var existsThreads = Set.empty[Thread]
22+
23+
@tailrec
24+
def getRootThread(t: ThreadGroup):ThreadGroup = {
25+
val parent = t.getParent
26+
if (parent == null) t else getRootThread(parent)
27+
}
28+
29+
def getCurrentThreads = {
30+
val threads = new Array[Thread](rootThreadGroup.activeCount())
31+
rootThreadGroup.enumerate(threads,true)
32+
33+
threads.filter(_ != null)
34+
}
35+
36+
def getNewThreads = {
37+
val currentThreads = getCurrentThreads
38+
val newThreads = currentThreads.filter(!existsThreads.contains(_))
39+
40+
//save threads
41+
existsThreads = currentThreads.toSet
42+
43+
newThreads
44+
}
45+
46+
def createObservableNewThreads: Observable[Thread] = {
47+
Observable[Thread] {
48+
(s) => {
49+
getNewThreads.foreach(s.onNext _)
50+
}
51+
}
52+
}
53+
54+
//create Observable
55+
val o = for {
56+
_ <- Observable.interval(1 seconds)
57+
j <- createObservableNewThreads
58+
} yield j
59+
60+
o.subscribe((t) => log(s"${Calendar.getInstance().getTime()}: ${t.toString}"))
61+
62+
//test
63+
64+
def createTestThread(name:String): Unit = {
65+
val t = new Thread(name) {
66+
override def run(): Unit = {
67+
Thread.sleep(5000)
68+
}
69+
}
70+
t.start()
71+
}
72+
73+
Thread.sleep(2000)
74+
createTestThread("A")
75+
Thread.sleep(3000)
76+
createTestThread("B")
77+
78+
Thread.sleep(10000)
79+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package org.learningconcurrency
2+
package exercises
3+
package ch6
4+
5+
/**
6+
* Implement an Observable object that emits an event every 5 seconds and every 12 seconds,
7+
* but not if the elapsed time is a multiple of 30 seconds.
8+
* Use functional combinators on Observable objects.
9+
*/
10+
11+
import rx.lang.scala.Observable
12+
import scala.concurrent.duration._
13+
14+
object Ex2A extends App {
15+
16+
val a = Observable.interval(5 seconds).map(_ * 5)
17+
val b = Observable.interval(12 seconds).map(_ * 12)
18+
19+
val c = (a merge b distinct) filter (_ % 30 != 0)
20+
21+
c.subscribe((s) => log(s.toString))
22+
23+
Thread.sleep(70000)
24+
}
25+
26+
object Ex2B extends App {
27+
28+
val d = Observable.interval(1 seconds).filter((l) => (l % 30 != 0) && ((l % 5 == 0) || (l % 12 == 0)))
29+
d.subscribe((s) => log(s.toString))
30+
31+
Thread.sleep(70000)
32+
33+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package org.learningconcurrency
2+
package exercises
3+
package ch6
4+
5+
import rx.lang.scala._
6+
7+
import scala.annotation.tailrec
8+
import scala.util.Random
9+
import scala.concurrent.duration._
10+
11+
/**
12+
* Use the randomQuote method from this section in order to create an Observable object
13+
* with the moving average of the quote lengths.
14+
* Each time a new quote arrives, a new average value should be emitted.
15+
*/
16+
object Ex3 extends App {
17+
18+
19+
@tailrec
20+
def randomString(length: Int, l: List[Char] = List.empty[Char]):List[Char] = {
21+
if (length == 1) util.Random.nextPrintableChar :: l
22+
else randomString(length-1,util.Random.nextPrintableChar :: l)
23+
}
24+
25+
def randomQuoteMock = Observable.interval(1 seconds).map((l) => randomString(Random.nextInt(10)+1))
26+
27+
randomQuoteMock.scan((0D,0)) {
28+
(n, q) => n match {
29+
case (s, c) => (s + q.length, c + 1)
30+
}
31+
}
32+
.tail
33+
.map((e) => e._1 / e._2)
34+
.subscribe((e) => log(s"avg = $e"))
35+
36+
Thread.sleep(10000)
37+
38+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package org.learningconcurrency
2+
package exercises
3+
package ch6
4+
5+
/**
6+
Implement the reactive signal abstraction, represented with the Signal[T] type.
7+
8+
The Signal[T] type comes with the method apply, used to query the last event emitted by this signal,
9+
and several combinators with the same semantics as the corresponding Observable methods:
10+
11+
class Signal[T] {
12+
def apply(): T = ???
13+
def map(f: T => S): Signal[S] = ???
14+
def zip[S](that: Signal[S]): Signal[(T, S)] = ???
15+
def scan[S](z: S)(f: (S, T) => S) = ???
16+
}
17+
18+
Then, add the method toSignal to the Observable[T] type, which converts
19+
an Observable object to a reactive signal: def toSignal: Signal[T] = ???
20+
21+
Consider using Rx subjects for this task.
22+
*/
23+
24+
import rx.lang.scala._
25+
26+
object Ex4 extends App {
27+
28+
implicit class ObserverableAdditional[T](val self:Observable[T]) extends AnyVal {
29+
30+
def toSignal:Signal[T] = {
31+
new Signal[T](self)
32+
}
33+
34+
}
35+
36+
class Signal[T] {
37+
38+
var lastEvent:T = _
39+
40+
var observable: Observable[T] = _
41+
42+
val subject = Subject[T]()
43+
subject.subscribe(lastEvent = _)
44+
45+
def this(observable: Observable[T]) = {
46+
this()
47+
48+
this.observable = observable.last
49+
this.observable.subscribe(subject)
50+
}
51+
52+
def apply(): T = lastEvent
53+
54+
def map[S](f: T => S): Signal[S] =
55+
this.observable.map(f).toSignal
56+
57+
def zip[S](that: Signal[S]): Signal[(T, S)] =
58+
this.observable.zip(that.observable).toSignal
59+
60+
def scan[S](z: S)(f: (S, T) => S):Signal[S] =
61+
this.observable.scan(z)(f).toSignal
62+
63+
}
64+
65+
//test
66+
def test = {
67+
68+
val o = Observable.from(List(1,2,3,4,5))
69+
val o2 = Observable.from(List("A","B","C","D","E"))
70+
71+
val s1 = o.toSignal
72+
val s2 =o2.toSignal
73+
74+
log(s"s1: element = ${s1()}")
75+
log(s"s2: element = ${s2()}")
76+
77+
val sMap = s1.map(_+"~")
78+
log(s"sMap: element = ${sMap()}")
79+
80+
val sZip = s1.zip(s2)
81+
log(s"sZip: element = ${sZip()}")
82+
83+
val sScan = s1.scan(2)((s,t)=>s*t)
84+
log(s"sScan: element = ${sScan()}")
85+
}
86+
87+
test
88+
Thread.sleep(5000)
89+
90+
}

0 commit comments

Comments
 (0)