Skip to content

Commit

Permalink
Merge pull request #57 from pfcoperez/transactional_manager
Browse files Browse the repository at this point in the history
[DCS-1995] Transaction Manager Component
  • Loading branch information
pfcoperez committed Mar 10, 2017
2 parents 7f942aa + 829d094 commit d6bdcc7
Show file tree
Hide file tree
Showing 10 changed files with 493 additions and 5 deletions.
70 changes: 69 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ Now is possible to create some generic modules:
- Repositories
- Logger
- Configuration
- Transaction managers

Appart from these modules, a functional package has been added in order to provide some extended functionality
Apart from these modules, a functional package has been added in order to provide some extended functionality
regarding scala collections and functional utilities.

Repositories
Expand Down Expand Up @@ -291,4 +292,71 @@ To set a monitor and subscribe it to our silly actor heartbeats you can just do
`testActor` will receive `HeartbeatLost(fallenId)` when the by-that-id identified service has stopped its heartbeat.


Transaction managers
====================

A public component interface aimed to abstract mutual cluster-wide exclusion code areas over a protected set of
resources.

The idea is to apply the principles of JVM's object monitor locks. However its implementors should guarantee that
these exclusion areas are cluster-wide. That is, this interface should serve as a way of seamlessly provide cluster
wide `syncrhonize` operation.

```scala
transactionMgr.atomically("test", DogsTable) {
...
//The code here will won't run at the same time that any other
//code protected with the resource DogsTable at any JVM in the same or other machine
...
}
```

Protected resources should not be shared, instead, they should serve as unique lock identifiers:

```
+-------------------+
| |
+-----------> | Shared Resource | <-----------------------+
| | | |
| +---------+---------+ |
| | |
+------------------------+ | +----------------------+
| Node (A) | | | | Node (B) | |
| | | | | | |
| | | v | | |
| + | | + |
| atomically(Shared Resorce) { +--------+ | atomically(Shared Resorce) {
| | | | |
| //Code @ A, never runs at the | id | | //Code @ B, never runs at the
| //same time than code @ B | | | //same time than code @ A
| | +---+----+ | |
| } | | | } |
| | | | |
| | | | |
| | | | |
| | | | |
+------------------------+ v +----------------------+
XXXXXXX
X X
X X
+--X-----X--+
| +-+ |
| | | |
| | | |
| +-+ |
+-----------+
Cluster Lock
```

Zookeeper
------------------------------------------------

An Zookeeper based implementor is also provided by `ZookeeperRepositoryWithTransactionsComponent` in the form of a
mixed component: Repository + Transaction Manager

A good use example can be found at `ZookeeperIntegrationTest` integration test where several
`ZKTransactionTestClient` application processes compete for the same shared resource: A common output stream.

4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.stratio</groupId>
<artifactId>parent</artifactId>
<version>0.8.2</version>
<version>0.9.0</version>
</parent>

<groupId>com.stratio.common</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ trait ZookeeperRepositoryComponent extends RepositoryComponent[String, Array[Byt

class ZookeeperRepository(path: Option[String] = None) extends Repository {

private def curatorClient: CuratorFramework =
protected def curatorClient: CuratorFramework =
ZookeeperRepository.getInstance(getZookeeperConfig)

def get(entity: String, id: String): Try[Option[Array[Byte]]] =
Expand Down Expand Up @@ -111,7 +111,7 @@ trait ZookeeperRepositoryComponent extends RepositoryComponent[String, Array[Byt
)

def getZookeeperConfig: Config = {
config.getConfig(path.getOrElse(ConfigZookeeper))
config.getConfig(ConfigZookeeper)
.getOrElse(throw new ZookeeperRepositoryException(s"Zookeeper config not found"))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@

/*
* Copyright (C) 2015 Stratio (http://stratio.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.stratio.common.utils.components.transaction_manager

import com.stratio.common.utils.components.repository.RepositoryComponent
import TransactionResource.WholeRepository

trait TransactionManagerComponent[K,V] {

self: RepositoryComponent[K, V] =>

trait TransactionalRepository extends Repository {

/**
* Code execution exclusion zone over a given set of cluster-wide resources.
*
* @param entity Entity prefix used for the content repository providing synchronization mechanisms
* @param firstResource First resource in the protected resource set
* @param resources Remaining resources in the protected resource set
* @param block Code to execute in the exclusion area over the resources
* @tparam T Return type of the exclusion area block
* @return Exclusion area result after its executions
*/
def atomically[T](entity: String,
firstResource: TransactionResource,
resources: TransactionResource*
)(block: => T): T

/**
* Code execution exclusion zone over a given repository content. The exlucsion area will be
* protected over all the resources managed by the `entity`, that is, the entity itself
*
* @param entity Entity prefix used for the content repository providing synchronization mechanisms
* @param block Code to execute in the exclusion area over the resources
* @tparam T Return type of the exclusion area block
* @return Exclusion area result after its executions
*/
final def atomically[T](entity: String)(block: => T): T = atomically[T](entity, WholeRepository)(block)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@

/*
* Copyright (C) 2015 Stratio (http://stratio.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.stratio.common.utils.components.transaction_manager

trait TransactionResource {
def id: String
}

object TransactionResource {

object WholeRepository extends TransactionResource {
override def id: String = "_all"
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@

/*
* Copyright (C) 2015 Stratio (http://stratio.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.stratio.common.utils.components.transaction_manager.impl

import com.stratio.common.utils.components.config.ConfigComponent
import com.stratio.common.utils.components.logger.LoggerComponent
import com.stratio.common.utils.components.repository.impl.ZookeeperRepositoryComponent
import com.stratio.common.utils.components.transaction_manager.{TransactionManagerComponent, TransactionResource}
import org.apache.curator.framework.recipes.locks.InterProcessMutex

trait ZookeeperRepositoryWithTransactionsComponent extends ZookeeperRepositoryComponent
with TransactionManagerComponent[String, Array[Byte]] {

self: ConfigComponent with LoggerComponent =>

override val repository: ZookeeperRepositoryWithTransactions = new ZookeeperRepositoryWithTransactions(None)

//TODO Improve paths and locksPath behaviour
class ZookeeperRepositoryWithTransactions(path: Option[String] = None) extends ZookeeperRepository(path)
with TransactionalRepository {

//TODO: Improve path option usage
private def acquisitionResource: String = "/locks" + path.map("/" + _).getOrElse("")

private object AcquiredLocks {

import collection.mutable.Map

private val acquisitionLock: InterProcessMutex = new InterProcessMutex(curatorClient, acquisitionResource)

private val path2lock: Map[String, InterProcessMutex] = Map.empty

def acquireResources(paths: Seq[String]): Unit = {
acquisitionLock.acquire()
path2lock.synchronized {
paths foreach { path =>
val lock = path2lock.get(path) getOrElse {
val newLock = new InterProcessMutex(curatorClient, path)
path2lock += (path -> newLock)
newLock
}
lock.acquire()
}
}
acquisitionLock.release()
}

def freeResources(paths: Seq[String]): Unit = path2lock.synchronized {
for {
path <- paths
lock <- path2lock.get(path)
} {
lock.release()
if(!lock.isAcquiredInThisProcess)
path2lock -= path
}
}

}

private def lockPath(entity: String)(resource: TransactionResource): String = {
s"/locks/$entity/${resource.id}"
}

override def atomically[T](
entity: String,
firstResource: TransactionResource,
resources: TransactionResource*)(block: => T): T = {

val paths = (firstResource +: resources).map(lockPath(entity))
AcquiredLocks.acquireResources(paths)
val res = try {
block
} finally {
AcquiredLocks.freeResources(paths)
}
res
}

}

}
83 changes: 83 additions & 0 deletions src/test/scala/com/stratio/common/utils/MultiJVMTestUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@

/*
* Copyright (C) 2015 Stratio (http://stratio.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.stratio.common.utils

import scala.reflect.ClassTag

object MultiJVMTestUtils {

import scala.sys.process._

def externalProcess[T <: App](app: T)(params: String*)(implicit ct: ClassTag[T]): ProcessBuilder = {

val separator = System.getProperty("file.separator")
val javaPath = System.getProperty("java.home")::"bin"::"java"::Nil mkString separator
val classPath = System.getProperty("java.class.path")
val className = ct.runtimeClass.getCanonicalName.reverse.dropWhile(_ == '$').reverse

javaPath :: "-cp" :: classPath :: className :: params.toList

}

class TestBatch private (private val batch: Seq[ProcessBuilder] = Seq.empty) {

def addProcess(process: ProcessBuilder): TestBatch = new TestBatch(process +: batch)

def launchAndWait(): Seq[String] = {

val mergedOutputs = new collection.mutable.Queue[String]()

val pLogger = ProcessLogger { line =>
mergedOutputs.synchronized(mergedOutputs.enqueue(line))
}

val toWait = batch.reverse.map { process =>
process.run(pLogger)
}

toWait.foreach(_.exitValue())

mergedOutputs synchronized mergedOutputs

}

}

object TestBatch {
def apply(): TestBatch = new TestBatch()
}

/* USE EXAMPLE:
import com.stratio.common.utils.integration.ZKTransactionTestClient
val testBatch = TestBatch() addProcess {
externalProcess(ZKTransactionTestClient)("testclient1", "3", "a", "b", "c", "10", "300")
} addProcess {
externalProcess(ZKTransactionTestClient)("testclient2", "2", "c", "d", "10", "200")
} addProcess {
externalProcess(ZKTransactionTestClient)("testclient3", "2", "b", "c", "10", "200")
} addProcess {
externalProcess(ZKTransactionTestClient)("testclient4", "2", "c", "d", "10", "200")
}
testBatch.launchAndWait().foreach(x => println(s"> $x"))
*/


}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

/*
* Copyright (C) 2015 Stratio (http://stratio.com)
*
Expand Down
Loading

0 comments on commit d6bdcc7

Please sign in to comment.