@@ -18,32 +18,60 @@ import rx.lang.scala._
18
18
object Ex6 extends App {
19
19
20
20
class RMap [K , V ] {
21
- private [this ] val map = scala.collection.mutable.Map [K , Subject [V ]]()
22
-
23
- def update (k : K , v : V ): Unit = map.get(k) match {
24
- case Some (s) => s.onNext(v)
25
- case _ =>
26
- val s = Subject [V ]()
27
- map(k) = s
28
- s.onNext(v)
21
+ import scala .collection ._
22
+ private [this ] val allSubscribers = mutable.Map [K , (Subject [V ], mutable.Set [Subscriber [V ]])]()
23
+ private [this ] val map = mutable.Map [K , V ]()
24
+
25
+ def update (k : K , v : V ): Unit = {
26
+ map(k) = v
27
+ allSubscribers.get(k) match {
28
+ case Some (s) => s._1.onNext(v)
29
+ case _ =>
30
+ }
29
31
}
30
32
31
- /* This method throws `NoSuchElementException` if the key does not exist in the map. */
32
- def apply (k : K ): Observable [V ] = map.get(k).get
33
+ def apply (k : K ): Observable [V ] = Observable [V ] { subscriber =>
34
+ val (subject, subscribers) =
35
+ allSubscribers.getOrElseUpdate(k, (Subject [V ](), mutable.Set .empty[Subscriber [V ]]))
36
+ subscribers += subscriber
37
+
38
+ val subscription = subject.subscribe(subscriber)
39
+
40
+ subscriber.add(Subscription {
41
+ subscription.unsubscribe()
42
+
43
+ subscribers -= subscriber
44
+ if (subscribers.isEmpty) {
45
+ allSubscribers -= k
46
+ }
47
+ })
48
+ }
49
+
50
+ /* return true if there is at least one subscriber which subscribes to the updates of the specific key. */
51
+ def hasSubscribers (k : K ): Boolean = allSubscribers.get(k).isDefined
33
52
}
34
53
35
54
import scala .collection .mutable .ListBuffer
36
55
37
56
val rmap = new RMap [String , Int ]()
38
- rmap(" a" ) = 1
39
57
40
- val o = rmap(" a" )
41
- val buf = ListBuffer .empty[Int ]
42
- o.subscribe(buf += _)
58
+ val key = " a"
59
+ val o = rmap(key)
60
+ assert(rmap.hasSubscribers(key) == false )
61
+
62
+ val buf1 = ListBuffer .empty[Int ]
63
+ val subscription1 = o.subscribe(buf1 += _)
64
+ val buf2 = ListBuffer .empty[Int ]
65
+ val subscription2 = o.subscribe(buf2 += _)
43
66
44
- rmap(" a" ) = 2
45
- rmap(" a" ) = 3
67
+ rmap(key) = 1
68
+ rmap(key) = 2
69
+ assert(buf1 == ListBuffer (1 , 2 ), buf1)
70
+ assert(buf2 == ListBuffer (1 , 2 ), buf2)
46
71
47
- assert(buf == ListBuffer (2 , 3 ), buf)
72
+ subscription1.unsubscribe()
73
+ assert(rmap.hasSubscribers(key))
74
+ subscription2.unsubscribe()
75
+ assert(rmap.hasSubscribers(key) == false )
48
76
49
77
}
0 commit comments