Skip to content
This repository has been archived by the owner on Aug 26, 2024. It is now read-only.

Commit

Permalink
Revert "topology: replace token map with binary search (apache#1350)"
Browse files Browse the repository at this point in the history
This reverts commit 73f35ff.

Conflicts:
	policies.go
	policies_test.go
	topology.go
  • Loading branch information
martin-sucha committed Feb 13, 2020
1 parent 1fb1ac5 commit 7ba21be
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 246 deletions.
15 changes: 5 additions & 10 deletions common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,36 +224,31 @@ func staticAddressTranslator(newAddr net.IP, newPort int) AddressTranslator {
}

func assertTrue(t *testing.T, description string, value bool) {
t.Helper()
if !value {
t.Fatalf("expected %s to be true", description)
t.Errorf("expected %s to be true", description)
}
}

func assertEqual(t *testing.T, description string, expected, actual interface{}) {
t.Helper()
if expected != actual {
t.Fatalf("expected %s to be (%+v) but was (%+v) instead", description, expected, actual)
t.Errorf("expected %s to be (%+v) but was (%+v) instead", description, expected, actual)
}
}

func assertDeepEqual(t *testing.T, description string, expected, actual interface{}) {
t.Helper()
if !reflect.DeepEqual(expected, actual) {
t.Fatalf("expected %s to be (%+v) but was (%+v) instead", description, expected, actual)
t.Errorf("expected %s to be (%+v) but was (%+v) instead", description, expected, actual)
}
}

func assertNil(t *testing.T, description string, actual interface{}) {
t.Helper()
if actual != nil {
t.Fatalf("expected %s to be (nil) but was (%+v) instead", description, actual)
t.Errorf("expected %s to be (nil) but was (%+v) instead", description, actual)
}
}

func assertNotNil(t *testing.T, description string, actual interface{}) {
t.Helper()
if actual == nil {
t.Fatalf("expected %s not to be (nil)", description)
t.Errorf("expected %s not to be (nil)", description)
}
}
4 changes: 2 additions & 2 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func TestApprove(t *testing.T) {

func TestJoinHostPort(t *testing.T) {
tests := map[string]string{
"127.0.0.1:0": JoinHostPort("127.0.0.1", 0),
"127.0.0.1:1": JoinHostPort("127.0.0.1:1", 9142),
"127.0.0.1:0": JoinHostPort("127.0.0.1", 0),
"127.0.0.1:1": JoinHostPort("127.0.0.1:1", 9142),
"[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:0": JoinHostPort("2001:0db8:85a3:0000:0000:8a2e:0370:7334", 0),
"[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:1": JoinHostPort("[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:1", 9142),
}
Expand Down
12 changes: 6 additions & 6 deletions control.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,14 @@ func hostInfo(addr string, defaultPort int) ([]*HostInfo, error) {
}

func shuffleHosts(hosts []*HostInfo) []*HostInfo {
shuffled := make([]*HostInfo, len(hosts))
copy(shuffled, hosts)

mutRandr.Lock()
randr.Shuffle(len(hosts), func(i, j int) {
shuffled[i], shuffled[j] = shuffled[j], shuffled[i]
})
perm := randr.Perm(len(hosts))
mutRandr.Unlock()
shuffled := make([]*HostInfo, len(hosts))

for i, host := range hosts {
shuffled[perm[i]] = host
}

return shuffled
}
Expand Down
4 changes: 2 additions & 2 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type HostInfo struct {
// TODO(zariel): reduce locking maybe, not all values will change, but to ensure
// that we are thread safe use a mutex to access all fields.
mu sync.RWMutex
hostname string
hostname string
peer net.IP
broadcastAddress net.IP
listenAddress net.IP
Expand All @@ -128,7 +128,7 @@ type HostInfo struct {
clusterName string
version cassVersion
state nodeState
schemaVersion string
schemaVersion string
tokens []string
}

Expand Down
16 changes: 8 additions & 8 deletions marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1301,15 +1301,15 @@ func unmarshalDate(info TypeInfo, data []byte, value interface{}) error {
*v = time.Unix(0, timestamp*int64(time.Millisecond)).In(time.UTC)
return nil
case *string:
if len(data) == 0 {
*v = ""
return nil
}
var origin uint32 = 1 << 31
var current uint32 = binary.BigEndian.Uint32(data)
timestamp := (int64(current) - int64(origin)) * 86400000
if len(data) == 0 {
*v = ""
return nil
}
var origin uint32 = 1 << 31
var current uint32 = binary.BigEndian.Uint32(data)
timestamp := (int64(current) - int64(origin)) * 86400000
*v = time.Unix(0, timestamp*int64(time.Millisecond)).In(time.UTC).Format("2006-01-02")
return nil
return nil
}
return unmarshalErrorf("can not unmarshal %s into %T", info, value)
}
Expand Down
56 changes: 28 additions & 28 deletions marshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1871,58 +1871,58 @@ func TestReadCollectionSize(t *testing.T) {
}

tests := []struct {
name string
info CollectionType
data []byte
isError bool
name string
info CollectionType
data []byte
isError bool
expectedSize int
}{
{
name: "short read 0 proto 2",
info: listV2,
data: []byte{},
name: "short read 0 proto 2",
info: listV2,
data: []byte{},
isError: true,
},
{
name: "short read 1 proto 2",
info: listV2,
data: []byte{0x01},
name: "short read 1 proto 2",
info: listV2,
data: []byte{0x01},
isError: true,
},
{
name: "good read proto 2",
info: listV2,
data: []byte{0x01, 0x38},
name: "good read proto 2",
info: listV2,
data: []byte{0x01, 0x38},
expectedSize: 0x0138,
},
{
name: "short read 0 proto 3",
info: listV3,
data: []byte{},
name: "short read 0 proto 3",
info: listV3,
data: []byte{},
isError: true,
},
{
name: "short read 1 proto 3",
info: listV3,
data: []byte{0x01},
name: "short read 1 proto 3",
info: listV3,
data: []byte{0x01},
isError: true,
},
{
name: "short read 2 proto 3",
info: listV3,
data: []byte{0x01, 0x38},
name: "short read 2 proto 3",
info: listV3,
data: []byte{0x01, 0x38},
isError: true,
},
{
name: "short read 3 proto 3",
info: listV3,
data: []byte{0x01, 0x38, 0x42},
name: "short read 3 proto 3",
info: listV3,
data: []byte{0x01, 0x38, 0x42},
isError: true,
},
{
name: "good read proto 3",
info: listV3,
data: []byte{0x01, 0x38, 0x42, 0x22},
name: "good read proto 3",
info: listV3,
data: []byte{0x01, 0x38, 0x42, 0x22},
expectedSize: 0x01384222,
},
}
Expand Down
55 changes: 28 additions & 27 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,14 +424,14 @@ func TokenAwareHostPolicy(fallback HostSelectionPolicy, opts ...func(*tokenAware
// and the pointer in clusterMeta updated to point to the new value.
type clusterMeta struct {
// replicas is map[keyspace]map[token]hosts
replicas map[string]tokenRingReplicas
replicas map[string]map[token][]*HostInfo
tokenRing *tokenRing
}

type tokenAwareHostPolicy struct {
fallback HostSelectionPolicy
fallback HostSelectionPolicy
getKeyspaceMetadata func(keyspace string) (*KeyspaceMetadata, error)
getKeyspaceName func() string
getKeyspaceName func() string

shuffleReplicas bool
nonLocalReplicasFallback bool
Expand All @@ -446,7 +446,7 @@ type tokenAwareHostPolicy struct {

func (t *tokenAwareHostPolicy) Init(s *Session) {
t.getKeyspaceMetadata = s.KeyspaceMetadata
t.getKeyspaceName = func() string { return s.cfg.Keyspace }
t.getKeyspaceName = func() string {return s.cfg.Keyspace}
}

func (t *tokenAwareHostPolicy) IsLocal(host *HostInfo) bool {
Expand All @@ -465,14 +465,15 @@ func (t *tokenAwareHostPolicy) KeyspaceChanged(update KeyspaceUpdateEvent) {
// It must be called with t.mu mutex locked.
// meta must not be nil and it's replicas field will be updated.
func (t *tokenAwareHostPolicy) updateReplicas(meta *clusterMeta, keyspace string) {
newReplicas := make(map[string]tokenRingReplicas, len(meta.replicas))
newReplicas := make(map[string]map[token][]*HostInfo, len(meta.replicas))

ks, err := t.getKeyspaceMetadata(keyspace)
if err == nil {
strat := getStrategy(ks)
if strat != nil {
if meta != nil && meta.tokenRing != nil {
newReplicas[keyspace] = strat.replicaMap(meta.tokenRing)
hosts := t.hosts.get()
newReplicas[keyspace] = strat.replicaMap(hosts, meta.tokenRing.tokens)
}
}
}
Expand Down Expand Up @@ -593,6 +594,14 @@ func (m *clusterMeta) resetTokenRing(partitioner string, hosts []*HostInfo) {
m.tokenRing = tokenRing
}

func (m *clusterMeta) getReplicas(keyspace string, token token) ([]*HostInfo, bool) {
if m.replicas == nil {
return nil, false
}
replicas, ok := m.replicas[keyspace][token]
return replicas, ok
}

func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
if qry == nil {
return t.fallback.Pick(qry)
Expand All @@ -610,23 +619,22 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
return t.fallback.Pick(qry)
}

token := meta.tokenRing.partitioner.Hash(routingKey)
ht := meta.replicas[qry.Keyspace()].replicasFor(token)
primaryEndpoint, token, endToken := meta.tokenRing.GetHostForPartitionKey(routingKey)
if primaryEndpoint == nil || endToken == nil {
return t.fallback.Pick(qry)
}

var replicas []*HostInfo
if ht == nil {
host, _ := meta.tokenRing.GetHostForToken(token)
replicas = []*HostInfo{host}
replicas, ok := meta.getReplicas(qry.Keyspace(), endToken)
if !ok {
replicas = []*HostInfo{primaryEndpoint}
} else if t.shuffleReplicas {
replicas = shuffleHosts(replicas)
} else {
replicas = ht.hosts
}

var (
fallbackIter NextHost
i, j int
remote []*HostInfo
i int
j int
)

used := make(map[*HostInfo]bool, len(replicas))
Expand All @@ -635,23 +643,18 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
h := replicas[i]
i++

if !t.fallback.IsLocal(h) {
remote = append(remote, h)
continue
}

if h.IsUp() {
if h.IsUp() && t.fallback.IsLocal(h) {
used[h] = true
return selectedHost{info: h, token: token}
}
}

if t.nonLocalReplicasFallback {
for j < len(remote) {
h := remote[j]
for j < len(replicas) {
h := replicas[j]
j++

if h.IsUp() {
if h.IsUp() && !t.fallback.IsLocal(h) {
used[h] = true
return selectedHost{info: h, token: token}
}
Expand All @@ -666,11 +669,9 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
// filter the token aware selected hosts from the fallback hosts
for fallbackHost := fallbackIter(); fallbackHost != nil; fallbackHost = fallbackIter() {
if !used[fallbackHost.Info()] {
used[fallbackHost.Info()] = true
return fallbackHost
}
}

return nil
}
}
Expand Down
Loading

0 comments on commit 7ba21be

Please sign in to comment.