From ff95842fb10d2d9630bbd1665d3d02430b447d9c Mon Sep 17 00:00:00 2001 From: David Porter Date: Mon, 19 Feb 2024 15:03:38 -0800 Subject: [PATCH] Emits a counter value for every unique view of the hashring (#5672) Context and problem The consistent hashring is an eventually consistent way to see the services' members for routing in amongst shards. It's an eventually-consistent system (since actual strong consistency for membership is controlled by the database shard locking). However, it's possible that it can be very eventually consistent, particularly if the membership resolving system is serving stale data for some reason. This can cause problems because particularly in the case of history, it breaks inter-service communication and routing. Solution A dumb way to determine if there's an inconsistency therefore, is to just hash the complete membership set and emit it for each host, like a fingerprint. In the healthy case, this random identifier value will quickly converge across hosts. . In the event their views are inconsistent, this will appear as a different guage values which remain persistently different, indicating that operationally some manual operation must be taken. querying the data Assuming m3, the trick will be to look for differences between the identifier values. The value itself is just a hash value and arbitrary. Therefore, selecting differences between the upper bound thusly: max = fetch service:cadence* operation:hashring region:region1 service:cadence-history | maxSeries; min = fetch service:cadence* operation:hashring region:region1 service:cadence-history | minSeries; -- select where they're different max | != (min) -- and emit -1 for the default case where the upper value and lower value are all the same | transformNull -1 --- cmd/server/cadence/server.go | 4 +- common/membership/hashring.go | 52 +++++++++++++++++-- common/membership/hashring_test.go | 80 ++++++++++++++++++++++++++---- common/membership/resolver.go | 6 +-- common/metrics/defs.go | 6 +++ common/metrics/tags.go | 12 +++++ 6 files changed, 141 insertions(+), 19 deletions(-) diff --git a/cmd/server/cadence/server.go b/cmd/server/cadence/server.go index 58c0a60e561..06331459a05 100644 --- a/cmd/server/cadence/server.go +++ b/cmd/server/cadence/server.go @@ -180,6 +180,8 @@ func (s *server) startService() common.Daemon { log.Fatalf("ringpop provider failed: %v", err) } + params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, params.Logger)) + params.MembershipResolver, err = membership.NewResolver( peerProvider, params.Logger, @@ -192,8 +194,6 @@ func (s *server) startService() common.Daemon { params.ClusterRedirectionPolicy = s.cfg.ClusterGroupMetadata.ClusterRedirectionPolicy - params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, params.Logger)) - params.ClusterMetadata = cluster.NewMetadata( clusterGroupMetadata.FailoverVersionIncrement, clusterGroupMetadata.PrimaryClusterName, diff --git a/common/membership/hashring.go b/common/membership/hashring.go index 7b2abe828a9..1b173b75d28 100644 --- a/common/membership/hashring.go +++ b/common/membership/hashring.go @@ -22,6 +22,8 @@ package membership import ( "fmt" + "sort" + "strings" "sync" "sync/atomic" "time" @@ -33,6 +35,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/types" ) @@ -64,6 +67,7 @@ type ring struct { refreshChan chan *ChangedEvent shutdownCh chan struct{} shutdownWG sync.WaitGroup + scope metrics.Scope logger log.Logger value atomic.Value // this stores the current hashring @@ -84,21 +88,23 @@ func newHashring( service string, provider PeerProvider, logger log.Logger, + scope metrics.Scope, ) *ring { - hashring := &ring{ + ring := &ring{ status: common.DaemonStatusInitialized, service: service, peerProvider: provider, shutdownCh: make(chan struct{}), logger: logger, refreshChan: make(chan *ChangedEvent), + scope: scope, } - hashring.members.keys = make(map[string]HostInfo) - hashring.subscribers.keys = make(map[string]chan<- *ChangedEvent) + ring.members.keys = make(map[string]HostInfo) + ring.subscribers.keys = make(map[string]chan<- *ChangedEvent) - hashring.value.Store(emptyHashring()) - return hashring + ring.value.Store(emptyHashring()) + return ring } func emptyHashring() *hashring.HashRing { @@ -264,6 +270,7 @@ func (r *ring) refreshRingWorker() { r.logger.Error("refreshing ring", tag.Error(err)) } case <-refreshTicker.C: // periodically refresh membership + r.emitHashIdentifier() if err := r.refresh(); err != nil { r.logger.Error("periodically refreshing ring", tag.Error(err)) } @@ -275,6 +282,41 @@ func (r *ring) ring() *hashring.HashRing { return r.value.Load().(*hashring.HashRing) } +func (r *ring) emitHashIdentifier() float64 { + members, err := r.peerProvider.GetMembers(r.service) + if err != nil { + r.logger.Error("Observed a problem getting peer members while emitting hash identifier metrics", tag.Error(err)) + return -1 + } + self, err := r.peerProvider.WhoAmI() + if err != nil { + r.logger.Error("Observed a problem looking up self from the membership provider while emitting hash identifier metrics", tag.Error(err)) + self = HostInfo{ + identity: "unknown", + } + } + + sort.Slice(members, func(i int, j int) bool { + return members[i].addr > members[j].addr + }) + var sb strings.Builder + for i := range members { + sb.WriteString(members[i].addr) + sb.WriteString("\n") + } + hashedView := farm.Hash32([]byte(sb.String())) + // Trimming the metric because collisions are unlikely and I didn't want to use the full Float64 + // in-case it overflowed something. The number itself is meaningless, so additional precision + // doesn't really give any advantage, besides reducing the risk of collision + trimmedForMetric := float64(hashedView % 1000) + r.logger.Debug("Hashring view", tag.Dynamic("hashring-view", sb.String()), tag.Dynamic("trimmed-hash-id", trimmedForMetric), tag.Service(r.service)) + r.scope.Tagged( + metrics.ServiceTag(r.service), + metrics.HostTag(self.identity), + ).UpdateGauge(metrics.HashringViewIdentifier, trimmedForMetric) + return trimmedForMetric +} + func (r *ring) compareMembers(members []HostInfo) (map[string]HostInfo, bool) { changed := false newMembersMap := make(map[string]HostInfo, len(members)) diff --git a/common/membership/hashring_test.go b/common/membership/hashring_test.go index 639777131a8..01be0b2e833 100644 --- a/common/membership/hashring_test.go +++ b/common/membership/hashring_test.go @@ -34,6 +34,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/metrics" ) var letters = []rune("abcdefghijklmnopqrstuvwxyz") @@ -103,7 +104,7 @@ func TestFailedLookupWillAskProvider(t *testing.T) { pp.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1) pp.EXPECT().GetMembers("test-service").Times(1) - hr := newHashring("test-service", pp, log.NewNoop()) + hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0)) hr.Start() _, err := hr.Lookup("a") @@ -117,7 +118,7 @@ func TestRefreshUpdatesRingOnlyWhenRingHasChanged(t *testing.T) { pp.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1) pp.EXPECT().GetMembers("test-service").Times(3) - hr := newHashring("test-service", pp, log.NewNoop()) + hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0)) hr.Start() hr.refresh() @@ -132,7 +133,7 @@ func TestSubscribeIgnoresDuplicates(t *testing.T) { ctrl := gomock.NewController(t) pp := NewMockPeerProvider(ctrl) - hr := newHashring("test-service", pp, log.NewNoop()) + hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0)) assert.NoError(t, hr.Subscribe("test-service", changeCh)) assert.Error(t, hr.Subscribe("test-service", changeCh)) @@ -143,7 +144,7 @@ func TestUnsubcribeIgnoresDeletionOnEmpty(t *testing.T) { ctrl := gomock.NewController(t) pp := NewMockPeerProvider(ctrl) - hr := newHashring("test-service", pp, log.NewNoop()) + hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0)) assert.Equal(t, 0, len(hr.subscribers.keys)) assert.NoError(t, hr.Unsubscribe("test-service")) assert.NoError(t, hr.Unsubscribe("test-service")) @@ -155,7 +156,7 @@ func TestUnsubcribeDeletes(t *testing.T) { pp := NewMockPeerProvider(ctrl) var changeCh = make(chan *ChangedEvent) - hr := newHashring("test-service", pp, log.NewNoop()) + hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0)) assert.Equal(t, 0, len(hr.subscribers.keys)) assert.NoError(t, hr.Subscribe("testservice1", changeCh)) @@ -171,7 +172,7 @@ func TestMemberCountReturnsNumber(t *testing.T) { ctrl := gomock.NewController(t) pp := NewMockPeerProvider(ctrl) - hr := newHashring("test-service", pp, log.NewNoop()) + hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0)) assert.Equal(t, 0, hr.MemberCount()) ring := emptyHashring() @@ -188,7 +189,7 @@ func TestErrorIsPropagatedWhenProviderFails(t *testing.T) { pp := NewMockPeerProvider(ctrl) pp.EXPECT().GetMembers(gomock.Any()).Return(nil, errors.New("error")) - hr := newHashring("test-service", pp, log.NewNoop()) + hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0)) assert.Error(t, hr.refresh()) } @@ -198,7 +199,7 @@ func TestStopWillStopProvider(t *testing.T) { pp.EXPECT().Stop().Times(1) - hr := newHashring("test-service", pp, log.NewNoop()) + hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0)) hr.status = common.DaemonStatusStarted hr.Stop() @@ -213,7 +214,7 @@ func TestLookupAndRefreshRaceCondition(t *testing.T) { pp.EXPECT().GetMembers("test-service").AnyTimes().DoAndReturn(func(service string) ([]HostInfo, error) { return randomHostInfo(5), nil }) - hr := newHashring("test-service", pp, log.NewNoop()) + hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0)) hr.Start() wg.Add(2) go func() { @@ -233,3 +234,64 @@ func TestLookupAndRefreshRaceCondition(t *testing.T) { wg.Wait() } + +func TestEmitHashringView(t *testing.T) { + + tests := map[string]struct { + hosts []HostInfo + lookuperr error + selfInfo HostInfo + selfErr error + expectedResult float64 + }{ + "example one - sorted set 1 - the output should be some random hashed value": { + hosts: []HostInfo{ + {addr: "10.0.0.1:1234", ip: "10.0.0.1", identity: "host1", portMap: nil}, + {addr: "10.0.0.2:1234", ip: "10.0.0.2", identity: "host2", portMap: nil}, + {addr: "10.0.0.3:1234", ip: "10.0.0.3", identity: "host3", portMap: nil}, + }, + selfInfo: HostInfo{identity: "host123"}, + expectedResult: 835.0, // the number here is meaningless + }, + "example one - unsorted set 1 - the order of the hosts should not matter": { + hosts: []HostInfo{ + {addr: "10.0.0.1:1234", ip: "10.0.0.1", identity: "host1", portMap: nil}, + {addr: "10.0.0.3:1234", ip: "10.0.0.3", identity: "host3", portMap: nil}, + {addr: "10.0.0.2:1234", ip: "10.0.0.2", identity: "host2", portMap: nil}, + }, + selfInfo: HostInfo{identity: "host123"}, + expectedResult: 835.0, // the test here is that it's the same as test 1 + }, + "example 2 - empty set": { + hosts: []HostInfo{}, + selfInfo: HostInfo{identity: "host123"}, + expectedResult: 242.0, // meaningless hash value + }, + "example 3 - nil set": { + hosts: nil, + selfInfo: HostInfo{identity: "host123"}, + expectedResult: 242.0, // meaningless hash value + }, + } + + for name, td := range tests { + + t.Run(name, func(t *testing.T) { + + ctrl := gomock.NewController(t) + pp := NewMockPeerProvider(ctrl) + + pp.EXPECT().GetMembers("test-service").DoAndReturn(func(service string) ([]HostInfo, error) { + return td.hosts, td.lookuperr + }) + + pp.EXPECT().WhoAmI().DoAndReturn(func() (HostInfo, error) { + return td.selfInfo, td.selfErr + }) + + hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0)) + + assert.Equal(t, td.expectedResult, hr.emitHashIdentifier()) + }) + } +} diff --git a/common/membership/resolver.go b/common/membership/resolver.go index 42c3e3c76be..f4d5906c0a1 100644 --- a/common/membership/resolver.go +++ b/common/membership/resolver.go @@ -103,17 +103,17 @@ func NewMultiringResolver( services []string, provider PeerProvider, logger log.Logger, - metrics metrics.Client, + metricsClient metrics.Client, ) *MultiringResolver { rpo := &MultiringResolver{ status: common.DaemonStatusInitialized, provider: provider, rings: make(map[string]*ring), - metrics: metrics, + metrics: metricsClient, } for _, s := range services { - rpo.rings[s] = newHashring(s, provider, logger) + rpo.rings[s] = newHashring(s, provider, logger, metricsClient.Scope(metrics.HashringScope)) } return rpo } diff --git a/common/metrics/defs.go b/common/metrics/defs.go index e0da0aebbc0..3825f424d47 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -308,6 +308,8 @@ const ( // ResolverHostNotFoundScope is a simple low level error indicating a lookup failed in the membership resolver ResolverHostNotFoundScope + // HashringScope is a metrics scope for emitting events for the service hashrhing + HashringScope // HistoryClientStartWorkflowExecutionScope tracks RPC calls to history service HistoryClientStartWorkflowExecutionScope // HistoryClientDescribeHistoryHostScope tracks RPC calls to history service @@ -1707,6 +1709,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ TaskValidatorScope: {operation: "TaskValidation"}, DomainReplicationQueueScope: {operation: "DomainReplicationQueue"}, ClusterMetadataScope: {operation: "ClusterMetadata"}, + HashringScope: {operation: "Hashring"}, }, // Frontend Scope Names Frontend: { @@ -2174,6 +2177,8 @@ const ( IsolationGroupStateHealthy ValidatedWorkflowCount + HashringViewIdentifier + NumCommonMetrics // Needs to be last on this list for iota numbering ) @@ -2812,6 +2817,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ IsolationGroupStateDrained: {metricName: "isolation_group_drained", metricType: Counter}, IsolationGroupStateHealthy: {metricName: "isolation_group_healthy", metricType: Counter}, ValidatedWorkflowCount: {metricName: "task_validator_count", metricType: Counter}, + HashringViewIdentifier: {metricName: "hashring_view_identifier", metricType: Counter}, }, History: { TaskRequests: {metricName: "task_requests", metricType: Counter}, diff --git a/common/metrics/tags.go b/common/metrics/tags.go index 651a37770ea..a1a5a11dedf 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -51,10 +51,12 @@ const ( kafkaPartition = "kafkaPartition" transport = "transport" caller = "caller" + service = "service" signalName = "signalName" workflowVersion = "workflow_version" shardID = "shard_id" matchingHost = "matching_host" + host = "host" pollerIsolationGroup = "poller_isolation_group" asyncWFRequestType = "async_wf_request_type" @@ -201,6 +203,16 @@ func CallerTag(value string) Tag { return simpleMetric{key: caller, value: value} } +// CallerTag returns a new RPC Caller type tag. +func ServiceTag(value string) Tag { + return simpleMetric{key: service, value: value} +} + +// Hosttag emits the host identifier +func HostTag(value string) Tag { + return metricWithUnknown(host, value) +} + // SignalNameTag returns a new SignalName tag func SignalNameTag(value string) Tag { return metricWithUnknown(signalName, value)