Skip to content

Commit 407a5cd

Browse files
committed
Merge pull request #30 from ryblovAV/chapter7
add answers to exercise 1-5 chapter 7
2 parents 8d5f4aa + 1f21dc8 commit 407a5cd

File tree

5 files changed

+373
-0
lines changed

5 files changed

+373
-0
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package org.learningconcurrency
2+
package exercises
3+
package ch7
4+
5+
/**
6+
* Implement the transactional pair abstraction, represented with the TPair class:
7+
*
8+
* class TPair[P, Q](pinit: P, qinit: Q) {
9+
* def first(implicit txn: InTxn): P = ???
10+
* def first_=(x: P)(implicit txn: InTxn): P = ???
11+
* def second(implicit txn: InTxn): Q = ???
12+
* def second_=(x: Q)(implicit txn: InTxn): Q = ???
13+
* def swap()(implicit e: P =:= Q, txn: InTxn): Unit = ???
14+
* }
15+
*
16+
* In addition to getters and setters for the two fields,
17+
* the transactional pair defines the swap method that swaps the fields,
18+
* and can only be called if the types P and Q are the same.
19+
*/
20+
object Ex1 extends App {
21+
22+
import scala.concurrent._
23+
import ExecutionContext.Implicits.global
24+
import scala.concurrent.stm._
25+
26+
class TPair[P, Q](pinit: P, qinit: Q) {
27+
28+
private val rFirst = Ref[P](pinit)
29+
private val rSecond = Ref[Q](qinit)
30+
31+
32+
def first(implicit txn: InTxn): P = rFirst.single()
33+
34+
def first_=(x: P)(implicit txn: InTxn) = rFirst.single.transform(old => x)
35+
36+
def second(implicit txn: InTxn): Q = rSecond.single()
37+
38+
def second_=(x: Q)(implicit txn: InTxn) = rSecond.single.transform(old => x)
39+
40+
def swap()(implicit e: P =:= Q, txn: InTxn): Unit = {
41+
val old = first
42+
first = second.asInstanceOf[P]
43+
second = e(old)
44+
}
45+
}
46+
47+
//test
48+
val p = new TPair[String,String]("first value","second value")
49+
50+
def swapOne = atomic { implicit txn =>
51+
p.swap
52+
53+
val vF = p.first
54+
val vS = p.second
55+
56+
Txn.afterCommit { _ =>
57+
assert(vS != vF)
58+
}
59+
}
60+
61+
(1 to 1001).map(_ => Future {
62+
swapOne
63+
})
64+
65+
Thread.sleep(2000)
66+
67+
atomic {implicit txn =>
68+
log(s"Result: first = '${p.first}' vSecond = '${p.second}'")
69+
}
70+
71+
72+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package org.learningconcurrency
2+
package exercises
3+
package ch7
4+
5+
/**
6+
* Use ScalaSTM to implement the mutable location abstraction from Haskell,
7+
* represented with the MVar class:
8+
*
9+
* class MVar[T] {
10+
* def put(x: T)(implicit txn: InTxn): Unit = ???
11+
* def take()(implicit txn: InTxn): T = ???
12+
* }
13+
*
14+
* An MVar object can be either full or empty.
15+
* Calling put on a full MVar object blocks until the MVar object becomes empty,
16+
* and adds an element.
17+
*
18+
* Similarly, calling take on an empty MVar object blocks until the MVar object becomes full,
19+
* and removes the element.
20+
*
21+
* Now, implement a method called swap, which takes two MVar objects and swaps their values:
22+
*
23+
* def swap[T](a: MVar[T], b: MVar[T])(implicit txn: InTxn) = ???
24+
*
25+
* Contrast the MVar class with the SyncVar class from Chapter 2,
26+
* Concurrency on the JVM and the Java Memory Model.
27+
* Is it possible to implement the swap method for SyncVar objects
28+
* without modifying the internal implementation of the SyncVar class?
29+
*
30+
*/
31+
object Ex2 extends App {
32+
33+
import java.util.concurrent.atomic.AtomicInteger
34+
import scala.concurrent._
35+
import ExecutionContext.Implicits.global
36+
import scala.concurrent.stm._
37+
38+
class MVar[T] {
39+
40+
private val rx = Ref[Option[T]](None)
41+
42+
def put(x: T)(implicit txn: InTxn): Unit = rx() match {
43+
case Some(_) => retry
44+
case None => rx() = Some(x)
45+
}
46+
47+
def take()(implicit txn: InTxn): T = rx() match {
48+
case Some(x) => {
49+
rx() = None
50+
x
51+
}
52+
case None => retry
53+
}
54+
}
55+
56+
def swap[T](a: MVar[T], b: MVar[T])(implicit txn: InTxn) = {
57+
val old = a.take
58+
a.put(b.take())
59+
b.put(old)
60+
}
61+
62+
//test
63+
val mVar = new MVar[Integer]
64+
65+
val l = 1 to 1001
66+
67+
l.map(
68+
i => Future {
69+
atomic {implicit txn =>
70+
mVar.put(i)
71+
}
72+
}
73+
)
74+
75+
val sum = new AtomicInteger(0)
76+
77+
l.map(
78+
i => Future {
79+
atomic {implicit txn =>
80+
val i = mVar.take
81+
Txn.afterCommit(_ => sum.addAndGet(i))
82+
}
83+
}
84+
)
85+
86+
Thread.sleep(5000)
87+
88+
if (l.sum != sum.get) log(s"Error !!!! ${l.sum} != $sum")
89+
90+
log(s"summ = ${sum.get}")
91+
92+
//test swap
93+
log("--- test swap ------------")
94+
95+
val mva = new MVar[String]
96+
val mvb = new MVar[String]
97+
atomic {implicit txn =>
98+
mva.put("a")
99+
mvb.put("b")
100+
}
101+
102+
l.map(i =>
103+
Future{
104+
atomic {implicit txn =>
105+
swap(mva, mvb)
106+
}
107+
}
108+
)
109+
110+
Thread.sleep(5000)
111+
112+
atomic {implicit txn =>
113+
val a = mva.take
114+
val b = mvb.take
115+
116+
Txn.afterCommit( _ => log(s"a= $a, b = $b"))
117+
}
118+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package org.learningconcurrency
2+
package exercises
3+
package ch7
4+
5+
/**
6+
* Implement the atomicRollbackCount method, which is used to track how many times
7+
* a transaction was rolled back before it completed successfully:
8+
*
9+
* def atomicRollbackCount[T](block: InTxn => T): (T, Int) = ???
10+
*/
11+
object Ex3 extends App {
12+
13+
import scala.concurrent.ExecutionContext
14+
import scala.concurrent.Future
15+
import scala.concurrent.stm._
16+
import ExecutionContext.Implicits.global
17+
import scala.util.Random
18+
19+
def atomicRollbackCount[T](block: InTxn => T): (T, Int) = {
20+
var cnt = 0
21+
atomic { implicit txn =>
22+
Txn.afterRollback(_ =>
23+
cnt += 1
24+
)
25+
(block(txn), cnt)
26+
}
27+
}
28+
29+
//test
30+
val r = Ref(10)
31+
32+
def block(txn: InTxn): Int = {
33+
var x: Int = r.get(txn)
34+
x = Random.nextInt(10000)
35+
Thread.sleep(10)
36+
r.set(x)(txn)
37+
x
38+
}
39+
40+
(1 to 100).map(i =>
41+
Future {
42+
atomicRollbackCount[Int](block) match {
43+
case (_, cnt) => log(s"Transaction: $i, retries = $cnt")
44+
case _ => log("???")
45+
}
46+
}
47+
)
48+
49+
Thread.sleep(3000)
50+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package org.learningconcurrency
2+
package exercises
3+
package ch7
4+
5+
/**
6+
* Implement the atomicWithRetryMax method,
7+
* which is used to start a transaction that can be retried at most n times:
8+
*
9+
* def atomicWithRetryMax[T](n: Int)(block: InTxn => T): T = ???
10+
*
11+
* Reaching the maximum number of retries throws an exception.
12+
*/
13+
object Ex4 extends App {
14+
15+
import scala.concurrent._
16+
import ExecutionContext.Implicits.global
17+
import scala.concurrent.stm._
18+
import scala.util.Random
19+
20+
case class ReachMaxNumberException(cntRetries: Int) extends Exception
21+
22+
def atomicWithRetryMax[T](n: Int)(block: InTxn => T): T = {
23+
24+
var cntRetries = 0
25+
26+
atomic{ implicit txn =>
27+
Txn.afterRollback(_ => cntRetries += 1)
28+
29+
if (cntRetries > n) {
30+
throw ReachMaxNumberException(cntRetries)
31+
}
32+
33+
block(txn)
34+
}
35+
}
36+
37+
//test
38+
val r = Ref(10)
39+
40+
def block(txn: InTxn): Int = {
41+
var x: Int = r.get(txn)
42+
Thread.sleep(10)
43+
x += Random.nextInt(100)
44+
r.set(x)(txn)
45+
x
46+
}
47+
48+
(1 to 100).map(i =>
49+
Future {
50+
try {
51+
atomicWithRetryMax[Int](3)(block)
52+
log(s"Transaction: $i - ok")
53+
} catch {
54+
case ReachMaxNumberException(cntRetries) => log(s"Transaction: $i (retries = $cntRetries)")
55+
}
56+
}
57+
)
58+
59+
Thread.sleep(3000)
60+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package org.learningconcurrency
2+
package exercises
3+
package ch7
4+
5+
/**
6+
* Implement a transactional First In First Out (FIFO) queue,
7+
* represented with the TQueue class:
8+
*
9+
* class TQueue[T] {
10+
* def enqueue(x: T)(implicit txn: InTxn): Unit = ???
11+
* def dequeue()(implicit txn: InTxn): T = ???
12+
* }
13+
*
14+
* The TQueue class has similar semantics as scala.collection.mutable. Queue,
15+
* but calling dequeue on an empty queue blocks until a value becomes available.
16+
*/
17+
object Ex5 extends App {
18+
19+
import scala.collection.immutable.Queue
20+
import scala.concurrent.stm._
21+
import scala.concurrent.{ExecutionContext, Future}
22+
import ExecutionContext.Implicits.global
23+
24+
class TQueue[T] {
25+
26+
private val r = Ref[Queue[T]](Queue.empty[T])
27+
28+
def enqueue(x: T)(implicit txn: InTxn): Unit = {
29+
r() = r() :+ x
30+
}
31+
32+
def dequeue()(implicit txn: InTxn): T = {
33+
r().dequeueOption match {
34+
case None => retry
35+
case Some((x,q)) => {
36+
r() = q
37+
x
38+
}
39+
}
40+
}
41+
}
42+
43+
//test
44+
val tQueue = new TQueue[Integer]
45+
46+
val l = 1 to 20
47+
48+
l.map { i =>
49+
Future {
50+
atomic {implicit txn =>
51+
val x = tQueue.dequeue
52+
53+
Txn.afterCommit{_ =>
54+
log(s"dequeu: $x")
55+
}
56+
}
57+
}
58+
}
59+
60+
l.map { i =>
61+
Future {
62+
atomic {implicit txn =>
63+
tQueue.enqueue(i)
64+
65+
Txn.afterCommit { _ =>
66+
log(s"enque: $i")
67+
}
68+
}
69+
}
70+
}
71+
72+
Thread.sleep(1000)
73+
}

0 commit comments

Comments
 (0)