Skip to content

Commit 8d5f4aa

Browse files
committed
Merge pull request #29 from ssmylh/ch6
Answer to Exercise 5-7 of Chapter 6
2 parents 27c707e + 1df1106 commit 8d5f4aa

File tree

3 files changed

+190
-0
lines changed

3 files changed

+190
-0
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package org.learningconcurrency
2+
package exercises
3+
package ch6
4+
5+
/**
6+
* Implement the reactive cell abstraction, represented with the RCell[T] type:
7+
* class RCell[T] extends Signal[T] {
8+
* def :=(x: T): Unit = ???
9+
* }
10+
*
11+
* A reactive cell is simultaneously a reactive signal from the previous exercise.
12+
* Calling the := method sets a new value to the reactive cell, and emits an event.
13+
*/
14+
15+
import rx.lang.scala._
16+
17+
object Ex5 extends App {
18+
19+
class RCell[T] extends Ex4.Signal[T] {
20+
private[this] val subject = Subject[T]()
21+
setObservable(subject)
22+
23+
def :=(x: T): Unit = {
24+
subject.onNext(x)
25+
}
26+
}
27+
28+
val rc1 = new RCell[Int]()
29+
rc1 := 1
30+
assert(rc1() == 1)
31+
32+
val rc2 = new RCell[Int]()
33+
rc2 := 1
34+
val increment = rc2.map(_ + 1)
35+
assert(increment() == 2)
36+
rc2 := 2
37+
assert(increment() == 3)
38+
39+
val rc31 = new RCell[Int]()
40+
val rc32 = new RCell[String]()
41+
rc31 := 1
42+
rc32 := "a"
43+
val zipped = rc31.zip(rc32)
44+
assert(zipped() == (1, "a"))
45+
rc31 := 2
46+
rc32 := "b"
47+
assert(zipped() == (2, "b"))
48+
49+
val rc4 = new RCell[Int]()
50+
rc4 := 1
51+
val sum = rc4.scan(10)(_ + _)
52+
assert(sum() == 10)
53+
rc4 := 2
54+
assert(sum() == 12)
55+
rc4 := 3
56+
assert(sum() == 15)
57+
58+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package org.learningconcurrency
2+
package exercises
3+
package ch6
4+
5+
/**
6+
* Implement the reactive map collection, represented with the RMap class:
7+
* class RMap[K, V] {
8+
* def update(k: K, v: V): Unit
9+
* def apply(k: K): Observable[V]
10+
* }
11+
*
12+
* The update method behaves like the update on a regular Map collection.
13+
* Calling apply on a reactive map returns an Observable object with all the subsequent updates of the specific key.
14+
*/
15+
16+
import rx.lang.scala._
17+
18+
object Ex6 extends App {
19+
20+
class RMap[K, V] {
21+
import scala.collection._
22+
private[this] val allSubscribers = mutable.Map[K, (Subject[V], mutable.Set[Subscriber[V]])]()
23+
private[this] val map = mutable.Map[K, V]()
24+
25+
def update(k: K, v: V): Unit = {
26+
map(k) = v
27+
allSubscribers.get(k) match {
28+
case Some(s) => s._1.onNext(v)
29+
case _ =>
30+
}
31+
}
32+
33+
def apply(k: K): Observable[V] = Observable[V] { subscriber =>
34+
val (subject, subscribers) =
35+
allSubscribers.getOrElseUpdate(k, (Subject[V](), mutable.Set.empty[Subscriber[V]]))
36+
subscribers += subscriber
37+
38+
val subscription = subject.subscribe(subscriber)
39+
40+
subscriber.add(Subscription {
41+
subscription.unsubscribe()
42+
43+
subscribers -= subscriber
44+
if (subscribers.isEmpty) {
45+
allSubscribers -= k
46+
}
47+
})
48+
}
49+
50+
/* return true if there is at least one subscriber which subscribes to the updates of the specific key. */
51+
def hasSubscribers(k: K): Boolean = allSubscribers.get(k).isDefined
52+
}
53+
54+
import scala.collection.mutable.ListBuffer
55+
56+
val rmap = new RMap[String, Int]()
57+
58+
val key = "a"
59+
val o = rmap(key)
60+
assert(rmap.hasSubscribers(key) == false)
61+
62+
val buf1 = ListBuffer.empty[Int]
63+
val subscription1 = o.subscribe(buf1 += _)
64+
val buf2 = ListBuffer.empty[Int]
65+
val subscription2 = o.subscribe(buf2 += _)
66+
67+
rmap(key) = 1
68+
rmap(key) = 2
69+
assert(buf1 == ListBuffer(1, 2), buf1)
70+
assert(buf2 == ListBuffer(1, 2), buf2)
71+
72+
subscription1.unsubscribe()
73+
assert(rmap.hasSubscribers(key))
74+
subscription2.unsubscribe()
75+
assert(rmap.hasSubscribers(key) == false)
76+
77+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package org.learningconcurrency
2+
package exercises
3+
package ch6
4+
5+
/**
6+
* Implement the reactive priority queue, represented with the RPriorityQueue class:
7+
* class RPriorityQueue[T] {
8+
* def add(x: T): Unit = ???
9+
* def pop(): T = ???
10+
* def popped: Observable[T] = ???
11+
* }
12+
*
13+
* The reactive priority queue exposes the Observable object popped,
14+
* which emits events whenever the smallest element in the priority queue gets removed by calling pop.
15+
*/
16+
17+
import rx.lang.scala._
18+
19+
object Ex7 extends App {
20+
21+
class RPriorityQueue[T](implicit val ord: Ordering[T]) {
22+
private[this] val pq = new scala.collection.mutable.PriorityQueue[T]()(ord.reverse)
23+
private[this] val subject = Subject[T]()
24+
25+
def add(x: T): Unit = {
26+
pq += x
27+
}
28+
29+
/* This method throws `NoSuchElementException` if the queue is empty. */
30+
def pop(): T = {
31+
val x = pq.dequeue()
32+
subject.onNext(x)
33+
x
34+
}
35+
36+
def popped: Observable[T] = subject
37+
}
38+
39+
import scala.collection.mutable.ListBuffer
40+
41+
val rqueue = new RPriorityQueue[Int]()
42+
rqueue.add(3)
43+
rqueue.add(1)
44+
rqueue.add(2)
45+
46+
val o = rqueue.popped
47+
val buf = ListBuffer.empty[Int]
48+
o.subscribe(buf += _)
49+
50+
assert(rqueue.pop() == 1)
51+
assert(rqueue.pop() == 2)
52+
assert(rqueue.pop() == 3)
53+
assert(buf == ListBuffer(1, 2, 3))
54+
55+
}

0 commit comments

Comments
 (0)