Skip to content

Commit

Permalink
rpc factory should listen membership changes for matching & history
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir committed Nov 7, 2024
1 parent 054cd89 commit 89e0870
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 38 deletions.
7 changes: 6 additions & 1 deletion common/rpc/direct_peer_chooser.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,12 @@ func (g *directPeerChooser) Choose(ctx context.Context, req *transport.Request)

// UpdatePeers removes peers that are not in the members list.
// Do not create actual yarpc peers for the members. They are created lazily when a request comes in (Choose is called).
func (g *directPeerChooser) UpdatePeers(members []membership.HostInfo) {
func (g *directPeerChooser) UpdatePeers(serviceName string, members []membership.HostInfo) {
if g.serviceName != serviceName {
// This is not the service this chooser is created for. Ignore such updates.
return
}

g.logger.Debug("direct peer chooser got a membership update", tag.Counter(len(members)))

// If the chooser is not started, do not act on membership changes.
Expand Down
34 changes: 20 additions & 14 deletions common/rpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/service"
)

const (
Expand Down Expand Up @@ -176,22 +177,27 @@ func (d *FactoryImpl) GetMaxMessageSize() int {
}

func (d *FactoryImpl) Start(peerLister PeerLister) error {
// subscribe to membership changes and notify outbounds builder for peer updates
d.peerLister = peerLister
ch := make(chan *membership.ChangedEvent, 1)
if err := d.peerLister.Subscribe(d.serviceName, factoryComponentName, ch); err != nil {
return fmt.Errorf("rpc factory failed to subscribe to membership updates: %v", err)
// subscribe to membership changes for history and matching. This is needed to update the peers for rpc
for _, svc := range []string{service.History, service.Matching} {
ch := make(chan *membership.ChangedEvent, 1)
if err := d.peerLister.Subscribe(svc, factoryComponentName, ch); err != nil {
return fmt.Errorf("rpc factory failed to subscribe to membership updates for svc: %v, err: %v", svc, err)
}
d.wg.Add(1)
go d.listenMembershipChanges(svc, ch)
}
d.wg.Add(1)
go d.listenMembershipChanges(ch)

return nil
}

func (d *FactoryImpl) Stop() error {
d.logger.Info("stopping rpc factory")
if err := d.peerLister.Unsubscribe(d.serviceName, factoryComponentName); err != nil {
d.logger.Error("rpc factory failed to unsubscribe from membership updates", tag.Error(err))

for _, svc := range []string{service.History, service.Matching} {
if err := d.peerLister.Unsubscribe(svc, factoryComponentName); err != nil {
d.logger.Error("rpc factory failed to unsubscribe from membership updates", tag.Error(err), tag.Service(svc))
}
}

d.cancelFn()
Expand All @@ -201,22 +207,22 @@ func (d *FactoryImpl) Stop() error {
return nil
}

func (d *FactoryImpl) listenMembershipChanges(ch chan *membership.ChangedEvent) {
func (d *FactoryImpl) listenMembershipChanges(svc string, ch chan *membership.ChangedEvent) {
defer d.wg.Done()

for {
select {
case <-ch:
d.logger.Debug("rpc factory received membership changed event")
members, err := d.peerLister.Members(d.serviceName)
d.logger.Debug("rpc factory received membership changed event", tag.Service(svc))
members, err := d.peerLister.Members(svc)
if err != nil {
d.logger.Error("rpc factory failed to get members from membership resolver", tag.Error(err))
d.logger.Error("rpc factory failed to get members from membership resolver", tag.Error(err), tag.Service(svc))
continue
}

d.outbounds.UpdatePeers(members)
d.outbounds.UpdatePeers(svc, members)
case <-d.ctx.Done():
d.logger.Info("rpc factory stopped so listenMembershipChanges returning")
d.logger.Info("rpc factory stopped so listenMembershipChanges returning", tag.Service(svc))
return
}
}
Expand Down
39 changes: 24 additions & 15 deletions common/rpc/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/service"
)

func TestNewFactory(t *testing.T) {
Expand Down Expand Up @@ -68,12 +69,12 @@ func TestStartStop(t *testing.T) {
serviceName := "service"
ob := NewMockOutboundsBuilder(ctrl)
var mu sync.Mutex
var gotMembers []membership.HostInfo
gotMembers := make(map[string][]membership.HostInfo)
outbounds := &Outbounds{
onUpdatePeers: func(members []membership.HostInfo) {
onUpdatePeers: func(svc string, members []membership.HostInfo) {
mu.Lock()
defer mu.Unlock()
gotMembers = members
gotMembers[svc] = members
},
}
ob.EXPECT().Build(gomock.Any(), gomock.Any()).Return(outbounds, nil).Times(1)
Expand All @@ -89,19 +90,27 @@ func TestStartStop(t *testing.T) {
OutboundsBuilder: ob,
})

members := []membership.HostInfo{
membership.NewHostInfo("localhost:9191"),
membership.NewHostInfo("localhost:9192"),
membersBySvc := map[string][]membership.HostInfo{
service.Matching: {
membership.NewHostInfo("localhost:9191"),
membership.NewHostInfo("localhost:9192"),
},
service.History: {
membership.NewHostInfo("localhost:8585"),
},
}

peerLister := membership.NewMockResolver(ctrl)
peerLister.EXPECT().Subscribe(serviceName, factoryComponentName, gomock.Any()).
DoAndReturn(func(service, name string, notifyChannel chan<- *membership.ChangedEvent) error {
// Notify the channel once to validate listening logic is working
notifyChannel <- &membership.ChangedEvent{}
return nil
}).Times(1)
peerLister.EXPECT().Unsubscribe(serviceName, factoryComponentName).Return(nil).Times(1)
peerLister.EXPECT().Members(serviceName).Return(members, nil).Times(1)
for _, svc := range []string{service.Matching, service.History} {
peerLister.EXPECT().Subscribe(svc, factoryComponentName, gomock.Any()).
DoAndReturn(func(service, name string, notifyChannel chan<- *membership.ChangedEvent) error {
// Notify the channel once to validate listening logic is working
notifyChannel <- &membership.ChangedEvent{}
return nil
}).Times(1)
peerLister.EXPECT().Unsubscribe(svc, factoryComponentName).Return(nil).Times(1)
peerLister.EXPECT().Members(svc).Return(membersBySvc[svc], nil).Times(1)
}

if err := f.Start(peerLister); err != nil {
t.Fatalf("Factory.Start() returned error: %v", err)
Expand All @@ -110,7 +119,7 @@ func TestStartStop(t *testing.T) {
// Wait for membership changes to be processed
time.Sleep(100 * time.Millisecond)
mu.Lock()
assert.Equal(t, members, gotMembers, "UpdatePeers not called with expected members")
assert.Equal(t, membersBySvc, gotMembers, "UpdatePeers not called with expected members")
mu.Unlock()

if err := f.Stop(); err != nil {
Expand Down
12 changes: 6 additions & 6 deletions common/rpc/outbounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ type OutboundsBuilder interface {

type Outbounds struct {
yarpc.Outbounds
onUpdatePeers func([]membership.HostInfo)
onUpdatePeers func(serviceName string, members []membership.HostInfo)
}

func (o *Outbounds) UpdatePeers(peers []membership.HostInfo) {
func (o *Outbounds) UpdatePeers(serviceName string, peers []membership.HostInfo) {
if o.onUpdatePeers != nil {
o.onUpdatePeers(peers)
o.onUpdatePeers(serviceName, peers)
}
}

Expand All @@ -76,7 +76,7 @@ func CombineOutbounds(builders ...OutboundsBuilder) OutboundsBuilder {
func (b multiOutboundsBuilder) Build(grpc *grpc.Transport, tchannel *tchannel.Transport) (*Outbounds, error) {
outbounds := yarpc.Outbounds{}
var errs error
var callbacks []func([]membership.HostInfo)
var callbacks []func(string, []membership.HostInfo)
for _, builder := range b.builders {
builderOutbounds, err := builder.Build(grpc, tchannel)
if err != nil {
Expand All @@ -99,9 +99,9 @@ func (b multiOutboundsBuilder) Build(grpc *grpc.Transport, tchannel *tchannel.Tr

return &Outbounds{
Outbounds: outbounds,
onUpdatePeers: func(peers []membership.HostInfo) {
onUpdatePeers: func(serviceName string, members []membership.HostInfo) {
for _, callback := range callbacks {
callback(peers)
callback(serviceName, members)
}
},
}, errs
Expand Down
4 changes: 2 additions & 2 deletions common/rpc/peer_chooser.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type (
peer.Chooser

// UpdatePeers updates the list of peers if needed.
UpdatePeers([]membership.HostInfo)
UpdatePeers(serviceName string, members []membership.HostInfo)
}

dnsPeerChooserFactory struct {
Expand All @@ -78,7 +78,7 @@ type defaultPeerChooser struct {
}

// UpdatePeers is a no-op for defaultPeerChooser. It is added to satisfy the PeerChooser interface.
func (d *defaultPeerChooser) UpdatePeers(peers []membership.HostInfo) {}
func (d *defaultPeerChooser) UpdatePeers(string, []membership.HostInfo) {}

func NewDNSPeerChooserFactory(interval time.Duration, logger log.Logger) PeerChooserFactory {
if interval <= 0 {
Expand Down

0 comments on commit 89e0870

Please sign in to comment.