From 89e0870e1012ce03afffa9b62173e57c43c1c654 Mon Sep 17 00:00:00 2001 From: taylan isikdemir Date: Wed, 6 Nov 2024 16:29:09 -0800 Subject: [PATCH] rpc factory should listen membership changes for matching & history --- common/rpc/direct_peer_chooser.go | 7 +++++- common/rpc/factory.go | 34 ++++++++++++++++----------- common/rpc/factory_test.go | 39 +++++++++++++++++++------------ common/rpc/outbounds.go | 12 +++++----- common/rpc/peer_chooser.go | 4 ++-- 5 files changed, 58 insertions(+), 38 deletions(-) diff --git a/common/rpc/direct_peer_chooser.go b/common/rpc/direct_peer_chooser.go index f1cf039bc40..9ad5f40d004 100644 --- a/common/rpc/direct_peer_chooser.go +++ b/common/rpc/direct_peer_chooser.go @@ -159,7 +159,12 @@ func (g *directPeerChooser) Choose(ctx context.Context, req *transport.Request) // UpdatePeers removes peers that are not in the members list. // Do not create actual yarpc peers for the members. They are created lazily when a request comes in (Choose is called). -func (g *directPeerChooser) UpdatePeers(members []membership.HostInfo) { +func (g *directPeerChooser) UpdatePeers(serviceName string, members []membership.HostInfo) { + if g.serviceName != serviceName { + // This is not the service this chooser is created for. Ignore such updates. + return + } + g.logger.Debug("direct peer chooser got a membership update", tag.Counter(len(members))) // If the chooser is not started, do not act on membership changes. diff --git a/common/rpc/factory.go b/common/rpc/factory.go index cd1a0b76ed0..b495bce5403 100644 --- a/common/rpc/factory.go +++ b/common/rpc/factory.go @@ -37,6 +37,7 @@ import ( "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/service" ) const ( @@ -176,22 +177,27 @@ func (d *FactoryImpl) GetMaxMessageSize() int { } func (d *FactoryImpl) Start(peerLister PeerLister) error { - // subscribe to membership changes and notify outbounds builder for peer updates d.peerLister = peerLister - ch := make(chan *membership.ChangedEvent, 1) - if err := d.peerLister.Subscribe(d.serviceName, factoryComponentName, ch); err != nil { - return fmt.Errorf("rpc factory failed to subscribe to membership updates: %v", err) + // subscribe to membership changes for history and matching. This is needed to update the peers for rpc + for _, svc := range []string{service.History, service.Matching} { + ch := make(chan *membership.ChangedEvent, 1) + if err := d.peerLister.Subscribe(svc, factoryComponentName, ch); err != nil { + return fmt.Errorf("rpc factory failed to subscribe to membership updates for svc: %v, err: %v", svc, err) + } + d.wg.Add(1) + go d.listenMembershipChanges(svc, ch) } - d.wg.Add(1) - go d.listenMembershipChanges(ch) return nil } func (d *FactoryImpl) Stop() error { d.logger.Info("stopping rpc factory") - if err := d.peerLister.Unsubscribe(d.serviceName, factoryComponentName); err != nil { - d.logger.Error("rpc factory failed to unsubscribe from membership updates", tag.Error(err)) + + for _, svc := range []string{service.History, service.Matching} { + if err := d.peerLister.Unsubscribe(svc, factoryComponentName); err != nil { + d.logger.Error("rpc factory failed to unsubscribe from membership updates", tag.Error(err), tag.Service(svc)) + } } d.cancelFn() @@ -201,22 +207,22 @@ func (d *FactoryImpl) Stop() error { return nil } -func (d *FactoryImpl) listenMembershipChanges(ch chan *membership.ChangedEvent) { +func (d *FactoryImpl) listenMembershipChanges(svc string, ch chan *membership.ChangedEvent) { defer d.wg.Done() for { select { case <-ch: - d.logger.Debug("rpc factory received membership changed event") - members, err := d.peerLister.Members(d.serviceName) + d.logger.Debug("rpc factory received membership changed event", tag.Service(svc)) + members, err := d.peerLister.Members(svc) if err != nil { - d.logger.Error("rpc factory failed to get members from membership resolver", tag.Error(err)) + d.logger.Error("rpc factory failed to get members from membership resolver", tag.Error(err), tag.Service(svc)) continue } - d.outbounds.UpdatePeers(members) + d.outbounds.UpdatePeers(svc, members) case <-d.ctx.Done(): - d.logger.Info("rpc factory stopped so listenMembershipChanges returning") + d.logger.Info("rpc factory stopped so listenMembershipChanges returning", tag.Service(svc)) return } } diff --git a/common/rpc/factory_test.go b/common/rpc/factory_test.go index 6b039cc8116..75c5d60de82 100644 --- a/common/rpc/factory_test.go +++ b/common/rpc/factory_test.go @@ -31,6 +31,7 @@ import ( "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/service" ) func TestNewFactory(t *testing.T) { @@ -68,12 +69,12 @@ func TestStartStop(t *testing.T) { serviceName := "service" ob := NewMockOutboundsBuilder(ctrl) var mu sync.Mutex - var gotMembers []membership.HostInfo + gotMembers := make(map[string][]membership.HostInfo) outbounds := &Outbounds{ - onUpdatePeers: func(members []membership.HostInfo) { + onUpdatePeers: func(svc string, members []membership.HostInfo) { mu.Lock() defer mu.Unlock() - gotMembers = members + gotMembers[svc] = members }, } ob.EXPECT().Build(gomock.Any(), gomock.Any()).Return(outbounds, nil).Times(1) @@ -89,19 +90,27 @@ func TestStartStop(t *testing.T) { OutboundsBuilder: ob, }) - members := []membership.HostInfo{ - membership.NewHostInfo("localhost:9191"), - membership.NewHostInfo("localhost:9192"), + membersBySvc := map[string][]membership.HostInfo{ + service.Matching: { + membership.NewHostInfo("localhost:9191"), + membership.NewHostInfo("localhost:9192"), + }, + service.History: { + membership.NewHostInfo("localhost:8585"), + }, } + peerLister := membership.NewMockResolver(ctrl) - peerLister.EXPECT().Subscribe(serviceName, factoryComponentName, gomock.Any()). - DoAndReturn(func(service, name string, notifyChannel chan<- *membership.ChangedEvent) error { - // Notify the channel once to validate listening logic is working - notifyChannel <- &membership.ChangedEvent{} - return nil - }).Times(1) - peerLister.EXPECT().Unsubscribe(serviceName, factoryComponentName).Return(nil).Times(1) - peerLister.EXPECT().Members(serviceName).Return(members, nil).Times(1) + for _, svc := range []string{service.Matching, service.History} { + peerLister.EXPECT().Subscribe(svc, factoryComponentName, gomock.Any()). + DoAndReturn(func(service, name string, notifyChannel chan<- *membership.ChangedEvent) error { + // Notify the channel once to validate listening logic is working + notifyChannel <- &membership.ChangedEvent{} + return nil + }).Times(1) + peerLister.EXPECT().Unsubscribe(svc, factoryComponentName).Return(nil).Times(1) + peerLister.EXPECT().Members(svc).Return(membersBySvc[svc], nil).Times(1) + } if err := f.Start(peerLister); err != nil { t.Fatalf("Factory.Start() returned error: %v", err) @@ -110,7 +119,7 @@ func TestStartStop(t *testing.T) { // Wait for membership changes to be processed time.Sleep(100 * time.Millisecond) mu.Lock() - assert.Equal(t, members, gotMembers, "UpdatePeers not called with expected members") + assert.Equal(t, membersBySvc, gotMembers, "UpdatePeers not called with expected members") mu.Unlock() if err := f.Stop(); err != nil { diff --git a/common/rpc/outbounds.go b/common/rpc/outbounds.go index 8c17122744b..ccd02a6d17a 100644 --- a/common/rpc/outbounds.go +++ b/common/rpc/outbounds.go @@ -55,12 +55,12 @@ type OutboundsBuilder interface { type Outbounds struct { yarpc.Outbounds - onUpdatePeers func([]membership.HostInfo) + onUpdatePeers func(serviceName string, members []membership.HostInfo) } -func (o *Outbounds) UpdatePeers(peers []membership.HostInfo) { +func (o *Outbounds) UpdatePeers(serviceName string, peers []membership.HostInfo) { if o.onUpdatePeers != nil { - o.onUpdatePeers(peers) + o.onUpdatePeers(serviceName, peers) } } @@ -76,7 +76,7 @@ func CombineOutbounds(builders ...OutboundsBuilder) OutboundsBuilder { func (b multiOutboundsBuilder) Build(grpc *grpc.Transport, tchannel *tchannel.Transport) (*Outbounds, error) { outbounds := yarpc.Outbounds{} var errs error - var callbacks []func([]membership.HostInfo) + var callbacks []func(string, []membership.HostInfo) for _, builder := range b.builders { builderOutbounds, err := builder.Build(grpc, tchannel) if err != nil { @@ -99,9 +99,9 @@ func (b multiOutboundsBuilder) Build(grpc *grpc.Transport, tchannel *tchannel.Tr return &Outbounds{ Outbounds: outbounds, - onUpdatePeers: func(peers []membership.HostInfo) { + onUpdatePeers: func(serviceName string, members []membership.HostInfo) { for _, callback := range callbacks { - callback(peers) + callback(serviceName, members) } }, }, errs diff --git a/common/rpc/peer_chooser.go b/common/rpc/peer_chooser.go index aea6185ad64..6de39a9c972 100644 --- a/common/rpc/peer_chooser.go +++ b/common/rpc/peer_chooser.go @@ -57,7 +57,7 @@ type ( peer.Chooser // UpdatePeers updates the list of peers if needed. - UpdatePeers([]membership.HostInfo) + UpdatePeers(serviceName string, members []membership.HostInfo) } dnsPeerChooserFactory struct { @@ -78,7 +78,7 @@ type defaultPeerChooser struct { } // UpdatePeers is a no-op for defaultPeerChooser. It is added to satisfy the PeerChooser interface. -func (d *defaultPeerChooser) UpdatePeers(peers []membership.HostInfo) {} +func (d *defaultPeerChooser) UpdatePeers(string, []membership.HostInfo) {} func NewDNSPeerChooserFactory(interval time.Duration, logger log.Logger) PeerChooserFactory { if interval <= 0 {