Skip to content

Commit

Permalink
Merge pull request sandreev#1 from RomezzBorisov/master
Browse files Browse the repository at this point in the history
Implemented nodes migration support
  • Loading branch information
sandreev committed Jun 5, 2012
2 parents 7801b5e + b210c1f commit 701c18c
Show file tree
Hide file tree
Showing 16 changed files with 670 additions and 92 deletions.
2 changes: 2 additions & 0 deletions project/ScalaRedisProject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ object ScalaRedisProject extends Build
name := "RedisClient",

libraryDependencies ++= Seq("commons-pool" % "commons-pool" % "1.5.6",
"com.github.sgroschupf" % "zkclient" % "0.1",
"org.slf4j" % "slf4j-api" % "1.6.1",
"org.slf4j" % "slf4j-log4j12" % "1.6.1" % "provided",
"log4j" % "log4j" % "1.2.16" % "provided",
"junit" % "junit" % "4.8.1" % "test",
"org.scalatest" % "scalatest_2.9.1" % "1.6.1" % "test",
"org.mockito" % "mockito-all" % "1.8.4" % "test",
"com.twitter" % "util" % "1.11.4" % "test" intransitive(),
"com.twitter" % "finagle-core" % "1.9.0" % "test"),

Expand Down
17 changes: 17 additions & 0 deletions src/main/scala/com/redis/cluster/ClusterConfigDiff.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.redis.cluster

case class ClusterConfigDiff(newNodes: Map[String, NodeConfig],
updatedNodes: Map[String, NodeConfig],
deletedNodes: Map[String, NodeConfig])

object ClusterConfigDiff {
def apply(oldCluster: Map[String, NodeConfig], newCluster: Map[String, NodeConfig]): ClusterConfigDiff = {
val newNodes = newCluster -- oldCluster.keys
val removedNodes = oldCluster -- newCluster.keys
val changedNodes = newCluster.filter {
case (name, cfg) if oldCluster.contains(name) => cfg != oldCluster(name)
case _ => false
}
ClusterConfigDiff(newNodes, changedNodes, removedNodes)
}
}
7 changes: 7 additions & 0 deletions src/main/scala/com/redis/cluster/ClusterConfigListener.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.redis.cluster


trait ClusterConfigListener {
def configUpdated(newConfig: Map[String, NodeConfig])

}
9 changes: 9 additions & 0 deletions src/main/scala/com/redis/cluster/ConfigManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.redis.cluster


trait ConfigManager {
def readConfig: Map[String, NodeConfig]

def addListener(listener: ClusterConfigListener)

}
39 changes: 39 additions & 0 deletions src/main/scala/com/redis/cluster/CopyOnWriteHashRing.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.redis.cluster

import java.util.concurrent.atomic.AtomicReference


class CopyOnWriteHashRing[T](nodes: Map[String, T], replicas: Int) {
val ringRef = new AtomicReference[HashRing[T]](HashRing(nodes, replicas))

def addNode(nodeName: String, node: T) {
val ring = ringRef.get()
if (!ringRef.compareAndSet(ring, ring.addNode(nodeName, node)))
addNode(nodeName, node)
}

def udpateNode(nodeName: String, node: T): T = {
val ring = ringRef.get()
val oldNode = ring.cluster(nodeName)
val newRing = ring.removeNode(nodeName).addNode(nodeName, node)
if (!ringRef.compareAndSet(ring, newRing))
udpateNode(nodeName, node)
else
oldNode
}

// remove node from the ring
def removeNode(nodeName: String): T = {
val ring = ringRef.get()
val newRing = ring.removeNode(nodeName)
if (!ringRef.compareAndSet(ring, newRing))
removeNode(nodeName)
else
ring.cluster(nodeName)
}

// get node for the key
def getNode(key: Seq[Byte]): T = ringRef.get().getNode(key)

def cluster = ringRef.get().cluster
}
52 changes: 28 additions & 24 deletions src/main/scala/com/redis/cluster/HashRing.scala
Original file line number Diff line number Diff line change
@@ -1,46 +1,51 @@
package com.redis.cluster

import java.util.zip.CRC32
import scala.collection.immutable.TreeSet
import scala.collection.mutable.{ArrayBuffer, Map, ListBuffer}
import collection.immutable.SortedMap

case class HashRing[T](nodes: List[T], replicas: Int) {
var sortedKeys = new TreeSet[Long]
val cluster = new ArrayBuffer[T]
var ring = Map[Long, T]()
class HashRing[T](val cluster: Map[String, T], val ring: SortedMap[Long, T], replicas: Int) {

nodes.foreach(addNode(_))
import HashRing._

// adds a node to the hash ring (including a number of replicas)
def addNode(node: T) = {
cluster += node
(1 to replicas).foreach {replica =>
val key = calculateChecksum((node + ":" + replica).getBytes("UTF-8"))
ring += (key -> node)
sortedKeys = sortedKeys + key
def addNode(nodeName: String, node: T) = {
require(!cluster.contains(nodeName), "Cluster already contains node '" + nodeName + "'")
val pairs = (1 to replicas).map {
replica => (calculateChecksum((nodeName + ":" + replica).getBytes("UTF-8")), node)
}
new HashRing[T](cluster + (nodeName -> node), ring ++ pairs, replicas)
}

// remove node from the ring
def removeNode(node: T) {
cluster -= node
(1 to replicas).foreach {replica =>
val key = calculateChecksum((node + ":" + replica).getBytes("UTF-8"))
ring -= key
sortedKeys = sortedKeys - key
def removeNode(nodeName: String) = {
require(cluster.contains(nodeName), "Cluster does not contain node '" + nodeName + "'")
val keys = (1 to replicas).map {
replica => calculateChecksum((nodeName + ":" + replica).getBytes("UTF-8"))
}
new HashRing[T](cluster - nodeName, ring -- keys, replicas)
}

// get node for the key
def getNode(key: Seq[Byte]): T = {
val crc = calculateChecksum(key)
if (sortedKeys contains crc) ring(crc)
if (ring contains crc) ring(crc)
else {
if (crc < sortedKeys.firstKey) ring(sortedKeys.firstKey)
else if (crc > sortedKeys.lastKey) ring(sortedKeys.lastKey)
else ring(sortedKeys.rangeImpl(None, Some(crc)).lastKey)
if (crc < ring.firstKey) ring.head._2
else if (crc > ring.lastKey) ring.last._2
else ring.rangeImpl(None, Some(crc)).last._2
}
}
}

object HashRing {
def apply[T](nodes: Map[String, T], replicas: Int) = {
val pairs =
for ((nodeName, node) <- nodes.toSeq;
replica <- 1 to replicas) yield {
(calculateChecksum((nodeName + ":" + replica).getBytes("UTF-8")), node)
}
new HashRing[T](nodes, SortedMap(pairs: _*), replicas)
}

// Computes the CRC-32 of the given String
def calculateChecksum(value: Seq[Byte]): Long = {
Expand All @@ -49,4 +54,3 @@ case class HashRing[T](nodes: List[T], replicas: Int) {
checksum.getValue
}
}

5 changes: 5 additions & 0 deletions src/main/scala/com/redis/cluster/NodeConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.redis.cluster

case class NodeConfig(host: String, port: Int) {
override def toString = host + ":" + port
}
Loading

0 comments on commit 701c18c

Please sign in to comment.