Skip to content

Answer to exercise 3-10 ch.2 #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Aug 19, 2015
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch2/Ex10.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.learningconcurrency
package exercises
package ch2

import scala.collection.mutable

object Ex10 extends App {

class PriorityTaskPool(val p:Int, val important: Int) {

implicit val ord: Ordering[(Int,() => Unit)] = Ordering.by(_._1)

private val tasks = mutable.PriorityQueue[(Int,() => Unit)]()

@volatile
private var terminated = false

def asynchronous(priority: Int)(task: => Unit):Unit = tasks synchronized {
tasks.enqueue((priority,() => task))
tasks.notify
}

class Worker extends Thread {

def poll() = tasks.synchronized {
while (tasks.isEmpty) {
tasks.wait()
}
log(s"queue: " + tasks.foldLeft("")((s,t)=>s"$s${t._1},"))
tasks.dequeue()
}

override def run() = {
while (true) {
poll() match {
case (p, task) if (p > important) || (!terminated) => task()
case _ =>
}
}
}
}

def shutdown() = tasks.synchronized {
terminated = true
tasks.notify()
}

(1 to p).map((i) => new Worker()).map(_.start)

}

val tasks = new PriorityTaskPool(10, 300)

(1 to 1000).foreach((i) => {
val a = (Math.random*1000).toInt
tasks.asynchronous(a)({log(s"<- $a")})
})

Thread.sleep(1)
tasks.shutdown()

}
31 changes: 31 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch2/ex3.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.learningconcurrency
package exercises
package ch2

object Ex3 extends App {

class SyncVar[T] {

private var empty:Boolean = true

private var x:T = null.asInstanceOf[T]

def get(): T = this.synchronized {
if (empty) throw new Exception("must be non-empty")
else {
empty = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: here you might want to set x back to null, to avoid a potential memory leak.

x
}
}

def put(x: T):Unit = this.synchronized {
if (!empty) throw new Exception("must be empty")
else {
empty = false
this.x = x
}
}

}

}
72 changes: 72 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch2/ex4.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.learningconcurrency
package exercises
package ch2

object Ex4 extends App {

class SyncVar[T] {

private var empty: Boolean = true

private var x: T = null.asInstanceOf[T]

def get(): T = this.synchronized {
if (empty) throw new Exception("must be non-empty")
else {
empty = true
x
}
}

def put(x: T): Unit = this.synchronized {
if (!empty) throw new Exception("must be empty")
else {
empty = false
this.x = x
}
}

def isEmpty = empty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you would need to use synchronization when checking the empty field. Otherwise, a consumer risks reading false after the producer sets empty to false and before the field x was actually updated, in the put method, even if empty is made volatile.
If it is not made volatile, there is a data race, which could be even more harmful and unpredictable.


def nonEmpty = !empty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nonEmpty should also use the synchronized statement here.


}

import org.learningconcurrency.ch2.thread

val lock = new AnyRef
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it was not made explicit enough in the text, but the idea in this exercise was to have the producer transfer the elements to the consumer without using any synchronization primitives other than the SyncVar.
The only way to do that in Exercise 4 is through busy waiting with the isEmpty and nonEmpty methods.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the modification would be to remove the lock primitive, and instead modify the while loops to call isEmpty and nonEmpty methods along with checking if x < 15.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@axel22 should I stop consumer thread after it prints last number equals 14 ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stopping the thread would be a reasonable thing to do, otherwise, the thread would be busy-waiting forever.

The thread should be able to stop on its own after the condition in its loop is met. For example, the consumer should stop on its own when x == 15.


val syncVar = new SyncVar[Int]

val producer = thread {
var x = 0
while(x < 15) {

lock.synchronized {
if (syncVar.isEmpty) {
syncVar.put(x)
x = x + 1
}
lock.wait
}

}
}

val consumer = thread {
var x = -1
while(x < 14) {
lock.synchronized {
if (syncVar.nonEmpty) {
x = syncVar.get
log(s"get = $x")
}
lock.notify
}
}
}

producer.join
consumer.join

}
61 changes: 61 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch2/ex5.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package org.learningconcurrency
package exercises
package ch2

object Ex5 extends App {

class SyncVar[T] {

private var empty: Boolean = true

private var x: T = null.asInstanceOf[T]

def isEmpty = empty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access to empty should here too require synchronized.


def nonEmpty = !empty

def getWait():T = this.synchronized {
while (empty)
this.wait

empty = true
this.notify
x
}

def putWait(x: T): Unit = this.synchronized {
while (!empty)
this.wait()

empty = false
this.x = x
this.notify()
}


}

import org.learningconcurrency.ch2.thread

val syncVar = new SyncVar[Int]

val producer = thread {
var x = 0
while(x < 15) {
syncVar.putWait(x)
x = x + 1
}
}

val consumer = thread {
var x = -1
while(x < 14) {
x = syncVar.getWait
log(s"get: $x")
}
}

producer.join
consumer.join

}
56 changes: 56 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch2/ex6.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.learningconcurrency
package exercises
package ch2

import scala.collection.mutable._


object Ex6 extends App {

class SyncQueue[T](val n:Int) {

private var qX = Queue[T]()

def getWait():T = this.synchronized {
while (qX.isEmpty) {
this.notify
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not run this, but it still seems to me that this notify is not necessary.
The notify should occur only when the state of the queue changes in such a way that it could allow some other thread to progress. Here, the isEmpty does not change the state of the queue in any way, so whichever thread waited as a result of some condition, will see the queue in the same state after it wakes up.

this.wait
}

qX.dequeue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that the notify call should happen immediately after you dequeue an element:

val x = qX.dequeue()
this.notify()
x

If there are producer threads waiting to enqueue elements, then they will be blocked until some element is removed, to make more space.

In the current solution, if:

  1. the queue becomes full and the producer waits until the queue becomes empty.
  2. the consumer then empties the queue, but never calls getWait when the queue is actually empty.

then the producer becomes permanently blocked.

}

def putWait(x: T): Unit = this.synchronized {
while (qX.length == n)
this.wait()

qX += x
this.notify()
}

}

import org.learningconcurrency.ch2.thread

val syncVar = new SyncQueue[Int](10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it would be more consistent to rename this to syncQueue.


val producer = thread {
var x = 0
while(x < 15) {
syncVar.putWait(x)
x = x + 1
}
}

val consumer = thread {
var x = -1
while(x < 14) {
x = syncVar.getWait
log(s"get: $x")
}
}

producer.join
consumer.join

}
46 changes: 46 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch2/ex7.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.learningconcurrency
package exercises
package ch2

import org.learningconcurrency.ch2._

object Ex7 extends App {

import SynchronizedProtectedUid._

class Account(val name: String, var money: Int) {
val uid = getUniqueId()
}

def send(a1: Account, a2: Account, n: Int) {
def adjust() {
a1.money -= n
a2.money += n
}

a1.synchronized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this could result in a deadlock if two threads T1 and T2 invoke send at the same time with the order of their arguments reversed.

The send method should instead use the uid in the Account object to decide which account to lock out first (see page 46 in the book).

a2.synchronized {
adjust()
}
}

}

def sendAll(accounts: Set[Account], target: Account):Unit = {
accounts.map((a) => thread {
send(a,target,a.money)
} ).foreach(_.join)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are three separate things that we should fix in this code - I'll put them in separate comments.

First, this could be an eventually consistent implementation of the sendAll method.
Eventually, after all threads complete, the target account will have the amount of money on it that corresponds to the amount deducted from all other accounts in the accounts set.
For example, if some other thread now repetitively checks the amount on the target account, it will see:

target.amount: 0 -> 10 -> 20 -> 30 -> ... -> 90 -> 100

This means that it will notice all the intermediate account states on the target account, resulting from adding partial values from different accounts to target.
The change is not atomic.

If you now want to add some validation to the different accounts in the accounts set, for example, to check that each of those accounts has an amount greater than zero, it could happen that after your initial check that a.amount > 0 for each account, some other thread calls send on that account, and makes the amount negative.

In distributed computing, we sometimes have to live with this weaker consistency model to ensure better scalability.
However, in a shared-memory system, it is often possible to achieve good scalability and stronger consistency. We would really like the sendAll method to be atomic - to atomically, at once, from the viewpoint of any other thread, take all the money from those other accounts, and transfer it to the target account.

target.amount: 0 -> 100

This means that you should not start multiple threads to do the transfer - the transfer should happen on the caller thread, while all the accounts are simultaneously locked out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second, calling a.money when instantiating a separate thread, should be done in a synchronized block. Otherwise, there is a data race with another thread calling send or sendAll.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last, although there are multiple threads, this code does not prevent a deadlock.
The right solution would be to convert the accounts in the account set into a sequence collection, then sort them according to their uid, and then lock them in that order. This prevents the possibility of a deadlock (it is a generalization of the same trick applied to a pair of accounts, as in the send method).


//accounts.foreach((a) => send(a,target,a.money))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove this comment to keep things clean.

}

val accounts = (1 to 100).map((i) => new Account(s"Account: $i",i*10)).toSet
val target = new Account("Target account", 0)

sendAll(accounts,target)

accounts.foreach((a) => log(s"${a.name}, money = ${a.money}"))
log(s"${target.name} - money = ${target.money}")


}
56 changes: 56 additions & 0 deletions src/main/scala/org/learningconcurrency/exercises/ch2/ex8.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.learningconcurrency
package exercises
package ch2


import scala.collection.mutable

object Ex8 extends App {

class PriorityTaskPool {

implicit val ord: Ordering[(Int,() => Unit)] = Ordering.by(_._1)

private val tasks = mutable.PriorityQueue[(Int,() => Unit)]()

def asynchronous(priority: Int)(task: => Unit):Unit = tasks synchronized {
tasks.enqueue((priority,() => task))
tasks.notify
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: usual convention in Scala is to put parenthesis after a 0-arg side-effecting method:

tasks.notify()

}


object Worker extends Thread {

setDaemon(true)

def poll() = tasks.synchronized {
while (tasks.isEmpty) {
tasks.wait()
}
log("queue: " + tasks.foldLeft("")((s,t)=>s"$s${t._1},"))
tasks.dequeue()
}

override def run() = {
while (true) {
poll() match {
case (_, task) => task()
}
}
}
}

Worker.start()

}

val tasks = new PriorityTaskPool

(1 to 10).foreach((i) => {
val a = (Math.random*1000).toInt
tasks.asynchronous(a)({log(s"<- $a")})
})

Thread.sleep(10000)

}
Loading