Skip to content

Commit

Permalink
added signature to ring hash
Browse files Browse the repository at this point in the history
  • Loading branch information
or-else committed Sep 28, 2017
1 parent 7938c86 commit d8008f4
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 22 deletions.
18 changes: 14 additions & 4 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,14 @@ type ClusterSess struct {

// Proxy to Master request message
type ClusterReq struct {
// Name of the node which sent the request
// Name of the node sending this request
Node string

// Ring hash signature of the node sending this request
// Signature must match the signature of the receiver, otherwise the
// Cluster is desynchronized.
Signature string

Msg *ClientComMessage
// Expanded (routable) topic name
RcptTo string
Expand Down Expand Up @@ -276,9 +281,10 @@ func (c *Cluster) routeToTopic(msg *ClientComMessage, topic string, sess *Sessio

return n.forward(
&ClusterReq{
Node: c.thisNodeName,
Msg: msg,
RcptTo: topic,
Node: c.thisNodeName,
Signature: c.ring.Signature(),
Msg: msg,
RcptTo: topic,
Sess: &ClusterSess{
Uid: sess.uid,
RemoteAddr: sess.remoteAddr,
Expand Down Expand Up @@ -442,3 +448,7 @@ func (c *Cluster) shutdown() {

log.Println("cluster shut down")
}

func (c *Cluster) rehash() {

}
49 changes: 40 additions & 9 deletions server/ringhash/ringhash.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// Package ringhash implementations a consistent ring hash:
// Implementation of a consistent ring hash:
// https://en.wikipedia.org/wiki/Consistent_hashing
package ringhash

import (
"hash/crc32"
"encoding/ascii85"
"hash/fnv"
"log"
"sort"
"strconv"
Expand All @@ -13,7 +14,7 @@ type Hash func(data []byte) uint32

type elem struct {
key string
hash int
hash uint32
}

type sortable []elem
Expand All @@ -34,8 +35,9 @@ func (k sortable) Less(i, j int) bool {
type Ring struct {
keys []elem // Sorted list of keys.

replicas int
hashfunc Hash
signature string
replicas int
hashfunc Hash
}

func New(replicas int, fn Hash) *Ring {
Expand All @@ -44,7 +46,11 @@ func New(replicas int, fn Hash) *Ring {
hashfunc: fn,
}
if ring.hashfunc == nil {
ring.hashfunc = crc32.ChecksumIEEE
ring.hashfunc = func(data []byte) uint32 {
hash := fnv.New32a()
hash.Write(data)
return hash.Sum32()
}
}
return ring
}
Expand All @@ -59,11 +65,29 @@ func (ring *Ring) Add(keys ...string) {
for _, key := range keys {
for i := 0; i < ring.replicas; i++ {
ring.keys = append(ring.keys, elem{
hash: int(ring.hashfunc([]byte(strconv.Itoa(i) + key))),
hash: ring.hashfunc([]byte(strconv.Itoa(i) + key)),
key: key})
}
}
sort.Sort(sortable(ring.keys))

// Calculate signature
hash := fnv.New128a()
b := make([]byte, 4)
for _, key := range ring.keys {
b[0] = byte(key.hash)
b[1] = byte(key.hash >> 8)
b[2] = byte(key.hash >> 16)
b[3] = byte(key.hash >> 24)
hash.Write(b)
hash.Write([]byte(key.key))
}

b = []byte{}
b = hash.Sum(b)
dst := make([]byte, ascii85.MaxEncodedLen(len(b)))
ascii85.Encode(dst, b)
ring.signature = string(dst)
}

// Get returns the closest item in the ring to the provided key.
Expand All @@ -73,13 +97,12 @@ func (ring *Ring) Get(key string) string {
return ""
}

hash := int(ring.hashfunc([]byte(key)))
hash := ring.hashfunc([]byte(key))

// Binary search for appropriate replica.
idx := sort.Search(len(ring.keys), func(i int) bool {
el := ring.keys[i]
return (el.hash > hash) || (el.hash == hash && el.key >= key)
//return (ring.keys[i].hash > hash) || (ring.keys[i].hash == hash && ring.keys[i].key >= key)
})

// Means we have cycled back to the first replica.
Expand All @@ -90,6 +113,14 @@ func (ring *Ring) Get(key string) string {
return ring.keys[idx].key
}

// Get a ring hash signature. Two identical ring hashes
// will have the same signature. Two hashes with different
// number of keys or replicas or hash functions will have different
// signatures.
func (ring *Ring) Signature() string {
return ring.signature
}

func (ring *Ring) dump() {
for _, e := range ring.keys {
log.Printf("key: '%s', hash=%d", e.key, e.hash)
Expand Down
59 changes: 51 additions & 8 deletions server/ringhash/ringhash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package ringhash

import (
"fmt"
"hash/crc32"
"testing"
)

func TestHashing(t *testing.T) {

// Ring with 3 elements hashed by crc32.ChecksumIEEE
ring := New(3, nil)
ring := New(3, crc32.ChecksumIEEE)
ring.Add("A", "B", "C")

// The ring contains:
Expand Down Expand Up @@ -64,8 +65,8 @@ func TestHashing(t *testing.T) {
}

func TestConsistency(t *testing.T) {
ring1 := New(1, nil)
ring2 := New(1, nil)
ring1 := New(3, nil)
ring2 := New(3, nil)

ring1.Add("owl", "crow", "sparrow")
ring2.Add("sparrow", "owl", "crow")
Expand All @@ -88,8 +89,8 @@ func TestConsistency(t *testing.T) {
// 0VXGN 0BGABAK
// 0VXGI 0BGABAL

ring1 = New(1, nil)
ring2 = New(1, nil)
ring1 = New(1, crc32.ChecksumIEEE)
ring2 = New(1, crc32.ChecksumIEEE)

ring1.Add("VXGD", "BGABAA", "VXGG", "BGABAB", "VXGF", "BGABAC")
ring2.Add("BGABAA", "VXGD", "BGABAB", "VXGG", "BGABAC", "VXGF")
Expand All @@ -107,17 +108,59 @@ func TestConsistency(t *testing.T) {
}
}

func TestSignature(t *testing.T) {
ring1 := New(4, nil)
ring2 := New(4, nil)

ring1.Add("owl", "crow", "sparrow")
ring2.Add("sparrow", "owl", "crow")

if ring1.Signature() != ring2.Signature() {
t.Errorf("Signatures must be identical")
}

ring1 = New(4, nil)
ring2 = New(5, nil)

ring1.Add("owl", "crow", "sparrow")
ring2.Add("owl", "crow", "sparrow")

if ring1.Signature() == ring2.Signature() {
t.Errorf("Signatures must be different - different count of replicas")
}

ring1 = New(4, nil)
ring2 = New(4, nil)

ring1.Add("owl", "crow", "sparrow")
ring2.Add("owl", "crow", "sparrow", "crane")

if ring1.Signature() == ring2.Signature() {
t.Errorf("Signatures must be different - different keys")
}

ring1 = New(4, nil)
ring2 = New(4, crc32.ChecksumIEEE)

ring1.Add("owl", "crow", "sparrow")
ring2.Add("owl", "crow", "sparrow")

if ring1.Signature() == ring2.Signature() {
t.Errorf("Signatures must be different - different hash functions")
}
}

func BenchmarkGet8(b *testing.B) { benchmarkGet(b, 8) }
func BenchmarkGet32(b *testing.B) { benchmarkGet(b, 32) }
func BenchmarkGet128(b *testing.B) { benchmarkGet(b, 128) }
func BenchmarkGet512(b *testing.B) { benchmarkGet(b, 512) }

func benchmarkGet(b *testing.B, buckets int) {
func benchmarkGet(b *testing.B, keycount int) {

ring := New(53, nil)

var ids []string
for i := 0; i < buckets; i++ {
for i := 0; i < keycount; i++ {
ids = append(ids, fmt.Sprintf("id=%d", i))
}

Expand All @@ -126,6 +169,6 @@ func benchmarkGet(b *testing.B, buckets int) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
ring.Get(ids[i&(buckets-1)])
ring.Get(ids[i&(keycount-1)])
}
}
2 changes: 1 addition & 1 deletion server/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ func (t *Topic) run(hub *Hub) {
}

case meta := <-t.meta:
log.Printf("topic[%s]: got meta message '%#+v' %x", t.name, meta, meta.what)
// log.Printf("topic[%s]: got meta message '%#+v' %x", t.name, meta, meta.what)

// Request to get/set topic metadata
if meta.pkt.Get != nil {
Expand Down

0 comments on commit d8008f4

Please sign in to comment.