Skip to content

Commit

Permalink
final touches
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir committed Nov 7, 2024
1 parent 108787b commit 16710bb
Showing 1 changed file with 10 additions and 13 deletions.
23 changes: 10 additions & 13 deletions common/rpc/direct_peer_chooser.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,20 @@ func newDirectChooser(
}

// Start statisfies the peer.Chooser interface.
func (g *directPeerChooser) Start() error {
func (g *directPeerChooser) Start() (err error) {
if !atomic.CompareAndSwapInt32(&g.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return nil
}

defer func() { g.logger.Info("direct peer chooser started") }()
defer func() { g.logger.Info("direct peer chooser started", tag.Error(err)) }()

if g.enableConnRetainMode != nil && !g.enableConnRetainMode() {
c, ok := g.getLegacyChooser()
if ok {
return c.Start()
}

return fmt.Errorf("failed to start direct peer chooser because direct peer chooser initialization failed, err: %v", g.legacyChooserErr)

Check warning on line 93 in common/rpc/direct_peer_chooser.go

View check run for this annotation

Codecov / codecov/patch

common/rpc/direct_peer_chooser.go#L93

Added line #L93 was not covered by tests
}

return nil
Expand Down Expand Up @@ -162,13 +164,11 @@ func (g *directPeerChooser) Choose(ctx context.Context, req *transport.Request)
// Do not create actual yarpc peers for the members. They are created lazily when a request comes in (Choose is called).
func (g *directPeerChooser) UpdatePeers(serviceName string, members []membership.HostInfo) {
if g.serviceName != serviceName {
// TODO: convert to debug log
g.logger.Info("This is not the service this chooser is created for. Ignore such updates.", tag.Dynamic("members-service", serviceName))
g.logger.Debug("This is not the service chooser is created for. Ignore such updates.", tag.Dynamic("members-service", serviceName))
return
}

// TODO: convert to debug log
g.logger.Info("direct peer chooser got a membership update", tag.Counter(len(members)))
g.logger.Debug("direct peer chooser got a membership update", tag.Counter(len(members)))

// If the chooser is not started, do not act on membership changes.
// If membership updates arrive after chooser is stopped, ignore them.
Expand Down Expand Up @@ -201,8 +201,7 @@ func (g *directPeerChooser) updatePeersInternal(members []membership.HostInfo) {
}
g.mu.RUnlock()

// TODO: remove this log after verifying the behavior
g.logger.Info(fmt.Sprintf("valid peers: %v, current peers: %v", validPeerAddresses, peers))
g.logger.Debugf("valid peers: %v, current peers: %v", validPeerAddresses, peers)

for addr := range peers {
if !validPeerAddresses[addr] {
Expand All @@ -220,7 +219,6 @@ func (g *directPeerChooser) removePeer(addr string) {
defer g.mu.Unlock()

delete(g.peers, addr)
// TODO: change to debug level
g.logger.Info("removed peer from direct peer chooser", tag.Address(addr))
g.scope.IncCounter(metrics.P2PPeerRemoved)
g.scope.UpdateGauge(metrics.P2PPeersCount, float64(len(g.peers)))
Expand All @@ -238,7 +236,6 @@ func (g *directPeerChooser) addPeer(addr string) (peer.Peer, error) {
return nil, err
}

Check warning on line 237 in common/rpc/direct_peer_chooser.go

View check run for this annotation

Codecov / codecov/patch

common/rpc/direct_peer_chooser.go#L236-L237

Added lines #L236 - L237 were not covered by tests
g.peers[addr] = p
// TODO: change to debug level
g.logger.Info("added peer to direct peer chooser", tag.Address(addr))
g.scope.IncCounter(metrics.P2PPeerAdded)
g.scope.UpdateGauge(metrics.P2PPeersCount, float64(len(g.peers)))
Expand All @@ -248,7 +245,7 @@ func (g *directPeerChooser) addPeer(addr string) (peer.Peer, error) {
func (g *directPeerChooser) chooseFromLegacyDirectPeerChooser(ctx context.Context, req *transport.Request) (peer.Peer, func(error), error) {
c, ok := g.getLegacyChooser()
if !ok {
return nil, nil, yarpcerrors.InternalErrorf("failed to get legacy direct peer chooser")
return nil, nil, yarpcerrors.InternalErrorf("failed to get legacy direct peer chooser, err: %v", g.legacyChooserErr)

Check warning on line 248 in common/rpc/direct_peer_chooser.go

View check run for this annotation

Codecov / codecov/patch

common/rpc/direct_peer_chooser.go#L248

Added line #L248 was not covered by tests
}

return c.Choose(ctx, req)
Expand Down Expand Up @@ -281,7 +278,7 @@ func (g *directPeerChooser) getLegacyChooser() (peer.Chooser, bool) {
}

if atomic.LoadInt32(&g.status) == common.DaemonStatusStarted {
// Start the legacy chooser if the current chooser is started
// Start the legacy chooser if the current chooser is already started
if err := g.legacyChooser.Start(); err != nil {
g.logger.Error("failed to start legacy direct peer chooser", tag.Error(err))
return nil, false
Expand All @@ -291,7 +288,7 @@ func (g *directPeerChooser) getLegacyChooser() (peer.Chooser, bool) {
return g.legacyChooser, true
}

// noOpSubscriber is a peer.Subscriber that does nothing.
// noOpSubscriber is a no-op implementation of peer.Subscriber
type noOpSubscriber struct{}

func (*noOpSubscriber) NotifyStatusChanged(peer.Identifier) {}

0 comments on commit 16710bb

Please sign in to comment.