From c47eecfae9d81740c35eec2ecd90e2247903280d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Fco=2E=20P=C3=A9rez=20Hidalgo?= Date: Mon, 6 Mar 2017 10:35:10 +0100 Subject: [PATCH 01/10] Transaction manager component Stub Zookeeper repository with transactions manager. Changed `TransactionalRepository` interface so it requires a resource to lock. If not provided, it is the mixing DAO which should server as locking resource. Initial Zookeeper implementation of transaction manager Locally protected distributed lock map. Moved component package. --- .../impl/ZookeeperRepositoryComponent.scala | 2 +- .../TransactionManagerComponent.scala | 36 +++++++ .../TransactionResource.scala | 29 ++++++ ...rRepositoryWithTransactionsComponent.scala | 93 +++++++++++++++++++ 4 files changed, 159 insertions(+), 1 deletion(-) create mode 100644 src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionManagerComponent.scala create mode 100644 src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionResource.scala create mode 100644 src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala diff --git a/src/main/scala/com/stratio/common/utils/components/repository/impl/ZookeeperRepositoryComponent.scala b/src/main/scala/com/stratio/common/utils/components/repository/impl/ZookeeperRepositoryComponent.scala index 16995d0..59a5531 100644 --- a/src/main/scala/com/stratio/common/utils/components/repository/impl/ZookeeperRepositoryComponent.scala +++ b/src/main/scala/com/stratio/common/utils/components/repository/impl/ZookeeperRepositoryComponent.scala @@ -40,7 +40,7 @@ trait ZookeeperRepositoryComponent extends RepositoryComponent[String, Array[Byt class ZookeeperRepository(path: Option[String] = None) extends Repository { - private def curatorClient: CuratorFramework = + protected def curatorClient: CuratorFramework = ZookeeperRepository.getInstance(getZookeeperConfig) def get(entity: String, id: String): Try[Option[Array[Byte]]] = diff --git a/src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionManagerComponent.scala b/src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionManagerComponent.scala new file mode 100644 index 0000000..b49e885 --- /dev/null +++ b/src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionManagerComponent.scala @@ -0,0 +1,36 @@ + +/* + * Copyright (C) 2015 Stratio (http://stratio.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.stratio.common.utils.components.transaction_manager + +import com.stratio.common.utils.components.repository.RepositoryComponent +import TransactionResource.Dao + +trait TransactionManagerComponent[K,V] { + + self: RepositoryComponent[K, V] => + + trait TransactionalRepository extends Repository { + + def atomically[T](entity: String, + firstResource: TransactionResource, + resources: TransactionResource* + )(block: => T): T + + final def atomically[T](entity: String)(block: => T): T = atomically[T](entity, Dao)(block) + } + +} diff --git a/src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionResource.scala b/src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionResource.scala new file mode 100644 index 0000000..95e1241 --- /dev/null +++ b/src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionResource.scala @@ -0,0 +1,29 @@ + +/* + * Copyright (C) 2015 Stratio (http://stratio.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.stratio.common.utils.components.transaction_manager + +trait TransactionResource { + def id: String +} + +object TransactionResource { + + object Dao extends TransactionResource { + override def id: String = "_dao" + } + +} \ No newline at end of file diff --git a/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala b/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala new file mode 100644 index 0000000..c2ee6b0 --- /dev/null +++ b/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala @@ -0,0 +1,93 @@ + +/* + * Copyright (C) 2015 Stratio (http://stratio.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.stratio.common.utils.components.transaction_manager.impl + +import com.stratio.common.utils.components.config.ConfigComponent +import com.stratio.common.utils.components.logger.LoggerComponent +import com.stratio.common.utils.components.repository.impl.ZookeeperRepositoryComponent +import com.stratio.common.utils.components.transaction_manager.{TransactionManagerComponent, TransactionResource} +import org.apache.curator.framework.recipes.locks.InterProcessMutex + +trait ZookeeperRepositoryWithTransactionsComponent extends ZookeeperRepositoryComponent + with TransactionManagerComponent[String, Array[Byte]] { + + self: ConfigComponent with LoggerComponent => + + class ZookeeperRepositoryWithTransactions(path: Option[String] = None) extends ZookeeperRepository(path) + with TransactionalRepository { + + //TODO: Improve path option usage + private def acquisitionResource: String = s"${path.getOrElse("")}/locks" + + private object AcquiredLocks { + + import collection.mutable.Map + + private val acquisitionLock: InterProcessMutex = new InterProcessMutex(curatorClient, acquisitionResource) + + private val path2lock: Map[String, InterProcessMutex] = Map.empty + + def acquireResources(paths: Seq[String]): Unit = { + acquisitionLock.acquire() + path2lock.synchronized { + paths foreach { path => + val lock = path2lock.get(path) getOrElse { + val newLock = new InterProcessMutex(curatorClient, path) + path2lock += (path -> newLock) + newLock + } + lock.acquire() + } + } + acquisitionLock.release() + } + + def freeResources(paths: Seq[String]): Unit = path2lock.synchronized { + for { + path <- paths + lock <- path2lock.get(path) + } { + lock.release() + if(!lock.isAcquiredInThisProcess) + path2lock -= path + } + } + + } + + private def lockPath(entity: String)(resource: TransactionResource): String = { + s"$entity/locks" + } + + override def atomically[T]( + entity: String, + firstResource: TransactionResource, + resources: TransactionResource*)(block: => T): T = { + + val paths = (firstResource +: resources).map(lockPath(entity)) + AcquiredLocks.acquireResources(paths) + val res = try { + block + } finally { + AcquiredLocks.freeResources(paths) + } + res + } + + } + +} From 21e4b6fe069712f320faf98df017b25e7e346cb8 Mon Sep 17 00:00:00 2001 From: pmadrigal Date: Tue, 7 Mar 2017 13:10:07 +0100 Subject: [PATCH 02/10] fix read config from path. We were mixing config path with zookeeper path fix lockpath and add repository visibility --- .../repository/impl/ZookeeperRepositoryComponent.scala | 2 +- .../impl/ZookeeperRepositoryWithTransactionsComponent.scala | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/stratio/common/utils/components/repository/impl/ZookeeperRepositoryComponent.scala b/src/main/scala/com/stratio/common/utils/components/repository/impl/ZookeeperRepositoryComponent.scala index 59a5531..1332d44 100644 --- a/src/main/scala/com/stratio/common/utils/components/repository/impl/ZookeeperRepositoryComponent.scala +++ b/src/main/scala/com/stratio/common/utils/components/repository/impl/ZookeeperRepositoryComponent.scala @@ -111,7 +111,7 @@ trait ZookeeperRepositoryComponent extends RepositoryComponent[String, Array[Byt ) def getZookeeperConfig: Config = { - config.getConfig(path.getOrElse(ConfigZookeeper)) + config.getConfig(ConfigZookeeper) .getOrElse(throw new ZookeeperRepositoryException(s"Zookeeper config not found")) } diff --git a/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala b/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala index c2ee6b0..13fa6f8 100644 --- a/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala +++ b/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala @@ -27,6 +27,8 @@ trait ZookeeperRepositoryWithTransactionsComponent extends ZookeeperRepositoryCo self: ConfigComponent with LoggerComponent => + override val repository: ZookeeperRepositoryWithTransactions = new ZookeeperRepositoryWithTransactions(None) + class ZookeeperRepositoryWithTransactions(path: Option[String] = None) extends ZookeeperRepository(path) with TransactionalRepository { @@ -70,7 +72,7 @@ trait ZookeeperRepositoryWithTransactionsComponent extends ZookeeperRepositoryCo } private def lockPath(entity: String)(resource: TransactionResource): String = { - s"$entity/locks" + s"$entity/locks/${resource.id}" } override def atomically[T]( From 23972dd825fafe57b9ac796e8f641ee4d0581bf6 Mon Sep 17 00:00:00 2001 From: pfcoperez Date: Wed, 8 Mar 2017 12:25:42 +0100 Subject: [PATCH 03/10] TestClient application for multi JVM tests. Added testing service application for multijvm test. TestClient App: user provided lock resources --- ...rRepositoryWithTransactionsComponent.scala | 2 +- .../common/utils/MultiJVMTestUtils.scala | 48 ++++++++++++++++++ .../transaction_manager/TestClient.scala | 50 +++++++++++++++++++ 3 files changed, 99 insertions(+), 1 deletion(-) create mode 100644 src/test/scala/com/stratio/common/utils/MultiJVMTestUtils.scala create mode 100644 src/test/scala/com/stratio/common/utils/components/transaction_manager/TestClient.scala diff --git a/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala b/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala index 13fa6f8..d545fd0 100644 --- a/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala +++ b/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala @@ -72,7 +72,7 @@ trait ZookeeperRepositoryWithTransactionsComponent extends ZookeeperRepositoryCo } private def lockPath(entity: String)(resource: TransactionResource): String = { - s"$entity/locks/${resource.id}" + s"/$entity/locks/${resource.id}" } override def atomically[T]( diff --git a/src/test/scala/com/stratio/common/utils/MultiJVMTestUtils.scala b/src/test/scala/com/stratio/common/utils/MultiJVMTestUtils.scala new file mode 100644 index 0000000..c01980e --- /dev/null +++ b/src/test/scala/com/stratio/common/utils/MultiJVMTestUtils.scala @@ -0,0 +1,48 @@ + +package com.stratio.common.utils + +import java.io._ + +import com.stratio.common.utils.components.transaction_manager.TestClient + +import scala.reflect.ClassTag +import scala.collection.mutable.SynchronizedQueue + +object MultiJVMTestUtils extends App { + + import scala.sys.process._ + + def externalProcess[T <: App](app: T)(params: String*)(implicit ct: ClassTag[T]): ProcessBuilder = { + + val separator = System.getProperty("file.separator") + val javaPath = System.getProperty("java.home")::"bin"::"java"::Nil mkString separator + val classPath = System.getProperty("java.class.path") + val className = ct.runtimeClass.getCanonicalName.reverse.dropWhile(_ == '$').reverse + + javaPath :: "-cp" :: classPath :: className :: params.toList + + } + + + /*private val mergedOutputs = new collection.mutable.Queue[String]() + + val pLogger = ProcessLogger { line => + mergedOutputs.synchronized(mergedOutputs.enqueue(line)) + } + + + val p1 = externalProcess(TestClient)("testclient1", "2", "a", "b", "10", "300").run(pLogger) + val p2 = externalProcess(TestClient)("testclient2", "2", "c", "d", "10", "200").run(pLogger) + val p3 = externalProcess(TestClient)("testclient3", "2", "b", "c", "10", "200").run(pLogger) + val p4 = externalProcess(TestClient)("testclient4", "2", "c", "d", "10", "200").run(pLogger) + + p1.exitValue() + p2.exitValue() + p3.exitValue() + p4.exitValue() + + mergedOutputs.synchronized { + mergedOutputs.foreach(println) + }*/ + +} diff --git a/src/test/scala/com/stratio/common/utils/components/transaction_manager/TestClient.scala b/src/test/scala/com/stratio/common/utils/components/transaction_manager/TestClient.scala new file mode 100644 index 0000000..ec4fd53 --- /dev/null +++ b/src/test/scala/com/stratio/common/utils/components/transaction_manager/TestClient.scala @@ -0,0 +1,50 @@ + +package com.stratio.common.utils.components.transaction_manager + +import com.stratio.common.utils.components.config.impl.TypesafeConfigComponent +import com.stratio.common.utils.components.logger.impl.Slf4jLoggerComponent +import com.stratio.common.utils.components.transaction_manager.impl.ZookeeperRepositoryWithTransactionsComponent + +object TestClient extends App { + + case class Resource(id: String) extends TransactionResource + + val usageMsg = "Usage: TestClient [resourceid] [ ]" + + require(args.size > 2, usageMsg) + + val Array( + label, + nResourcesStr, + remainingArgs @ _* + ) = args + + require(nResourcesStr.toInt <= remainingArgs.size) + + val (resourcesStr, segmentsStr) = remainingArgs.splitAt(nResourcesStr.toInt) + val resources = resourcesStr map Resource + + require((remainingArgs.size - resources.size) % 2 == 0, usageMsg) + + val segments = segmentsStr.grouped(2) + + + def mayBeProtected(block: => Unit): Unit = + if(resources.isEmpty) block + else transactionManager.repository.atomically("test", resources.head, resources.tail:_*)(block) + + val transactionManager = new ZookeeperRepositoryWithTransactionsComponent + with TypesafeConfigComponent with Slf4jLoggerComponent + + segments.zipWithIndex foreach { case (segment, iteration) => + val Seq(nParts, millis) = segment.map(_.toLong) + mayBeProtected { + (1L to nParts) foreach { part => + println(s"client=$label resources=[${resources.map(_.id).mkString(", ")}] segment=$iteration part=$part") + Thread.sleep(millis) + } + } + } + +} + From c042805f03a93b5d944ed738071dad5801ed5a9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Fco=2E=20P=C3=A9rez=20Hidalgo?= Date: Wed, 8 Mar 2017 18:49:09 +0100 Subject: [PATCH 04/10] Added multi-jvm tests... and not I am not using SBT Added license header. Improved test set --- .../TransactionManagerComponent.scala | 25 +++++- .../TransactionResource.scala | 4 +- .../common/utils/MultiJVMTestUtils.scala | 75 +++++++++++----- .../components/dao/DaoComponentTest.scala | 1 + .../transaction_manager/TestClient.scala | 50 ----------- .../integration/ZKTransactionTestClient.scala | 86 +++++++++++++++++++ .../utils/integration/ZookeeperTest.scala | 74 ++++++++++++++++ 7 files changed, 240 insertions(+), 75 deletions(-) delete mode 100644 src/test/scala/com/stratio/common/utils/components/transaction_manager/TestClient.scala create mode 100644 src/test/scala/com/stratio/common/utils/integration/ZKTransactionTestClient.scala diff --git a/src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionManagerComponent.scala b/src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionManagerComponent.scala index b49e885..848ca02 100644 --- a/src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionManagerComponent.scala +++ b/src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionManagerComponent.scala @@ -17,7 +17,7 @@ package com.stratio.common.utils.components.transaction_manager import com.stratio.common.utils.components.repository.RepositoryComponent -import TransactionResource.Dao +import TransactionResource.WholeRepository trait TransactionManagerComponent[K,V] { @@ -25,12 +25,31 @@ trait TransactionManagerComponent[K,V] { trait TransactionalRepository extends Repository { + /** + * Code execution exclusion zone over a given set of cluster-wide resources. + * + * @param entity Entity prefix used for the content repository providing synchronization mechanisms + * @param firstResource First resource in the protected resource set + * @param resources Remaining resources in the protected resource set + * @param block Code to execute in the exclusion area over the resources + * @tparam T Return type of the exclusion area block + * @return Exclusion area result after its executions + */ def atomically[T](entity: String, firstResource: TransactionResource, resources: TransactionResource* )(block: => T): T - final def atomically[T](entity: String)(block: => T): T = atomically[T](entity, Dao)(block) + /** + * Code execution exclusion zone over a given repository content. The exlucsion area will be + * protected over all the resources managed by the `entity`, that is, the entity itself + * + * @param entity Entity prefix used for the content repository providing synchronization mechanisms + * @param block Code to execute in the exclusion area over the resources + * @tparam T Return type of the exclusion area block + * @return Exclusion area result after its executions + */ + final def atomically[T](entity: String)(block: => T): T = atomically[T](entity, WholeRepository)(block) } -} +} \ No newline at end of file diff --git a/src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionResource.scala b/src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionResource.scala index 95e1241..4102219 100644 --- a/src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionResource.scala +++ b/src/main/scala/com/stratio/common/utils/components/transaction_manager/TransactionResource.scala @@ -22,8 +22,8 @@ trait TransactionResource { object TransactionResource { - object Dao extends TransactionResource { - override def id: String = "_dao" + object WholeRepository extends TransactionResource { + override def id: String = "_all" } } \ No newline at end of file diff --git a/src/test/scala/com/stratio/common/utils/MultiJVMTestUtils.scala b/src/test/scala/com/stratio/common/utils/MultiJVMTestUtils.scala index c01980e..b145c2f 100644 --- a/src/test/scala/com/stratio/common/utils/MultiJVMTestUtils.scala +++ b/src/test/scala/com/stratio/common/utils/MultiJVMTestUtils.scala @@ -1,14 +1,24 @@ +/* + * Copyright (C) 2015 Stratio (http://stratio.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.stratio.common.utils -import java.io._ - -import com.stratio.common.utils.components.transaction_manager.TestClient - import scala.reflect.ClassTag -import scala.collection.mutable.SynchronizedQueue -object MultiJVMTestUtils extends App { +object MultiJVMTestUtils { import scala.sys.process._ @@ -23,26 +33,51 @@ object MultiJVMTestUtils extends App { } + class TestBatch private (private val batch: Seq[ProcessBuilder] = Seq.empty) { + + def addProcess(process: ProcessBuilder): TestBatch = new TestBatch(process +: batch) + + def launchAndWait(): Seq[String] = { + + val mergedOutputs = new collection.mutable.Queue[String]() + + val pLogger = ProcessLogger { line => + mergedOutputs.synchronized(mergedOutputs.enqueue(line)) + } + + val toWait = batch.reverse.map { process => + process.run(pLogger) + } - /*private val mergedOutputs = new collection.mutable.Queue[String]() + toWait.foreach(_.exitValue()) - val pLogger = ProcessLogger { line => - mergedOutputs.synchronized(mergedOutputs.enqueue(line)) + mergedOutputs synchronized mergedOutputs + + } + + } + + object TestBatch { + def apply(): TestBatch = new TestBatch() } + /* USE EXAMPLE: + + import com.stratio.common.utils.integration.ZKTransactionTestClient + + val testBatch = TestBatch() addProcess { + externalProcess(ZKTransactionTestClient)("testclient1", "3", "a", "b", "c", "10", "300") + } addProcess { + externalProcess(ZKTransactionTestClient)("testclient2", "2", "c", "d", "10", "200") + } addProcess { + externalProcess(ZKTransactionTestClient)("testclient3", "2", "b", "c", "10", "200") + } addProcess { + externalProcess(ZKTransactionTestClient)("testclient4", "2", "c", "d", "10", "200") + } - val p1 = externalProcess(TestClient)("testclient1", "2", "a", "b", "10", "300").run(pLogger) - val p2 = externalProcess(TestClient)("testclient2", "2", "c", "d", "10", "200").run(pLogger) - val p3 = externalProcess(TestClient)("testclient3", "2", "b", "c", "10", "200").run(pLogger) - val p4 = externalProcess(TestClient)("testclient4", "2", "c", "d", "10", "200").run(pLogger) + testBatch.launchAndWait().foreach(x => println(s"> $x")) - p1.exitValue() - p2.exitValue() - p3.exitValue() - p4.exitValue() + */ - mergedOutputs.synchronized { - mergedOutputs.foreach(println) - }*/ } diff --git a/src/test/scala/com/stratio/common/utils/components/dao/DaoComponentTest.scala b/src/test/scala/com/stratio/common/utils/components/dao/DaoComponentTest.scala index 47c0993..010dd68 100644 --- a/src/test/scala/com/stratio/common/utils/components/dao/DaoComponentTest.scala +++ b/src/test/scala/com/stratio/common/utils/components/dao/DaoComponentTest.scala @@ -1,3 +1,4 @@ + /* * Copyright (C) 2015 Stratio (http://stratio.com) * diff --git a/src/test/scala/com/stratio/common/utils/components/transaction_manager/TestClient.scala b/src/test/scala/com/stratio/common/utils/components/transaction_manager/TestClient.scala deleted file mode 100644 index ec4fd53..0000000 --- a/src/test/scala/com/stratio/common/utils/components/transaction_manager/TestClient.scala +++ /dev/null @@ -1,50 +0,0 @@ - -package com.stratio.common.utils.components.transaction_manager - -import com.stratio.common.utils.components.config.impl.TypesafeConfigComponent -import com.stratio.common.utils.components.logger.impl.Slf4jLoggerComponent -import com.stratio.common.utils.components.transaction_manager.impl.ZookeeperRepositoryWithTransactionsComponent - -object TestClient extends App { - - case class Resource(id: String) extends TransactionResource - - val usageMsg = "Usage: TestClient [resourceid] [ ]" - - require(args.size > 2, usageMsg) - - val Array( - label, - nResourcesStr, - remainingArgs @ _* - ) = args - - require(nResourcesStr.toInt <= remainingArgs.size) - - val (resourcesStr, segmentsStr) = remainingArgs.splitAt(nResourcesStr.toInt) - val resources = resourcesStr map Resource - - require((remainingArgs.size - resources.size) % 2 == 0, usageMsg) - - val segments = segmentsStr.grouped(2) - - - def mayBeProtected(block: => Unit): Unit = - if(resources.isEmpty) block - else transactionManager.repository.atomically("test", resources.head, resources.tail:_*)(block) - - val transactionManager = new ZookeeperRepositoryWithTransactionsComponent - with TypesafeConfigComponent with Slf4jLoggerComponent - - segments.zipWithIndex foreach { case (segment, iteration) => - val Seq(nParts, millis) = segment.map(_.toLong) - mayBeProtected { - (1L to nParts) foreach { part => - println(s"client=$label resources=[${resources.map(_.id).mkString(", ")}] segment=$iteration part=$part") - Thread.sleep(millis) - } - } - } - -} - diff --git a/src/test/scala/com/stratio/common/utils/integration/ZKTransactionTestClient.scala b/src/test/scala/com/stratio/common/utils/integration/ZKTransactionTestClient.scala new file mode 100644 index 0000000..b49bf9a --- /dev/null +++ b/src/test/scala/com/stratio/common/utils/integration/ZKTransactionTestClient.scala @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2015 Stratio (http://stratio.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.stratio.common.utils.integration + +import com.stratio.common.utils.components.config.impl.TypesafeConfigComponent +import com.stratio.common.utils.components.logger.impl.Slf4jLoggerComponent +import com.stratio.common.utils.components.transaction_manager.TransactionResource +import com.stratio.common.utils.components.transaction_manager.impl.ZookeeperRepositoryWithTransactionsComponent + +object ZKTransactionTestClient extends App { + + case class OutputEntry(client: String, resources: Seq[String], segment: Int, part: Int) { + override def toString: String = + s"client=$client resources=[${resources.mkString(", ")}] segment=$segment part=$part" + } + + object OutputEntry { + + def apply(line: String): OutputEntry = { + //TODO: Be able to extract more than one resource id + val ExtractionRegex = + """^client=(\w+)\s+resources=\[\s*((?:\w+)?(?:,\s*\w+\s*)*)\s*]\s+segment=(\d+)\s+part=(\d+)\s*$""".r + line match { + case ExtractionRegex(client, resources, segmentStr, partStr) => + OutputEntry(client, resources.split(",").map(_.trim), segmentStr.toInt, partStr.toInt) + } + } + + } + + case class Resource(id: String) extends TransactionResource + + val usageMsg = + "Usage: ZKTransactionTestClient [resourceid] [ ]" + + require(args.size > 2, usageMsg) + + val Array( + label, + nResourcesStr, + remainingArgs @ _* + ) = args + + require(nResourcesStr.toInt <= remainingArgs.size) + + val (resourcesStr, segmentsStr) = remainingArgs.splitAt(nResourcesStr.toInt) + val resources = resourcesStr map Resource + + require((remainingArgs.size - resources.size) % 2 == 0, usageMsg) + + val segments = segmentsStr.grouped(2) + + + def mayBeProtected(block: => Unit): Unit = + if(resources.isEmpty) block + else transactionManager.repository.atomically("test", resources.head, resources.tail:_*)(block) + + val transactionManager = new ZookeeperRepositoryWithTransactionsComponent + with TypesafeConfigComponent with Slf4jLoggerComponent + + segments.zipWithIndex foreach { case (segment, iteration) => + val Seq(nParts, millis) = segment.map(_.toLong) + mayBeProtected { + (1L to nParts) foreach { part => + println(OutputEntry(label, resources.map(_.id), iteration, part.toInt)) + Thread.sleep(millis) + } + } + } + +} + diff --git a/src/test/scala/com/stratio/common/utils/integration/ZookeeperTest.scala b/src/test/scala/com/stratio/common/utils/integration/ZookeeperTest.scala index a55f5f6..476b375 100644 --- a/src/test/scala/com/stratio/common/utils/integration/ZookeeperTest.scala +++ b/src/test/scala/com/stratio/common/utils/integration/ZookeeperTest.scala @@ -19,6 +19,7 @@ import com.stratio.common.utils.components.config.impl.TypesafeConfigComponent import com.stratio.common.utils.components.dao.GenericDAOComponent import com.stratio.common.utils.components.logger.impl.Slf4jLoggerComponent import com.stratio.common.utils.components.repository.impl.ZookeeperRepositoryComponent +import com.stratio.common.utils.integration.ZKTransactionTestClient.OutputEntry import org.apache.curator.test.TestingServer import org.apache.curator.utils.CloseableUtils import org.junit.runner.RunWith @@ -89,6 +90,79 @@ class ZookeeperIntegrationTest extends WordSpec dao.count() shouldBe a[Failure[_]] } } + + "A Transaction manager backed by Zookeeper" should { + + import com.stratio.common.utils.MultiJVMTestUtils._ + + def sequenceGroups(outputs: Seq[String]): Seq[Seq[Int]] = { + outputs.map(OutputEntry(_)).zipWithIndex groupBy { + case (OutputEntry(label, _, _, _), n) => label + } values + }.toSeq.map(_.map(_._2)) + + "avoid exclusion zone (over a given resource) process interleaving" in { + + val testBatch = (TestBatch() /: (1 to 5)) { (batchToUpdate, n) => + val p = externalProcess(ZKTransactionTestClient)(s"testclient$n", "1", "a", "5", (200+n*10).toString) + batchToUpdate addProcess p + } + + sequenceGroups(testBatch.launchAndWait().view) foreach { group => + group should contain theSameElementsInOrderAs (group.min until (group.min + group.size)) + } + + } + + "avoid exclusion zone (over overlapping resources sets) process inverleaving" in { + + val testBatch = TestBatch() addProcess { + externalProcess(ZKTransactionTestClient)("testclient1", "3", "a", "b", "c", "10", "300") + } addProcess { + externalProcess(ZKTransactionTestClient)("testclient2", "2", "c", "d", "10", "100") + } addProcess { + externalProcess(ZKTransactionTestClient)("testclient3", "2", "b", "c", "10", "150") + } + + sequenceGroups(testBatch.launchAndWait().view) foreach { group => + group should contain theSameElementsInOrderAs (group.min until (group.min + group.size)) + } + + } + + "avoid process interleaving for intersecting resources while allowing interleaving for non-intersecting" in { + + val testBatch = TestBatch() addProcess { + externalProcess(ZKTransactionTestClient)("testclient1", "2", "a", "b", "10", "100") + } addProcess { + externalProcess(ZKTransactionTestClient)("testclient2", "2", "b", "c", "10", "100") + } addProcess { + externalProcess(ZKTransactionTestClient)("testclient3", "2", "c", "d", "10", "100") + } + + val output = testBatch.launchAndWait() + + val outOneAndTwo = output.filterNot(line => OutputEntry(line).client == "testclient3") + val outTwoAndThree = output.filterNot(line => OutputEntry(line).client == "testclient1") + val outOneAndThree = output.filterNot(line => OutputEntry(line).client == "testclient2") + + sequenceGroups(outOneAndTwo) foreach { group => + group should contain theSameElementsInOrderAs (group.min until (group.min + group.size)) + } + + sequenceGroups(outTwoAndThree) foreach { group => + group should contain theSameElementsInOrderAs (group.min until (group.min + group.size)) + } + + sequenceGroups(outOneAndThree) foreach { group => + group should not contain theSameElementsInOrderAs (group.min until (group.min + group.size)) + } + + } + + + } + } class DummyZookeeperDAOComponent extends GenericDAOComponent[Dummy] From e5fbee213795da9fd201fc0baebb87c9be810544 Mon Sep 17 00:00:00 2001 From: pmadrigal Date: Wed, 8 Mar 2017 13:13:48 +0100 Subject: [PATCH 05/10] adquisionResource with correct slash --- .../impl/ZookeeperRepositoryWithTransactionsComponent.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala b/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala index d545fd0..5573c4f 100644 --- a/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala +++ b/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala @@ -33,7 +33,10 @@ trait ZookeeperRepositoryWithTransactionsComponent extends ZookeeperRepositoryCo with TransactionalRepository { //TODO: Improve path option usage - private def acquisitionResource: String = s"${path.getOrElse("")}/locks" + private def acquisitionResource: String = path match { + case Some(pathValue) => s"/$pathValue/locks" + case None => "/locks" + } private object AcquiredLocks { From c922f2e0291e9c210f611ba97cc747312ad1967b Mon Sep 17 00:00:00 2001 From: pmadrigal Date: Thu, 9 Mar 2017 10:40:03 +0100 Subject: [PATCH 06/10] update with review --- .../impl/ZookeeperRepositoryWithTransactionsComponent.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala b/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala index 5573c4f..20a974b 100644 --- a/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala +++ b/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala @@ -33,10 +33,7 @@ trait ZookeeperRepositoryWithTransactionsComponent extends ZookeeperRepositoryCo with TransactionalRepository { //TODO: Improve path option usage - private def acquisitionResource: String = path match { - case Some(pathValue) => s"/$pathValue/locks" - case None => "/locks" - } + private def acquisitionResource: String = "/" + path.map(_ + "/").getOrElse("") + "locks" private object AcquiredLocks { From 43ef04d7070afc7fb44216b7d664dd61a5fe6697 Mon Sep 17 00:00:00 2001 From: JesusMtnez Date: Thu, 9 Mar 2017 11:48:06 +0100 Subject: [PATCH 07/10] Move locks to global directory Leaving the locks in a directory under entity's directory caused an error when retrieving all directories for an entity, because locks appeared as an empty entity. TODO: Improve locks directory under stratio directory, not globally. --- .../impl/ZookeeperRepositoryWithTransactionsComponent.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala b/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala index 20a974b..c8d5e53 100644 --- a/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala +++ b/src/main/scala/com/stratio/common/utils/components/transaction_manager/impl/ZookeeperRepositoryWithTransactionsComponent.scala @@ -29,11 +29,12 @@ trait ZookeeperRepositoryWithTransactionsComponent extends ZookeeperRepositoryCo override val repository: ZookeeperRepositoryWithTransactions = new ZookeeperRepositoryWithTransactions(None) + //TODO Improve paths and locksPath behaviour class ZookeeperRepositoryWithTransactions(path: Option[String] = None) extends ZookeeperRepository(path) with TransactionalRepository { //TODO: Improve path option usage - private def acquisitionResource: String = "/" + path.map(_ + "/").getOrElse("") + "locks" + private def acquisitionResource: String = "/locks" + path.map("/" + _).getOrElse("") private object AcquiredLocks { @@ -72,7 +73,7 @@ trait ZookeeperRepositoryWithTransactionsComponent extends ZookeeperRepositoryCo } private def lockPath(entity: String)(resource: TransactionResource): String = { - s"/$entity/locks/${resource.id}" + s"/locks/$entity/${resource.id}" } override def atomically[T]( From 8562ddaab83dadfc45c2fe0537b715f8153f5208 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Fco=2E=20P=C3=A9rez=20Hidalgo?= Date: Thu, 9 Mar 2017 11:56:19 +0100 Subject: [PATCH 08/10] Removed flaky test --- .../stratio/common/utils/integration/ZookeeperTest.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/test/scala/com/stratio/common/utils/integration/ZookeeperTest.scala b/src/test/scala/com/stratio/common/utils/integration/ZookeeperTest.scala index 476b375..1333a71 100644 --- a/src/test/scala/com/stratio/common/utils/integration/ZookeeperTest.scala +++ b/src/test/scala/com/stratio/common/utils/integration/ZookeeperTest.scala @@ -153,11 +153,7 @@ class ZookeeperIntegrationTest extends WordSpec sequenceGroups(outTwoAndThree) foreach { group => group should contain theSameElementsInOrderAs (group.min until (group.min + group.size)) } - - sequenceGroups(outOneAndThree) foreach { group => - group should not contain theSameElementsInOrderAs (group.min until (group.min + group.size)) - } - + } From ee5a7c8fe2f14ffd02ab5faccc79726a13d927cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Fco=2E=20P=C3=A9rez=20Hidalgo?= Date: Thu, 9 Mar 2017 14:42:31 +0100 Subject: [PATCH 09/10] Added transaction manager documentation to README. --- README.md | 70 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 1df327e..9a45898 100644 --- a/README.md +++ b/README.md @@ -11,8 +11,9 @@ Now is possible to create some generic modules: - Repositories - Logger - Configuration +- Transaction managers -Appart from these modules, a functional package has been added in order to provide some extended functionality +Apart from these modules, a functional package has been added in order to provide some extended functionality regarding scala collections and functional utilities. Repositories @@ -291,4 +292,71 @@ To set a monitor and subscribe it to our silly actor heartbeats you can just do `testActor` will receive `HeartbeatLost(fallenId)` when the by-that-id identified service has stopped its heartbeat. +Transaction managers +==================== + +A public component interface aimed to abstract mutual cluster-wide exclusion code areas over a protected set of +resources. + +The idea is to apply the principles of JVM's object monitor locks. However its implementors should guarantee that +these exclusion areas are cluster-wide. That is, this interface should serve as a way of seamlessly provide cluster +wide `syncrhonize` operation. + +```scala +transactionMgr.atomically("test", DogsTable) { + ... + //The code here will won't run at the same time that any other + //code protected with the resource DogsTable at any JVM in the same or other machine + ... +} +``` + +Protected resources should not be shared, instead, they should serve as unique lock identifiers: + +``` + +-------------------+ + | | + +-----------> | Shared Resource | <-----------------------+ + | | | | + | +---------+---------+ | + | | | ++------------------------+ | +----------------------+ +| Node (A) | | | | Node (B) | | +| | | | | | | +| | | v | | | +| + | | + | +| atomically(Shared Resorce) { +--------+ | atomically(Shared Resorce) { +| | | | | +| //Code @ A, never runs at the | id | | //Code @ B, never runs at the +| //same time than code @ B | | | //same time than code @ A +| | +---+----+ | | +| } | | | } | +| | | | | +| | | | | +| | | | | +| | | | | ++------------------------+ v +----------------------+ + + XXXXXXX + X X + X X + +--X-----X--+ + | +-+ | + | | | | + | | | | + | +-+ | + +-----------+ + + Cluster Lock + +``` + +Zookeeper +------------------------------------------------ + +An Zookeeper based implementor is also provided by `ZookeeperRepositoryWithTransactionsComponent` in the form of a +mixed component: Repository + Transaction Manager + +A good use example can be found at `ZookeeperIntegrationTest` integration test where several +`ZKTransactionTestClient` application processes compete for the same shared resource: A common output stream. From 829d094a955c2555f0254ab4025ac8fac01ac095 Mon Sep 17 00:00:00 2001 From: JesusMtnez Date: Fri, 10 Mar 2017 08:50:24 +0100 Subject: [PATCH 10/10] Bump version to 0.9.0 --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 6ac4239..44ab9d6 100644 --- a/pom.xml +++ b/pom.xml @@ -18,13 +18,13 @@ - + 4.0.0 com.stratio parent - 0.8.2 + 0.9.0 com.stratio.common