-
Notifications
You must be signed in to change notification settings - Fork 104
Answer to Exercise 5-7 of Chapter 6 #29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 3 commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
58 changes: 58 additions & 0 deletions
58
src/main/scala/org/learningconcurrency/exercises/ch6/Ex5.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
package org.learningconcurrency | ||
package exercises | ||
package ch6 | ||
|
||
/** | ||
* Implement the reactive cell abstraction, represented with the RCell[T] type: | ||
* class RCell[T] extends Signal[T] { | ||
* def :=(x: T): Unit = ??? | ||
* } | ||
* | ||
* A reactive cell is simultaneously a reactive signal from the previous exercise. | ||
* Calling the := method sets a new value to the reactive cell, and emits an event. | ||
*/ | ||
|
||
import rx.lang.scala._ | ||
|
||
object Ex5 extends App { | ||
|
||
class RCell[T] extends Ex4.Signal[T] { | ||
private[this] val subject = Subject[T]() | ||
setObservable(subject) | ||
|
||
def :=(x: T): Unit = { | ||
subject.onNext(x) | ||
} | ||
} | ||
|
||
val rc1 = new RCell[Int]() | ||
rc1 := 1 | ||
assert(rc1() == 1) | ||
|
||
val rc2 = new RCell[Int]() | ||
rc2 := 1 | ||
val increment = rc2.map(_ + 1) | ||
assert(increment() == 2) | ||
rc2 := 2 | ||
assert(increment() == 3) | ||
|
||
val rc31 = new RCell[Int]() | ||
val rc32 = new RCell[String]() | ||
rc31 := 1 | ||
rc32 := "a" | ||
val zipped = rc31.zip(rc32) | ||
assert(zipped() == (1, "a")) | ||
rc31 := 2 | ||
rc32 := "b" | ||
assert(zipped() == (2, "b")) | ||
|
||
val rc4 = new RCell[Int]() | ||
rc4 := 1 | ||
val sum = rc4.scan(10)(_ + _) | ||
assert(sum() == 10) | ||
rc4 := 2 | ||
assert(sum() == 12) | ||
rc4 := 3 | ||
assert(sum() == 15) | ||
|
||
} |
49 changes: 49 additions & 0 deletions
49
src/main/scala/org/learningconcurrency/exercises/ch6/Ex6.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
package org.learningconcurrency | ||
package exercises | ||
package ch6 | ||
|
||
/** | ||
* Implement the reactive map collection, represented with the RMap class: | ||
* class RMap[K, V] { | ||
* def update(k: K, v: V): Unit | ||
* def apply(k: K): Observable[V] | ||
* } | ||
* | ||
* The update method behaves like the update on a regular Map collection. | ||
* Calling apply on a reactive map returns an Observable object with all the subsequent updates of the specific key. | ||
*/ | ||
|
||
import rx.lang.scala._ | ||
|
||
object Ex6 extends App { | ||
|
||
class RMap[K, V] { | ||
private[this] val map = scala.collection.mutable.Map[K, Subject[V]]() | ||
|
||
def update(k: K, v: V): Unit = map.get(k) match { | ||
case Some(s) => s.onNext(v) | ||
case _ => | ||
val s = Subject[V]() | ||
map(k) = s | ||
s.onNext(v) | ||
} | ||
|
||
/* This method throws `NoSuchElementException` if the key does not exist in the map. */ | ||
def apply(k: K): Observable[V] = map.get(k).get | ||
} | ||
|
||
import scala.collection.mutable.ListBuffer | ||
|
||
val rmap = new RMap[String, Int]() | ||
rmap("a") = 1 | ||
|
||
val o = rmap("a") | ||
val buf = ListBuffer.empty[Int] | ||
o.subscribe(buf += _) | ||
|
||
rmap("a") = 2 | ||
rmap("a") = 3 | ||
|
||
assert(buf == ListBuffer(2, 3), buf) | ||
|
||
} |
55 changes: 55 additions & 0 deletions
55
src/main/scala/org/learningconcurrency/exercises/ch6/Ex7.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package org.learningconcurrency | ||
package exercises | ||
package ch6 | ||
|
||
/** | ||
* Implement the reactive priority queue, represented with the RPriorityQueue class: | ||
* class RPriorityQueue[T] { | ||
* def add(x: T): Unit = ??? | ||
* def pop(): T = ??? | ||
* def popped: Observable[T] = ??? | ||
* } | ||
* | ||
* The reactive priority queue exposes the Observable object popped, | ||
* which emits events whenever the smallest element in the priority queue gets removed by calling pop. | ||
*/ | ||
|
||
import rx.lang.scala._ | ||
|
||
object Ex7 extends App { | ||
|
||
class RPriorityQueue[T](implicit val ord: Ordering[T]) { | ||
private[this] val pq = new scala.collection.mutable.PriorityQueue[T]()(ord.reverse) | ||
private[this] val subject = Subject[T]() | ||
|
||
def add(x: T): Unit = { | ||
pq += x | ||
} | ||
|
||
/* This method throws `NoSuchElementException` if the queue is empty. */ | ||
def pop(): T = { | ||
val x = pq.dequeue() | ||
subject.onNext(x) | ||
x | ||
} | ||
|
||
def popped: Observable[T] = subject | ||
} | ||
|
||
import scala.collection.mutable.ListBuffer | ||
|
||
val rqueue = new RPriorityQueue[Int]() | ||
rqueue.add(3) | ||
rqueue.add(1) | ||
rqueue.add(2) | ||
|
||
val o = rqueue.popped | ||
val buf = ListBuffer.empty[Int] | ||
o.subscribe(buf += _) | ||
|
||
assert(rqueue.pop() == 1) | ||
assert(rqueue.pop() == 2) | ||
assert(rqueue.pop() == 3) | ||
assert(buf == ListBuffer(1, 2, 3)) | ||
|
||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should not throw an exception if there is no such element.
Instead, if there is a subscription, the reactive map should create an observable X such that when somebody subscribes to X, a key with the subject is added to the map if it does not already exist, and the observable subscribes to that subject. If the subscription is canceled (
unsubscribe
), the subject is removed from the map, unless somebody stored an actual value to that key.This way, map does not grow indefinitely (no memory leaks), and the semantics do not require that there is actually a key in the map - i.e. one component can subscribes to the key, and wait until the key appears.