Skip to content
This repository has been archived by the owner on Aug 26, 2024. It is now read-only.

Commit

Permalink
topology: replace token map with binary search (apache#1350)
Browse files Browse the repository at this point in the history
We are not going to have every token in a map, instead binary search to
find the closest host which owns the token. This remove the need to
first binary search the tokenRing to find an actual token to lookup in
the map.

Fix SimpleStrategy not placing replicas on unique hosts.

Dont return duplicate hosts from fallbacks from TokenAwarePolicy

go fmt ./...
  • Loading branch information
Zariel authored Sep 22, 2019
1 parent 16cf9ea commit 73f35ff
Show file tree
Hide file tree
Showing 12 changed files with 299 additions and 293 deletions.
15 changes: 10 additions & 5 deletions common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,31 +224,36 @@ func staticAddressTranslator(newAddr net.IP, newPort int) AddressTranslator {
}

func assertTrue(t *testing.T, description string, value bool) {
t.Helper()
if !value {
t.Errorf("expected %s to be true", description)
t.Fatalf("expected %s to be true", description)
}
}

func assertEqual(t *testing.T, description string, expected, actual interface{}) {
t.Helper()
if expected != actual {
t.Errorf("expected %s to be (%+v) but was (%+v) instead", description, expected, actual)
t.Fatalf("expected %s to be (%+v) but was (%+v) instead", description, expected, actual)
}
}

func assertDeepEqual(t *testing.T, description string, expected, actual interface{}) {
t.Helper()
if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %s to be (%+v) but was (%+v) instead", description, expected, actual)
t.Fatalf("expected %s to be (%+v) but was (%+v) instead", description, expected, actual)
}
}

func assertNil(t *testing.T, description string, actual interface{}) {
t.Helper()
if actual != nil {
t.Errorf("expected %s to be (nil) but was (%+v) instead", description, actual)
t.Fatalf("expected %s to be (nil) but was (%+v) instead", description, actual)
}
}

func assertNotNil(t *testing.T, description string, actual interface{}) {
t.Helper()
if actual == nil {
t.Errorf("expected %s not to be (nil)", description)
t.Fatalf("expected %s not to be (nil)", description)
}
}
4 changes: 2 additions & 2 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func TestApprove(t *testing.T) {

func TestJoinHostPort(t *testing.T) {
tests := map[string]string{
"127.0.0.1:0": JoinHostPort("127.0.0.1", 0),
"127.0.0.1:1": JoinHostPort("127.0.0.1:1", 9142),
"127.0.0.1:0": JoinHostPort("127.0.0.1", 0),
"127.0.0.1:1": JoinHostPort("127.0.0.1:1", 9142),
"[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:0": JoinHostPort("2001:0db8:85a3:0000:0000:8a2e:0370:7334", 0),
"[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:1": JoinHostPort("[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:1", 9142),
}
Expand Down
12 changes: 6 additions & 6 deletions control.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,14 @@ func hostInfo(addr string, defaultPort int) ([]*HostInfo, error) {
}

func shuffleHosts(hosts []*HostInfo) []*HostInfo {
mutRandr.Lock()
perm := randr.Perm(len(hosts))
mutRandr.Unlock()
shuffled := make([]*HostInfo, len(hosts))
copy(shuffled, hosts)

for i, host := range hosts {
shuffled[perm[i]] = host
}
mutRandr.Lock()
randr.Shuffle(len(hosts), func(i, j int) {
shuffled[i], shuffled[j] = shuffled[j], shuffled[i]
})
mutRandr.Unlock()

return shuffled
}
Expand Down
4 changes: 2 additions & 2 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type HostInfo struct {
// TODO(zariel): reduce locking maybe, not all values will change, but to ensure
// that we are thread safe use a mutex to access all fields.
mu sync.RWMutex
hostname string
hostname string
peer net.IP
broadcastAddress net.IP
listenAddress net.IP
Expand All @@ -128,7 +128,7 @@ type HostInfo struct {
clusterName string
version cassVersion
state nodeState
schemaVersion string
schemaVersion string
tokens []string
}

Expand Down
16 changes: 8 additions & 8 deletions marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1262,15 +1262,15 @@ func unmarshalDate(info TypeInfo, data []byte, value interface{}) error {
*v = time.Unix(0, timestamp*int64(time.Millisecond)).In(time.UTC)
return nil
case *string:
if len(data) == 0 {
*v = ""
return nil
}
var origin uint32 = 1 << 31
var current uint32 = binary.BigEndian.Uint32(data)
timestamp := (int64(current) - int64(origin)) * 86400000
if len(data) == 0 {
*v = ""
return nil
}
var origin uint32 = 1 << 31
var current uint32 = binary.BigEndian.Uint32(data)
timestamp := (int64(current) - int64(origin)) * 86400000
*v = time.Unix(0, timestamp*int64(time.Millisecond)).In(time.UTC).Format("2006-01-02")
return nil
return nil
}
return unmarshalErrorf("can not unmarshal %s into %T", info, value)
}
Expand Down
56 changes: 28 additions & 28 deletions marshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1554,58 +1554,58 @@ func TestReadCollectionSize(t *testing.T) {
}

tests := []struct {
name string
info CollectionType
data []byte
isError bool
name string
info CollectionType
data []byte
isError bool
expectedSize int
}{
{
name: "short read 0 proto 2",
info: listV2,
data: []byte{},
name: "short read 0 proto 2",
info: listV2,
data: []byte{},
isError: true,
},
{
name: "short read 1 proto 2",
info: listV2,
data: []byte{0x01},
name: "short read 1 proto 2",
info: listV2,
data: []byte{0x01},
isError: true,
},
{
name: "good read proto 2",
info: listV2,
data: []byte{0x01, 0x38},
name: "good read proto 2",
info: listV2,
data: []byte{0x01, 0x38},
expectedSize: 0x0138,
},
{
name: "short read 0 proto 3",
info: listV3,
data: []byte{},
name: "short read 0 proto 3",
info: listV3,
data: []byte{},
isError: true,
},
{
name: "short read 1 proto 3",
info: listV3,
data: []byte{0x01},
name: "short read 1 proto 3",
info: listV3,
data: []byte{0x01},
isError: true,
},
{
name: "short read 2 proto 3",
info: listV3,
data: []byte{0x01, 0x38},
name: "short read 2 proto 3",
info: listV3,
data: []byte{0x01, 0x38},
isError: true,
},
{
name: "short read 3 proto 3",
info: listV3,
data: []byte{0x01, 0x38, 0x42},
name: "short read 3 proto 3",
info: listV3,
data: []byte{0x01, 0x38, 0x42},
isError: true,
},
{
name: "good read proto 3",
info: listV3,
data: []byte{0x01, 0x38, 0x42, 0x22},
name: "good read proto 3",
info: listV3,
data: []byte{0x01, 0x38, 0x42, 0x22},
expectedSize: 0x01384222,
},
}
Expand Down
55 changes: 27 additions & 28 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,14 +416,14 @@ func TokenAwareHostPolicy(fallback HostSelectionPolicy, opts ...func(*tokenAware
// and the pointer in clusterMeta updated to point to the new value.
type clusterMeta struct {
// replicas is map[keyspace]map[token]hosts
replicas map[string]map[token][]*HostInfo
replicas map[string]tokenRingReplicas
tokenRing *tokenRing
}

type tokenAwareHostPolicy struct {
fallback HostSelectionPolicy
fallback HostSelectionPolicy
getKeyspaceMetadata func(keyspace string) (*KeyspaceMetadata, error)
getKeyspaceName func() string
getKeyspaceName func() string

shuffleReplicas bool
nonLocalReplicasFallback bool
Expand All @@ -438,7 +438,7 @@ type tokenAwareHostPolicy struct {

func (t *tokenAwareHostPolicy) Init(s *Session) {
t.getKeyspaceMetadata = s.KeyspaceMetadata
t.getKeyspaceName = func() string {return s.cfg.Keyspace}
t.getKeyspaceName = func() string { return s.cfg.Keyspace }
}

func (t *tokenAwareHostPolicy) IsLocal(host *HostInfo) bool {
Expand All @@ -457,15 +457,14 @@ func (t *tokenAwareHostPolicy) KeyspaceChanged(update KeyspaceUpdateEvent) {
// It must be called with t.mu mutex locked.
// meta must not be nil and it's replicas field will be updated.
func (t *tokenAwareHostPolicy) updateReplicas(meta *clusterMeta, keyspace string) {
newReplicas := make(map[string]map[token][]*HostInfo, len(meta.replicas))
newReplicas := make(map[string]tokenRingReplicas, len(meta.replicas))

ks, err := t.getKeyspaceMetadata(keyspace)
if err == nil {
strat := getStrategy(ks)
if strat != nil {
if meta != nil && meta.tokenRing != nil {
hosts := t.hosts.get()
newReplicas[keyspace] = strat.replicaMap(hosts, meta.tokenRing.tokens)
newReplicas[keyspace] = strat.replicaMap(meta.tokenRing)
}
}
}
Expand Down Expand Up @@ -567,14 +566,6 @@ func (m *clusterMeta) resetTokenRing(partitioner string, hosts []*HostInfo) {
m.tokenRing = tokenRing
}

func (m *clusterMeta) getReplicas(keyspace string, token token) ([]*HostInfo, bool) {
if m.replicas == nil {
return nil, false
}
replicas, ok := m.replicas[keyspace][token]
return replicas, ok
}

func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
if qry == nil {
return t.fallback.Pick(qry)
Expand All @@ -592,22 +583,23 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
return t.fallback.Pick(qry)
}

primaryEndpoint, token := meta.tokenRing.GetHostForPartitionKey(routingKey)
if primaryEndpoint == nil || token == nil {
return t.fallback.Pick(qry)
}
token := meta.tokenRing.partitioner.Hash(routingKey)
ht := meta.replicas[qry.Keyspace()].replicasFor(token)
var replicas []*HostInfo

replicas, ok := meta.getReplicas(qry.Keyspace(), token)
if !ok {
replicas = []*HostInfo{primaryEndpoint}
if ht == nil {
host, _ := meta.tokenRing.GetHostForToken(token)
replicas = []*HostInfo{host}
} else if t.shuffleReplicas {
replicas = shuffleHosts(replicas)
} else {
replicas = ht.hosts
}

var (
fallbackIter NextHost
i int
j int
i, j int
remote []*HostInfo
)

used := make(map[*HostInfo]bool, len(replicas))
Expand All @@ -616,18 +608,23 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
h := replicas[i]
i++

if h.IsUp() && t.fallback.IsLocal(h) {
if !t.fallback.IsLocal(h) {
remote = append(remote, h)
continue
}

if h.IsUp() {
used[h] = true
return (*selectedHost)(h)
}
}

if t.nonLocalReplicasFallback {
for j < len(replicas) {
h := replicas[j]
for j < len(remote) {
h := remote[j]
j++

if h.IsUp() && !t.fallback.IsLocal(h) {
if h.IsUp() {
used[h] = true
return (*selectedHost)(h)
}
Expand All @@ -642,9 +639,11 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
// filter the token aware selected hosts from the fallback hosts
for fallbackHost := fallbackIter(); fallbackHost != nil; fallbackHost = fallbackIter() {
if !used[fallbackHost.Info()] {
used[fallbackHost.Info()] = true
return fallbackHost
}
}

return nil
}
}
Expand Down
Loading

0 comments on commit 73f35ff

Please sign in to comment.