-
Notifications
You must be signed in to change notification settings - Fork 105
Answer to exercise 3-10 ch.2 #15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
868c806
cd25d0e
d8b4ce9
3bd27a4
84d99dd
4bc4908
85adcc7
d376e55
860f6b2
df42787
f802534
4f543fe
eade10f
9abbcd5
b4260d9
6374389
94cf9bb
fe25652
f4f07e9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
package org.learningconcurrency | ||
package exercises | ||
package ch2 | ||
|
||
import scala.collection.mutable | ||
|
||
object Ex10 extends App { | ||
|
||
class PriorityTaskPool(val p:Int, val important: Int) { | ||
|
||
implicit val ord: Ordering[(Int,() => Unit)] = Ordering.by(_._1) | ||
|
||
private val tasks = mutable.PriorityQueue[(Int,() => Unit)]() | ||
|
||
@volatile | ||
private var terminated = false | ||
|
||
def asynchronous(priority: Int)(task: => Unit):Unit = tasks synchronized { | ||
tasks.enqueue((priority,() => task)) | ||
tasks.notify | ||
} | ||
|
||
class Worker extends Thread { | ||
|
||
def poll() = tasks.synchronized { | ||
while (tasks.isEmpty) { | ||
tasks.wait() | ||
} | ||
log(s"queue: " + tasks.foldLeft("")((s,t)=>s"$s${t._1},")) | ||
tasks.dequeue() | ||
} | ||
|
||
override def run() = { | ||
while (true) { | ||
poll() match { | ||
case (p, task) if (p > important) || (!terminated) => task() | ||
case _ => | ||
} | ||
} | ||
} | ||
} | ||
|
||
def shutdown() = tasks.synchronized { | ||
terminated = true | ||
tasks.notify() | ||
} | ||
|
||
(1 to p).map((i) => new Worker()).map(_.start) | ||
|
||
} | ||
|
||
val tasks = new PriorityTaskPool(10, 300) | ||
|
||
(1 to 1000).foreach((i) => { | ||
val a = (Math.random*1000).toInt | ||
tasks.asynchronous(a)({log(s"<- $a")}) | ||
}) | ||
|
||
Thread.sleep(1) | ||
tasks.shutdown() | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
package org.learningconcurrency | ||
package exercises | ||
package ch2 | ||
|
||
object Ex3 extends App { | ||
|
||
class SyncVar[T] { | ||
|
||
private var empty:Boolean = true | ||
|
||
private var x:T = null.asInstanceOf[T] | ||
|
||
def get(): T = this.synchronized { | ||
if (empty) throw new Exception("must be non-empty") | ||
else { | ||
empty = true | ||
x | ||
} | ||
} | ||
|
||
def put(x: T):Unit = this.synchronized { | ||
if (!empty) throw new Exception("must be empty") | ||
else { | ||
empty = false | ||
this.x = x | ||
} | ||
} | ||
|
||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
package org.learningconcurrency | ||
package exercises | ||
package ch2 | ||
|
||
object Ex4 extends App { | ||
|
||
class SyncVar[T] { | ||
|
||
private var empty: Boolean = true | ||
|
||
private var x: T = null.asInstanceOf[T] | ||
|
||
def get(): T = this.synchronized { | ||
if (empty) throw new Exception("must be non-empty") | ||
else { | ||
empty = true | ||
x | ||
} | ||
} | ||
|
||
def put(x: T): Unit = this.synchronized { | ||
if (!empty) throw new Exception("must be empty") | ||
else { | ||
empty = false | ||
this.x = x | ||
} | ||
} | ||
|
||
def isEmpty = empty | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here you would need to use synchronization when checking the |
||
|
||
def nonEmpty = !empty | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
} | ||
|
||
import org.learningconcurrency.ch2.thread | ||
|
||
val lock = new AnyRef | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe it was not made explicit enough in the text, but the idea in this exercise was to have the producer transfer the elements to the consumer without using any synchronization primitives other than the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, the modification would be to remove the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @axel22 should I stop consumer thread after it prints last number equals 14 ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stopping the thread would be a reasonable thing to do, otherwise, the thread would be busy-waiting forever. The thread should be able to stop on its own after the condition in its loop is met. For example, the consumer should stop on its own when |
||
|
||
val syncVar = new SyncVar[Int] | ||
|
||
val producer = thread { | ||
var x = 0 | ||
while(x < 15) { | ||
|
||
lock.synchronized { | ||
if (syncVar.isEmpty) { | ||
syncVar.put(x) | ||
x = x + 1 | ||
} | ||
lock.wait | ||
} | ||
|
||
} | ||
} | ||
|
||
val consumer = thread { | ||
var x = -1 | ||
while(x < 14) { | ||
lock.synchronized { | ||
if (syncVar.nonEmpty) { | ||
x = syncVar.get | ||
log(s"get = $x") | ||
} | ||
lock.notify | ||
} | ||
} | ||
} | ||
|
||
producer.join | ||
consumer.join | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
package org.learningconcurrency | ||
package exercises | ||
package ch2 | ||
|
||
object Ex5 extends App { | ||
|
||
class SyncVar[T] { | ||
|
||
private var empty: Boolean = true | ||
|
||
private var x: T = null.asInstanceOf[T] | ||
|
||
def isEmpty = empty | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Access to |
||
|
||
def nonEmpty = !empty | ||
|
||
def getWait():T = this.synchronized { | ||
while (empty) | ||
this.wait | ||
|
||
empty = true | ||
this.notify | ||
x | ||
} | ||
|
||
def putWait(x: T): Unit = this.synchronized { | ||
while (!empty) | ||
this.wait() | ||
|
||
empty = false | ||
this.x = x | ||
this.notify() | ||
} | ||
|
||
|
||
} | ||
|
||
import org.learningconcurrency.ch2.thread | ||
|
||
val syncVar = new SyncVar[Int] | ||
|
||
val producer = thread { | ||
var x = 0 | ||
while(x < 15) { | ||
syncVar.putWait(x) | ||
x = x + 1 | ||
} | ||
} | ||
|
||
val consumer = thread { | ||
var x = -1 | ||
while(x < 14) { | ||
x = syncVar.getWait | ||
log(s"get: $x") | ||
} | ||
} | ||
|
||
producer.join | ||
consumer.join | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
package org.learningconcurrency | ||
package exercises | ||
package ch2 | ||
|
||
import scala.collection.mutable._ | ||
|
||
|
||
object Ex6 extends App { | ||
|
||
class SyncQueue[T](val n:Int) { | ||
|
||
private var qX = Queue[T]() | ||
|
||
def getWait():T = this.synchronized { | ||
while (qX.isEmpty) { | ||
this.notify | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did not run this, but it still seems to me that this |
||
this.wait | ||
} | ||
|
||
qX.dequeue | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems to me that the
If there are producer threads waiting to enqueue elements, then they will be blocked until some element is removed, to make more space. In the current solution, if:
then the producer becomes permanently blocked. |
||
} | ||
|
||
def putWait(x: T): Unit = this.synchronized { | ||
while (qX.length == n) | ||
this.wait() | ||
|
||
qX += x | ||
this.notify() | ||
} | ||
|
||
} | ||
|
||
import org.learningconcurrency.ch2.thread | ||
|
||
val syncVar = new SyncQueue[Int](10) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: it would be more consistent to rename this to |
||
|
||
val producer = thread { | ||
var x = 0 | ||
while(x < 15) { | ||
syncVar.putWait(x) | ||
x = x + 1 | ||
} | ||
} | ||
|
||
val consumer = thread { | ||
var x = -1 | ||
while(x < 14) { | ||
x = syncVar.getWait | ||
log(s"get: $x") | ||
} | ||
} | ||
|
||
producer.join | ||
consumer.join | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
package org.learningconcurrency | ||
package exercises | ||
package ch2 | ||
|
||
import org.learningconcurrency.ch2._ | ||
|
||
object Ex7 extends App { | ||
|
||
import SynchronizedProtectedUid._ | ||
|
||
class Account(val name: String, var money: Int) { | ||
val uid = getUniqueId() | ||
} | ||
|
||
def send(a1: Account, a2: Account, n: Int) { | ||
def adjust() { | ||
a1.money -= n | ||
a2.money += n | ||
} | ||
|
||
a1.synchronized { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like this could result in a deadlock if two threads T1 and T2 invoke The |
||
a2.synchronized { | ||
adjust() | ||
} | ||
} | ||
|
||
} | ||
|
||
def sendAll(accounts: Set[Account], target: Account):Unit = { | ||
accounts.map((a) => thread { | ||
send(a,target,a.money) | ||
} ).foreach(_.join) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are three separate things that we should fix in this code - I'll put them in separate comments. First, this could be an eventually consistent implementation of the
This means that it will notice all the intermediate account states on the If you now want to add some validation to the different accounts in the In distributed computing, we sometimes have to live with this weaker consistency model to ensure better scalability.
This means that you should not start multiple threads to do the transfer - the transfer should happen on the caller thread, while all the accounts are simultaneously locked out. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Second, calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Last, although there are multiple threads, this code does not prevent a deadlock. |
||
|
||
//accounts.foreach((a) => send(a,target,a.money)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should remove this comment to keep things clean. |
||
} | ||
|
||
val accounts = (1 to 100).map((i) => new Account(s"Account: $i",i*10)).toSet | ||
val target = new Account("Target account", 0) | ||
|
||
sendAll(accounts,target) | ||
|
||
accounts.foreach((a) => log(s"${a.name}, money = ${a.money}")) | ||
log(s"${target.name} - money = ${target.money}") | ||
|
||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
package org.learningconcurrency | ||
package exercises | ||
package ch2 | ||
|
||
|
||
import scala.collection.mutable | ||
|
||
object Ex8 extends App { | ||
|
||
class PriorityTaskPool { | ||
|
||
implicit val ord: Ordering[(Int,() => Unit)] = Ordering.by(_._1) | ||
|
||
private val tasks = mutable.PriorityQueue[(Int,() => Unit)]() | ||
|
||
def asynchronous(priority: Int)(task: => Unit):Unit = tasks synchronized { | ||
tasks.enqueue((priority,() => task)) | ||
tasks.notify | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: usual convention in Scala is to put parenthesis after a 0-arg side-effecting method:
|
||
} | ||
|
||
|
||
object Worker extends Thread { | ||
|
||
setDaemon(true) | ||
|
||
def poll() = tasks.synchronized { | ||
while (tasks.isEmpty) { | ||
tasks.wait() | ||
} | ||
log("queue: " + tasks.foldLeft("")((s,t)=>s"$s${t._1},")) | ||
tasks.dequeue() | ||
} | ||
|
||
override def run() = { | ||
while (true) { | ||
poll() match { | ||
case (_, task) => task() | ||
} | ||
} | ||
} | ||
} | ||
|
||
Worker.start() | ||
|
||
} | ||
|
||
val tasks = new PriorityTaskPool | ||
|
||
(1 to 10).foreach((i) => { | ||
val a = (Math.random*1000).toInt | ||
tasks.asynchronous(a)({log(s"<- $a")}) | ||
}) | ||
|
||
Thread.sleep(10000) | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: here you might want to set
x
back tonull
, to avoid a potential memory leak.