Skip to content

Commit

Permalink
Emits a counter value for every unique view of the hashring (#5672)
Browse files Browse the repository at this point in the history
Context and problem
The consistent hashring is an eventually consistent way to see the services' members for routing in amongst shards. It's an eventually-consistent system (since actual strong consistency for membership is controlled by the database shard locking).

However, it's possible that it can be very eventually consistent, particularly if the membership resolving system is serving stale data for some reason. This can cause problems because particularly in the case of history, it breaks inter-service communication and routing.

Solution
A dumb way to determine if there's an inconsistency therefore, is to just hash the complete membership set and emit it for each host, like a fingerprint. In the healthy case, this random identifier value will quickly converge across hosts. . In the event their views are inconsistent, this will appear as a different guage values which remain persistently different, indicating that operationally some manual operation must be taken.

querying the data
Assuming m3, the trick will be to look for differences between the identifier values. The value itself is just a hash value and arbitrary. Therefore, selecting differences between the upper bound thusly:

max = fetch service:cadence* operation:hashring region:region1 service:cadence-history | maxSeries;
min = fetch service:cadence* operation:hashring region:region1 service:cadence-history | minSeries;

-- select where they're different
max | != (min)

-- and emit -1 for the default case where the upper value and lower value are all the same
| transformNull -1
  • Loading branch information
davidporter-id-au authored Feb 19, 2024
1 parent f0097a6 commit ff95842
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 19 deletions.
4 changes: 2 additions & 2 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ func (s *server) startService() common.Daemon {
log.Fatalf("ringpop provider failed: %v", err)
}

params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, params.Logger))

params.MembershipResolver, err = membership.NewResolver(
peerProvider,
params.Logger,
Expand All @@ -192,8 +194,6 @@ func (s *server) startService() common.Daemon {

params.ClusterRedirectionPolicy = s.cfg.ClusterGroupMetadata.ClusterRedirectionPolicy

params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, params.Logger))

params.ClusterMetadata = cluster.NewMetadata(
clusterGroupMetadata.FailoverVersionIncrement,
clusterGroupMetadata.PrimaryClusterName,
Expand Down
52 changes: 47 additions & 5 deletions common/membership/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ package membership

import (
"fmt"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -33,6 +35,7 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/types"
)

Expand Down Expand Up @@ -64,6 +67,7 @@ type ring struct {
refreshChan chan *ChangedEvent
shutdownCh chan struct{}
shutdownWG sync.WaitGroup
scope metrics.Scope
logger log.Logger

value atomic.Value // this stores the current hashring
Expand All @@ -84,21 +88,23 @@ func newHashring(
service string,
provider PeerProvider,
logger log.Logger,
scope metrics.Scope,
) *ring {
hashring := &ring{
ring := &ring{
status: common.DaemonStatusInitialized,
service: service,
peerProvider: provider,
shutdownCh: make(chan struct{}),
logger: logger,
refreshChan: make(chan *ChangedEvent),
scope: scope,
}

hashring.members.keys = make(map[string]HostInfo)
hashring.subscribers.keys = make(map[string]chan<- *ChangedEvent)
ring.members.keys = make(map[string]HostInfo)
ring.subscribers.keys = make(map[string]chan<- *ChangedEvent)

hashring.value.Store(emptyHashring())
return hashring
ring.value.Store(emptyHashring())
return ring
}

func emptyHashring() *hashring.HashRing {
Expand Down Expand Up @@ -264,6 +270,7 @@ func (r *ring) refreshRingWorker() {
r.logger.Error("refreshing ring", tag.Error(err))
}
case <-refreshTicker.C: // periodically refresh membership
r.emitHashIdentifier()
if err := r.refresh(); err != nil {
r.logger.Error("periodically refreshing ring", tag.Error(err))
}
Expand All @@ -275,6 +282,41 @@ func (r *ring) ring() *hashring.HashRing {
return r.value.Load().(*hashring.HashRing)
}

func (r *ring) emitHashIdentifier() float64 {
members, err := r.peerProvider.GetMembers(r.service)
if err != nil {
r.logger.Error("Observed a problem getting peer members while emitting hash identifier metrics", tag.Error(err))
return -1
}
self, err := r.peerProvider.WhoAmI()
if err != nil {
r.logger.Error("Observed a problem looking up self from the membership provider while emitting hash identifier metrics", tag.Error(err))
self = HostInfo{
identity: "unknown",
}
}

sort.Slice(members, func(i int, j int) bool {
return members[i].addr > members[j].addr
})
var sb strings.Builder
for i := range members {
sb.WriteString(members[i].addr)
sb.WriteString("\n")
}
hashedView := farm.Hash32([]byte(sb.String()))
// Trimming the metric because collisions are unlikely and I didn't want to use the full Float64
// in-case it overflowed something. The number itself is meaningless, so additional precision
// doesn't really give any advantage, besides reducing the risk of collision
trimmedForMetric := float64(hashedView % 1000)
r.logger.Debug("Hashring view", tag.Dynamic("hashring-view", sb.String()), tag.Dynamic("trimmed-hash-id", trimmedForMetric), tag.Service(r.service))
r.scope.Tagged(
metrics.ServiceTag(r.service),
metrics.HostTag(self.identity),
).UpdateGauge(metrics.HashringViewIdentifier, trimmedForMetric)
return trimmedForMetric
}

func (r *ring) compareMembers(members []HostInfo) (map[string]HostInfo, bool) {
changed := false
newMembersMap := make(map[string]HostInfo, len(members))
Expand Down
80 changes: 71 additions & 9 deletions common/membership/hashring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/metrics"
)

var letters = []rune("abcdefghijklmnopqrstuvwxyz")
Expand Down Expand Up @@ -103,7 +104,7 @@ func TestFailedLookupWillAskProvider(t *testing.T) {
pp.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1)
pp.EXPECT().GetMembers("test-service").Times(1)

hr := newHashring("test-service", pp, log.NewNoop())
hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
hr.Start()
_, err := hr.Lookup("a")

Expand All @@ -117,7 +118,7 @@ func TestRefreshUpdatesRingOnlyWhenRingHasChanged(t *testing.T) {
pp.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1)
pp.EXPECT().GetMembers("test-service").Times(3)

hr := newHashring("test-service", pp, log.NewNoop())
hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
hr.Start()

hr.refresh()
Expand All @@ -132,7 +133,7 @@ func TestSubscribeIgnoresDuplicates(t *testing.T) {
ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)

hr := newHashring("test-service", pp, log.NewNoop())
hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))

assert.NoError(t, hr.Subscribe("test-service", changeCh))
assert.Error(t, hr.Subscribe("test-service", changeCh))
Expand All @@ -143,7 +144,7 @@ func TestUnsubcribeIgnoresDeletionOnEmpty(t *testing.T) {
ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)

hr := newHashring("test-service", pp, log.NewNoop())
hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
assert.Equal(t, 0, len(hr.subscribers.keys))
assert.NoError(t, hr.Unsubscribe("test-service"))
assert.NoError(t, hr.Unsubscribe("test-service"))
Expand All @@ -155,7 +156,7 @@ func TestUnsubcribeDeletes(t *testing.T) {
pp := NewMockPeerProvider(ctrl)
var changeCh = make(chan *ChangedEvent)

hr := newHashring("test-service", pp, log.NewNoop())
hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))

assert.Equal(t, 0, len(hr.subscribers.keys))
assert.NoError(t, hr.Subscribe("testservice1", changeCh))
Expand All @@ -171,7 +172,7 @@ func TestMemberCountReturnsNumber(t *testing.T) {
ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)

hr := newHashring("test-service", pp, log.NewNoop())
hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
assert.Equal(t, 0, hr.MemberCount())

ring := emptyHashring()
Expand All @@ -188,7 +189,7 @@ func TestErrorIsPropagatedWhenProviderFails(t *testing.T) {
pp := NewMockPeerProvider(ctrl)
pp.EXPECT().GetMembers(gomock.Any()).Return(nil, errors.New("error"))

hr := newHashring("test-service", pp, log.NewNoop())
hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
assert.Error(t, hr.refresh())
}

Expand All @@ -198,7 +199,7 @@ func TestStopWillStopProvider(t *testing.T) {

pp.EXPECT().Stop().Times(1)

hr := newHashring("test-service", pp, log.NewNoop())
hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
hr.status = common.DaemonStatusStarted
hr.Stop()

Expand All @@ -213,7 +214,7 @@ func TestLookupAndRefreshRaceCondition(t *testing.T) {
pp.EXPECT().GetMembers("test-service").AnyTimes().DoAndReturn(func(service string) ([]HostInfo, error) {
return randomHostInfo(5), nil
})
hr := newHashring("test-service", pp, log.NewNoop())
hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
hr.Start()
wg.Add(2)
go func() {
Expand All @@ -233,3 +234,64 @@ func TestLookupAndRefreshRaceCondition(t *testing.T) {

wg.Wait()
}

func TestEmitHashringView(t *testing.T) {

tests := map[string]struct {
hosts []HostInfo
lookuperr error
selfInfo HostInfo
selfErr error
expectedResult float64
}{
"example one - sorted set 1 - the output should be some random hashed value": {
hosts: []HostInfo{
{addr: "10.0.0.1:1234", ip: "10.0.0.1", identity: "host1", portMap: nil},
{addr: "10.0.0.2:1234", ip: "10.0.0.2", identity: "host2", portMap: nil},
{addr: "10.0.0.3:1234", ip: "10.0.0.3", identity: "host3", portMap: nil},
},
selfInfo: HostInfo{identity: "host123"},
expectedResult: 835.0, // the number here is meaningless
},
"example one - unsorted set 1 - the order of the hosts should not matter": {
hosts: []HostInfo{
{addr: "10.0.0.1:1234", ip: "10.0.0.1", identity: "host1", portMap: nil},
{addr: "10.0.0.3:1234", ip: "10.0.0.3", identity: "host3", portMap: nil},
{addr: "10.0.0.2:1234", ip: "10.0.0.2", identity: "host2", portMap: nil},
},
selfInfo: HostInfo{identity: "host123"},
expectedResult: 835.0, // the test here is that it's the same as test 1
},
"example 2 - empty set": {
hosts: []HostInfo{},
selfInfo: HostInfo{identity: "host123"},
expectedResult: 242.0, // meaningless hash value
},
"example 3 - nil set": {
hosts: nil,
selfInfo: HostInfo{identity: "host123"},
expectedResult: 242.0, // meaningless hash value
},
}

for name, td := range tests {

t.Run(name, func(t *testing.T) {

ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)

pp.EXPECT().GetMembers("test-service").DoAndReturn(func(service string) ([]HostInfo, error) {
return td.hosts, td.lookuperr
})

pp.EXPECT().WhoAmI().DoAndReturn(func() (HostInfo, error) {
return td.selfInfo, td.selfErr
})

hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))

assert.Equal(t, td.expectedResult, hr.emitHashIdentifier())
})
}
}
6 changes: 3 additions & 3 deletions common/membership/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,17 @@ func NewMultiringResolver(
services []string,
provider PeerProvider,
logger log.Logger,
metrics metrics.Client,
metricsClient metrics.Client,
) *MultiringResolver {
rpo := &MultiringResolver{
status: common.DaemonStatusInitialized,
provider: provider,
rings: make(map[string]*ring),
metrics: metrics,
metrics: metricsClient,
}

for _, s := range services {
rpo.rings[s] = newHashring(s, provider, logger)
rpo.rings[s] = newHashring(s, provider, logger, metricsClient.Scope(metrics.HashringScope))
}
return rpo
}
Expand Down
6 changes: 6 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ const (

// ResolverHostNotFoundScope is a simple low level error indicating a lookup failed in the membership resolver
ResolverHostNotFoundScope
// HashringScope is a metrics scope for emitting events for the service hashrhing
HashringScope
// HistoryClientStartWorkflowExecutionScope tracks RPC calls to history service
HistoryClientStartWorkflowExecutionScope
// HistoryClientDescribeHistoryHostScope tracks RPC calls to history service
Expand Down Expand Up @@ -1707,6 +1709,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
TaskValidatorScope: {operation: "TaskValidation"},
DomainReplicationQueueScope: {operation: "DomainReplicationQueue"},
ClusterMetadataScope: {operation: "ClusterMetadata"},
HashringScope: {operation: "Hashring"},
},
// Frontend Scope Names
Frontend: {
Expand Down Expand Up @@ -2174,6 +2177,8 @@ const (
IsolationGroupStateHealthy
ValidatedWorkflowCount

HashringViewIdentifier

NumCommonMetrics // Needs to be last on this list for iota numbering
)

Expand Down Expand Up @@ -2812,6 +2817,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
IsolationGroupStateDrained: {metricName: "isolation_group_drained", metricType: Counter},
IsolationGroupStateHealthy: {metricName: "isolation_group_healthy", metricType: Counter},
ValidatedWorkflowCount: {metricName: "task_validator_count", metricType: Counter},
HashringViewIdentifier: {metricName: "hashring_view_identifier", metricType: Counter},
},
History: {
TaskRequests: {metricName: "task_requests", metricType: Counter},
Expand Down
12 changes: 12 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ const (
kafkaPartition = "kafkaPartition"
transport = "transport"
caller = "caller"
service = "service"
signalName = "signalName"
workflowVersion = "workflow_version"
shardID = "shard_id"
matchingHost = "matching_host"
host = "host"
pollerIsolationGroup = "poller_isolation_group"
asyncWFRequestType = "async_wf_request_type"

Expand Down Expand Up @@ -201,6 +203,16 @@ func CallerTag(value string) Tag {
return simpleMetric{key: caller, value: value}
}

// CallerTag returns a new RPC Caller type tag.
func ServiceTag(value string) Tag {
return simpleMetric{key: service, value: value}
}

// Hosttag emits the host identifier
func HostTag(value string) Tag {
return metricWithUnknown(host, value)
}

// SignalNameTag returns a new SignalName tag
func SignalNameTag(value string) Tag {
return metricWithUnknown(signalName, value)
Expand Down

0 comments on commit ff95842

Please sign in to comment.