diff --git a/README.md b/README.md index 1df327e..9a45898 100644 --- a/README.md +++ b/README.md @@ -11,8 +11,9 @@ Now is possible to create some generic modules: - Repositories - Logger - Configuration +- Transaction managers -Appart from these modules, a functional package has been added in order to provide some extended functionality +Apart from these modules, a functional package has been added in order to provide some extended functionality regarding scala collections and functional utilities. Repositories @@ -291,4 +292,71 @@ To set a monitor and subscribe it to our silly actor heartbeats you can just do `testActor` will receive `HeartbeatLost(fallenId)` when the by-that-id identified service has stopped its heartbeat. +Transaction managers +==================== + +A public component interface aimed to abstract mutual cluster-wide exclusion code areas over a protected set of +resources. + +The idea is to apply the principles of JVM's object monitor locks. However its implementors should guarantee that +these exclusion areas are cluster-wide. That is, this interface should serve as a way of seamlessly provide cluster +wide `syncrhonize` operation. + +```scala +transactionMgr.atomically("test", DogsTable) { + ... + //The code here will won't run at the same time that any other + //code protected with the resource DogsTable at any JVM in the same or other machine + ... +} +``` + +Protected resources should not be shared, instead, they should serve as unique lock identifiers: + +``` + +-------------------+ + | | + +-----------> | Shared Resource | <-----------------------+ + | | | | + | +---------+---------+ | + | | | ++------------------------+ | +----------------------+ +| Node (A) | | | | Node (B) | | +| | | | | | | +| | | v | | | +| + | | + | +| atomically(Shared Resorce) { +--------+ | atomically(Shared Resorce) { +| | | | | +| //Code @ A, never runs at the | id | | //Code @ B, never runs at the +| //same time than code @ B | | | //same time than code @ A +| | +---+----+ | | +| } | | | } | +| | | | | +| | | | | +| | | | | +| | | | | ++------------------------+ v +----------------------+ + + XXXXXXX + X X + X X + +--X-----X--+ + | +-+ | + | | | | + | | | | + | +-+ | + +-----------+ + + Cluster Lock + +``` + +Zookeeper +------------------------------------------------ + +An Zookeeper based implementor is also provided by `ZookeeperRepositoryWithTransactionsComponent` in the form of a +mixed component: Repository + Transaction Manager + +A good use example can be found at `ZookeeperIntegrationTest` integration test where several +`ZKTransactionTestClient` application processes compete for the same shared resource: A common output stream. diff --git a/pom.xml b/pom.xml index 6ac4239..44ab9d6 100644 --- a/pom.xml +++ b/pom.xml @@ -18,13 +18,13 @@ - + 4.0.0 com.stratio parent - 0.8.2 + 0.9.0 com.stratio.common diff --git a/src/main/scala/com/stratio/common/utils/components/repository/impl/ZookeeperRepositoryComponent.scala b/src/main/scala/com/stratio/common/utils/components/repository/impl/ZookeeperRepositoryComponent.scala index 16995d0..1332d44 100644 --- a/src/main/scala/com/stratio/common/utils/components/repository/impl/ZookeeperRepositoryComponent.scala +++ b/src/main/scala/com/stratio/common/utils/components/repository/impl/ZookeeperRepositoryComponent.scala @@ -40,7 +40,7 @@ trait ZookeeperRepositoryComponent extends RepositoryComponent[String, Array[Byt class ZookeeperRepository(path: Option[String] = None) extends Repository { - private def curatorClient: CuratorFramework = + protected def curatorClient: CuratorFramework = ZookeeperRepository.getInstance(getZookeeperConfig) def get(entity: String, id: String): Try[Option[Array[Byte]]] = @@ -111,7 +111,7 @@ trait ZookeeperRepositoryComponent extends RepositoryComponent[String, Array[Byt ) def getZookeeperConfig: Config = { - config.getConfig(path.getOrElse(ConfigZookeeper)) + config.getConfig(ConfigZookeeper) .getOrElse(throw new ZookeeperRepositoryException(s"Zookeeper config not found")) } diff --git a/src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionManagerComponent.scala b/src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionManagerComponent.scala new file mode 100644 index 0000000..848ca02 --- /dev/null +++ b/src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionManagerComponent.scala @@ -0,0 +1,55 @@ + +/* + * Copyright (C) 2015 Stratio (http://stratio.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.stratio.common.utils.components.transaction_manager + +import com.stratio.common.utils.components.repository.RepositoryComponent +import TransactionResource.WholeRepository + +trait TransactionManagerComponent[K,V] { + + self: RepositoryComponent[K, V] => + + trait TransactionalRepository extends Repository { + + /** + * Code execution exclusion zone over a given set of cluster-wide resources. + * + * @param entity Entity prefix used for the content repository providing synchronization mechanisms + * @param firstResource First resource in the protected resource set + * @param resources Remaining resources in the protected resource set + * @param block Code to execute in the exclusion area over the resources + * @tparam T Return type of the exclusion area block + * @return Exclusion area result after its executions + */ + def atomically[T](entity: String, + firstResource: TransactionResource, + resources: TransactionResource* + )(block: => T): T + + /** + * Code execution exclusion zone over a given repository content. The exlucsion area will be + * protected over all the resources managed by the `entity`, that is, the entity itself + * + * @param entity Entity prefix used for the content repository providing synchronization mechanisms + * @param block Code to execute in the exclusion area over the resources + * @tparam T Return type of the exclusion area block + * @return Exclusion area result after its executions + */ + final def atomically[T](entity: String)(block: => T): T = atomically[T](entity, WholeRepository)(block) + } + +} \ No newline at end of file diff --git a/src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionResource.scala b/src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionResource.scala new file mode 100644 index 0000000..4102219 --- /dev/null +++ b/src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionResource.scala @@ -0,0 +1,29 @@ + +/* + * Copyright (C) 2015 Stratio (http://stratio.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.stratio.common.utils.components.transaction_manager + +trait TransactionResource { + def id: String +} + +object TransactionResource { + + object WholeRepository extends TransactionResource { + override def id: String = "_all" + } + +} \ No newline at end of file diff --git a/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala b/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala new file mode 100644 index 0000000..c8d5e53 --- /dev/null +++ b/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala @@ -0,0 +1,96 @@ + +/* + * Copyright (C) 2015 Stratio (http://stratio.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.stratio.common.utils.components.transaction_manager.impl + +import com.stratio.common.utils.components.config.ConfigComponent +import com.stratio.common.utils.components.logger.LoggerComponent +import com.stratio.common.utils.components.repository.impl.ZookeeperRepositoryComponent +import com.stratio.common.utils.components.transaction_manager.{TransactionManagerComponent, TransactionResource} +import org.apache.curator.framework.recipes.locks.InterProcessMutex + +trait ZookeeperRepositoryWithTransactionsComponent extends ZookeeperRepositoryComponent + with TransactionManagerComponent[String, Array[Byte]] { + + self: ConfigComponent with LoggerComponent => + + override val repository: ZookeeperRepositoryWithTransactions = new ZookeeperRepositoryWithTransactions(None) + + //TODO Improve paths and locksPath behaviour + class ZookeeperRepositoryWithTransactions(path: Option[String] = None) extends ZookeeperRepository(path) + with TransactionalRepository { + + //TODO: Improve path option usage + private def acquisitionResource: String = "/locks" + path.map("/" + _).getOrElse("") + + private object AcquiredLocks { + + import collection.mutable.Map + + private val acquisitionLock: InterProcessMutex = new InterProcessMutex(curatorClient, acquisitionResource) + + private val path2lock: Map[String, InterProcessMutex] = Map.empty + + def acquireResources(paths: Seq[String]): Unit = { + acquisitionLock.acquire() + path2lock.synchronized { + paths foreach { path => + val lock = path2lock.get(path) getOrElse { + val newLock = new InterProcessMutex(curatorClient, path) + path2lock += (path -> newLock) + newLock + } + lock.acquire() + } + } + acquisitionLock.release() + } + + def freeResources(paths: Seq[String]): Unit = path2lock.synchronized { + for { + path <- paths + lock <- path2lock.get(path) + } { + lock.release() + if(!lock.isAcquiredInThisProcess) + path2lock -= path + } + } + + } + + private def lockPath(entity: String)(resource: TransactionResource): String = { + s"/locks/$entity/${resource.id}" + } + + override def atomically[T]( + entity: String, + firstResource: TransactionResource, + resources: TransactionResource*)(block: => T): T = { + + val paths = (firstResource +: resources).map(lockPath(entity)) + AcquiredLocks.acquireResources(paths) + val res = try { + block + } finally { + AcquiredLocks.freeResources(paths) + } + res + } + + } + +} diff --git a/src/test/scala/com/stratio/common/utils/MultiJVMTestUtils.scala b/src/test/scala/com/stratio/common/utils/MultiJVMTestUtils.scala new file mode 100644 index 0000000..b145c2f --- /dev/null +++ b/src/test/scala/com/stratio/common/utils/MultiJVMTestUtils.scala @@ -0,0 +1,83 @@ + +/* + * Copyright (C) 2015 Stratio (http://stratio.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.stratio.common.utils + +import scala.reflect.ClassTag + +object MultiJVMTestUtils { + + import scala.sys.process._ + + def externalProcess[T <: App](app: T)(params: String*)(implicit ct: ClassTag[T]): ProcessBuilder = { + + val separator = System.getProperty("file.separator") + val javaPath = System.getProperty("java.home")::"bin"::"java"::Nil mkString separator + val classPath = System.getProperty("java.class.path") + val className = ct.runtimeClass.getCanonicalName.reverse.dropWhile(_ == '$').reverse + + javaPath :: "-cp" :: classPath :: className :: params.toList + + } + + class TestBatch private (private val batch: Seq[ProcessBuilder] = Seq.empty) { + + def addProcess(process: ProcessBuilder): TestBatch = new TestBatch(process +: batch) + + def launchAndWait(): Seq[String] = { + + val mergedOutputs = new collection.mutable.Queue[String]() + + val pLogger = ProcessLogger { line => + mergedOutputs.synchronized(mergedOutputs.enqueue(line)) + } + + val toWait = batch.reverse.map { process => + process.run(pLogger) + } + + toWait.foreach(_.exitValue()) + + mergedOutputs synchronized mergedOutputs + + } + + } + + object TestBatch { + def apply(): TestBatch = new TestBatch() + } + + /* USE EXAMPLE: + + import com.stratio.common.utils.integration.ZKTransactionTestClient + + val testBatch = TestBatch() addProcess { + externalProcess(ZKTransactionTestClient)("testclient1", "3", "a", "b", "c", "10", "300") + } addProcess { + externalProcess(ZKTransactionTestClient)("testclient2", "2", "c", "d", "10", "200") + } addProcess { + externalProcess(ZKTransactionTestClient)("testclient3", "2", "b", "c", "10", "200") + } addProcess { + externalProcess(ZKTransactionTestClient)("testclient4", "2", "c", "d", "10", "200") + } + + testBatch.launchAndWait().foreach(x => println(s"> $x")) + + */ + + +} diff --git a/src/test/scala/com/stratio/common/utils/components/dao/DaoComponentTest.scala b/src/test/scala/com/stratio/common/utils/components/dao/DaoComponentTest.scala index 47c0993..010dd68 100644 --- a/src/test/scala/com/stratio/common/utils/components/dao/DaoComponentTest.scala +++ b/src/test/scala/com/stratio/common/utils/components/dao/DaoComponentTest.scala @@ -1,3 +1,4 @@ + /* * Copyright (C) 2015 Stratio (http://stratio.com) * diff --git a/src/test/scala/com/stratio/common/utils/integration/ZKTransactionTestClient.scala b/src/test/scala/com/stratio/common/utils/integration/ZKTransactionTestClient.scala new file mode 100644 index 0000000..b49bf9a --- /dev/null +++ b/src/test/scala/com/stratio/common/utils/integration/ZKTransactionTestClient.scala @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2015 Stratio (http://stratio.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.stratio.common.utils.integration + +import com.stratio.common.utils.components.config.impl.TypesafeConfigComponent +import com.stratio.common.utils.components.logger.impl.Slf4jLoggerComponent +import com.stratio.common.utils.components.transaction_manager.TransactionResource +import com.stratio.common.utils.components.transaction_manager.impl.ZookeeperRepositoryWithTransactionsComponent + +object ZKTransactionTestClient extends App { + + case class OutputEntry(client: String, resources: Seq[String], segment: Int, part: Int) { + override def toString: String = + s"client=$client resources=[${resources.mkString(", ")}] segment=$segment part=$part" + } + + object OutputEntry { + + def apply(line: String): OutputEntry = { + //TODO: Be able to extract more than one resource id + val ExtractionRegex = + """^client=(\w+)\s+resources=\[\s*((?:\w+)?(?:,\s*\w+\s*)*)\s*]\s+segment=(\d+)\s+part=(\d+)\s*$""".r + line match { + case ExtractionRegex(client, resources, segmentStr, partStr) => + OutputEntry(client, resources.split(",").map(_.trim), segmentStr.toInt, partStr.toInt) + } + } + + } + + case class Resource(id: String) extends TransactionResource + + val usageMsg = + "Usage: ZKTransactionTestClient [resourceid] [ ]" + + require(args.size > 2, usageMsg) + + val Array( + label, + nResourcesStr, + remainingArgs @ _* + ) = args + + require(nResourcesStr.toInt <= remainingArgs.size) + + val (resourcesStr, segmentsStr) = remainingArgs.splitAt(nResourcesStr.toInt) + val resources = resourcesStr map Resource + + require((remainingArgs.size - resources.size) % 2 == 0, usageMsg) + + val segments = segmentsStr.grouped(2) + + + def mayBeProtected(block: => Unit): Unit = + if(resources.isEmpty) block + else transactionManager.repository.atomically("test", resources.head, resources.tail:_*)(block) + + val transactionManager = new ZookeeperRepositoryWithTransactionsComponent + with TypesafeConfigComponent with Slf4jLoggerComponent + + segments.zipWithIndex foreach { case (segment, iteration) => + val Seq(nParts, millis) = segment.map(_.toLong) + mayBeProtected { + (1L to nParts) foreach { part => + println(OutputEntry(label, resources.map(_.id), iteration, part.toInt)) + Thread.sleep(millis) + } + } + } + +} + diff --git a/src/test/scala/com/stratio/common/utils/integration/ZookeeperTest.scala b/src/test/scala/com/stratio/common/utils/integration/ZookeeperTest.scala index a55f5f6..1333a71 100644 --- a/src/test/scala/com/stratio/common/utils/integration/ZookeeperTest.scala +++ b/src/test/scala/com/stratio/common/utils/integration/ZookeeperTest.scala @@ -19,6 +19,7 @@ import com.stratio.common.utils.components.config.impl.TypesafeConfigComponent import com.stratio.common.utils.components.dao.GenericDAOComponent import com.stratio.common.utils.components.logger.impl.Slf4jLoggerComponent import com.stratio.common.utils.components.repository.impl.ZookeeperRepositoryComponent +import com.stratio.common.utils.integration.ZKTransactionTestClient.OutputEntry import org.apache.curator.test.TestingServer import org.apache.curator.utils.CloseableUtils import org.junit.runner.RunWith @@ -89,6 +90,75 @@ class ZookeeperIntegrationTest extends WordSpec dao.count() shouldBe a[Failure[_]] } } + + "A Transaction manager backed by Zookeeper" should { + + import com.stratio.common.utils.MultiJVMTestUtils._ + + def sequenceGroups(outputs: Seq[String]): Seq[Seq[Int]] = { + outputs.map(OutputEntry(_)).zipWithIndex groupBy { + case (OutputEntry(label, _, _, _), n) => label + } values + }.toSeq.map(_.map(_._2)) + + "avoid exclusion zone (over a given resource) process interleaving" in { + + val testBatch = (TestBatch() /: (1 to 5)) { (batchToUpdate, n) => + val p = externalProcess(ZKTransactionTestClient)(s"testclient$n", "1", "a", "5", (200+n*10).toString) + batchToUpdate addProcess p + } + + sequenceGroups(testBatch.launchAndWait().view) foreach { group => + group should contain theSameElementsInOrderAs (group.min until (group.min + group.size)) + } + + } + + "avoid exclusion zone (over overlapping resources sets) process inverleaving" in { + + val testBatch = TestBatch() addProcess { + externalProcess(ZKTransactionTestClient)("testclient1", "3", "a", "b", "c", "10", "300") + } addProcess { + externalProcess(ZKTransactionTestClient)("testclient2", "2", "c", "d", "10", "100") + } addProcess { + externalProcess(ZKTransactionTestClient)("testclient3", "2", "b", "c", "10", "150") + } + + sequenceGroups(testBatch.launchAndWait().view) foreach { group => + group should contain theSameElementsInOrderAs (group.min until (group.min + group.size)) + } + + } + + "avoid process interleaving for intersecting resources while allowing interleaving for non-intersecting" in { + + val testBatch = TestBatch() addProcess { + externalProcess(ZKTransactionTestClient)("testclient1", "2", "a", "b", "10", "100") + } addProcess { + externalProcess(ZKTransactionTestClient)("testclient2", "2", "b", "c", "10", "100") + } addProcess { + externalProcess(ZKTransactionTestClient)("testclient3", "2", "c", "d", "10", "100") + } + + val output = testBatch.launchAndWait() + + val outOneAndTwo = output.filterNot(line => OutputEntry(line).client == "testclient3") + val outTwoAndThree = output.filterNot(line => OutputEntry(line).client == "testclient1") + val outOneAndThree = output.filterNot(line => OutputEntry(line).client == "testclient2") + + sequenceGroups(outOneAndTwo) foreach { group => + group should contain theSameElementsInOrderAs (group.min until (group.min + group.size)) + } + + sequenceGroups(outTwoAndThree) foreach { group => + group should contain theSameElementsInOrderAs (group.min until (group.min + group.size)) + } + + } + + + } + } class DummyZookeeperDAOComponent extends GenericDAOComponent[Dummy]