Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir committed Nov 12, 2024
1 parent b8cc197 commit 03318db
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions common/rpc/direct_peer_chooser.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,20 @@ func newDirectChooser(
metricsCl metrics.Client,
enableConnRetainMode dynamicconfig.BoolPropertyFn,
) *directPeerChooser {
return &directPeerChooser{
dpc := &directPeerChooser{
serviceName: serviceName,
logger: logger.WithTags(tag.DestService(serviceName)),
scope: metricsCl.Scope(metrics.P2PRPCPeerChooserScope).Tagged(metrics.DestServiceTag(serviceName)),
t: t,
enableConnRetainMode: enableConnRetainMode,
peers: make(map[string]peer.Peer),
}

if dpc.enableConnRetainMode == nil {
dpc.enableConnRetainMode = func(opts ...dynamicconfig.FilterOption) bool { return false }

Check warning on line 79 in common/rpc/direct_peer_chooser.go

View check run for this annotation

Codecov / codecov/patch

common/rpc/direct_peer_chooser.go#L79

Added line #L79 was not covered by tests
}

return dpc
}

// Start statisfies the peer.Chooser interface.
Expand All @@ -82,9 +88,15 @@ func (g *directPeerChooser) Start() (err error) {
return nil
}

defer func() { g.logger.Info("direct peer chooser started", tag.Error(err)) }()
defer func() {
if err != nil {
g.logger.Error("direct peer chooser failed to start", tag.Error(err))
return
}

Check warning on line 95 in common/rpc/direct_peer_chooser.go

View check run for this annotation

Codecov / codecov/patch

common/rpc/direct_peer_chooser.go#L93-L95

Added lines #L93 - L95 were not covered by tests
g.logger.Info("direct peer chooser started")
}()

if g.enableConnRetainMode != nil && !g.enableConnRetainMode() {
if !g.enableConnRetainMode() {
c, ok := g.getLegacyChooser()
if ok {
return c.Start()
Expand Down Expand Up @@ -121,7 +133,7 @@ func (g *directPeerChooser) IsRunning() bool {
return false
}

if g.enableConnRetainMode != nil && !g.enableConnRetainMode() {
if !g.enableConnRetainMode() {
c, ok := g.getLegacyChooser()
if ok {
return c.IsRunning()
Expand All @@ -135,7 +147,7 @@ func (g *directPeerChooser) IsRunning() bool {
// Choose returns an existing peer for the shard key.
// ShardKey is {host}:{port} of the peer. It could be tchannel or grpc address.
func (g *directPeerChooser) Choose(ctx context.Context, req *transport.Request) (peer peer.Peer, onFinish func(error), err error) {
if g.enableConnRetainMode != nil && !g.enableConnRetainMode() {
if !g.enableConnRetainMode() {
return g.chooseFromLegacyDirectPeerChooser(ctx, req)
}

Expand Down Expand Up @@ -211,9 +223,11 @@ func (g *directPeerChooser) updatePeersInternal(members []membership.HostInfo) {
}

func (g *directPeerChooser) removePeer(addr string) {
g.mu.RLock()
if err := g.t.ReleasePeer(g.peers[addr], noOpSubscriberInstance); err != nil {
g.logger.Error("failed to release peer", tag.Error(err), tag.Address(addr))
}

Check warning on line 229 in common/rpc/direct_peer_chooser.go

View check run for this annotation

Codecov / codecov/patch

common/rpc/direct_peer_chooser.go#L228-L229

Added lines #L228 - L229 were not covered by tests
g.mu.RUnlock()

g.mu.Lock()
defer g.mu.Unlock()
Expand Down

0 comments on commit 03318db

Please sign in to comment.