From b210c1f44dfd4be20411db2943da0ec082dc04cb Mon Sep 17 00:00:00 2001 From: roman Date: Wed, 6 Jun 2012 00:49:52 +0400 Subject: [PATCH] Implement nodes migration support in cluster --- project/ScalaRedisProject.scala | 2 + .../com/redis/cluster/ClusterConfigDiff.scala | 17 ++ .../redis/cluster/ClusterConfigListener.scala | 7 + .../com/redis/cluster/ConfigManager.scala | 9 + .../redis/cluster/CopyOnWriteHashRing.scala | 39 +++ .../scala/com/redis/cluster/HashRing.scala | 52 ++-- .../scala/com/redis/cluster/NodeConfig.scala | 5 + .../com/redis/cluster/RedisCluster.scala | 266 ++++++++++++++---- .../cluster/config/ZKStringSerializer.scala | 18 ++ .../com/redis/cluster/config/ZkConfig.scala | 28 ++ .../config/ZookeperConfigManager.scala | 80 ++++++ .../redis/cluster/ClusterConfigDiffSpec.scala | 64 +++++ .../com/redis/cluster/HashRingSpec.scala | 28 ++ .../{ => cluster}/RedisClusterSpec.scala | 29 +- .../cluster/RedisClusterUpdateSpec.scala | 51 ++++ .../config/ZookeeperConfigManagerSpec.scala | 67 +++++ 16 files changed, 670 insertions(+), 92 deletions(-) create mode 100644 src/main/scala/com/redis/cluster/ClusterConfigDiff.scala create mode 100644 src/main/scala/com/redis/cluster/ClusterConfigListener.scala create mode 100644 src/main/scala/com/redis/cluster/ConfigManager.scala create mode 100644 src/main/scala/com/redis/cluster/CopyOnWriteHashRing.scala create mode 100644 src/main/scala/com/redis/cluster/NodeConfig.scala create mode 100644 src/main/scala/com/redis/cluster/config/ZKStringSerializer.scala create mode 100644 src/main/scala/com/redis/cluster/config/ZkConfig.scala create mode 100644 src/main/scala/com/redis/cluster/config/ZookeperConfigManager.scala create mode 100644 src/test/scala/com/redis/cluster/ClusterConfigDiffSpec.scala create mode 100644 src/test/scala/com/redis/cluster/HashRingSpec.scala rename src/test/scala/com/redis/{ => cluster}/RedisClusterSpec.scala (81%) create mode 100644 src/test/scala/com/redis/cluster/RedisClusterUpdateSpec.scala create mode 100644 src/test/scala/com/redis/cluster/config/ZookeeperConfigManagerSpec.scala diff --git a/project/ScalaRedisProject.scala b/project/ScalaRedisProject.scala index 764ad7aa..d13436dd 100644 --- a/project/ScalaRedisProject.scala +++ b/project/ScalaRedisProject.scala @@ -18,11 +18,13 @@ object ScalaRedisProject extends Build name := "RedisClient", libraryDependencies ++= Seq("commons-pool" % "commons-pool" % "1.5.6", + "com.github.sgroschupf" % "zkclient" % "0.1", "org.slf4j" % "slf4j-api" % "1.6.1", "org.slf4j" % "slf4j-log4j12" % "1.6.1" % "provided", "log4j" % "log4j" % "1.2.16" % "provided", "junit" % "junit" % "4.8.1" % "test", "org.scalatest" % "scalatest_2.9.1" % "1.6.1" % "test", + "org.mockito" % "mockito-all" % "1.8.4" % "test", "com.twitter" % "util" % "1.11.4" % "test" intransitive(), "com.twitter" % "finagle-core" % "1.9.0" % "test"), diff --git a/src/main/scala/com/redis/cluster/ClusterConfigDiff.scala b/src/main/scala/com/redis/cluster/ClusterConfigDiff.scala new file mode 100644 index 00000000..69dcb1c4 --- /dev/null +++ b/src/main/scala/com/redis/cluster/ClusterConfigDiff.scala @@ -0,0 +1,17 @@ +package com.redis.cluster + +case class ClusterConfigDiff(newNodes: Map[String, NodeConfig], + updatedNodes: Map[String, NodeConfig], + deletedNodes: Map[String, NodeConfig]) + +object ClusterConfigDiff { + def apply(oldCluster: Map[String, NodeConfig], newCluster: Map[String, NodeConfig]): ClusterConfigDiff = { + val newNodes = newCluster -- oldCluster.keys + val removedNodes = oldCluster -- newCluster.keys + val changedNodes = newCluster.filter { + case (name, cfg) if oldCluster.contains(name) => cfg != oldCluster(name) + case _ => false + } + ClusterConfigDiff(newNodes, changedNodes, removedNodes) + } +} diff --git a/src/main/scala/com/redis/cluster/ClusterConfigListener.scala b/src/main/scala/com/redis/cluster/ClusterConfigListener.scala new file mode 100644 index 00000000..f6664f47 --- /dev/null +++ b/src/main/scala/com/redis/cluster/ClusterConfigListener.scala @@ -0,0 +1,7 @@ +package com.redis.cluster + + +trait ClusterConfigListener { + def configUpdated(newConfig: Map[String, NodeConfig]) + +} diff --git a/src/main/scala/com/redis/cluster/ConfigManager.scala b/src/main/scala/com/redis/cluster/ConfigManager.scala new file mode 100644 index 00000000..956292f0 --- /dev/null +++ b/src/main/scala/com/redis/cluster/ConfigManager.scala @@ -0,0 +1,9 @@ +package com.redis.cluster + + +trait ConfigManager { + def readConfig: Map[String, NodeConfig] + + def addListener(listener: ClusterConfigListener) + +} diff --git a/src/main/scala/com/redis/cluster/CopyOnWriteHashRing.scala b/src/main/scala/com/redis/cluster/CopyOnWriteHashRing.scala new file mode 100644 index 00000000..f1ccfb43 --- /dev/null +++ b/src/main/scala/com/redis/cluster/CopyOnWriteHashRing.scala @@ -0,0 +1,39 @@ +package com.redis.cluster + +import java.util.concurrent.atomic.AtomicReference + + +class CopyOnWriteHashRing[T](nodes: Map[String, T], replicas: Int) { + val ringRef = new AtomicReference[HashRing[T]](HashRing(nodes, replicas)) + + def addNode(nodeName: String, node: T) { + val ring = ringRef.get() + if (!ringRef.compareAndSet(ring, ring.addNode(nodeName, node))) + addNode(nodeName, node) + } + + def udpateNode(nodeName: String, node: T): T = { + val ring = ringRef.get() + val oldNode = ring.cluster(nodeName) + val newRing = ring.removeNode(nodeName).addNode(nodeName, node) + if (!ringRef.compareAndSet(ring, newRing)) + udpateNode(nodeName, node) + else + oldNode + } + + // remove node from the ring + def removeNode(nodeName: String): T = { + val ring = ringRef.get() + val newRing = ring.removeNode(nodeName) + if (!ringRef.compareAndSet(ring, newRing)) + removeNode(nodeName) + else + ring.cluster(nodeName) + } + + // get node for the key + def getNode(key: Seq[Byte]): T = ringRef.get().getNode(key) + + def cluster = ringRef.get().cluster +} diff --git a/src/main/scala/com/redis/cluster/HashRing.scala b/src/main/scala/com/redis/cluster/HashRing.scala index 029db7ce..49bed999 100644 --- a/src/main/scala/com/redis/cluster/HashRing.scala +++ b/src/main/scala/com/redis/cluster/HashRing.scala @@ -1,46 +1,51 @@ package com.redis.cluster import java.util.zip.CRC32 -import scala.collection.immutable.TreeSet -import scala.collection.mutable.{ArrayBuffer, Map, ListBuffer} +import collection.immutable.SortedMap -case class HashRing[T](nodes: List[T], replicas: Int) { - var sortedKeys = new TreeSet[Long] - val cluster = new ArrayBuffer[T] - var ring = Map[Long, T]() +class HashRing[T](val cluster: Map[String, T], val ring: SortedMap[Long, T], replicas: Int) { - nodes.foreach(addNode(_)) + import HashRing._ // adds a node to the hash ring (including a number of replicas) - def addNode(node: T) = { - cluster += node - (1 to replicas).foreach {replica => - val key = calculateChecksum((node + ":" + replica).getBytes("UTF-8")) - ring += (key -> node) - sortedKeys = sortedKeys + key + def addNode(nodeName: String, node: T) = { + require(!cluster.contains(nodeName), "Cluster already contains node '" + nodeName + "'") + val pairs = (1 to replicas).map { + replica => (calculateChecksum((nodeName + ":" + replica).getBytes("UTF-8")), node) } + new HashRing[T](cluster + (nodeName -> node), ring ++ pairs, replicas) } // remove node from the ring - def removeNode(node: T) { - cluster -= node - (1 to replicas).foreach {replica => - val key = calculateChecksum((node + ":" + replica).getBytes("UTF-8")) - ring -= key - sortedKeys = sortedKeys - key + def removeNode(nodeName: String) = { + require(cluster.contains(nodeName), "Cluster does not contain node '" + nodeName + "'") + val keys = (1 to replicas).map { + replica => calculateChecksum((nodeName + ":" + replica).getBytes("UTF-8")) } + new HashRing[T](cluster - nodeName, ring -- keys, replicas) } // get node for the key def getNode(key: Seq[Byte]): T = { val crc = calculateChecksum(key) - if (sortedKeys contains crc) ring(crc) + if (ring contains crc) ring(crc) else { - if (crc < sortedKeys.firstKey) ring(sortedKeys.firstKey) - else if (crc > sortedKeys.lastKey) ring(sortedKeys.lastKey) - else ring(sortedKeys.rangeImpl(None, Some(crc)).lastKey) + if (crc < ring.firstKey) ring.head._2 + else if (crc > ring.lastKey) ring.last._2 + else ring.rangeImpl(None, Some(crc)).last._2 } } +} + +object HashRing { + def apply[T](nodes: Map[String, T], replicas: Int) = { + val pairs = + for ((nodeName, node) <- nodes.toSeq; + replica <- 1 to replicas) yield { + (calculateChecksum((nodeName + ":" + replica).getBytes("UTF-8")), node) + } + new HashRing[T](nodes, SortedMap(pairs: _*), replicas) + } // Computes the CRC-32 of the given String def calculateChecksum(value: Seq[Byte]): Long = { @@ -49,4 +54,3 @@ case class HashRing[T](nodes: List[T], replicas: Int) { checksum.getValue } } - diff --git a/src/main/scala/com/redis/cluster/NodeConfig.scala b/src/main/scala/com/redis/cluster/NodeConfig.scala new file mode 100644 index 00000000..6d9a7b5a --- /dev/null +++ b/src/main/scala/com/redis/cluster/NodeConfig.scala @@ -0,0 +1,5 @@ +package com.redis.cluster + +case class NodeConfig(host: String, port: Int) { + override def toString = host + ":" + port +} diff --git a/src/main/scala/com/redis/cluster/RedisCluster.scala b/src/main/scala/com/redis/cluster/RedisCluster.scala index 4fcc4fe2..49eee5fd 100644 --- a/src/main/scala/com/redis/cluster/RedisCluster.scala +++ b/src/main/scala/com/redis/cluster/RedisCluster.scala @@ -1,12 +1,13 @@ package com.redis.cluster -import java.util.zip.CRC32 -import scala.collection.immutable.TreeSet -import scala.collection.mutable.{ArrayBuffer, ListBuffer} +import config._ import com.redis._ import serialization._ +import java.util.concurrent.{TimeUnit, ConcurrentLinkedQueue, Executors} +import org.slf4j.LoggerFactory +import java.util.concurrent.atomic.AtomicInteger /** * Consistent hashing distributes keys across multiple servers. But there are situations @@ -37,7 +38,7 @@ trait KeyTag { def tag(key: Seq[Byte]): Option[Seq[Byte]] } -import scala.util.matching.Regex + object RegexKeyTag extends KeyTag { val tagStart = '{'.toByte @@ -47,7 +48,7 @@ object RegexKeyTag extends KeyTag { val start = key.indexOf(tagStart) + 1 if (start > 0) { val end = key.indexOf(tagEnd, start) - if (end > -1) Some(key.slice(start,end)) else None + if (end > -1) Some(key.slice(start, end)) else None } else None } } @@ -56,7 +57,13 @@ object NoOpKeyTag extends KeyTag { def tag(key: Seq[Byte]) = Some(key) } -abstract class RedisCluster(hosts: String*) extends RedisClient { +abstract class RedisCluster(configManager: ConfigManager) extends RedisCommand with PubSub { + val port = 0 + val host = null + + + val log = LoggerFactory.getLogger(getClass) + // not needed at cluster level // override val host = null @@ -69,64 +76,147 @@ abstract class RedisCluster(hosts: String*) extends RedisClient { val POINTS_PER_SERVER = 160 // default in libmemcached // instantiating a cluster will automatically connect participating nodes to the server - val clients = hosts.toList.map {h => - val hp = h.split(":") - new RedisClientPool(hp(0), hp(1).toInt) - } + val initialConfig = configManager.readConfig + + val connectCount = new AtomicInteger() // the hash ring will instantiate with the nodes up and added - val hr = HashRing[RedisClientPool](clients, POINTS_PER_SERVER) + val hr = new CopyOnWriteHashRing[RedisClientPool]( + initialConfig.map {case (name, hostPort) => + log.info(connectCount.incrementAndGet().toString + ": Connecting to redis node " + name + "(" + hostPort + ")") + (name, new RedisClientPool(hostPort.host, hostPort.port)) + }, + POINTS_PER_SERVER) + + + object DisconnectManager { + val DisconnectTimeoutMs = 120000 + val queue = new ConcurrentLinkedQueue[(Long, RedisClientPool)] + val disconnectService = Executors.newSingleThreadScheduledExecutor() + disconnectService.scheduleAtFixedRate( + new Runnable { + def run() { + val curTime = System.currentTimeMillis() + for(i <- 1 to queue.size()) { + queue.remove() match { + case (time, client) if (curTime - time >= DisconnectTimeoutMs) => + log.info("Closing redis client " + client) + client.close + case v => queue.add(v) + } + } + } + }, + 30,30, TimeUnit.SECONDS) + + def submit(client: RedisClientPool) { + queue.add((System.currentTimeMillis(), client)) + } + } + + configManager.addListener(new ClusterConfigListener { + var prevConfig = initialConfig + + def configUpdated(newConfig: Map[String, NodeConfig]) { + val diff = ClusterConfigDiff(prevConfig, newConfig) + + diff.updatedNodes.foreach { + case (name, nodeConf) => { + log.info(connectCount.incrementAndGet().toString + ": Changing node endpoint: " + name + " moved to " + nodeConf) + val oldNode = hr.udpateNode(name, new RedisClientPool(nodeConf.host, nodeConf.port)) + DisconnectManager.submit(oldNode) + } + } + + diff.newNodes.foreach { + case (name, nodeConf) => log.error("Attempt to add a node to cluster: " + name + ", " + nodeConf) + } + + diff.deletedNodes.foreach { + case (name, nodeConf) => log.error("Attempt to remove a node from cluster: " + name + ", " + nodeConf) + } + + } + }) // get node for the key def nodeForKey(key: Any)(implicit format: Format) = { val bKey = format(key) - hr.getNode(keyTag.flatMap(_.tag(bKey)).getOrElse(bKey)).withClient { client => client } + hr.getNode(keyTag.flatMap(_.tag(bKey)).getOrElse(bKey)).withClient { + client => client + } } // add a server - def addServer(server: String) = { + def addServer(name: String, server: String) = { val hp = server.split(":") - hr addNode new RedisClientPool(hp(0), hp(1).toInt) + hr.addNode(name, new RedisClientPool(hp(0), hp(1).toInt)) } /** * Operations */ override def keys[A](pattern: Any = "*")(implicit format: Format, parse: Parse[A]) = - Some(hr.cluster.toList.map(_.withClient(_.keys[A](pattern))).flatten.flatten) + Some[List[Option[A]]]( + hr.cluster.values.toList.map { + _.withClient(_.keys[A](pattern)) + }.flatten.flatten + ) - def onAllConns[T](body: RedisClient => T) = - hr.cluster.map(p => p.withClient { client => body(client) }) // .forall(_ == true) + def onAllConns[T](body: RedisClient => T) = + hr.cluster.values.map(p => p.withClient { + client => body(client) + }) // .forall(_ == true) - override def flushdb = onAllConns(_.flushdb) forall(_ == true) - override def flushall = onAllConns(_.flushall) forall(_ == true) - override def quit = onAllConns(_.quit) forall(_ == true) - def close = hr.cluster.map(_.close) + override def flushdb = onAllConns(_.flushdb) forall (_ == true) + + override def flushall = onAllConns(_.flushall) forall (_ == true) + + override def quit = onAllConns(_.quit) forall (_ == true) + + def close = hr.cluster.values.foreach(_.close) override def rename(oldkey: Any, newkey: Any)(implicit format: Format): Boolean = nodeForKey(oldkey).rename(oldkey, newkey) + override def renamenx(oldkey: Any, newkey: Any)(implicit format: Format): Boolean = nodeForKey(oldkey).renamenx(oldkey, newkey) + override def dbsize: Option[Int] = - Some(onAllConns(_.dbsize).foldLeft(0)((a, b) => b.map(a+).getOrElse(a))) + Some(onAllConns(_.dbsize).foldLeft(0)((a, b) => b.map(a +).getOrElse(a))) + override def exists(key: Any)(implicit format: Format): Boolean = nodeForKey(key).exists(key) + override def del(key: Any, keys: Any*)(implicit format: Format): Option[Int] = - Some((key :: keys.toList).groupBy(nodeForKey).foldLeft(0) { case (t,(n,ks)) => n.del(ks.head,ks.tail:_*).map(t+).getOrElse(t) }) + Some((key :: keys.toList).groupBy(nodeForKey).foldLeft(0) { + case (t, (n, ks)) => n.del(ks.head, ks.tail: _*).map(t +).getOrElse(t) + }) + override def getType(key: Any)(implicit format: Format) = nodeForKey(key).getType(key) + override def expire(key: Any, expiry: Int)(implicit format: Format) = nodeForKey(key).expire(key, expiry) + override def select(index: Int) = throw new UnsupportedOperationException("not supported on a cluster") /** * NodeOperations */ - override def save = onAllConns(_.save) forall(_ == true) - override def bgsave = onAllConns(_.bgsave) forall(_ == true) - override def shutdown = onAllConns(_.shutdown) forall(_ == true) - override def bgrewriteaof = onAllConns(_.bgrewriteaof) forall(_ == true) + override def save = onAllConns(_.save) forall (_ == true) + + override def bgsave = onAllConns(_.bgsave) forall (_ == true) + + override def shutdown = onAllConns(_.shutdown) forall (_ == true) + + override def bgrewriteaof = onAllConns(_.bgrewriteaof) forall (_ == true) override def lastsave = throw new UnsupportedOperationException("not supported on a cluster") + override def monitor = throw new UnsupportedOperationException("not supported on a cluster") + override def info = throw new UnsupportedOperationException("not supported on a cluster") + override def slaveof(options: Any) = throw new UnsupportedOperationException("not supported on a cluster") + override def move(key: Any, db: Int)(implicit format: Format) = throw new UnsupportedOperationException("not supported on a cluster") + override def auth(secret: Any)(implicit format: Format) = throw new UnsupportedOperationException("not supported on a cluster") @@ -134,12 +224,19 @@ abstract class RedisCluster(hosts: String*) extends RedisClient { * StringOperations */ override def set(key: Any, value: Any)(implicit format: Format) = nodeForKey(key).set(key, value) + override def get[A](key: Any)(implicit format: Format, parse: Parse[A]) = nodeForKey(key).get(key) + override def getset[A](key: Any, value: Any)(implicit format: Format, parse: Parse[A]) = nodeForKey(key).getset(key, value) + override def setnx(key: Any, value: Any)(implicit format: Format) = nodeForKey(key).setnx(key, value) + override def incr(key: Any)(implicit format: Format) = nodeForKey(key).incr(key) + override def incrby(key: Any, increment: Int)(implicit format: Format) = nodeForKey(key).incrby(key, increment) + override def decr(key: Any)(implicit format: Format) = nodeForKey(key).decr(key) + override def decrby(key: Any, increment: Int)(implicit format: Format) = nodeForKey(key).decrby(key, increment) override def mget[A](key: Any, keys: Any*)(implicit format: Format, parse: Parse[A]): Option[List[Option[A]]] = { @@ -152,30 +249,47 @@ abstract class RedisCluster(hosts: String*) extends RedisClient { Some(keylist.map(kvs)) } - override def mset(kvs: (Any, Any)*)(implicit format: Format) = kvs.toList.map{ case (k, v) => set(k, v) }.forall(_ == true) - override def msetnx(kvs: (Any, Any)*)(implicit format: Format) = kvs.toList.map{ case (k, v) => setnx(k, v) }.forall(_ == true) + override def mset(kvs: (Any, Any)*)(implicit format: Format) = kvs.toList.map { + case (k, v) => set(k, v) + }.forall(_ == true) + + override def msetnx(kvs: (Any, Any)*)(implicit format: Format) = kvs.toList.map { + case (k, v) => setnx(k, v) + }.forall(_ == true) /** * ListOperations */ - override def lpush(key: Any, value: Any, values: Any*)(implicit format: Format) = nodeForKey(key).lpush(key, value, values:_*) - override def rpush(key: Any, value: Any, values: Any*)(implicit format: Format) = nodeForKey(key).lpush(key, value, values:_*) + override def lpush(key: Any, value: Any, values: Any*)(implicit format: Format) = nodeForKey(key).lpush(key, value, values: _*) + + override def rpush(key: Any, value: Any, values: Any*)(implicit format: Format) = nodeForKey(key).lpush(key, value, values: _*) + override def llen(key: Any)(implicit format: Format) = nodeForKey(key).llen(key) + override def lrange[A](key: Any, start: Int, end: Int)(implicit format: Format, parse: Parse[A]) = nodeForKey(key).lrange[A](key, start, end) + override def ltrim(key: Any, start: Int, end: Int)(implicit format: Format) = nodeForKey(key).ltrim(key, start, end) + override def lindex[A](key: Any, index: Int)(implicit format: Format, parse: Parse[A]) = nodeForKey(key).lindex(key, index) + override def lset(key: Any, index: Int, value: Any)(implicit format: Format) = nodeForKey(key).lset(key, index, value) + override def lrem(key: Any, count: Int, value: Any)(implicit format: Format) = nodeForKey(key).lrem(key, count, value) + override def lpop[A](key: Any)(implicit format: Format, parse: Parse[A]) = nodeForKey(key).lpop[A](key) + override def rpop[A](key: Any)(implicit format: Format, parse: Parse[A]) = nodeForKey(key).rpop[A](key) - override def rpoplpush[A](srcKey: Any, dstKey: Any)(implicit format: Format, parse: Parse[A]) = - inSameNode(srcKey, dstKey) {n => n.rpoplpush[A](srcKey, dstKey)} + + override def rpoplpush[A](srcKey: Any, dstKey: Any)(implicit format: Format, parse: Parse[A]) = + inSameNode(srcKey, dstKey) { + n => n.rpoplpush[A](srcKey, dstKey) + } private def inSameNode[T](keys: Any*)(body: RedisClient => T)(implicit format: Format): T = { val nodes = keys.toList.map(nodeForKey(_)) nodes.forall(_ == nodes.head) match { - case true => body(nodes.head) // all nodes equal - case _ => + case true => body(nodes.head) // all nodes equal + case _ => throw new UnsupportedOperationException("can only occur if both keys map to same node") } } @@ -183,58 +297,82 @@ abstract class RedisCluster(hosts: String*) extends RedisClient { /** * SetOperations */ - override def sadd(key: Any, value: Any, values: Any*)(implicit format: Format): Option[Int] = nodeForKey(key).sadd(key, value, values:_*) - override def srem(key: Any, value: Any, values: Any*)(implicit format: Format): Option[Int] = nodeForKey(key).srem(key, value, values:_*) + override def sadd(key: Any, value: Any, values: Any*)(implicit format: Format): Option[Int] = nodeForKey(key).sadd(key, value, values: _*) + + override def srem(key: Any, value: Any, values: Any*)(implicit format: Format): Option[Int] = nodeForKey(key).srem(key, value, values: _*) + override def spop[A](key: Any)(implicit format: Format, parse: Parse[A]) = nodeForKey(key).spop[A](key) - override def smove(sourceKey: Any, destKey: Any, value: Any)(implicit format: Format) = - inSameNode(sourceKey, destKey) {n => n.smove(sourceKey, destKey, value)} + override def smove(sourceKey: Any, destKey: Any, value: Any)(implicit format: Format) = + inSameNode(sourceKey, destKey) { + n => n.smove(sourceKey, destKey, value) + } override def scard(key: Any)(implicit format: Format) = nodeForKey(key).scard(key) + override def sismember(key: Any, value: Any)(implicit format: Format) = nodeForKey(key).sismember(key, value) - override def sinter[A](key: Any, keys: Any*)(implicit format: Format, parse: Parse[A]) = - inSameNode((key :: keys.toList): _*) {n => n.sinter[A](key, keys: _*)} + override def sinter[A](key: Any, keys: Any*)(implicit format: Format, parse: Parse[A]) = + inSameNode((key :: keys.toList): _*) { + n => n.sinter[A](key, keys: _*) + } - override def sinterstore(key: Any, keys: Any*)(implicit format: Format) = - inSameNode((key :: keys.toList): _*) {n => n.sinterstore(key, keys: _*)} + override def sinterstore(key: Any, keys: Any*)(implicit format: Format) = + inSameNode((key :: keys.toList): _*) { + n => n.sinterstore(key, keys: _*) + } - override def sunion[A](key: Any, keys: Any*)(implicit format: Format, parse: Parse[A]) = - inSameNode((key :: keys.toList): _*) {n => n.sunion[A](key, keys: _*)} + override def sunion[A](key: Any, keys: Any*)(implicit format: Format, parse: Parse[A]) = + inSameNode((key :: keys.toList): _*) { + n => n.sunion[A](key, keys: _*) + } - override def sunionstore(key: Any, keys: Any*)(implicit format: Format) = - inSameNode((key :: keys.toList): _*) {n => n.sunionstore(key, keys: _*)} + override def sunionstore(key: Any, keys: Any*)(implicit format: Format) = + inSameNode((key :: keys.toList): _*) { + n => n.sunionstore(key, keys: _*) + } - override def sdiff[A](key: Any, keys: Any*)(implicit format: Format, parse: Parse[A]) = - inSameNode((key :: keys.toList): _*) {n => n.sdiff[A](key, keys: _*)} + override def sdiff[A](key: Any, keys: Any*)(implicit format: Format, parse: Parse[A]) = + inSameNode((key :: keys.toList): _*) { + n => n.sdiff[A](key, keys: _*) + } - override def sdiffstore(key: Any, keys: Any*)(implicit format: Format) = - inSameNode((key :: keys.toList): _*) {n => n.sdiffstore(key, keys: _*)} + override def sdiffstore(key: Any, keys: Any*)(implicit format: Format) = + inSameNode((key :: keys.toList): _*) { + n => n.sdiffstore(key, keys: _*) + } override def smembers[A](key: Any)(implicit format: Format, parse: Parse[A]) = nodeForKey(key).smembers(key) + override def srandmember[A](key: Any)(implicit format: Format, parse: Parse[A]) = nodeForKey(key).srandmember(key) - import Commands._ import RedisClient._ /** * SortedSetOperations */ - override def zadd(key: Any, score: Double, member: Any, scoreVals: (Double, Any)*)(implicit format: Format) = - nodeForKey(key).zadd(key, score, member, scoreVals:_*) - override def zrem(key: Any, member: Any, members: Any*)(implicit format: Format): Option[Int] = + override def zadd(key: Any, score: Double, member: Any, scoreVals: (Double, Any)*)(implicit format: Format) = + nodeForKey(key).zadd(key, score, member, scoreVals: _*) + + override def zrem(key: Any, member: Any, members: Any*)(implicit format: Format): Option[Int] = nodeForKey(key).zrem(key, member, members) + override def zincrby(key: Any, incr: Double, member: Any)(implicit format: Format) = nodeForKey(key).zincrby(key, incr, member) + override def zcard(key: Any)(implicit format: Format) = nodeForKey(key).zcard(key) + override def zscore(key: Any, element: Any)(implicit format: Format) = nodeForKey(key).zscore(key, element) - override def zrange[A](key: Any, start: Int = 0, end: Int = -1, sortAs: SortOrder )(implicit format: Format, parse: Parse[A]) = + + override def zrange[A](key: Any, start: Int = 0, end: Int = -1, sortAs: SortOrder)(implicit format: Format, parse: Parse[A]) = nodeForKey(key).zrange[A](key, start, end, sortAs) + override def zrangeWithScore[A](key: Any, start: Int = 0, end: Int = -1, sortAs: SortOrder = ASC)(implicit format: Format, parse: Parse[A]) = nodeForKey(key).zrangeWithScore[A](key, start, end, sortAs) override def zrangebyscore[A](key: Any, min: Double = Double.NegativeInfinity, minInclusive: Boolean = true, max: Double = Double.PositiveInfinity, maxInclusive: Boolean = true, limit: Option[(Int, Int)], sortAs: SortOrder = ASC)(implicit format: Format, parse: Parse[A]) = nodeForKey(key).zrangebyscore[A](key, min, minInclusive, max, maxInclusive, limit, sortAs) + override def zcount(key: Any, min: Double = Double.NegativeInfinity, max: Double = Double.PositiveInfinity, minInclusive: Boolean = true, maxInclusive: Boolean = true)(implicit format: Format): Option[Int] = nodeForKey(key).zcount(key, min, max, minInclusive, maxInclusive) @@ -242,14 +380,24 @@ abstract class RedisCluster(hosts: String*) extends RedisClient { * HashOperations */ override def hset(key: Any, field: Any, value: Any)(implicit format: Format) = nodeForKey(key).hset(key, field, value) + override def hget[A](key: Any, field: Any)(implicit format: Format, parse: Parse[A]) = nodeForKey(key).hget[A](key, field) + override def hmset(key: Any, map: Iterable[Product2[Any, Any]])(implicit format: Format) = nodeForKey(key).hmset(key, map) - override def hmget[K,V](key: Any, fields: K*)(implicit format: Format, parseV: Parse[V]) = nodeForKey(key).hmget[K,V](key, fields:_*) + + override def hmget[K, V](key: Any, fields: K*)(implicit format: Format, parseV: Parse[V]) = nodeForKey(key).hmget[K, V](key, fields: _*) + override def hincrby(key: Any, field: Any, value: Int)(implicit format: Format) = nodeForKey(key).hincrby(key, field, value) + override def hexists(key: Any, field: Any)(implicit format: Format) = nodeForKey(key).hexists(key, field) - override def hdel(key: Any, field: Any, fields: Any*)(implicit format: Format): Option[Int] = nodeForKey(key).hdel(key, field, fields:_*) + + override def hdel(key: Any, field: Any, fields: Any*)(implicit format: Format): Option[Int] = nodeForKey(key).hdel(key, field, fields: _*) + override def hlen(key: Any)(implicit format: Format) = nodeForKey(key).hlen(key) + override def hkeys[A](key: Any)(implicit format: Format, parse: Parse[A]) = nodeForKey(key).hkeys[A](key) + override def hvals[A](key: Any)(implicit format: Format, parse: Parse[A]) = nodeForKey(key).hvals[A](key) - override def hgetall[K,V](key: Any)(implicit format: Format, parseK: Parse[K], parseV: Parse[V]) = nodeForKey(key).hgetall[K,V](key) + + override def hgetall[K, V](key: Any)(implicit format: Format, parseK: Parse[K], parseV: Parse[V]) = nodeForKey(key).hgetall[K, V](key) } diff --git a/src/main/scala/com/redis/cluster/config/ZKStringSerializer.scala b/src/main/scala/com/redis/cluster/config/ZKStringSerializer.scala new file mode 100644 index 00000000..f16b29ce --- /dev/null +++ b/src/main/scala/com/redis/cluster/config/ZKStringSerializer.scala @@ -0,0 +1,18 @@ +package com.redis.cluster.config + +import org.I0Itec.zkclient.serialize.ZkSerializer +import org.I0Itec.zkclient.exception.ZkMarshallingError + +object ZKStringSerializer extends ZkSerializer { + + @throws(classOf[ZkMarshallingError]) + def serialize(data: Object): Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8") + + @throws(classOf[ZkMarshallingError]) + def deserialize(bytes: Array[Byte]): Object = { + if (bytes == null) + null + else + new String(bytes, "UTF-8") + } +} diff --git a/src/main/scala/com/redis/cluster/config/ZkConfig.scala b/src/main/scala/com/redis/cluster/config/ZkConfig.scala new file mode 100644 index 00000000..e2640233 --- /dev/null +++ b/src/main/scala/com/redis/cluster/config/ZkConfig.scala @@ -0,0 +1,28 @@ +package com.redis.cluster.config + +import java.util.Properties + +class ZkConfig(props: Properties) { + import ZkConfig._ + + val zkConnect = getString(props, "zk.connect") + val zkSessionTimeoutMs = getInt(props, "zk.sessiontimeout.ms", 6000) + val zkConnectionTimeoutMs = getInt(props, "zk.connectiontimeout.ms", zkSessionTimeoutMs) + val zkNodesPath = getString(props, "zk.rediscluster.nodes", "/rediscluster/nodes") +} + +object ZkConfig { + private def get[T](props: Properties, name: String, default: T)(f: String => T) = Option(props.getProperty(name)) match { + case Some(v) => f(v) + case None => default + } + + private def get[T](props: Properties, name: String)(f: String => T) = Option(props.getProperty(name)) match { + case Some(v) => f(v) + case None => throw new IllegalArgumentException("Missing mandatory property " + name) + } + + def getString(props: Properties, name: String) = get(props, name){v => v} + def getString(props: Properties, name: String, default: String) = get(props, name, default){v => v} + def getInt(props: Properties, name: String, default: Int) = get(props,name,default){_.toInt} +} diff --git a/src/main/scala/com/redis/cluster/config/ZookeperConfigManager.scala b/src/main/scala/com/redis/cluster/config/ZookeperConfigManager.scala new file mode 100644 index 00000000..f1fc6450 --- /dev/null +++ b/src/main/scala/com/redis/cluster/config/ZookeperConfigManager.scala @@ -0,0 +1,80 @@ +package com.redis.cluster.config + +import org.I0Itec.zkclient.serialize.ZkSerializer +import org.I0Itec.zkclient.exception.ZkMarshallingError +import org.I0Itec.zkclient.{IZkDataListener, ZkClient} +import java.util.concurrent.{Callable, Executors, CopyOnWriteArrayList} +import org.slf4j.LoggerFactory +import java.util.StringTokenizer +import com.redis.cluster.{ConfigManager, NodeConfig, ClusterConfigListener} + +class ZookeperConfigManager(cfg: ZkConfig) extends ConfigManager { + + import cfg._ + import collection.JavaConversions.asScalaIterator + + val logger = LoggerFactory.getLogger(getClass) + + val zkClient: ZkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, zkConnectionTimeoutMs, ZKStringSerializer) + zkClient.subscribeDataChanges(zkNodesPath, ListenerComposite) + + val service = Executors.newSingleThreadExecutor() + + private def parseCofig(data: String): Map[String, NodeConfig] = { + + def parseEntry(entry: String): (String, NodeConfig) = { + val nameHostPort = entry.split(":") + try { + (nameHostPort(0), NodeConfig(nameHostPort(1), nameHostPort(2).toInt)) + } catch { + case e: IndexOutOfBoundsException => + throw new IllegalArgumentException("Invalid node config: " + entry) + } + } + + def parse(t: StringTokenizer, res: Map[String, NodeConfig]): Map[String, NodeConfig] = t.hasMoreTokens match { + case true => parse(t, res + parseEntry(t.nextToken())) + case false => res + } + + parse(new StringTokenizer(data, "|"), Map.empty[String, NodeConfig]) + } + + + object ListenerComposite extends IZkDataListener { + val listeners = new CopyOnWriteArrayList[ClusterConfigListener]() + + def handleDataChange(dataPath: String, data: Any) { + service.submit(new Runnable { + def run() { + val conf = parseCofig(data.toString) + listeners.iterator.foreach { + _.configUpdated(conf) + } + } + }) + } + + def handleDataDeleted(dataPath: String) { + logger.error("Attempt to delete cluster configuration!!!") + } + } + + def readConfig = service.submit(new Callable[Map[String, NodeConfig]] { + def call() = parseCofig(zkClient.readData(zkNodesPath)) + }).get + + + def addListener(listener: ClusterConfigListener) { + service.submit(new Runnable { + def run() { + ListenerComposite.listeners.add(listener) + val conf = parseCofig(zkClient.readData(zkNodesPath)) + listener.configUpdated(conf) + } + }) + } + + +} + diff --git a/src/test/scala/com/redis/cluster/ClusterConfigDiffSpec.scala b/src/test/scala/com/redis/cluster/ClusterConfigDiffSpec.scala new file mode 100644 index 00000000..218af1e8 --- /dev/null +++ b/src/test/scala/com/redis/cluster/ClusterConfigDiffSpec.scala @@ -0,0 +1,64 @@ +package com.redis.cluster + +import org.scalatest.WordSpec +import org.scalatest.matchers.ShouldMatchers + +class ClusterConfigDiffSpec extends WordSpec with ShouldMatchers { + + "ClusterConfigDiff" should { + "find new nodes" in { + val oldConf = Map("1" -> NodeConfig("1", 1), "2" -> NodeConfig("2", 1)) + val newConf = oldConf + ("3" -> NodeConfig("3", 1)) + ClusterConfigDiff(oldConf, newConf).newNodes should be(newConf -- oldConf.keys) + } + + "find removed nodes" in { + val oldConf = Map("1" -> NodeConfig("1", 1), "2" -> NodeConfig("2", 1)) + val newConf = oldConf - "2" + ClusterConfigDiff(oldConf, newConf).deletedNodes should be(oldConf -- newConf.keys) + } + + "find updated nodes" in { + val oldConf = Map("1" -> NodeConfig("1", 1), "2" -> NodeConfig("2", 1)) + val newConf = Map("1" -> NodeConfig("1", 1), "2" -> NodeConfig("2", 2)) + ClusterConfigDiff(oldConf, newConf).updatedNodes should be(Map("2" -> NodeConfig("2", 2))) + } + + "detect no changes" in { + val oldConf = Map("1" -> NodeConfig("1", 1), "2" -> NodeConfig("2", 1)) + val newConf = Map("1" -> NodeConfig("1", 1), "2" -> NodeConfig("2", 1)) + + val diff = ClusterConfigDiff(oldConf, newConf) + diff.deletedNodes.isEmpty should be(true) + diff.updatedNodes.isEmpty should be(true) + diff.newNodes.isEmpty should be(true) + } + + "detect multiple changes" in { + val oldConf = Map( + "1" -> NodeConfig("1", 1), + "2" -> NodeConfig("2", 1), + "3" -> NodeConfig("3", 1), + "4" -> NodeConfig("4", 1), + "5" -> NodeConfig("5", 1), + "6" -> NodeConfig("6", 1)) + + val newConf = Map( + "7" -> NodeConfig("7", 1), + "8" -> NodeConfig("8", 1), + "3" -> NodeConfig("3", 2), + "4" -> NodeConfig("4", 2), + "5" -> NodeConfig("5", 1), + "6" -> NodeConfig("6", 1)) + + val diff = ClusterConfigDiff(oldConf, newConf) + diff.deletedNodes should be(Map("1" -> NodeConfig("1", 1), "2" -> NodeConfig("2", 1))) + diff.updatedNodes should be(Map("3" -> NodeConfig("3", 2), "4" -> NodeConfig("4", 2))) + diff.newNodes should be(Map("7" -> NodeConfig("7", 1), "8" -> NodeConfig("8", 1))) + } + + + + } + +} diff --git a/src/test/scala/com/redis/cluster/HashRingSpec.scala b/src/test/scala/com/redis/cluster/HashRingSpec.scala new file mode 100644 index 00000000..3391d705 --- /dev/null +++ b/src/test/scala/com/redis/cluster/HashRingSpec.scala @@ -0,0 +1,28 @@ +package com.redis.cluster + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers + +class HashRingSpec extends WordSpec with MustMatchers { + + "constructor" should { + "create correct ring" in { + val nodes = Map("1" -> "node1", "2" -> "node2", "3" -> "node3") + val ring = HashRing(nodes, 3) + ring.cluster must be(nodes) + ring.ring.size must be(nodes.size * 3) + } + } + + "addNode" should { + "prohibit duplicated name" in { + val nodes = Map("1" -> "node1", "2" -> "node2", "3" -> "node3") + val ring = HashRing(nodes, 3) + + intercept[IllegalArgumentException] { + ring.addNode("1", "node11") + } + } + } + +} diff --git a/src/test/scala/com/redis/RedisClusterSpec.scala b/src/test/scala/com/redis/cluster/RedisClusterSpec.scala similarity index 81% rename from src/test/scala/com/redis/RedisClusterSpec.scala rename to src/test/scala/com/redis/cluster/RedisClusterSpec.scala index fd797505..c2138c50 100644 --- a/src/test/scala/com/redis/RedisClusterSpec.scala +++ b/src/test/scala/com/redis/cluster/RedisClusterSpec.scala @@ -6,15 +6,26 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.ShouldMatchers import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith +import org.scalatest.mock.MockitoSugar @RunWith(classOf[JUnitRunner]) -class RedisClusterSpec extends Spec - with ShouldMatchers - with BeforeAndAfterEach - with BeforeAndAfterAll { +class RedisClusterSpec extends Spec +with ShouldMatchers +with BeforeAndAfterEach +with BeforeAndAfterAll +with MockitoSugar { - val r = new RedisCluster("localhost:6379", "localhost:6380", "localhost:6381") { + + import org.mockito.Mockito._ + + val conf = mock[ConfigManager] + when(conf.readConfig).thenReturn( + Map("1" -> NodeConfig("192.168.56.101", 6379), + "2" -> NodeConfig("192.168.56.101", 6380), + "3" -> NodeConfig("192.168.56.101", 6381))) + + val r = new RedisCluster(conf) { val keyTag = Some(RegexKeyTag) } @@ -29,7 +40,7 @@ class RedisClusterSpec extends Spec val l = List("debasish", "maulindu", "ramanendu", "nilanjan", "tarun", "tarun", "tarun") // last 3 should map to the same node - l.map(r.nodeForKey(_)).reverse.slice(0, 3).forall(_.toString == "localhost:6381") should equal(true) + l.map(r.nodeForKey(_)).reverse.slice(0, 3).toSet.size should equal(1) // set l foreach (s => r.nodeForKey(s).set(s, "working in anshin") should equal(true)) @@ -42,7 +53,7 @@ class RedisClusterSpec extends Spec val l = List("debasish", "maulindu", "ramanendu", "nilanjan", "tarun", "tarun", "tarun") // set - l foreach (s => r.nodeForKey(s).set(s, s + " is working in anshin") should equal(true)) + l foreach (s => r.nodeForKey(s).set(s, s + " is working in anshin") should equal(true)) r.get("debasish").get should equal("debasish is working in anshin") r.get("maulindu").get should equal("maulindu is working in anshin") @@ -75,7 +86,7 @@ class RedisClusterSpec extends Spec r.mget(l.head, l.tail: _*).get.map(_.get.split(" ")(0)) should equal(l) } - it("list operations should work on the cluster"){ + it("list operations should work on the cluster") { r.lpush("java-virtual-machine-langs", "java") should equal(Some(1)) r.lpush("java-virtual-machine-langs", "jruby") should equal(Some(2)) r.lpush("java-virtual-machine-langs", "groovy") should equal(Some(3)) @@ -83,7 +94,7 @@ class RedisClusterSpec extends Spec r.llen("java-virtual-machine-langs") should equal(Some(4)) } - it("keytags should ensure mapping to the same server"){ + it("keytags should ensure mapping to the same server") { r.lpush("java-virtual-machine-{langs}", "java") should equal(Some(1)) r.lpush("java-virtual-machine-{langs}", "jruby") should equal(Some(2)) r.lpush("java-virtual-machine-{langs}", "groovy") should equal(Some(3)) diff --git a/src/test/scala/com/redis/cluster/RedisClusterUpdateSpec.scala b/src/test/scala/com/redis/cluster/RedisClusterUpdateSpec.scala new file mode 100644 index 00000000..a143eb3e --- /dev/null +++ b/src/test/scala/com/redis/cluster/RedisClusterUpdateSpec.scala @@ -0,0 +1,51 @@ +package com.redis.cluster + +import config.{ZookeperConfigManager, ZKStringSerializer, ZkConfig} +import java.util.Properties +import org.I0Itec.zkclient.ZkClient +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, WordSpec} + +class RedisClusterUpdateSpec extends WordSpec with ShouldMatchers with BeforeAndAfterAll { + + val props = new Properties() + props.setProperty("zk.connect","localhost:2181") + + val zkConfig = new ZkConfig(props) + val zkClient = new ZkClient(zkConfig.zkConnect,zkConfig.zkSessionTimeoutMs, zkConfig.zkConnectionTimeoutMs, ZKStringSerializer) + val configManager = new ZookeperConfigManager(zkConfig) + + def setConfig(cfg: Map[String, NodeConfig]) { + zkClient.createPersistent(zkConfig.zkNodesPath, true) + zkClient.writeData(zkConfig.zkNodesPath, cfg.toSeq.map{ case (name, hostPort) => name + ":" + hostPort.host + ":" + hostPort.port} + .mkString("|") + ) + } + + override protected def beforeAll() { + setConfig(Map("1" -> NodeConfig("192.168.56.101", 6379), + "2" -> NodeConfig("192.168.56.101", 6380), + "3" -> NodeConfig("192.168.56.101", 6381))) + } + + "cluster" should { + "not fail on node update" in { + val cluster = new RedisCluster(configManager) { + val keyTag = Some(RegexKeyTag) + } + + setConfig(Map("1" -> NodeConfig("192.168.56.101", 6379), + "2" -> NodeConfig("192.168.56.101", 6380), + "3" -> NodeConfig("192.168.56.101", 6382))) + + Thread.sleep(3000) + + var clients = Set.empty[String] + cluster.onAllConns( clients += _.toString ) + + clients should be(Set("192.168.56.101:6379", "192.168.56.101:6380", "192.168.56.101:6382")) + } + } + + +} diff --git a/src/test/scala/com/redis/cluster/config/ZookeeperConfigManagerSpec.scala b/src/test/scala/com/redis/cluster/config/ZookeeperConfigManagerSpec.scala new file mode 100644 index 00000000..ce23c07e --- /dev/null +++ b/src/test/scala/com/redis/cluster/config/ZookeeperConfigManagerSpec.scala @@ -0,0 +1,67 @@ +package com.redis.cluster.config + +import org.scalatest.WordSpec +import org.I0Itec.zkclient.ZkClient +import java.util.Properties +import org.scalatest.matchers.ShouldMatchers +import java.util.concurrent.ConcurrentLinkedQueue +import com.redis.cluster.{ClusterConfigListener, NodeConfig} + +class ZookeeperConfigManagerSpec extends WordSpec with ShouldMatchers { + + val props = new Properties() + props.setProperty("zk.connect","localhost:2181") + + val zkConfig = new ZkConfig(props) + val zkClient = new ZkClient(zkConfig.zkConnect,zkConfig.zkSessionTimeoutMs, zkConfig.zkConnectionTimeoutMs, ZKStringSerializer) + val configManager = new ZookeperConfigManager(zkConfig) + + def setConfig(cfg: Map[String, NodeConfig]) { + zkClient.createPersistent(zkConfig.zkNodesPath, true) + zkClient.writeData(zkConfig.zkNodesPath, cfg.toSeq.map{ case (name, hostPort) => name + ":" + hostPort.host + ":" + hostPort.port} + .mkString("|") + ) + } + + "readConfig" should { + "read data correctly" in { + val conf = Map("1" -> NodeConfig("1", 1), + "2" -> NodeConfig("1", 2), + "3" -> NodeConfig("2", 1) ) + + setConfig(conf) + + configManager.readConfig should be(conf) + } + } + + "listeners" should { + "get data update" in { + val conf = Map("1" -> NodeConfig("1", 1), + "2" -> NodeConfig("1", 2), + "3" -> NodeConfig("2", 1) ) + + setConfig(conf) + + val newConf = Map("1" -> NodeConfig("1", 1), + "2" -> NodeConfig("3", 1), + "3" -> NodeConfig("2", 1) ) + + val queue = new ConcurrentLinkedQueue[Map[String, NodeConfig]]() + + configManager.addListener(new ClusterConfigListener { + def configUpdated(newConfig: Map[String, NodeConfig]) { + queue.add(newConfig) + } + }) + setConfig(newConf) + Thread.sleep(1000) + + queue.peek() should be(newConf) + + } + } + + + +}