Skip to content

Commit

Permalink
add metadata rt metrics (#2363)
Browse files Browse the repository at this point in the history
  • Loading branch information
FoghostCn authored Jul 26, 2023
1 parent 459d2ba commit d0c3564
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 77 deletions.
1 change: 1 addition & 0 deletions config/metric_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (mc *MetricConfig) ToReporterConfig() *metrics.ReporterConfig {
defaultMetricsReportConfig.Path = mc.Path
defaultMetricsReportConfig.PushGatewayAddress = mc.PushGatewayAddress
defaultMetricsReportConfig.SummaryMaxAge = mc.SummaryMaxAge
defaultMetricsReportConfig.Protocol = mc.Protocol
return defaultMetricsReportConfig
}

Expand Down
1 change: 0 additions & 1 deletion metadata/report/reporter_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func (r *PubMetricEventReport) StoreProviderMetadata(i *identifier.MetadataIdent
err := r.MetadataReport.StoreProviderMetadata(i, s)
event.Succ = err == nil
event.End = time.Now()
event.Attachment = make(map[string]string)
event.Attachment[constant.InterfaceKey] = i.ServiceInterface
metrics.Publish(event)
return err
Expand Down
103 changes: 75 additions & 28 deletions metrics/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,42 @@

package metrics

import (
"sync"
)

import (
"dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate"
)

var registries = make(map[string]func(*ReporterConfig) MetricRegistry)
var collectors = make([]CollectorFunc, 0)
var registry MetricRegistry

// CollectorFunc used to extend more indicators
type CollectorFunc func(MetricRegistry, *ReporterConfig)

const defaultRegistry = "prometheus"

// Init Metrics module
func Init(config *ReporterConfig) {
regFunc, ok := registries[config.Protocol]
if !ok {
regFunc = registries[defaultRegistry] // default
if config.Enable {
// defalut protocol is already set in metricConfig
regFunc, ok := registries[config.Protocol]
if ok {
registry = regFunc(config)
for _, co := range collectors {
co(registry, config)
}
registry.Export()
}
}
registry = regFunc(config)
for _, co := range collectors {
co(registry, config)
}
registry.Export()
}

// SetRegistry extend more MetricRegistry, default PrometheusRegistry
func SetRegistry(name string, v func(*ReporterConfig) MetricRegistry) {
registries[name] = v
}

// AddCollector add more indicators, like metadata、sla、configcenter、metadata etc
// AddCollector add more indicators, like metadata、sla、configcenter etc
func AddCollector(name string, fun func(MetricRegistry, *ReporterConfig)) {
collectors = append(collectors, fun)
}
Expand Down Expand Up @@ -142,15 +150,13 @@ type StatesMetrics interface {
Inc(succ bool)
}

func NewStatesMetrics(total func() *MetricId, succ func() *MetricId, fail func() *MetricId, reg MetricRegistry) StatesMetrics {
func NewStatesMetrics(total *MetricId, succ *MetricId, fail *MetricId, reg MetricRegistry) StatesMetrics {
return &DefaultStatesMetric{total: total, succ: succ, fail: fail, r: reg}
}

type DefaultStatesMetric struct {
r MetricRegistry
total func() *MetricId
succ func() *MetricId
fail func() *MetricId
r MetricRegistry
total, succ, fail *MetricId
}

func (c DefaultStatesMetric) Inc(succ bool) {
Expand All @@ -161,32 +167,73 @@ func (c DefaultStatesMetric) Inc(succ bool) {
}
}
func (c DefaultStatesMetric) Success() {
c.r.Counter(c.total()).Inc()
c.r.Counter(c.succ()).Inc()
c.r.Counter(c.total).Inc()
c.r.Counter(c.succ).Inc()
}

func (c DefaultStatesMetric) AddSuccess(v float64) {
c.r.Counter(c.total()).Add(v)
c.r.Counter(c.succ()).Add(v)
c.r.Counter(c.total).Add(v)
c.r.Counter(c.succ).Add(v)
}

func (c DefaultStatesMetric) Fail() {
c.r.Counter(c.total()).Inc()
c.r.Counter(c.fail()).Inc()
c.r.Counter(c.total).Inc()
c.r.Counter(c.fail).Inc()
}

func (c DefaultStatesMetric) AddFailed(v float64) {
c.r.Counter(c.total()).Add(v)
c.r.Counter(c.fail()).Add(v)
c.r.Counter(c.total).Add(v)
c.r.Counter(c.fail).Add(v)
}

// TimeMetrics muliti metrics, include min(Gauge)、max(Gauge)、avg(Gauge)、sum(Gauge)、last(Gauge),call MetricRegistry to expose
// TimeMetric muliti metrics, include min(Gauge)、max(Gauge)、avg(Gauge)、sum(Gauge)、last(Gauge),call MetricRegistry to expose
// see dubbo-java org.apache.dubbo.metrics.aggregate.TimeWindowAggregator
type TimeMetrics interface {
type TimeMetric interface {
Record(float64)
}

// NewTimeMetrics init and write all data to registry
func NewTimeMetrics(name string, l MetricLevel) {
const (
defaultBucketNum = 10
defalutTimeWindowSeconds = 120
)

// NewTimeMetric init and write all data to registry
func NewTimeMetric(min, max, avg, sum, last *MetricId, mr MetricRegistry) TimeMetric {
return &DefaultTimeMetric{r: mr, min: min, max: max, avg: avg, sum: sum, last: last,
agg: aggregate.NewTimeWindowAggregator(defaultBucketNum, defalutTimeWindowSeconds)}
}

type DefaultTimeMetric struct {
r MetricRegistry
agg *aggregate.TimeWindowAggregator
min, max, avg, sum, last *MetricId
}

func (m *DefaultTimeMetric) Record(v float64) {
m.agg.Add(v)
result := m.agg.Result()
m.r.Gauge(m.max).Set(result.Max)
m.r.Gauge(m.min).Set(result.Min)
m.r.Gauge(m.avg).Set(result.Avg)
m.r.Gauge(m.sum).Set(result.Total)
m.r.Gauge(m.last).Set(v)
}

// cache if needed, TimeMetrics must cached
var metricsCache map[string]interface{} = make(map[string]interface{})
var metricsCacheMutex sync.RWMutex

func ComputeIfAbsentCache(key string, supplier func() interface{}) interface{} {
metricsCacheMutex.RLock()
v, ok := metricsCache[key]
metricsCacheMutex.RUnlock()
if ok {
return v
} else {
metricsCacheMutex.Lock()
defer metricsCacheMutex.Unlock()
n := supplier()
metricsCache[key] = n
return n
}
}
60 changes: 45 additions & 15 deletions metrics/metadata/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func (c *MetadataMetricCollector) start() {
c.handleMetadataPush(event)
case MetadataSub:
c.handleMetadataSub(event)
case SubscribeServiceRt:
c.handleSubscribeService(event)
default:
}
}
Expand All @@ -61,31 +63,59 @@ func (c *MetadataMetricCollector) start() {
}

func (c *MetadataMetricCollector) handleMetadataPush(event *MetadataMetricEvent) {
m := newStatesMetricFunc(metadataPushNum, metadataPushNumSucceed, metadataPushNumFailed, metrics.GetApplicationLevel(), c.r)
m := metrics.ComputeIfAbsentCache(dubboMetadataPush, func() interface{} {
return newStatesMetricFunc(metadataPushNum, metadataPushNumSucceed, metadataPushNumFailed, metrics.GetApplicationLevel(), c.r)
}).(metrics.StatesMetrics)
m.Inc(event.Succ)
// TODO add RT metric dubbo_push_rt_milliseconds
metric := metrics.ComputeIfAbsentCache(dubboPushRt, func() interface{} {
return newTimeMetrics(pushRtMin, pushRtMax, pushRtAvg, pushRtSum, pushRtLast, metrics.GetApplicationLevel(), c.r)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
}

func (c *MetadataMetricCollector) handleMetadataSub(event *MetadataMetricEvent) {
m := newStatesMetricFunc(metadataSubNum, metadataSubNumSucceed, metadataSubNumFailed, metrics.GetApplicationLevel(), c.r)
m := metrics.ComputeIfAbsentCache(dubboMetadataSubscribe, func() interface{} {
return newStatesMetricFunc(metadataSubNum, metadataSubNumSucceed, metadataSubNumFailed, metrics.GetApplicationLevel(), c.r)
}).(metrics.StatesMetrics)
m.Inc(event.Succ)
// TODO add RT metric dubbo_subscribe_rt_milliseconds
metric := metrics.ComputeIfAbsentCache(dubboSubscribeRt, func() interface{} {
return newTimeMetrics(subscribeRtMin, subscribeRtMax, subscribeRtAvg, subscribeRtSum, subscribeRtLast, metrics.GetApplicationLevel(), c.r)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
}

func (c *MetadataMetricCollector) handleStoreProvider(event *MetadataMetricEvent) {
level := metrics.NewServiceMetric(event.Attachment[constant.InterfaceKey])
m := newStatesMetricFunc(metadataStoreProvider, metadataStoreProviderSucceed, metadataStoreProviderFailed, level, c.r)
interfaceName := event.Attachment[constant.InterfaceKey]
m := metrics.ComputeIfAbsentCache(dubboMetadataStoreProvider+":"+interfaceName, func() interface{} {
return newStatesMetricFunc(metadataStoreProvider, metadataStoreProviderSucceed, metadataStoreProviderFailed,
metrics.NewServiceMetric(interfaceName), c.r)
}).(metrics.StatesMetrics)
m.Inc(event.Succ)
// TODO add RT metric dubbo_store_provider_interface_rt_milliseconds
metric := metrics.ComputeIfAbsentCache(dubboStoreProviderInterfaceRt+":"+interfaceName, func() interface{} {
return newTimeMetrics(storeProviderInterfaceRtMin, storeProviderInterfaceRtMax, storeProviderInterfaceRtAvg,
storeProviderInterfaceRtSum, storeProviderInterfaceRtLast, metrics.NewServiceMetric(interfaceName), c.r)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
}

func newStatesMetricFunc(total *metrics.MetricKey, succ *metrics.MetricKey, fail *metrics.MetricKey, level metrics.MetricLevel, reg metrics.MetricRegistry) metrics.StatesMetrics {
return metrics.NewStatesMetrics(
func() *metrics.MetricId { return metrics.NewMetricId(total, level) },
func() *metrics.MetricId { return metrics.NewMetricId(succ, level) },
func() *metrics.MetricId { return metrics.NewMetricId(fail, level) },
reg,
)
func (c *MetadataMetricCollector) handleSubscribeService(event *MetadataMetricEvent) {
interfaceName := event.Attachment[constant.InterfaceKey]
metric := metrics.ComputeIfAbsentCache(dubboSubscribeServiceRt+":"+interfaceName, func() interface{} {
return newTimeMetrics(subscribeServiceRtMin, subscribeServiceRtMax, subscribeServiceRtAvg, subscribeServiceRtSum,
subscribeServiceRtLast, metrics.NewServiceMetric(interfaceName), c.r)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
}

func newStatesMetricFunc(total *metrics.MetricKey, succ *metrics.MetricKey, fail *metrics.MetricKey,
level metrics.MetricLevel, reg metrics.MetricRegistry) metrics.StatesMetrics {
return metrics.NewStatesMetrics(metrics.NewMetricId(total, level), metrics.NewMetricId(succ, level),
metrics.NewMetricId(fail, level), reg)
}

func newTimeMetrics(min, max, avg, sum, last *metrics.MetricKey, level metrics.MetricLevel, mr metrics.MetricRegistry) metrics.TimeMetric {
return metrics.NewTimeMetric(metrics.NewMetricId(min, level), metrics.NewMetricId(max, level), metrics.NewMetricId(avg, level),
metrics.NewMetricId(sum, level), metrics.NewMetricId(last, level), mr)
}

type MetadataMetricEvent struct {
Expand All @@ -105,5 +135,5 @@ func (e *MetadataMetricEvent) CostMs() float64 {
}

func NewMetadataMetricTimeEvent(n MetricName) *MetadataMetricEvent {
return &MetadataMetricEvent{Name: n, Start: time.Now()}
return &MetadataMetricEvent{Name: n, Start: time.Now(), Attachment: make(map[string]string)}
}
87 changes: 54 additions & 33 deletions metrics/metadata/metric_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,55 +33,76 @@ const (
SubscribeServiceRt
)

const (
dubboMetadataPush = "dubbo_metadata_push_num"
dubboPushRt = "dubbo_push_rt_milliseconds"
dubboMetadataSubscribe = "dubbo_metadata_subscribe_num"
dubboSubscribeRt = "dubbo_subscribe_rt_milliseconds"
dubboMetadataStoreProvider = "dubbo_metadata_store_provider"
dubboStoreProviderInterfaceRt = "dubbo_store_provider_interface_rt_milliseconds"
dubboSubscribeServiceRt = "dubbo_subscribe_service_rt_milliseconds"
)

const (
totalSuffix = "_total"
succSuffix = "_succeed_total"
failedSuffix = "_failed_total"
sumSuffix = "_sum"
lastSuffix = "_last"
minSuffix = "_min"
maxSuffix = "_max"
avgSuffix = "_avg"
)

var (
// app level
metadataPushNum = metrics.NewMetricKey("dubbo_metadata_push_num_total", "Total Num")
metadataPushNumSucceed = metrics.NewMetricKey("dubbo_metadata_push_num_succeed_total", "Succeed Push Num")
metadataPushNumFailed = metrics.NewMetricKey("dubbo_metadata_push_num_failed_total", "Failed Push Num")
metadataPushNum = metrics.NewMetricKey(dubboMetadataPush+totalSuffix, "Total Num")
metadataPushNumSucceed = metrics.NewMetricKey(dubboMetadataPush+succSuffix, "Succeed Push Num")
metadataPushNumFailed = metrics.NewMetricKey(dubboMetadataPush+failedSuffix, "Failed Push Num")
// app level
metadataSubNum = metrics.NewMetricKey("dubbo_metadata_subscribe_num_total", "Total Metadata Subscribe Num")
metadataSubNumSucceed = metrics.NewMetricKey("dubbo_metadata_subscribe_num_succeed_total", "Succeed Metadata Subscribe Num")
metadataSubNumFailed = metrics.NewMetricKey("dubbo_metadata_subscribe_num_failed_total", "Failed Metadata Subscribe Num")
metadataSubNum = metrics.NewMetricKey(dubboMetadataSubscribe+totalSuffix, "Total Metadata Subscribe Num")
metadataSubNumSucceed = metrics.NewMetricKey(dubboMetadataSubscribe+succSuffix, "Succeed Metadata Subscribe Num")
metadataSubNumFailed = metrics.NewMetricKey(dubboMetadataSubscribe+failedSuffix, "Failed Metadata Subscribe Num")
// app level
pushRtSum = metrics.NewMetricKey("dubbo_push_rt_milliseconds_sum", "Sum Response Time")
pushRtLast = metrics.NewMetricKey("dubbo_push_rt_milliseconds_last", "Last Response Time")
pushRtMin = metrics.NewMetricKey("dubbo_push_rt_milliseconds_min", "Min Response Time")
pushRtMax = metrics.NewMetricKey("dubbo_push_rt_milliseconds_max", "Max Response Time")
pushRtAvg = metrics.NewMetricKey("dubbo_push_rt_milliseconds_avg", "Average Response Time")
pushRtSum = metrics.NewMetricKey(dubboPushRt+sumSuffix, "Sum Response Time")
pushRtLast = metrics.NewMetricKey(dubboPushRt+lastSuffix, "Last Response Time")
pushRtMin = metrics.NewMetricKey(dubboPushRt+minSuffix, "Min Response Time")
pushRtMax = metrics.NewMetricKey(dubboPushRt+maxSuffix, "Max Response Time")
pushRtAvg = metrics.NewMetricKey(dubboPushRt+avgSuffix, "Average Response Time")
// app level
subscribeRtSum = metrics.NewMetricKey("dubbo_subscribe_rt_milliseconds_sum", "Sum Response Time")
subscribeRtLast = metrics.NewMetricKey("dubbo_subscribe_rt_milliseconds_last", "Last Response Time")
subscribeRtMin = metrics.NewMetricKey("dubbo_subscribe_rt_milliseconds_min", "Min Response Time")
subscribeRtMax = metrics.NewMetricKey("dubbo_subscribe_rt_milliseconds_max", "Max Response Time")
subscribeRtAvg = metrics.NewMetricKey("dubbo_subscribe_rt_milliseconds_avg", "Average Response Time")
subscribeRtSum = metrics.NewMetricKey(dubboSubscribeRt+sumSuffix, "Sum Response Time")
subscribeRtLast = metrics.NewMetricKey(dubboSubscribeRt+lastSuffix, "Last Response Time")
subscribeRtMin = metrics.NewMetricKey(dubboSubscribeRt+minSuffix, "Min Response Time")
subscribeRtMax = metrics.NewMetricKey(dubboSubscribeRt+maxSuffix, "Max Response Time")
subscribeRtAvg = metrics.NewMetricKey(dubboSubscribeRt+avgSuffix, "Average Response Time")

/*
# HELP dubbo_metadata_store_provider_succeed_total Succeed Store Provider Metadata
# TYPE dubbo_metadata_store_provider_succeed_total gauge
dubbo_metadata_store_provider_succeed_total{application_name="metrics-provider",hostname="foghostdeMacBook-Pro.local",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService2",ip="10.252.156.213",} 1.0
dubbo_metadata_store_provider_succeed_total{application_name="metrics-provider",hostname="foghostdeMacBook-Pro.local",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="10.252.156.213",} 1.0
dubbo_metadata_store_provider_succeed_total{application_name="metrics-provider",hostname="localhost",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService2",ip="10.252.156.213",} 1.0
dubbo_metadata_store_provider_succeed_total{application_name="metrics-provider",hostname="localhost",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="10.252.156.213",} 1.0
*/
// service level
metadataStoreProviderFailed = metrics.NewMetricKey("dubbo_metadata_store_provider_failed_total", "Total Failed Provider Metadata Store")
metadataStoreProviderSucceed = metrics.NewMetricKey("dubbo_metadata_store_provider_succeed_total", "Total Succeed Provider Metadata Store")
metadataStoreProvider = metrics.NewMetricKey("dubbo_metadata_store_provider_total", "Total Provider Metadata Store")
metadataStoreProviderFailed = metrics.NewMetricKey(dubboMetadataStoreProvider+failedSuffix, "Total Failed Provider Metadata Store")
metadataStoreProviderSucceed = metrics.NewMetricKey(dubboMetadataStoreProvider+succSuffix, "Total Succeed Provider Metadata Store")
metadataStoreProvider = metrics.NewMetricKey(dubboMetadataStoreProvider+totalSuffix, "Total Provider Metadata Store")

/*
# HELP dubbo_store_provider_interface_rt_milliseconds_avg Average Response Time
# TYPE dubbo_store_provider_interface_rt_milliseconds_avg gauge
dubbo_store_provider_interface_rt_milliseconds_avg{application_name="metrics-provider",application_version="3.2.1",git_commit_id="20de8b22ffb2a23531f6d9494a4963fcabd52561",hostname="foghostdeMacBook-Pro.local",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="10.252.156.213",} 504.0
dubbo_store_provider_interface_rt_milliseconds_avg{application_name="metrics-provider",application_version="3.2.1",git_commit_id="20de8b22ffb2a23531f6d9494a4963fcabd52561",hostname="foghostdeMacBook-Pro.local",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService2",ip="10.252.156.213",} 10837.0
dubbo_store_provider_interface_rt_milliseconds_avg{application_name="metrics-provider",application_version="3.2.1",git_commit_id="20de8b22ffb2a23531f6d9494a4963fcabd52561",hostname="localhost",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="10.252.156.213",} 504.0
dubbo_store_provider_interface_rt_milliseconds_avg{application_name="metrics-provider",application_version="3.2.1",git_commit_id="20de8b22ffb2a23531f6d9494a4963fcabd52561",hostname="localhost",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService2",ip="10.252.156.213",} 10837.0
*/
// service level
storeProviderInterfaceRtAvg = metrics.NewMetricKey("dubbo_store_provider_interface_rt_milliseconds_avg", "Average Store Provider Interface Time")
storeProviderInterfaceRtLast = metrics.NewMetricKey("dubbo_store_provider_interface_rt_milliseconds_last", "Last Store Provider Interface Time")
storeProviderInterfaceRtMax = metrics.NewMetricKey("dubbo_store_provider_interface_rt_milliseconds_max", "Max Store Provider Interface Time")
storeProviderInterfaceRtMin = metrics.NewMetricKey("dubbo_store_provider_interface_rt_milliseconds_min", "Min Store Provider Interface Time")
storeProviderInterfaceRtSum = metrics.NewMetricKey("dubbo_store_provider_interface_rt_milliseconds_sum", "Sum Store Provider Interface Time")
storeProviderInterfaceRtAvg = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+avgSuffix, "Average Store Provider Interface Time")
storeProviderInterfaceRtLast = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+lastSuffix, "Last Store Provider Interface Time")
storeProviderInterfaceRtMax = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+maxSuffix, "Max Store Provider Interface Time")
storeProviderInterfaceRtMin = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+minSuffix, "Min Store Provider Interface Time")
storeProviderInterfaceRtSum = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+sumSuffix, "Sum Store Provider Interface Time")

subscribeServiceRtLast = metrics.NewMetricKey("dubbo_subscribe_service_rt_milliseconds_last", "Last Subscribe Service Time")
subscribeServiceRtMax = metrics.NewMetricKey("dubbo_subscribe_service_rt_milliseconds_max", "Max Subscribe Service Time")
subscribeServiceRtMin = metrics.NewMetricKey("dubbo_subscribe_service_rt_milliseconds_min", "Min Subscribe Service Time")
subscribeServiceRtSum = metrics.NewMetricKey("dubbo_subscribe_service_rt_milliseconds_sum", "Sum Subscribe Service Time")
subscribeServiceRtAvg = metrics.NewMetricKey("dubbo_subscribe_service_rt_milliseconds_avg", "Average Subscribe Service Time")
subscribeServiceRtLast = metrics.NewMetricKey(dubboSubscribeServiceRt+lastSuffix, "Last Subscribe Service Time")
subscribeServiceRtMax = metrics.NewMetricKey(dubboSubscribeServiceRt+maxSuffix, "Max Subscribe Service Time")
subscribeServiceRtMin = metrics.NewMetricKey(dubboSubscribeServiceRt+minSuffix, "Min Subscribe Service Time")
subscribeServiceRtSum = metrics.NewMetricKey(dubboSubscribeServiceRt+sumSuffix, "Sum Subscribe Service Time")
subscribeServiceRtAvg = metrics.NewMetricKey(dubboSubscribeServiceRt+avgSuffix, "Average Subscribe Service Time")
)
Loading

0 comments on commit d0c3564

Please sign in to comment.