-
Notifications
You must be signed in to change notification settings - Fork 105
add answers to exercise 1-4 chapter 6 #24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
axel22
merged 6 commits into
concurrent-programming-in-scala:master
from
ryblovAV:chapter6
Jan 11, 2016
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
097906a
add answers to exercise 1-4 chapter 6
ryblovAV 33279a0
added detection when any thread was started
ryblovAV f8405fa
grouped import statements together, made 2 spaces line indentation
ryblovAV 08f5c4f
grouped import statements together
ryblovAV cbbc39e
added tail instead filter
ryblovAV a11d35d
fixed error (the resulting signal will not emit mapped events after c…
ryblovAV File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
79 changes: 79 additions & 0 deletions
79
src/main/scala/org/learningconcurrency/exercises/ch6/Ex1.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
package org.learningconcurrency | ||
package exercises | ||
package ch6 | ||
|
||
/** | ||
* Implement a custom Observable[Thread] object that emits an event when it detects that a thread was started. | ||
* The implementation is allowed to miss some of the events. | ||
*/ | ||
|
||
import java.util.Calendar | ||
|
||
import rx.lang.scala.Observable | ||
|
||
import scala.annotation.tailrec | ||
import scala.concurrent.duration._ | ||
|
||
object Ex1 extends App { | ||
|
||
val rootThreadGroup = getRootThread(Thread.currentThread.getThreadGroup) | ||
|
||
var existsThreads = Set.empty[Thread] | ||
|
||
@tailrec | ||
def getRootThread(t: ThreadGroup):ThreadGroup = { | ||
val parent = t.getParent | ||
if (parent == null) t else getRootThread(parent) | ||
} | ||
|
||
def getCurrentThreads = { | ||
val threads = new Array[Thread](rootThreadGroup.activeCount()) | ||
rootThreadGroup.enumerate(threads,true) | ||
|
||
threads.filter(_ != null) | ||
} | ||
|
||
def getNewThreads = { | ||
val currentThreads = getCurrentThreads | ||
val newThreads = currentThreads.filter(!existsThreads.contains(_)) | ||
|
||
//save threads | ||
existsThreads = currentThreads.toSet | ||
|
||
newThreads | ||
} | ||
|
||
def createObservableNewThreads: Observable[Thread] = { | ||
Observable[Thread] { | ||
(s) => { | ||
getNewThreads.foreach(s.onNext _) | ||
} | ||
} | ||
} | ||
|
||
//create Observable | ||
val o = for { | ||
_ <- Observable.interval(1 seconds) | ||
j <- createObservableNewThreads | ||
} yield j | ||
|
||
o.subscribe((t) => log(s"${Calendar.getInstance().getTime()}: ${t.toString}")) | ||
|
||
//test | ||
|
||
def createTestThread(name:String): Unit = { | ||
val t = new Thread(name) { | ||
override def run(): Unit = { | ||
Thread.sleep(5000) | ||
} | ||
} | ||
t.start() | ||
} | ||
|
||
Thread.sleep(2000) | ||
createTestThread("A") | ||
Thread.sleep(3000) | ||
createTestThread("B") | ||
|
||
Thread.sleep(10000) | ||
} |
33 changes: 33 additions & 0 deletions
33
src/main/scala/org/learningconcurrency/exercises/ch6/Ex2.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
package org.learningconcurrency | ||
package exercises | ||
package ch6 | ||
|
||
/** | ||
* Implement an Observable object that emits an event every 5 seconds and every 12 seconds, | ||
* but not if the elapsed time is a multiple of 30 seconds. | ||
* Use functional combinators on Observable objects. | ||
*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we group |
||
|
||
import rx.lang.scala.Observable | ||
import scala.concurrent.duration._ | ||
|
||
object Ex2A extends App { | ||
|
||
val a = Observable.interval(5 seconds).map(_ * 5) | ||
val b = Observable.interval(12 seconds).map(_ * 12) | ||
|
||
val c = (a merge b distinct) filter (_ % 30 != 0) | ||
|
||
c.subscribe((s) => log(s.toString)) | ||
|
||
Thread.sleep(70000) | ||
} | ||
|
||
object Ex2B extends App { | ||
|
||
val d = Observable.interval(1 seconds).filter((l) => (l % 30 != 0) && ((l % 5 == 0) || (l % 12 == 0))) | ||
d.subscribe((s) => log(s.toString)) | ||
|
||
Thread.sleep(70000) | ||
|
||
} |
38 changes: 38 additions & 0 deletions
38
src/main/scala/org/learningconcurrency/exercises/ch6/Ex3.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package org.learningconcurrency | ||
package exercises | ||
package ch6 | ||
|
||
import rx.lang.scala._ | ||
|
||
import scala.annotation.tailrec | ||
import scala.util.Random | ||
import scala.concurrent.duration._ | ||
|
||
/** | ||
* Use the randomQuote method from this section in order to create an Observable object | ||
* with the moving average of the quote lengths. | ||
* Each time a new quote arrives, a new average value should be emitted. | ||
*/ | ||
object Ex3 extends App { | ||
|
||
|
||
@tailrec | ||
def randomString(length: Int, l: List[Char] = List.empty[Char]):List[Char] = { | ||
if (length == 1) util.Random.nextPrintableChar :: l | ||
else randomString(length-1,util.Random.nextPrintableChar :: l) | ||
} | ||
|
||
def randomQuoteMock = Observable.interval(1 seconds).map((l) => randomString(Random.nextInt(10)+1)) | ||
|
||
randomQuoteMock.scan((0D,0)) { | ||
(n, q) => n match { | ||
case (s, c) => (s + q.length, c + 1) | ||
} | ||
} | ||
.tail | ||
.map((e) => e._1 / e._2) | ||
.subscribe((e) => log(s"avg = $e")) | ||
|
||
Thread.sleep(10000) | ||
|
||
} |
90 changes: 90 additions & 0 deletions
90
src/main/scala/org/learningconcurrency/exercises/ch6/Ex4.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
package org.learningconcurrency | ||
package exercises | ||
package ch6 | ||
|
||
/** | ||
Implement the reactive signal abstraction, represented with the Signal[T] type. | ||
|
||
The Signal[T] type comes with the method apply, used to query the last event emitted by this signal, | ||
and several combinators with the same semantics as the corresponding Observable methods: | ||
|
||
class Signal[T] { | ||
def apply(): T = ??? | ||
def map(f: T => S): Signal[S] = ??? | ||
def zip[S](that: Signal[S]): Signal[(T, S)] = ??? | ||
def scan[S](z: S)(f: (S, T) => S) = ??? | ||
} | ||
|
||
Then, add the method toSignal to the Observable[T] type, which converts | ||
an Observable object to a reactive signal: def toSignal: Signal[T] = ??? | ||
|
||
Consider using Rx subjects for this task. | ||
*/ | ||
|
||
import rx.lang.scala._ | ||
|
||
object Ex4 extends App { | ||
|
||
implicit class ObserverableAdditional[T](val self:Observable[T]) extends AnyVal { | ||
|
||
def toSignal:Signal[T] = { | ||
new Signal[T](self) | ||
} | ||
|
||
} | ||
|
||
class Signal[T] { | ||
|
||
var lastEvent:T = _ | ||
|
||
var observable: Observable[T] = _ | ||
|
||
val subject = Subject[T]() | ||
subject.subscribe(lastEvent = _) | ||
|
||
def this(observable: Observable[T]) = { | ||
this() | ||
|
||
this.observable = observable.last | ||
this.observable.subscribe(subject) | ||
} | ||
|
||
def apply(): T = lastEvent | ||
|
||
def map[S](f: T => S): Signal[S] = | ||
this.observable.map(f).toSignal | ||
|
||
def zip[S](that: Signal[S]): Signal[(T, S)] = | ||
this.observable.zip(that.observable).toSignal | ||
|
||
def scan[S](z: S)(f: (S, T) => S):Signal[S] = | ||
this.observable.scan(z)(f).toSignal | ||
|
||
} | ||
|
||
//test | ||
def test = { | ||
|
||
val o = Observable.from(List(1,2,3,4,5)) | ||
val o2 = Observable.from(List("A","B","C","D","E")) | ||
|
||
val s1 = o.toSignal | ||
val s2 =o2.toSignal | ||
|
||
log(s"s1: element = ${s1()}") | ||
log(s"s2: element = ${s2()}") | ||
|
||
val sMap = s1.map(_+"~") | ||
log(s"sMap: element = ${sMap()}") | ||
|
||
val sZip = s1.zip(s2) | ||
log(s"sZip: element = ${sZip()}") | ||
|
||
val sScan = s1.scan(2)((s,t)=>s*t) | ||
log(s"sScan: element = ${sScan()}") | ||
} | ||
|
||
test | ||
Thread.sleep(5000) | ||
|
||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the problem statement is not super-precise in the book, but the intention was to detect when any thread whatsoever in that JVM process was started, not just one specific thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to expand on my previous comment a bit:
Note that the problem statement says that you can miss some of the events. This is because JVM does not have a callback API for creation and starting of new threads. It means that you will have to periodically poll to find out which threads that exist in the runtime, and emit events when you observe differences.