Skip to content
This repository has been archived by the owner on Aug 26, 2024. It is now read-only.

Commit

Permalink
Revert "topology: replace token map with binary search (apache#1350)"
Browse files Browse the repository at this point in the history
This reverts commit 73f35ff.
  • Loading branch information
martin-sucha committed Feb 14, 2020
1 parent 8106ec6 commit 3098227
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 200 deletions.
53 changes: 27 additions & 26 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func TokenAwareHostPolicy(fallback HostSelectionPolicy, opts ...func(*tokenAware
// and the pointer in clusterMeta updated to point to the new value.
type clusterMeta struct {
// replicas is map[keyspace]map[token]hosts
replicas map[string]tokenRingReplicas
replicas map[string]map[token][]*HostInfo
tokenRing *tokenRing
}

Expand Down Expand Up @@ -465,14 +465,15 @@ func (t *tokenAwareHostPolicy) KeyspaceChanged(update KeyspaceUpdateEvent) {
// It must be called with t.mu mutex locked.
// meta must not be nil and it's replicas field will be updated.
func (t *tokenAwareHostPolicy) updateReplicas(meta *clusterMeta, keyspace string) {
newReplicas := make(map[string]tokenRingReplicas, len(meta.replicas))
newReplicas := make(map[string]map[token][]*HostInfo, len(meta.replicas))

ks, err := t.getKeyspaceMetadata(keyspace)
if err == nil {
strat := getStrategy(ks)
if strat != nil {
if meta != nil && meta.tokenRing != nil {
newReplicas[keyspace] = strat.replicaMap(meta.tokenRing)
hosts := t.hosts.get()
newReplicas[keyspace] = strat.replicaMap(hosts, meta.tokenRing.tokens)
}
}
}
Expand Down Expand Up @@ -593,6 +594,14 @@ func (m *clusterMeta) resetTokenRing(partitioner string, hosts []*HostInfo) {
m.tokenRing = tokenRing
}

func (m *clusterMeta) getReplicas(keyspace string, token token) ([]*HostInfo, bool) {
if m.replicas == nil {
return nil, false
}
replicas, ok := m.replicas[keyspace][token]
return replicas, ok
}

func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
if qry == nil {
return t.fallback.Pick(qry)
Expand All @@ -610,24 +619,22 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
return t.fallback.Pick(qry)
}

token := meta.tokenRing.partitioner.Hash(routingKey)
ht := meta.replicas[qry.Keyspace()].replicasFor(token)
primaryEndpoint, token, endToken := meta.tokenRing.GetHostForPartitionKey(routingKey)
if primaryEndpoint == nil || endToken == nil {
return t.fallback.Pick(qry)
}

var replicas []*HostInfo
if ht == nil {
host, _ := meta.tokenRing.GetHostForToken(token)
replicas = []*HostInfo{host}
} else {
replicas = ht.hosts
if t.shuffleReplicas {
replicas = shuffleHosts(replicas)
}
replicas, ok := meta.getReplicas(qry.Keyspace(), endToken)
if !ok {
replicas = []*HostInfo{primaryEndpoint}
} else if t.shuffleReplicas {
replicas = shuffleHosts(replicas)
}

var (
fallbackIter NextHost
i, j int
remote []*HostInfo
i int
j int
)

used := make(map[*HostInfo]bool, len(replicas))
Expand All @@ -636,23 +643,18 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
h := replicas[i]
i++

if !t.fallback.IsLocal(h) {
remote = append(remote, h)
continue
}

if h.IsUp() {
if h.IsUp() && t.fallback.IsLocal(h) {
used[h] = true
return selectedHost{info: h, token: token}
}
}

if t.nonLocalReplicasFallback {
for j < len(remote) {
h := remote[j]
for j < len(replicas) {
h := replicas[j]
j++

if h.IsUp() {
if h.IsUp() && !t.fallback.IsLocal(h) {
used[h] = true
return selectedHost{info: h, token: token}
}
Expand All @@ -671,7 +673,6 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
return fallbackHost
}
}

return nil
}
}
Expand Down
126 changes: 72 additions & 54 deletions policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,20 @@ func TestHostPolicy_TokenAware_SimpleStrategy(t *testing.T) {

// The SimpleStrategy above should generate the following replicas.
// It's handy to have as reference here.
assertDeepEqual(t, "replicas", map[string]tokenRingReplicas{
assertDeepEqual(t, "replicas", map[string]map[token][]*HostInfo{
"myKeyspace": {
{orderedToken("00"), []*HostInfo{hosts[0], hosts[1]}},
{orderedToken("25"), []*HostInfo{hosts[1], hosts[2]}},
{orderedToken("50"), []*HostInfo{hosts[2], hosts[3]}},
{orderedToken("75"), []*HostInfo{hosts[3], hosts[0]}},
orderedToken("00"): {hosts[0], hosts[1]},
orderedToken("25"): {hosts[1], hosts[2]},
orderedToken("50"): {hosts[2], hosts[3]},
orderedToken("75"): {hosts[3], hosts[0]},
},
}, policyInternal.getMetadataReadOnly().replicas)

// now the token ring is configured
query.RoutingKey([]byte("20"))
iter = policy.Pick(query)
iterCheck(t, iter, "0")
iterCheck(t, iter, "1")
iterCheck(t, iter, "2")
}

// Tests of the host pool host selection policy implementation
Expand Down Expand Up @@ -501,20 +501,20 @@ func TestHostPolicy_TokenAware(t *testing.T) {

// The NetworkTopologyStrategy above should generate the following replicas.
// It's handy to have as reference here.
assertDeepEqual(t, "replicas", map[string]tokenRingReplicas{
assertDeepEqual(t, "replicas", map[string]map[token][]*HostInfo{
"myKeyspace": {
{orderedToken("05"), []*HostInfo{hosts[0], hosts[1], hosts[2]}},
{orderedToken("10"), []*HostInfo{hosts[1], hosts[2], hosts[3]}},
{orderedToken("15"), []*HostInfo{hosts[2], hosts[3], hosts[4]}},
{orderedToken("20"), []*HostInfo{hosts[3], hosts[4], hosts[5]}},
{orderedToken("25"), []*HostInfo{hosts[4], hosts[5], hosts[6]}},
{orderedToken("30"), []*HostInfo{hosts[5], hosts[6], hosts[7]}},
{orderedToken("35"), []*HostInfo{hosts[6], hosts[7], hosts[8]}},
{orderedToken("40"), []*HostInfo{hosts[7], hosts[8], hosts[9]}},
{orderedToken("45"), []*HostInfo{hosts[8], hosts[9], hosts[10]}},
{orderedToken("50"), []*HostInfo{hosts[9], hosts[10], hosts[11]}},
{orderedToken("55"), []*HostInfo{hosts[10], hosts[11], hosts[0]}},
{orderedToken("60"), []*HostInfo{hosts[11], hosts[0], hosts[1]}},
orderedToken("05"): {hosts[0], hosts[1], hosts[2]},
orderedToken("10"): {hosts[1], hosts[2], hosts[3]},
orderedToken("15"): {hosts[2], hosts[3], hosts[4]},
orderedToken("20"): {hosts[3], hosts[4], hosts[5]},
orderedToken("25"): {hosts[4], hosts[5], hosts[6]},
orderedToken("30"): {hosts[5], hosts[6], hosts[7]},
orderedToken("35"): {hosts[6], hosts[7], hosts[8]},
orderedToken("40"): {hosts[7], hosts[8], hosts[9]},
orderedToken("45"): {hosts[8], hosts[9], hosts[10]},
orderedToken("50"): {hosts[9], hosts[10], hosts[11]},
orderedToken("55"): {hosts[10], hosts[11], hosts[0]},
orderedToken("60"): {hosts[11], hosts[0], hosts[1]},
},
}, policyInternal.getMetadataReadOnly().replicas)

Expand Down Expand Up @@ -590,25 +590,25 @@ func TestHostPolicy_TokenAware_NetworkStrategy(t *testing.T) {

// The NetworkTopologyStrategy above should generate the following replicas.
// It's handy to have as reference here.
assertDeepEqual(t, "replicas", map[string]tokenRingReplicas{
keyspace: {
{orderedToken("05"), []*HostInfo{hosts[0], hosts[1], hosts[2], hosts[3], hosts[4], hosts[5]}},
{orderedToken("10"), []*HostInfo{hosts[1], hosts[2], hosts[3], hosts[4], hosts[5], hosts[6]}},
{orderedToken("15"), []*HostInfo{hosts[2], hosts[3], hosts[4], hosts[5], hosts[6], hosts[7]}},
{orderedToken("20"), []*HostInfo{hosts[3], hosts[4], hosts[5], hosts[6], hosts[7], hosts[8]}},
{orderedToken("25"), []*HostInfo{hosts[4], hosts[5], hosts[6], hosts[7], hosts[8], hosts[9]}},
{orderedToken("30"), []*HostInfo{hosts[5], hosts[6], hosts[7], hosts[8], hosts[9], hosts[10]}},
{orderedToken("35"), []*HostInfo{hosts[6], hosts[7], hosts[8], hosts[9], hosts[10], hosts[11]}},
{orderedToken("40"), []*HostInfo{hosts[7], hosts[8], hosts[9], hosts[10], hosts[11], hosts[0]}},
{orderedToken("45"), []*HostInfo{hosts[8], hosts[9], hosts[10], hosts[11], hosts[0], hosts[1]}},
{orderedToken("50"), []*HostInfo{hosts[9], hosts[10], hosts[11], hosts[0], hosts[1], hosts[2]}},
{orderedToken("55"), []*HostInfo{hosts[10], hosts[11], hosts[0], hosts[1], hosts[2], hosts[3]}},
{orderedToken("60"), []*HostInfo{hosts[11], hosts[0], hosts[1], hosts[2], hosts[3], hosts[4]}},
assertDeepEqual(t, "replicas", map[string]map[token][]*HostInfo{
"myKeyspace": {
orderedToken("05"): {hosts[0], hosts[1], hosts[2], hosts[3], hosts[4], hosts[5]},
orderedToken("10"): {hosts[1], hosts[2], hosts[3], hosts[4], hosts[5], hosts[6]},
orderedToken("15"): {hosts[2], hosts[3], hosts[4], hosts[5], hosts[6], hosts[7]},
orderedToken("20"): {hosts[3], hosts[4], hosts[5], hosts[6], hosts[7], hosts[8]},
orderedToken("25"): {hosts[4], hosts[5], hosts[6], hosts[7], hosts[8], hosts[9]},
orderedToken("30"): {hosts[5], hosts[6], hosts[7], hosts[8], hosts[9], hosts[10]},
orderedToken("35"): {hosts[6], hosts[7], hosts[8], hosts[9], hosts[10], hosts[11]},
orderedToken("40"): {hosts[7], hosts[8], hosts[9], hosts[10], hosts[11], hosts[0]},
orderedToken("45"): {hosts[8], hosts[9], hosts[10], hosts[11], hosts[0], hosts[1]},
orderedToken("50"): {hosts[9], hosts[10], hosts[11], hosts[0], hosts[1], hosts[2]},
orderedToken("55"): {hosts[10], hosts[11], hosts[0], hosts[1], hosts[2], hosts[3]},
orderedToken("60"): {hosts[11], hosts[0], hosts[1], hosts[2], hosts[3], hosts[4]},
},
}, policyInternal.getMetadataReadOnly().replicas)

// now the token ring is configured
query.RoutingKey([]byte("23"))
query.RoutingKey([]byte("18"))
iter = policy.Pick(query)
// first should be hosts with matching token from the local DC
iterCheck(t, iter, "4")
Expand All @@ -620,9 +620,10 @@ func TestHostPolicy_TokenAware_NetworkStrategy(t *testing.T) {
iterCheck(t, iter, "8")
}


func TestHostPolicy_TokenAware_Issue1274(t *testing.T) {
policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"))
// Tests of the token-aware host selection policy implementation with a
// DC aware round-robin host selection policy fallback with NonLocalReplicasFallback option enabled.
func TestHostPolicy_TokenAware_DCAwareRR_NonLocalFallback(t *testing.T) {
policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"), NonLocalReplicasFallback())
policyInternal := policy.(*tokenAwareHostPolicy)
policyInternal.getKeyspaceName = func() string {return "myKeyspace"}
policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
Expand Down Expand Up @@ -656,6 +657,9 @@ func TestHostPolicy_TokenAware_Issue1274(t *testing.T) {
{hostId: "10", connectAddress: net.IPv4(10, 0, 0, 11), tokens: []string{"55"}, dataCenter: "local"},
{hostId: "11", connectAddress: net.IPv4(10, 0, 0, 12), tokens: []string{"60"}, dataCenter: "remote2"},
}
for _, host := range hosts {
policy.AddHost(host)
}

policy.SetPartitioner("OrderedPartitioner")

Expand All @@ -676,24 +680,38 @@ func TestHostPolicy_TokenAware_Issue1274(t *testing.T) {
}
policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: "myKeyspace"})

cancel := make(chan struct{})
// The NetworkTopologyStrategy above should generate the following replicas.
// It's handy to have as reference here.
assertDeepEqual(t, "replicas", map[string]map[token][]*HostInfo{
"myKeyspace": {
orderedToken("05"): {hosts[0], hosts[1], hosts[2]},
orderedToken("10"): {hosts[1], hosts[2], hosts[3]},
orderedToken("15"): {hosts[2], hosts[3], hosts[4]},
orderedToken("20"): {hosts[3], hosts[4], hosts[5]},
orderedToken("25"): {hosts[4], hosts[5], hosts[6]},
orderedToken("30"): {hosts[5], hosts[6], hosts[7]},
orderedToken("35"): {hosts[6], hosts[7], hosts[8]},
orderedToken("40"): {hosts[7], hosts[8], hosts[9]},
orderedToken("45"): {hosts[8], hosts[9], hosts[10]},
orderedToken("50"): {hosts[9], hosts[10], hosts[11]},
orderedToken("55"): {hosts[10], hosts[11], hosts[0]},
orderedToken("60"): {hosts[11], hosts[0], hosts[1]},
},
}, policyInternal.getMetadataReadOnly().replicas)

// now the token ring is configured
for _, host := range hosts {
host := host
go func() {
for {
select {
case <-cancel:
return
default:
policy.AddHost(host)
policy.RemoveHost(host)
}
}
}()
query.RoutingKey([]byte("18"))
iter = policy.Pick(query)
// first should be host with matching token from the local DC
if actual := iter(); actual.Info().HostID() != "4" {
t.Errorf("Expected peer 4 but was %s", actual.Info().HostID())
}

time.Sleep(100 * time.Millisecond)
close(cancel)
// rest should be hosts with matching token from remote DCs
if actual := iter(); actual.Info().HostID() != "3" {
t.Errorf("Expected peer 3 but was %s", actual.Info().HostID())
}
if actual := iter(); actual.Info().HostID() != "5" {
t.Errorf("Expected peer 5 but was %s", actual.Info().HostID())
}
// rest depend on fallback strategy
}
13 changes: 5 additions & 8 deletions token.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,10 @@ func (ht hostToken) String() string {
type tokenRing struct {
partitioner partitioner
tokens []hostToken
hosts []*HostInfo
}

func newTokenRing(partitioner string, hosts []*HostInfo) (*tokenRing, error) {
tokenRing := &tokenRing{
hosts: hosts,
}
tokenRing := &tokenRing{}

if strings.HasSuffix(partitioner, "Murmur3Partitioner") {
tokenRing.partitioner = murmur3Partitioner{}
Expand Down Expand Up @@ -215,15 +212,15 @@ func (t *tokenRing) GetHostForToken(token token) (host *HostInfo, endToken token
}

// find the primary replica
p := sort.Search(len(t.tokens), func(i int) bool {
ringIndex := sort.Search(len(t.tokens), func(i int) bool {
return !t.tokens[i].token.Less(token)
})

if p == len(t.tokens) {
if ringIndex == len(t.tokens) {
// wrap around to the first in the ring
p = 0
ringIndex = 0
}

v := t.tokens[p]
v := t.tokens[ringIndex]
return v.host, v.token
}
Loading

0 comments on commit 3098227

Please sign in to comment.