Skip to content

Commit

Permalink
migrate old RT metric impl to new RT impl
Browse files Browse the repository at this point in the history
  • Loading branch information
FoghostCn committed Aug 21, 2023
1 parent c9c5a46 commit a24153b
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 254 deletions.
111 changes: 7 additions & 104 deletions metrics/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@

package metrics

import (
"sync"
)

import (
"dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate"
)

var (
registries = make(map[string]func(*ReporterConfig) MetricRegistry)
collectors = make([]CollectorFunc, 0)
Expand Down Expand Up @@ -145,104 +137,15 @@ type ObservableMetric interface {
Observe(float64)
}

// StatesMetrics multi metrics,include total,success num, fail num,call MetricsRegistry save data
type StatesMetrics interface {
Success()
AddSuccess(float64)
Fail()
AddFailed(float64)
Inc(succ bool)
type BaseCollector struct {
R MetricRegistry
}

func NewStatesMetrics(total *MetricId, succ *MetricId, fail *MetricId, reg MetricRegistry) StatesMetrics {
return &DefaultStatesMetric{total: total, succ: succ, fail: fail, r: reg}
}

type DefaultStatesMetric struct {
r MetricRegistry
total, succ, fail *MetricId
}

func (c DefaultStatesMetric) Inc(succ bool) {
if succ {
c.Success()
func (c *BaseCollector) StateCount(total, succ, fail *MetricKey, level MetricLevel, succed bool) {
c.R.Counter(NewMetricId(total, level)).Inc()
if succed {
c.R.Counter(NewMetricId(succ, level)).Inc()
} else {
c.Fail()
}
}
func (c DefaultStatesMetric) Success() {
c.r.Counter(c.total).Inc()
c.r.Counter(c.succ).Inc()
}

func (c DefaultStatesMetric) AddSuccess(v float64) {
c.r.Counter(c.total).Add(v)
c.r.Counter(c.succ).Add(v)
}

func (c DefaultStatesMetric) Fail() {
c.r.Counter(c.total).Inc()
c.r.Counter(c.fail).Inc()
}

func (c DefaultStatesMetric) AddFailed(v float64) {
c.r.Counter(c.total).Add(v)
c.r.Counter(c.fail).Add(v)
}

// TimeMetric muliti metrics, include min(Gauge)、max(Gauge)、avg(Gauge)、sum(Gauge)、last(Gauge),call MetricRegistry to expose
// see dubbo-java org.apache.dubbo.metrics.aggregate.TimeWindowAggregator
type TimeMetric interface {
Record(float64)
}

const (
defaultBucketNum = 10
defaultTimeWindowSeconds = 120
)

// NewTimeMetric init and write all data to registry
func NewTimeMetric(min, max, avg, sum, last *MetricId, mr MetricRegistry) TimeMetric {
return &DefaultTimeMetric{r: mr, min: min, max: max, avg: avg, sum: sum, last: last,
agg: aggregate.NewTimeWindowAggregator(defaultBucketNum, defaultTimeWindowSeconds)}
}

type DefaultTimeMetric struct {
r MetricRegistry
agg *aggregate.TimeWindowAggregator
min, max, avg, sum, last *MetricId
}

func (m *DefaultTimeMetric) Record(v float64) {
m.agg.Add(v)
result := m.agg.Result()
m.r.Gauge(m.max).Set(result.Max)
m.r.Gauge(m.min).Set(result.Min)
m.r.Gauge(m.avg).Set(result.Avg)
m.r.Gauge(m.sum).Set(result.Total)
m.r.Gauge(m.last).Set(v)
}

// cache if needed, TimeMetrics must cached
var metricsCache map[string]interface{} = make(map[string]interface{})
var metricsCacheMutex sync.RWMutex

func ComputeIfAbsentCache(key string, supplier func() interface{}) interface{} {
metricsCacheMutex.RLock()
v, ok := metricsCache[key]
metricsCacheMutex.RUnlock()
if ok {
return v
} else {
metricsCacheMutex.Lock()
defer metricsCacheMutex.Unlock()
v, ok = metricsCache[key] // double check,avoid overwriting
if ok {
return v
} else {
n := supplier()
metricsCache[key] = n
return n
}
c.R.Counter(NewMetricId(fail, level)).Inc()
}
}
2 changes: 1 addition & 1 deletion metrics/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (m *ApplicationMetricLevel) Tags() map[string]string {
tags := make(map[string]string)
tags[constant.IpKey] = m.Ip
tags[constant.HostnameKey] = m.HostName
tags[constant.ApplicationKey] = m.ApplicationName
tags[constant.ApplicationNameKey] = m.ApplicationName
tags[constant.ApplicationVersionKey] = m.Version
tags[constant.GitCommitIdKey] = m.GitCommitId
return tags
Expand Down
59 changes: 13 additions & 46 deletions metrics/metadata/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ var ch = make(chan metrics.MetricsEvent, 10)

func init() {
metrics.AddCollector("metadata", func(mr metrics.MetricRegistry, rc *metrics.ReporterConfig) {
l := &MetadataMetricCollector{r: mr}
l := &MetadataMetricCollector{metrics.BaseCollector{R: mr}}
l.start()
})
}

type MetadataMetricCollector struct {
r metrics.MetricRegistry
metrics.BaseCollector
}

func (c *MetadataMetricCollector) start() {
Expand All @@ -63,59 +63,26 @@ func (c *MetadataMetricCollector) start() {
}

func (c *MetadataMetricCollector) handleMetadataPush(event *MetadataMetricEvent) {
m := metrics.ComputeIfAbsentCache(dubboMetadataPush, func() interface{} {
return newStatesMetricFunc(metadataPushNum, metadataPushNumSucceed, metadataPushNumFailed, metrics.GetApplicationLevel(), c.r)
}).(metrics.StatesMetrics)
m.Inc(event.Succ)
metric := metrics.ComputeIfAbsentCache(dubboPushRt, func() interface{} {
return newTimeMetrics(pushRtMin, pushRtMax, pushRtAvg, pushRtSum, pushRtLast, metrics.GetApplicationLevel(), c.r)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
level := metrics.GetApplicationLevel()
c.StateCount(metadataPushNum, metadataPushSucceed, metadataPushFailed, level, event.Succ)
c.R.Rt(metrics.NewMetricId(pushRt, level), &metrics.RtOpts{}).Observe(event.CostMs())
}

func (c *MetadataMetricCollector) handleMetadataSub(event *MetadataMetricEvent) {
m := metrics.ComputeIfAbsentCache(dubboMetadataSubscribe, func() interface{} {
return newStatesMetricFunc(metadataSubNum, metadataSubNumSucceed, metadataSubNumFailed, metrics.GetApplicationLevel(), c.r)
}).(metrics.StatesMetrics)
m.Inc(event.Succ)
metric := metrics.ComputeIfAbsentCache(dubboSubscribeRt, func() interface{} {
return newTimeMetrics(subscribeRtMin, subscribeRtMax, subscribeRtAvg, subscribeRtSum, subscribeRtLast, metrics.GetApplicationLevel(), c.r)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
level := metrics.GetApplicationLevel()
c.StateCount(metadataSubNum, metadataSubSucceed, metadataSubFailed, level, event.Succ)
c.R.Rt(metrics.NewMetricId(subscribeRt, level), &metrics.RtOpts{}).Observe(event.CostMs())
}

func (c *MetadataMetricCollector) handleStoreProvider(event *MetadataMetricEvent) {
interfaceName := event.Attachment[constant.InterfaceKey]
m := metrics.ComputeIfAbsentCache(dubboMetadataStoreProvider+":"+interfaceName, func() interface{} {
return newStatesMetricFunc(metadataStoreProvider, metadataStoreProviderSucceed, metadataStoreProviderFailed,
metrics.NewServiceMetric(interfaceName), c.r)
}).(metrics.StatesMetrics)
m.Inc(event.Succ)
metric := metrics.ComputeIfAbsentCache(dubboStoreProviderInterfaceRt+":"+interfaceName, func() interface{} {
return newTimeMetrics(storeProviderInterfaceRtMin, storeProviderInterfaceRtMax, storeProviderInterfaceRtAvg,
storeProviderInterfaceRtSum, storeProviderInterfaceRtLast, metrics.NewServiceMetric(interfaceName), c.r)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
level := metrics.NewServiceMetric(event.Attachment[constant.InterfaceKey])
c.StateCount(metadataStoreProviderNum, metadataStoreProviderSucceed, metadataStoreProviderFailed, level, event.Succ)
c.R.Rt(metrics.NewMetricId(storeProviderInterfaceRt, level), &metrics.RtOpts{}).Observe(event.CostMs())
}

func (c *MetadataMetricCollector) handleSubscribeService(event *MetadataMetricEvent) {
interfaceName := event.Attachment[constant.InterfaceKey]
metric := metrics.ComputeIfAbsentCache(dubboSubscribeServiceRt+":"+interfaceName, func() interface{} {
return newTimeMetrics(subscribeServiceRtMin, subscribeServiceRtMax, subscribeServiceRtAvg, subscribeServiceRtSum,
subscribeServiceRtLast, metrics.NewServiceMetric(interfaceName), c.r)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
}

func newStatesMetricFunc(total *metrics.MetricKey, succ *metrics.MetricKey, fail *metrics.MetricKey,
level metrics.MetricLevel, reg metrics.MetricRegistry) metrics.StatesMetrics {
return metrics.NewStatesMetrics(metrics.NewMetricId(total, level), metrics.NewMetricId(succ, level),
metrics.NewMetricId(fail, level), reg)
}

func newTimeMetrics(min, max, avg, sum, last *metrics.MetricKey, level metrics.MetricLevel, mr metrics.MetricRegistry) metrics.TimeMetric {
return metrics.NewTimeMetric(metrics.NewMetricId(min, level), metrics.NewMetricId(max, level), metrics.NewMetricId(avg, level),
metrics.NewMetricId(sum, level), metrics.NewMetricId(last, level), mr)
level := metrics.NewServiceMetric(event.Attachment[constant.InterfaceKey])
c.R.Rt(metrics.NewMetricId(subscribeServiceRt, level), &metrics.RtOpts{}).Observe(event.CostMs())
}

type MetadataMetricEvent struct {
Expand Down
43 changes: 11 additions & 32 deletions metrics/metadata/metric_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,34 +47,21 @@ const (
totalSuffix = "_total"
succSuffix = "_succeed_total"
failedSuffix = "_failed_total"
sumSuffix = "_sum"
lastSuffix = "_last"
minSuffix = "_min"
maxSuffix = "_max"
avgSuffix = "_avg"
)

var (
// app level
metadataPushNum = metrics.NewMetricKey(dubboMetadataPush+totalSuffix, "Total Num")
metadataPushNumSucceed = metrics.NewMetricKey(dubboMetadataPush+succSuffix, "Succeed Push Num")
metadataPushNumFailed = metrics.NewMetricKey(dubboMetadataPush+failedSuffix, "Failed Push Num")
metadataPushNum = metrics.NewMetricKey(dubboMetadataPush+totalSuffix, "Total Num")
metadataPushSucceed = metrics.NewMetricKey(dubboMetadataPush+succSuffix, "Succeed Push Num")
metadataPushFailed = metrics.NewMetricKey(dubboMetadataPush+failedSuffix, "Failed Push Num")
// app level
metadataSubNum = metrics.NewMetricKey(dubboMetadataSubscribe+totalSuffix, "Total Metadata Subscribe Num")
metadataSubNumSucceed = metrics.NewMetricKey(dubboMetadataSubscribe+succSuffix, "Succeed Metadata Subscribe Num")
metadataSubNumFailed = metrics.NewMetricKey(dubboMetadataSubscribe+failedSuffix, "Failed Metadata Subscribe Num")
metadataSubNum = metrics.NewMetricKey(dubboMetadataSubscribe+totalSuffix, "Total Metadata Subscribe Num")
metadataSubSucceed = metrics.NewMetricKey(dubboMetadataSubscribe+succSuffix, "Succeed Metadata Subscribe Num")
metadataSubFailed = metrics.NewMetricKey(dubboMetadataSubscribe+failedSuffix, "Failed Metadata Subscribe Num")
// app level
pushRtSum = metrics.NewMetricKey(dubboPushRt+sumSuffix, "Sum Response Time")
pushRtLast = metrics.NewMetricKey(dubboPushRt+lastSuffix, "Last Response Time")
pushRtMin = metrics.NewMetricKey(dubboPushRt+minSuffix, "Min Response Time")
pushRtMax = metrics.NewMetricKey(dubboPushRt+maxSuffix, "Max Response Time")
pushRtAvg = metrics.NewMetricKey(dubboPushRt+avgSuffix, "Average Response Time")
pushRt = metrics.NewMetricKey(dubboPushRt, "Response Time")
// app level
subscribeRtSum = metrics.NewMetricKey(dubboSubscribeRt+sumSuffix, "Sum Response Time")
subscribeRtLast = metrics.NewMetricKey(dubboSubscribeRt+lastSuffix, "Last Response Time")
subscribeRtMin = metrics.NewMetricKey(dubboSubscribeRt+minSuffix, "Min Response Time")
subscribeRtMax = metrics.NewMetricKey(dubboSubscribeRt+maxSuffix, "Max Response Time")
subscribeRtAvg = metrics.NewMetricKey(dubboSubscribeRt+avgSuffix, "Average Response Time")
subscribeRt = metrics.NewMetricKey(dubboSubscribeRt, "Response Time")

/*
# HELP dubbo_metadata_store_provider_succeed_total Succeed Store Provider Metadata
Expand All @@ -85,7 +72,7 @@ var (
// service level
metadataStoreProviderFailed = metrics.NewMetricKey(dubboMetadataStoreProvider+failedSuffix, "Total Failed Provider Metadata Store")
metadataStoreProviderSucceed = metrics.NewMetricKey(dubboMetadataStoreProvider+succSuffix, "Total Succeed Provider Metadata Store")
metadataStoreProvider = metrics.NewMetricKey(dubboMetadataStoreProvider+totalSuffix, "Total Provider Metadata Store")
metadataStoreProviderNum = metrics.NewMetricKey(dubboMetadataStoreProvider+totalSuffix, "Total Provider Metadata Store")

/*
# HELP dubbo_store_provider_interface_rt_milliseconds_avg Average Response Time
Expand All @@ -94,15 +81,7 @@ var (
dubbo_store_provider_interface_rt_milliseconds_avg{application_name="metrics-provider",application_version="3.2.1",git_commit_id="20de8b22ffb2a23531f6d9494a4963fcabd52561",hostname="localhost",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService2",ip="10.252.156.213",} 10837.0
*/
// service level
storeProviderInterfaceRtAvg = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+avgSuffix, "Average Store Provider Interface Time")
storeProviderInterfaceRtLast = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+lastSuffix, "Last Store Provider Interface Time")
storeProviderInterfaceRtMax = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+maxSuffix, "Max Store Provider Interface Time")
storeProviderInterfaceRtMin = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+minSuffix, "Min Store Provider Interface Time")
storeProviderInterfaceRtSum = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+sumSuffix, "Sum Store Provider Interface Time")
storeProviderInterfaceRt = metrics.NewMetricKey(dubboStoreProviderInterfaceRt, "Store Provider Interface Time")

subscribeServiceRtLast = metrics.NewMetricKey(dubboSubscribeServiceRt+lastSuffix, "Last Subscribe Service Time")
subscribeServiceRtMax = metrics.NewMetricKey(dubboSubscribeServiceRt+maxSuffix, "Max Subscribe Service Time")
subscribeServiceRtMin = metrics.NewMetricKey(dubboSubscribeServiceRt+minSuffix, "Min Subscribe Service Time")
subscribeServiceRtSum = metrics.NewMetricKey(dubboSubscribeServiceRt+sumSuffix, "Sum Subscribe Service Time")
subscribeServiceRtAvg = metrics.NewMetricKey(dubboSubscribeServiceRt+avgSuffix, "Average Subscribe Service Time")
subscribeServiceRt = metrics.NewMetricKey(dubboSubscribeServiceRt, "Subscribe Service Time")
)
Loading

0 comments on commit a24153b

Please sign in to comment.