Skip to content

Answer to Exercise 5-7 of Chapter 6 #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch6/Ex5.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.learningconcurrency
package exercises
package ch6

/**
* Implement the reactive cell abstraction, represented with the RCell[T] type:
* class RCell[T] extends Signal[T] {
* def :=(x: T): Unit = ???
* }
*
* A reactive cell is simultaneously a reactive signal from the previous exercise.
* Calling the := method sets a new value to the reactive cell, and emits an event.
*/

import rx.lang.scala._

object Ex5 extends App {

class RCell[T] extends Ex4.Signal[T] {
private[this] val subject = Subject[T]()
setObservable(subject)

def :=(x: T): Unit = {
subject.onNext(x)
}
}

val rc1 = new RCell[Int]()
rc1 := 1
assert(rc1() == 1)

val rc2 = new RCell[Int]()
rc2 := 1
val increment = rc2.map(_ + 1)
assert(increment() == 2)
rc2 := 2
assert(increment() == 3)

val rc31 = new RCell[Int]()
val rc32 = new RCell[String]()
rc31 := 1
rc32 := "a"
val zipped = rc31.zip(rc32)
assert(zipped() == (1, "a"))
rc31 := 2
rc32 := "b"
assert(zipped() == (2, "b"))

val rc4 = new RCell[Int]()
rc4 := 1
val sum = rc4.scan(10)(_ + _)
assert(sum() == 10)
rc4 := 2
assert(sum() == 12)
rc4 := 3
assert(sum() == 15)

}
77 changes: 77 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch6/Ex6.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.learningconcurrency
package exercises
package ch6

/**
* Implement the reactive map collection, represented with the RMap class:
* class RMap[K, V] {
* def update(k: K, v: V): Unit
* def apply(k: K): Observable[V]
* }
*
* The update method behaves like the update on a regular Map collection.
* Calling apply on a reactive map returns an Observable object with all the subsequent updates of the specific key.
*/

import rx.lang.scala._

object Ex6 extends App {

class RMap[K, V] {
import scala.collection._
private[this] val allSubscribers = mutable.Map[K, (Subject[V], mutable.Set[Subscriber[V]])]()
private[this] val map = mutable.Map[K, V]()

def update(k: K, v: V): Unit = {
map(k) = v
allSubscribers.get(k) match {
case Some(s) => s._1.onNext(v)
case _ =>
}
}

def apply(k: K): Observable[V] = Observable[V] { subscriber =>
val (subject, subscribers) =
allSubscribers.getOrElseUpdate(k, (Subject[V](), mutable.Set.empty[Subscriber[V]]))
subscribers += subscriber

val subscription = subject.subscribe(subscriber)

subscriber.add(Subscription {
subscription.unsubscribe()

subscribers -= subscriber
if (subscribers.isEmpty) {
allSubscribers -= k
}
})
}

/* return true if there is at least one subscriber which subscribes to the updates of the specific key. */
def hasSubscribers(k: K): Boolean = allSubscribers.get(k).isDefined
}

import scala.collection.mutable.ListBuffer

val rmap = new RMap[String, Int]()

val key = "a"
val o = rmap(key)
assert(rmap.hasSubscribers(key) == false)

val buf1 = ListBuffer.empty[Int]
val subscription1 = o.subscribe(buf1 += _)
val buf2 = ListBuffer.empty[Int]
val subscription2 = o.subscribe(buf2 += _)

rmap(key) = 1
rmap(key) = 2
assert(buf1 == ListBuffer(1, 2), buf1)
assert(buf2 == ListBuffer(1, 2), buf2)

subscription1.unsubscribe()
assert(rmap.hasSubscribers(key))
subscription2.unsubscribe()
assert(rmap.hasSubscribers(key) == false)

}
55 changes: 55 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch6/Ex7.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.learningconcurrency
package exercises
package ch6

/**
* Implement the reactive priority queue, represented with the RPriorityQueue class:
* class RPriorityQueue[T] {
* def add(x: T): Unit = ???
* def pop(): T = ???
* def popped: Observable[T] = ???
* }
*
* The reactive priority queue exposes the Observable object popped,
* which emits events whenever the smallest element in the priority queue gets removed by calling pop.
*/

import rx.lang.scala._

object Ex7 extends App {

class RPriorityQueue[T](implicit val ord: Ordering[T]) {
private[this] val pq = new scala.collection.mutable.PriorityQueue[T]()(ord.reverse)
private[this] val subject = Subject[T]()

def add(x: T): Unit = {
pq += x
}

/* This method throws `NoSuchElementException` if the queue is empty. */
def pop(): T = {
val x = pq.dequeue()
subject.onNext(x)
x
}

def popped: Observable[T] = subject
}

import scala.collection.mutable.ListBuffer

val rqueue = new RPriorityQueue[Int]()
rqueue.add(3)
rqueue.add(1)
rqueue.add(2)

val o = rqueue.popped
val buf = ListBuffer.empty[Int]
o.subscribe(buf += _)

assert(rqueue.pop() == 1)
assert(rqueue.pop() == 2)
assert(rqueue.pop() == 3)
assert(buf == ListBuffer(1, 2, 3))

}