Skip to content

Commit

Permalink
Added multi-jvm tests... and not I am not using SBT
Browse files Browse the repository at this point in the history
Added license header.

Improved test set
  • Loading branch information
pfcoperez committed Mar 10, 2017
1 parent 23972dd commit c042805
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,39 @@
package com.stratio.common.utils.components.transaction_manager

import com.stratio.common.utils.components.repository.RepositoryComponent
import TransactionResource.Dao
import TransactionResource.WholeRepository

trait TransactionManagerComponent[K,V] {

self: RepositoryComponent[K, V] =>

trait TransactionalRepository extends Repository {

/**
* Code execution exclusion zone over a given set of cluster-wide resources.
*
* @param entity Entity prefix used for the content repository providing synchronization mechanisms
* @param firstResource First resource in the protected resource set
* @param resources Remaining resources in the protected resource set
* @param block Code to execute in the exclusion area over the resources
* @tparam T Return type of the exclusion area block
* @return Exclusion area result after its executions
*/
def atomically[T](entity: String,
firstResource: TransactionResource,
resources: TransactionResource*
)(block: => T): T

final def atomically[T](entity: String)(block: => T): T = atomically[T](entity, Dao)(block)
/**
* Code execution exclusion zone over a given repository content. The exlucsion area will be
* protected over all the resources managed by the `entity`, that is, the entity itself
*
* @param entity Entity prefix used for the content repository providing synchronization mechanisms
* @param block Code to execute in the exclusion area over the resources
* @tparam T Return type of the exclusion area block
* @return Exclusion area result after its executions
*/
final def atomically[T](entity: String)(block: => T): T = atomically[T](entity, WholeRepository)(block)
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ trait TransactionResource {

object TransactionResource {

object Dao extends TransactionResource {
override def id: String = "_dao"
object WholeRepository extends TransactionResource {
override def id: String = "_all"
}

}
75 changes: 55 additions & 20 deletions src/test/scala/com/stratio/common/utils/MultiJVMTestUtils.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@

/*
* Copyright (C) 2015 Stratio (http://stratio.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.stratio.common.utils

import java.io._

import com.stratio.common.utils.components.transaction_manager.TestClient

import scala.reflect.ClassTag
import scala.collection.mutable.SynchronizedQueue

object MultiJVMTestUtils extends App {
object MultiJVMTestUtils {

import scala.sys.process._

Expand All @@ -23,26 +33,51 @@ object MultiJVMTestUtils extends App {

}

class TestBatch private (private val batch: Seq[ProcessBuilder] = Seq.empty) {

def addProcess(process: ProcessBuilder): TestBatch = new TestBatch(process +: batch)

def launchAndWait(): Seq[String] = {

val mergedOutputs = new collection.mutable.Queue[String]()

val pLogger = ProcessLogger { line =>
mergedOutputs.synchronized(mergedOutputs.enqueue(line))
}

val toWait = batch.reverse.map { process =>
process.run(pLogger)
}

/*private val mergedOutputs = new collection.mutable.Queue[String]()
toWait.foreach(_.exitValue())

val pLogger = ProcessLogger { line =>
mergedOutputs.synchronized(mergedOutputs.enqueue(line))
mergedOutputs synchronized mergedOutputs

}

}

object TestBatch {
def apply(): TestBatch = new TestBatch()
}

/* USE EXAMPLE:
import com.stratio.common.utils.integration.ZKTransactionTestClient
val testBatch = TestBatch() addProcess {
externalProcess(ZKTransactionTestClient)("testclient1", "3", "a", "b", "c", "10", "300")
} addProcess {
externalProcess(ZKTransactionTestClient)("testclient2", "2", "c", "d", "10", "200")
} addProcess {
externalProcess(ZKTransactionTestClient)("testclient3", "2", "b", "c", "10", "200")
} addProcess {
externalProcess(ZKTransactionTestClient)("testclient4", "2", "c", "d", "10", "200")
}
val p1 = externalProcess(TestClient)("testclient1", "2", "a", "b", "10", "300").run(pLogger)
val p2 = externalProcess(TestClient)("testclient2", "2", "c", "d", "10", "200").run(pLogger)
val p3 = externalProcess(TestClient)("testclient3", "2", "b", "c", "10", "200").run(pLogger)
val p4 = externalProcess(TestClient)("testclient4", "2", "c", "d", "10", "200").run(pLogger)
testBatch.launchAndWait().foreach(x => println(s"> $x"))
p1.exitValue()
p2.exitValue()
p3.exitValue()
p4.exitValue()
*/

mergedOutputs.synchronized {
mergedOutputs.foreach(println)
}*/

}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

/*
* Copyright (C) 2015 Stratio (http://stratio.com)
*
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (C) 2015 Stratio (http://stratio.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.stratio.common.utils.integration

import com.stratio.common.utils.components.config.impl.TypesafeConfigComponent
import com.stratio.common.utils.components.logger.impl.Slf4jLoggerComponent
import com.stratio.common.utils.components.transaction_manager.TransactionResource
import com.stratio.common.utils.components.transaction_manager.impl.ZookeeperRepositoryWithTransactionsComponent

object ZKTransactionTestClient extends App {

case class OutputEntry(client: String, resources: Seq[String], segment: Int, part: Int) {
override def toString: String =
s"client=$client resources=[${resources.mkString(", ")}] segment=$segment part=$part"
}

object OutputEntry {

def apply(line: String): OutputEntry = {
//TODO: Be able to extract more than one resource id
val ExtractionRegex =
"""^client=(\w+)\s+resources=\[\s*((?:\w+)?(?:,\s*\w+\s*)*)\s*]\s+segment=(\d+)\s+part=(\d+)\s*$""".r
line match {
case ExtractionRegex(client, resources, segmentStr, partStr) =>
OutputEntry(client, resources.split(",").map(_.trim), segmentStr.toInt, partStr.toInt)
}
}

}

case class Resource(id: String) extends TransactionResource

val usageMsg =
"Usage: ZKTransactionTestClient <client_label> <nresources> [resourceid] [<no_segment_parts> <part_duration>]"

require(args.size > 2, usageMsg)

val Array(
label,
nResourcesStr,
remainingArgs @ _*
) = args

require(nResourcesStr.toInt <= remainingArgs.size)

val (resourcesStr, segmentsStr) = remainingArgs.splitAt(nResourcesStr.toInt)
val resources = resourcesStr map Resource

require((remainingArgs.size - resources.size) % 2 == 0, usageMsg)

val segments = segmentsStr.grouped(2)


def mayBeProtected(block: => Unit): Unit =
if(resources.isEmpty) block
else transactionManager.repository.atomically("test", resources.head, resources.tail:_*)(block)

val transactionManager = new ZookeeperRepositoryWithTransactionsComponent
with TypesafeConfigComponent with Slf4jLoggerComponent

segments.zipWithIndex foreach { case (segment, iteration) =>
val Seq(nParts, millis) = segment.map(_.toLong)
mayBeProtected {
(1L to nParts) foreach { part =>
println(OutputEntry(label, resources.map(_.id), iteration, part.toInt))
Thread.sleep(millis)
}
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.stratio.common.utils.components.config.impl.TypesafeConfigComponent
import com.stratio.common.utils.components.dao.GenericDAOComponent
import com.stratio.common.utils.components.logger.impl.Slf4jLoggerComponent
import com.stratio.common.utils.components.repository.impl.ZookeeperRepositoryComponent
import com.stratio.common.utils.integration.ZKTransactionTestClient.OutputEntry
import org.apache.curator.test.TestingServer
import org.apache.curator.utils.CloseableUtils
import org.junit.runner.RunWith
Expand Down Expand Up @@ -89,6 +90,79 @@ class ZookeeperIntegrationTest extends WordSpec
dao.count() shouldBe a[Failure[_]]
}
}

"A Transaction manager backed by Zookeeper" should {

import com.stratio.common.utils.MultiJVMTestUtils._

def sequenceGroups(outputs: Seq[String]): Seq[Seq[Int]] = {
outputs.map(OutputEntry(_)).zipWithIndex groupBy {
case (OutputEntry(label, _, _, _), n) => label
} values
}.toSeq.map(_.map(_._2))

"avoid exclusion zone (over a given resource) process interleaving" in {

val testBatch = (TestBatch() /: (1 to 5)) { (batchToUpdate, n) =>
val p = externalProcess(ZKTransactionTestClient)(s"testclient$n", "1", "a", "5", (200+n*10).toString)
batchToUpdate addProcess p
}

sequenceGroups(testBatch.launchAndWait().view) foreach { group =>
group should contain theSameElementsInOrderAs (group.min until (group.min + group.size))
}

}

"avoid exclusion zone (over overlapping resources sets) process inverleaving" in {

val testBatch = TestBatch() addProcess {
externalProcess(ZKTransactionTestClient)("testclient1", "3", "a", "b", "c", "10", "300")
} addProcess {
externalProcess(ZKTransactionTestClient)("testclient2", "2", "c", "d", "10", "100")
} addProcess {
externalProcess(ZKTransactionTestClient)("testclient3", "2", "b", "c", "10", "150")
}

sequenceGroups(testBatch.launchAndWait().view) foreach { group =>
group should contain theSameElementsInOrderAs (group.min until (group.min + group.size))
}

}

"avoid process interleaving for intersecting resources while allowing interleaving for non-intersecting" in {

val testBatch = TestBatch() addProcess {
externalProcess(ZKTransactionTestClient)("testclient1", "2", "a", "b", "10", "100")
} addProcess {
externalProcess(ZKTransactionTestClient)("testclient2", "2", "b", "c", "10", "100")
} addProcess {
externalProcess(ZKTransactionTestClient)("testclient3", "2", "c", "d", "10", "100")
}

val output = testBatch.launchAndWait()

val outOneAndTwo = output.filterNot(line => OutputEntry(line).client == "testclient3")
val outTwoAndThree = output.filterNot(line => OutputEntry(line).client == "testclient1")
val outOneAndThree = output.filterNot(line => OutputEntry(line).client == "testclient2")

sequenceGroups(outOneAndTwo) foreach { group =>
group should contain theSameElementsInOrderAs (group.min until (group.min + group.size))
}

sequenceGroups(outTwoAndThree) foreach { group =>
group should contain theSameElementsInOrderAs (group.min until (group.min + group.size))
}

sequenceGroups(outOneAndThree) foreach { group =>
group should not contain theSameElementsInOrderAs (group.min until (group.min + group.size))
}

}


}

}

class DummyZookeeperDAOComponent extends GenericDAOComponent[Dummy]
Expand Down

0 comments on commit c042805

Please sign in to comment.