Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch6/Ex5.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.learningconcurrency
package exercises
package ch6

/**
* Implement the reactive cell abstraction, represented with the RCell[T] type:
* class RCell[T] extends Signal[T] {
* def :=(x: T): Unit = ???
* }
*
* A reactive cell is simultaneously a reactive signal from the previous exercise.
* Calling the := method sets a new value to the reactive cell, and emits an event.
*/

import rx.lang.scala._

object Ex5 extends App {

class RCell[T] extends Ex4.Signal[T] {
private[this] val subject = Subject[T]()
setObservable(subject)

def :=(x: T): Unit = {
subject.onNext(x)
}
}

val rc1 = new RCell[Int]()
rc1 := 1
assert(rc1() == 1)

val rc2 = new RCell[Int]()
rc2 := 1
val increment = rc2.map(_ + 1)
assert(increment() == 2)
rc2 := 2
assert(increment() == 3)

val rc31 = new RCell[Int]()
val rc32 = new RCell[String]()
rc31 := 1
rc32 := "a"
val zipped = rc31.zip(rc32)
assert(zipped() == (1, "a"))
rc31 := 2
rc32 := "b"
assert(zipped() == (2, "b"))

val rc4 = new RCell[Int]()
rc4 := 1
val sum = rc4.scan(10)(_ + _)
assert(sum() == 10)
rc4 := 2
assert(sum() == 12)
rc4 := 3
assert(sum() == 15)

}
49 changes: 49 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch6/Ex6.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.learningconcurrency
package exercises
package ch6

/**
* Implement the reactive map collection, represented with the RMap class:
* class RMap[K, V] {
* def update(k: K, v: V): Unit
* def apply(k: K): Observable[V]
* }
*
* The update method behaves like the update on a regular Map collection.
* Calling apply on a reactive map returns an Observable object with all the subsequent updates of the specific key.
*/

import rx.lang.scala._

object Ex6 extends App {

class RMap[K, V] {
private[this] val map = scala.collection.mutable.Map[K, Subject[V]]()

def update(k: K, v: V): Unit = map.get(k) match {
case Some(s) => s.onNext(v)
case _ =>
val s = Subject[V]()
map(k) = s
s.onNext(v)
}

/* This method throws `NoSuchElementException` if the key does not exist in the map. */
def apply(k: K): Observable[V] = map.get(k).get
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should not throw an exception if there is no such element.

Instead, if there is a subscription, the reactive map should create an observable X such that when somebody subscribes to X, a key with the subject is added to the map if it does not already exist, and the observable subscribes to that subject. If the subscription is canceled (unsubscribe), the subject is removed from the map, unless somebody stored an actual value to that key.

This way, map does not grow indefinitely (no memory leaks), and the semantics do not require that there is actually a key in the map - i.e. one component can subscribes to the key, and wait until the key appears.

}

import scala.collection.mutable.ListBuffer

val rmap = new RMap[String, Int]()
rmap("a") = 1

val o = rmap("a")
val buf = ListBuffer.empty[Int]
o.subscribe(buf += _)

rmap("a") = 2
rmap("a") = 3

assert(buf == ListBuffer(2, 3), buf)

}
55 changes: 55 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch6/Ex7.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.learningconcurrency
package exercises
package ch6

/**
* Implement the reactive priority queue, represented with the RPriorityQueue class:
* class RPriorityQueue[T] {
* def add(x: T): Unit = ???
* def pop(): T = ???
* def popped: Observable[T] = ???
* }
*
* The reactive priority queue exposes the Observable object popped,
* which emits events whenever the smallest element in the priority queue gets removed by calling pop.
*/

import rx.lang.scala._

object Ex7 extends App {

class RPriorityQueue[T](implicit val ord: Ordering[T]) {
private[this] val pq = new scala.collection.mutable.PriorityQueue[T]()(ord.reverse)
private[this] val subject = Subject[T]()

def add(x: T): Unit = {
pq += x
}

/* This method throws `NoSuchElementException` if the queue is empty. */
def pop(): T = {
val x = pq.dequeue()
subject.onNext(x)
x
}

def popped: Observable[T] = subject
}

import scala.collection.mutable.ListBuffer

val rqueue = new RPriorityQueue[Int]()
rqueue.add(3)
rqueue.add(1)
rqueue.add(2)

val o = rqueue.popped
val buf = ListBuffer.empty[Int]
o.subscribe(buf += _)

assert(rqueue.pop() == 1)
assert(rqueue.pop() == 2)
assert(rqueue.pop() == 3)
assert(buf == ListBuffer(1, 2, 3))

}