Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ crossScalaVersions := Seq("2.12.7")

conflictManager := ConflictManager.strict

val JunitVersion = "4.12"
val ScalaTestVersion = "3.0.5"
val LogbackVersion = "1.2.3"
val CuratorVersion = "4.2.0"

val customScalacOptions = Seq(
"-unchecked",
"-deprecation",
Expand All @@ -23,8 +28,8 @@ val customScalacOptions = Seq(
)

val customDependencies = Seq(
"junit" % "junit" % "4.12" % Test,
"org.scalatest" %% "scalatest" % "3.0.5" % Test
"junit" % "junit" % JunitVersion % Test,
"org.scalatest" %% "scalatest" % ScalaTestVersion % Test
)

lazy val root = (project in file("."))
Expand All @@ -43,9 +48,11 @@ lazy val zookeeper = (project in file("zookeeper"))
name := "cockpit-zookeeper",
scalacOptions ++= customScalacOptions,
libraryDependencies ++= customDependencies ++ Seq(
"org.apache.curator" % "curator-framework" % "4.2.0"
"ch.qos.logback" % "logback-classic" % LogbackVersion,
"org.apache.curator" % "curator-framework" % CuratorVersion,
"org.apache.curator" % "curator-test" % CuratorVersion % Test,
)
).dependsOn(core % "compile->compile,test->test")
).dependsOn(core % "compile->compile;test->test")

lazy val akkaHttp = (project in file("akka-http"))
.settings(
Expand All @@ -54,6 +61,5 @@ lazy val akkaHttp = (project in file("akka-http"))
libraryDependencies ++= customDependencies
)

//coverageEnabled := true

// TODO coverageEnabled := true
// TODO scalastyle
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ abstract class HoldersDesk[F[_]](ft: ControlTypes,
init: String): Control[V]

protected def updateInner(name: String)
(current2next: String => String): F[Unit]
(updater: String => String): F[Unit]
}

object HoldersDesk {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ class InMemoryDesk(ft: ControlTypes,
}

override protected def updateInner(name: String)
(current2next: String => String): Future[Unit] =
(updater: String => String): Future[Unit] =
synchronized { Future.fromTry(Try {
val old = featureMap(name)
val n = current2next(old)
val n = updater(old)
featureMap.update(name, n)
})}
}
2 changes: 2 additions & 0 deletions core/src/main/scala/ru/alesavin/cockpit/model/Control.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ trait Control[V] {

override def hashCode: Int =
name.hashCode

override def toString: String = s"Control($name)"
}

object Control {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ trait DeskSpecBase

d.register(name, 122)
d.delete(name).futureValue shouldBe true
d.list.futureValue.isEmpty shouldBe true
d.list.futureValue shouldBe empty
}
"return false on calling delete for nonexistent feature" in {
val d = desk()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package ru.alesavin.cockpit.impl

import com.google.common.cache.CacheBuilder

import scala.concurrent.duration.FiniteDuration
import scala.util.{Success, Try}

/**
* Caching mix-in for [[Storage]]
*
* @author alesavin
*/
trait CachedStorage extends Storage {

def duration: FiniteDuration

private val Cache: com.google.common.cache.Cache[String, String] =
CacheBuilder.newBuilder()
.expireAfterWrite(duration.length, duration.unit)
.build()

abstract override def get(key: String): Try[Option[String]] =
for {
cached <- Try(Option(Cache.getIfPresent(key)))
r <- cached match {
case s@Some(_) => Success(s)
case _ => super.get(key).map { v =>
v.foreach(Cache.put(key, _))
v
}
}
} yield r


abstract override def set(key: String, value: String): Try[Unit] =
for {
_ <- Try(Cache.invalidate(key))
r <- super.set(key, value)
} yield r
}
17 changes: 17 additions & 0 deletions zookeeper/src/main/scala/ru/alesavin/cockpit/impl/Storage.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package ru.alesavin.cockpit.impl

import scala.util.Try

/**
* Provide simple interface to storage
* TODO move to core
*
* @author alesavin
*/
trait Storage {

def keys: Try[Iterable[String]] // TODO Curator Async => Future
def get(key: String): Try[Option[String]]
def set(key: String, value: String): Try[Unit]
def remove(key: String): Try[Boolean]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package ru.alesavin.cockpit.impl

import java.util.NoSuchElementException

import org.apache.curator.framework.CuratorFramework
import ru.alesavin.cockpit.impl.HoldersDesk.HolderControlType
import ru.alesavin.cockpit.impl.ZookeeperDesk.registrationLock
import ru.alesavin.cockpit.model.{Control, ControlType, ControlTypes, Holder}

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}

/**
* Impl of [[ru.alesavin.cockpit.model.Desk]] over Zookeeper
*
* @author alesavin
*/
class ZookeeperDesk(client: CuratorFramework,
baseZkPath: String,
ft: ControlTypes,
hType: ControlType[Holder] = HolderControlType,
cacheDuration: FiniteDuration = 30.seconds)
extends HoldersDesk[Future](ft, hType) {

private val zkStorage =
new ZookeeperStorage(client, baseZkPath)
with CachedStorage {
override def duration: FiniteDuration = cacheDuration
}

override def list: Future[Seq[Control[Holder]]] =
Future.fromTry(
for {
keys <- zkStorage.keys
result <- keys.foldLeft(Success(Seq.empty) : Try[Seq[Control[Holder]]]) {
case (f@Failure(_), _) => f
case (Success(s), k) =>
val nc = for {
optV <- zkStorage.get(k)
v <- Try(optV.getOrElse(throw new NoSuchElementException(s"No $k")))
} yield Control(k, _ => hType.from(v).get)
nc.map(ch => s :+ ch)
}
} yield result
)

override def delete(name: String): Future[Boolean] =
Future.fromTry(zkStorage.remove(name))

override protected def registerInner[V](name: String,
init: String): Control[V] =
registrationLock.synchronized { // do registrations sequential due of zk performance
(for {
exist <- zkStorage.get(name)
_ <- exist match {
case None => zkStorage.set(name, init)
case _ => Success(())
}
} yield Control[V](name, n => decode[V](zkStorage.get(n).get.get))) // TODO
.get
}

protected def updateInner(name: String)
(updater: String => String): Future[Unit] =
Future.fromTry {
for {
optExist <- zkStorage.get(name)
exist <- Try(optExist.getOrElse(throw new NoSuchElementException(s"No $name")))
_ <- zkStorage.set(name, updater(exist))
} yield ()
}
}

object ZookeeperDesk {

private val registrationLock = new Object
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package ru.alesavin.cockpit.impl

import org.apache.curator.framework.CuratorFramework
import org.apache.curator.utils.ZKPaths
import org.apache.zookeeper.{CreateMode, KeeperException}
import ru.alesavin.cockpit.impl.ZookeeperStorage._

import scala.util.Try
import scala.collection.JavaConverters._


/**
* Impl of [[Storage]] over Zookeeper
*
* @author alesavin
*/
class ZookeeperStorage(client: CuratorFramework,
baseZkPath: String) extends Storage {

require(baseZkPath.nonEmpty, "Empty zookeeper path")

override def keys: Try[Iterable[String]] =
Try(client.getChildren.forPath(baseZkPath).asScala)

override def get(key: String): Try[Option[String]] =
Try {
Option(client.getData.forPath(ZKPaths.makePath(baseZkPath, key))).map(to)
}.recover {
case _: KeeperException.NoNodeException => None
}

override def set(key: String, value: String): Try[Unit] =
Try {
client
.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(ZKPaths.makePath(baseZkPath, key), from(value))
()
}.recover {
case _: KeeperException.NodeExistsException =>
client
.setData()
.forPath(ZKPaths.makePath(baseZkPath, key), from(value))
()
}

override def remove(key: String): Try[Boolean] =
Try {
client.delete().forPath(ZKPaths.makePath(baseZkPath, key))
true
}.recover {
case _: KeeperException.NoNodeException => false
}
}

object ZookeeperStorage {

def to(data: Array[Byte]): String = new String(data, "UTF-8")
def from(data: String): Array[Byte] = data.getBytes("UTF-8")
}
12 changes: 12 additions & 0 deletions zookeeper/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<root level="ERROR">
<appender-ref ref="CONSOLE" />
</root>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package ru.alesavin.cockpit.impl

import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.test.TestingCluster
import ru.alesavin.cockpit.model.{ControlTypes, Desk}

import scala.concurrent.Future

/**
* Specd on [[ZookeeperDesk]]
*
* @author alesavin
*/
class ZookeeperDeskSpec
extends DeskSpecBase {

def desk(ft: ControlTypes): Desk[Future] = {
val zkCluster = new TestingCluster(3)
zkCluster.start()

val Curator: CuratorFramework = {
val curatorFramework = CuratorFrameworkFactory.newClient(
zkCluster.getConnectString,
new ExponentialBackoffRetry(100, 3))
curatorFramework.start()
curatorFramework
}
val BasePath = "/test"
Curator.create().creatingParentsIfNeeded().forPath(BasePath)

new ZookeeperDesk(Curator, BasePath, ft)
}
}
Loading