Skip to content

Answer to exercise 3-10 ch.2 #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Aug 19, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch2/Ex10.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.learningconcurrency
package exercises
package ch2

import scala.collection.mutable

object Ex10 extends App {

class PriorityTaskPool(val p:Int, val important: Int) {

implicit val ord: Ordering[(Int,() => Unit)] = Ordering.by(_._1)

private val tasks = mutable.PriorityQueue[(Int,() => Unit)]()

@volatile
private var terminated = false

def asynchronous(priority: Int)(task: => Unit):Unit = tasks synchronized {
tasks.enqueue((priority,() => task))
tasks.notify()
}

class Worker extends Thread {

def poll() = tasks.synchronized {
while (tasks.isEmpty) {
tasks.wait()
}
log(s"queue: " + tasks.foldLeft("")((s,t)=>s"$s${t._1},"))
tasks.dequeue()
}

override def run() = {
while (true) {
poll() match {
case (p, task) if (p > important) || (!terminated) => task()
case _ =>
}
}
}
}

def shutdown() = tasks.synchronized {
terminated = true
tasks.notify()
}

(1 to p).map((i) => new Worker()).map(_.start)

}

val tasks = new PriorityTaskPool(10, 300)

(1 to 1000).foreach((i) => {
val a = (Math.random*1000).toInt
tasks.asynchronous(a)({log(s"<- $a")})
})

Thread.sleep(1)
tasks.shutdown()

}
33 changes: 33 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch2/ex3.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.learningconcurrency
package exercises
package ch2

object Ex3 extends App {

class SyncVar[T] {

private var empty:Boolean = true

private var x:T = null.asInstanceOf[T]

def get(): T = this.synchronized {
if (empty) throw new Exception("must be non-empty")
else {
empty = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: here you might want to set x back to null, to avoid a potential memory leak.

val v = x
x = null.asInstanceOf[T]
v
}
}

def put(x: T):Unit = this.synchronized {
if (!empty) throw new Exception("must be empty")
else {
empty = false
this.x = x
}
}

}

}
69 changes: 69 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch2/ex4.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package org.learningconcurrency
package exercises
package ch2

object Ex4 extends App {

class SyncVar[T] {

private var empty: Boolean = true

private var x: T = null.asInstanceOf[T]

def get(): T = this.synchronized {
if (empty) throw new Exception("must be non-empty")
else {
empty = true
x
}
}

def put(x: T): Unit = this.synchronized {
if (!empty) throw new Exception("must be empty")
else {
empty = false
this.x = x
}
}

def isEmpty = synchronized {
empty
}

def nonEmpty = synchronized {
!empty
}

}

import org.learningconcurrency.ch2.thread

val syncVar = new SyncVar[Int]

val producer = thread {
var x = 0
while (x < 15) {
if (syncVar.isEmpty) {
syncVar.put(x)
x = x + 1
}

}
}

val consumer = thread {
var x = 0
while (x != 15) {
if (syncVar.nonEmpty) {
log(s"get = ${syncVar.get}")
x = x + 1
}
}
}

producer.join()
consumer.join()



}
65 changes: 65 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch2/ex5.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.learningconcurrency
package exercises
package ch2

object Ex5 extends App {

class SyncVar[T] {

private var empty: Boolean = true

private var x: T = null.asInstanceOf[T]

def isEmpty = synchronized {
empty
}

def nonEmpty = synchronized {
!empty
}

def getWait():T = this.synchronized {
while (empty)
this.wait

empty = true
this.notify
x
}

def putWait(x: T): Unit = this.synchronized {
while (!empty)
this.wait()

empty = false
this.x = x
this.notify()
}


}

import org.learningconcurrency.ch2.thread

val syncVar = new SyncVar[Int]

val producer = thread {
var x = 0
while(x < 15) {
syncVar.putWait(x)
x = x + 1
}
}

val consumer = thread {
var x = -1
while(x < 14) {
x = syncVar.getWait
log(s"get: $x")
}
}

producer.join()
consumer.join()

}
58 changes: 58 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch2/ex6.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.learningconcurrency
package exercises
package ch2

import scala.collection.mutable._


object Ex6 extends App {

class SyncQueue[T](val n:Int) {

private var syncQueue = Queue[T]()

def getWait():T = this.synchronized {
while (syncQueue.isEmpty) {
this.notify
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not run this, but it still seems to me that this notify is not necessary.
The notify should occur only when the state of the queue changes in such a way that it could allow some other thread to progress. Here, the isEmpty does not change the state of the queue in any way, so whichever thread waited as a result of some condition, will see the queue in the same state after it wakes up.

this.wait
}

val x = syncQueue.dequeue
this.notify()
x
}

def putWait(x: T): Unit = this.synchronized {
while (syncQueue.length == n)
this.wait()

syncQueue += x
this.notify()
}

}

import org.learningconcurrency.ch2.thread

val syncVar = new SyncQueue[Int](10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it would be more consistent to rename this to syncQueue.


val producer = thread {
var x = 0
while(x < 15) {
syncVar.putWait(x)
x = x + 1
}
}

val consumer = thread {
var x = -1
while(x < 14) {
x = syncVar.getWait
log(s"get: $x")
}
}

producer.join()
consumer.join()

}
66 changes: 66 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch2/ex7.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.learningconcurrency
package exercises
package ch2

import org.learningconcurrency.ch2._

object Ex7 extends App {

import SynchronizedProtectedUid._

class Account(val name: String, var money: Int) {
val uid = getUniqueId()
}

def send(a1: Account, a2: Account, n: Int) {
def adjust() {
a1.money -= n
a2.money += n
}

if (a1.uid < a2.uid) {
a1.synchronized {
a2.synchronized {
adjust()
}
}
} else {
a2.synchronized {
a1.synchronized {
adjust()
}
}
}
}


def sendAll(accounts: Set[Account], target: Account): Unit = {

def adjust() = {
target.money = accounts.foldLeft(0)((s, a) => {
val money = a.money
a.money = 0
s + money
}
)
}

def sendAllWithSynchronize(la: List[Account]): Unit = la match {
case h :: t => h synchronized {
sendAllWithSynchronize(t)
}
case _ => adjust()
}

sendAllWithSynchronize((target :: accounts.toList).sortBy(_.uid))
}

val accounts = (1 to 100).map((i) => new Account(s"Account: $i",i*10)).toSet
val target = new Account("Target account", 0)

sendAll(accounts,target)

accounts.foreach((a) => log(s"${a.name}, money = ${a.money}"))
log(s"${target.name} - money = ${target.money}")

}
Loading