Skip to content

add answers to exercise 1-4 chapter 6 #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch6/Ex1.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package org.learningconcurrency
package exercises
package ch6

/**
* Implement a custom Observable[Thread] object that emits an event when it detects that a thread was started.
* The implementation is allowed to miss some of the events.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the problem statement is not super-precise in the book, but the intention was to detect when any thread whatsoever in that JVM process was started, not just one specific thread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to expand on my previous comment a bit:

Note that the problem statement says that you can miss some of the events. This is because JVM does not have a callback API for creation and starting of new threads. It means that you will have to periodically poll to find out which threads that exist in the runtime, and emit events when you observe differences.

*/

import java.util.Calendar

import rx.lang.scala.Observable

import scala.annotation.tailrec
import scala.concurrent.duration._

object Ex1 extends App {

val rootThreadGroup = getRootThread(Thread.currentThread.getThreadGroup)

var existsThreads = Set.empty[Thread]

@tailrec
def getRootThread(t: ThreadGroup):ThreadGroup = {
val parent = t.getParent
if (parent == null) t else getRootThread(parent)
}

def getCurrentThreads = {
val threads = new Array[Thread](rootThreadGroup.activeCount())
rootThreadGroup.enumerate(threads,true)

threads.filter(_ != null)
}

def getNewThreads = {
val currentThreads = getCurrentThreads
val newThreads = currentThreads.filter(!existsThreads.contains(_))

//save threads
existsThreads = currentThreads.toSet

newThreads
}

def createObservableNewThreads: Observable[Thread] = {
Observable[Thread] {
(s) => {
getNewThreads.foreach(s.onNext _)
}
}
}

//create Observable
val o = for {
_ <- Observable.interval(1 seconds)
j <- createObservableNewThreads
} yield j

o.subscribe((t) => log(s"${Calendar.getInstance().getTime()}: ${t.toString}"))

//test

def createTestThread(name:String): Unit = {
val t = new Thread(name) {
override def run(): Unit = {
Thread.sleep(5000)
}
}
t.start()
}

Thread.sleep(2000)
createTestThread("A")
Thread.sleep(3000)
createTestThread("B")

Thread.sleep(10000)
}
33 changes: 33 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch6/Ex2.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.learningconcurrency
package exercises
package ch6

/**
* Implement an Observable object that emits an event every 5 seconds and every 12 seconds,
* but not if the elapsed time is a multiple of 30 seconds.
* Use functional combinators on Observable objects.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we group import statements together?


import rx.lang.scala.Observable
import scala.concurrent.duration._

object Ex2A extends App {

val a = Observable.interval(5 seconds).map(_ * 5)
val b = Observable.interval(12 seconds).map(_ * 12)

val c = (a merge b distinct) filter (_ % 30 != 0)

c.subscribe((s) => log(s.toString))

Thread.sleep(70000)
}

object Ex2B extends App {

val d = Observable.interval(1 seconds).filter((l) => (l % 30 != 0) && ((l % 5 == 0) || (l % 12 == 0)))
d.subscribe((s) => log(s.toString))

Thread.sleep(70000)

}
38 changes: 38 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch6/Ex3.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.learningconcurrency
package exercises
package ch6

import rx.lang.scala._

import scala.annotation.tailrec
import scala.util.Random
import scala.concurrent.duration._

/**
* Use the randomQuote method from this section in order to create an Observable object
* with the moving average of the quote lengths.
* Each time a new quote arrives, a new average value should be emitted.
*/
object Ex3 extends App {


@tailrec
def randomString(length: Int, l: List[Char] = List.empty[Char]):List[Char] = {
if (length == 1) util.Random.nextPrintableChar :: l
else randomString(length-1,util.Random.nextPrintableChar :: l)
}

def randomQuoteMock = Observable.interval(1 seconds).map((l) => randomString(Random.nextInt(10)+1))

randomQuoteMock.scan((0D,0)) {
(n, q) => n match {
case (s, c) => (s + q.length, c + 1)
}
}
.tail
.map((e) => e._1 / e._2)
.subscribe((e) => log(s"avg = $e"))

Thread.sleep(10000)

}
90 changes: 90 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch6/Ex4.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package org.learningconcurrency
package exercises
package ch6

/**
Implement the reactive signal abstraction, represented with the Signal[T] type.

The Signal[T] type comes with the method apply, used to query the last event emitted by this signal,
and several combinators with the same semantics as the corresponding Observable methods:

class Signal[T] {
def apply(): T = ???
def map(f: T => S): Signal[S] = ???
def zip[S](that: Signal[S]): Signal[(T, S)] = ???
def scan[S](z: S)(f: (S, T) => S) = ???
}

Then, add the method toSignal to the Observable[T] type, which converts
an Observable object to a reactive signal: def toSignal: Signal[T] = ???

Consider using Rx subjects for this task.
*/

import rx.lang.scala._

object Ex4 extends App {

implicit class ObserverableAdditional[T](val self:Observable[T]) extends AnyVal {

def toSignal:Signal[T] = {
new Signal[T](self)
}

}

class Signal[T] {

var lastEvent:T = _

var observable: Observable[T] = _

val subject = Subject[T]()
subject.subscribe(lastEvent = _)

def this(observable: Observable[T]) = {
this()

this.observable = observable.last
this.observable.subscribe(subject)
}

def apply(): T = lastEvent

def map[S](f: T => S): Signal[S] =
this.observable.map(f).toSignal

def zip[S](that: Signal[S]): Signal[(T, S)] =
this.observable.zip(that.observable).toSignal

def scan[S](z: S)(f: (S, T) => S):Signal[S] =
this.observable.scan(z)(f).toSignal

}

//test
def test = {

val o = Observable.from(List(1,2,3,4,5))
val o2 = Observable.from(List("A","B","C","D","E"))

val s1 = o.toSignal
val s2 =o2.toSignal

log(s"s1: element = ${s1()}")
log(s"s2: element = ${s2()}")

val sMap = s1.map(_+"~")
log(s"sMap: element = ${sMap()}")

val sZip = s1.zip(s2)
log(s"sZip: element = ${sZip()}")

val sScan = s1.scan(2)((s,t)=>s*t)
log(s"sScan: element = ${sScan()}")
}

test
Thread.sleep(5000)

}