Skip to content

Commit

Permalink
Connection retaining mode for p2p peer chooser (uber#6471)
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir authored Nov 12, 2024
1 parent 9d328c7 commit d2d1d47
Show file tree
Hide file tree
Showing 18 changed files with 564 additions and 146 deletions.
5 changes: 2 additions & 3 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ func (s *server) startService() common.Daemon {
)

params.MetricScope = svcCfg.Metrics.NewScope(params.Logger, params.Name)
params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, params.Logger))

rpcParams, err := rpc.NewParams(params.Name, s.cfg, dc, params.Logger)
rpcParams, err := rpc.NewParams(params.Name, s.cfg, dc, params.Logger, params.MetricsClient)
if err != nil {
log.Fatalf("error creating rpc factory params: %v", err)
}
Expand All @@ -182,8 +183,6 @@ func (s *server) startService() common.Daemon {
log.Fatalf("ringpop provider failed: %v", err)
}

params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, params.Logger))

params.MembershipResolver, err = membership.NewResolver(
peerProvider,
params.Logger,
Expand Down
5 changes: 5 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,11 @@ func Service(sv string) Tag {
return newStringTag("service", sv)
}

// DestService returns tag for destination service
func DestService(sv string) Tag {
return newStringTag("dest-service", sv)
}

// Addresses returns tag for Addresses
func Addresses(ads []string) Tag {
return newObjectTag("addresses", ads)
Expand Down
14 changes: 14 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,9 @@ const (
// GlobalRatelimiterAggregator is the metrics scope for aggregator-side common/quotas/global behavior
GlobalRatelimiterAggregator

// P2PRPCPeerChooserScope is the metrics scope for P2P RPC peer chooser
P2PRPCPeerChooserScope

NumCommonScopes
)

Expand Down Expand Up @@ -1741,6 +1744,8 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
// currently used by both frontend and history, but may grow to other limiting-host-services.
GlobalRatelimiter: {operation: "GlobalRatelimiter"},
GlobalRatelimiterAggregator: {operation: "GlobalRatelimiterAggregator"},

P2PRPCPeerChooserScope: {operation: "P2PRPCPeerChooser"},
},
// Frontend Scope Names
Frontend: {
Expand Down Expand Up @@ -2243,6 +2248,11 @@ const (
GlobalRatelimiterRemovedLimits
GlobalRatelimiterRemovedHostLimits

// p2p rpc metrics
P2PPeersCount
P2PPeerAdded
P2PPeerRemoved

NumCommonMetrics // Needs to be last on this list for iota numbering
)

Expand Down Expand Up @@ -2936,6 +2946,10 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
GlobalRatelimiterHostLimitsQueried: {metricName: "global_ratelimiter_host_limits_queried", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
GlobalRatelimiterRemovedLimits: {metricName: "global_ratelimiter_removed_limits", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
GlobalRatelimiterRemovedHostLimits: {metricName: "global_ratelimiter_removed_host_limits", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},

P2PPeersCount: {metricName: "peers_count", metricType: Gauge},
P2PPeerAdded: {metricName: "peer_added", metricType: Counter},
P2PPeerRemoved: {metricName: "peer_removed", metricType: Counter},
},
History: {
TaskRequests: {metricName: "task_requests", metricType: Counter},
Expand Down
8 changes: 7 additions & 1 deletion common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const (
transport = "transport"
caller = "caller"
service = "service"
destService = "dest_service"
signalName = "signalName"
workflowVersion = "workflow_version"
shardID = "shard_id"
Expand Down Expand Up @@ -219,11 +220,16 @@ func CallerTag(value string) Tag {
return simpleMetric{key: caller, value: value}
}

// CallerTag returns a new RPC Caller type tag.
// ServiceTag returns a new service tag.
func ServiceTag(value string) Tag {
return simpleMetric{key: service, value: value}
}

// DestServiceTag returns a new destination service tag.
func DestServiceTag(value string) Tag {
return simpleMetric{key: destService, value: value}
}

// Hosttag emits the host identifier
func HostTag(value string) Tag {
return metricWithUnknown(host, value)
Expand Down
4 changes: 3 additions & 1 deletion common/resource/resource_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,9 @@ func (h *Impl) Start() {
h.logger.WithTags(tag.Error(err)).Fatal("fail to start PProf")
}

h.rpcFactory.Start(h.membershipResolver)
if err := h.rpcFactory.Start(h.membershipResolver); err != nil {
h.logger.WithTags(tag.Error(err)).Fatal("fail to start RPC factory")
}

if err := h.dispatcher.Start(); err != nil {
h.logger.WithTags(tag.Error(err)).Fatal("fail to start dispatcher")
Expand Down
2 changes: 1 addition & 1 deletion common/resource/resource_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestStartStop(t *testing.T) {
"primary-cluster": {InitialFailoverVersion: 1, Enabled: true, RPCTransport: "tchannel", RPCAddress: "localhost:0"},
"secondary-cluster": {InitialFailoverVersion: 1, Enabled: true, RPCTransport: "tchannel", RPCAddress: "localhost:0"},
}, nil, metricsCl, logger)
directOutboundPCF := rpc.NewDirectPeerChooserFactory(serviceName, logger)
directOutboundPCF := rpc.NewDirectPeerChooserFactory(serviceName, logger, metricsCl)
directConnRetainFn := func(opts ...dynamicconfig.FilterOption) bool { return false }
pcf := rpc.NewMockPeerChooserFactory(ctrl)
peerChooser := rpc.NewMockPeerChooser(ctrl)
Expand Down
Loading

0 comments on commit d2d1d47

Please sign in to comment.