Skip to content

Commit

Permalink
Merge pull request #58 from pfcoperez/transactionalmgr_simplified_iface
Browse files Browse the repository at this point in the history
Transactional Manager: Simplified  interface in order to avoid portential pitfalls in its use
  • Loading branch information
pfcoperez authored Mar 12, 2017
2 parents d6bdcc7 + 1836671 commit 0cbf2bb
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,12 @@ trait TransactionManagerComponent[K,V] {
* Code execution exclusion zone over a given set of cluster-wide resources.
*
* @param entity Entity prefix used for the content repository providing synchronization mechanisms
* @param firstResource First resource in the protected resource set
* @param resources Remaining resources in the protected resource set
* @param resource Protected resource
* @param block Code to execute in the exclusion area over the resources
* @tparam T Return type of the exclusion area block
* @return Exclusion area result after its executions
*/
def atomically[T](entity: String,
firstResource: TransactionResource,
resources: TransactionResource*
)(block: => T): T
def atomically[T](entity: String, resource: TransactionResource)(block: => T): T

/**
* Code execution exclusion zone over a given repository content. The exlucsion area will be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,37 +33,24 @@ trait ZookeeperRepositoryWithTransactionsComponent extends ZookeeperRepositoryCo
class ZookeeperRepositoryWithTransactions(path: Option[String] = None) extends ZookeeperRepository(path)
with TransactionalRepository {

//TODO: Improve path option usage
private def acquisitionResource: String = "/locks" + path.map("/" + _).getOrElse("")

private object AcquiredLocks {

import collection.mutable.Map

private val acquisitionLock: InterProcessMutex = new InterProcessMutex(curatorClient, acquisitionResource)

private val path2lock: Map[String, InterProcessMutex] = Map.empty

def acquireResources(paths: Seq[String]): Unit = {
acquisitionLock.acquire()
def acquireResource(path: String): Unit =
path2lock.synchronized {
paths foreach { path =>
val lock = path2lock.get(path) getOrElse {
val newLock = new InterProcessMutex(curatorClient, path)
path2lock += (path -> newLock)
newLock
}
lock.acquire()
val lock = path2lock.get(path) getOrElse {
val newLock = new InterProcessMutex(curatorClient, path)
path2lock += (path -> newLock)
newLock
}
lock.acquire()
}
acquisitionLock.release()
}

def freeResources(paths: Seq[String]): Unit = path2lock.synchronized {
for {
path <- paths
lock <- path2lock.get(path)
} {
def freeResource(path: String): Unit = path2lock.synchronized {
path2lock.get(path) foreach { lock =>
lock.release()
if(!lock.isAcquiredInThisProcess)
path2lock -= path
Expand All @@ -76,19 +63,20 @@ trait ZookeeperRepositoryWithTransactionsComponent extends ZookeeperRepositoryCo
s"/locks/$entity/${resource.id}"
}

override def atomically[T](
entity: String,
firstResource: TransactionResource,
resources: TransactionResource*)(block: => T): T = {
override def atomically[T](entity: String, resource: TransactionResource)(block: => T): T = {

val paths = (firstResource +: resources).map(lockPath(entity))
AcquiredLocks.acquireResources(paths)
import AcquiredLocks._

val path = lockPath(entity)(resource)

acquireResource(path)
val res = try {
block
} finally {
AcquiredLocks.freeResources(paths)
freeResource(path)
}
res

}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,14 @@ object ZKTransactionTestClient extends App {
val segments = segmentsStr.grouped(2)


def mayBeProtected(block: => Unit): Unit =
if(resources.isEmpty) block
else transactionManager.repository.atomically("test", resources.head, resources.tail:_*)(block)
def mayBeProtected(block: => Unit): Unit = {
def nestedAtomicAreas(resources: Seq[TransactionResource]): Unit =
if(resources.isEmpty) block
else transactionManager.repository.atomically("test", resources.head) {
nestedAtomicAreas(resources.tail)
}
nestedAtomicAreas(resources)
}

val transactionManager = new ZookeeperRepositoryWithTransactionsComponent
with TypesafeConfigComponent with Slf4jLoggerComponent
Expand Down

0 comments on commit 0cbf2bb

Please sign in to comment.