Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add metadata rt metrics #2363

Merged
merged 1 commit into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/metric_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (mc *MetricConfig) ToReporterConfig() *metrics.ReporterConfig {
defaultMetricsReportConfig.Path = mc.Path
defaultMetricsReportConfig.PushGatewayAddress = mc.PushGatewayAddress
defaultMetricsReportConfig.SummaryMaxAge = mc.SummaryMaxAge
defaultMetricsReportConfig.Protocol = mc.Protocol
return defaultMetricsReportConfig
}

Expand Down
1 change: 0 additions & 1 deletion metadata/report/reporter_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func (r *PubMetricEventReport) StoreProviderMetadata(i *identifier.MetadataIdent
err := r.MetadataReport.StoreProviderMetadata(i, s)
event.Succ = err == nil
event.End = time.Now()
event.Attachment = make(map[string]string)
event.Attachment[constant.InterfaceKey] = i.ServiceInterface
metrics.Publish(event)
return err
Expand Down
103 changes: 75 additions & 28 deletions metrics/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,42 @@

package metrics

import (
"sync"
)

import (
"dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate"
)

var registries = make(map[string]func(*ReporterConfig) MetricRegistry)
var collectors = make([]CollectorFunc, 0)
var registry MetricRegistry

// CollectorFunc used to extend more indicators
type CollectorFunc func(MetricRegistry, *ReporterConfig)

const defaultRegistry = "prometheus"

// Init Metrics module
func Init(config *ReporterConfig) {
regFunc, ok := registries[config.Protocol]
if !ok {
regFunc = registries[defaultRegistry] // default
if config.Enable {
// defalut protocol is already set in metricConfig
regFunc, ok := registries[config.Protocol]
if ok {
registry = regFunc(config)
for _, co := range collectors {
co(registry, config)
}
registry.Export()
}
}
registry = regFunc(config)
for _, co := range collectors {
co(registry, config)
}
registry.Export()
}

// SetRegistry extend more MetricRegistry, default PrometheusRegistry
func SetRegistry(name string, v func(*ReporterConfig) MetricRegistry) {
registries[name] = v
}

// AddCollector add more indicators, like metadata、sla、configcenter、metadata etc
// AddCollector add more indicators, like metadata、sla、configcenter etc
func AddCollector(name string, fun func(MetricRegistry, *ReporterConfig)) {
collectors = append(collectors, fun)
}
Expand Down Expand Up @@ -142,15 +150,13 @@ type StatesMetrics interface {
Inc(succ bool)
}

func NewStatesMetrics(total func() *MetricId, succ func() *MetricId, fail func() *MetricId, reg MetricRegistry) StatesMetrics {
func NewStatesMetrics(total *MetricId, succ *MetricId, fail *MetricId, reg MetricRegistry) StatesMetrics {
return &DefaultStatesMetric{total: total, succ: succ, fail: fail, r: reg}
}

type DefaultStatesMetric struct {
r MetricRegistry
total func() *MetricId
succ func() *MetricId
fail func() *MetricId
r MetricRegistry
total, succ, fail *MetricId
}

func (c DefaultStatesMetric) Inc(succ bool) {
Expand All @@ -161,32 +167,73 @@ func (c DefaultStatesMetric) Inc(succ bool) {
}
}
func (c DefaultStatesMetric) Success() {
c.r.Counter(c.total()).Inc()
c.r.Counter(c.succ()).Inc()
c.r.Counter(c.total).Inc()
c.r.Counter(c.succ).Inc()
}

func (c DefaultStatesMetric) AddSuccess(v float64) {
c.r.Counter(c.total()).Add(v)
c.r.Counter(c.succ()).Add(v)
c.r.Counter(c.total).Add(v)
c.r.Counter(c.succ).Add(v)
}

func (c DefaultStatesMetric) Fail() {
c.r.Counter(c.total()).Inc()
c.r.Counter(c.fail()).Inc()
c.r.Counter(c.total).Inc()
c.r.Counter(c.fail).Inc()
}

func (c DefaultStatesMetric) AddFailed(v float64) {
c.r.Counter(c.total()).Add(v)
c.r.Counter(c.fail()).Add(v)
c.r.Counter(c.total).Add(v)
c.r.Counter(c.fail).Add(v)
}

// TimeMetrics muliti metrics, include min(Gauge)、max(Gauge)、avg(Gauge)、sum(Gauge)、last(Gauge),call MetricRegistry to expose
// TimeMetric muliti metrics, include min(Gauge)、max(Gauge)、avg(Gauge)、sum(Gauge)、last(Gauge),call MetricRegistry to expose
// see dubbo-java org.apache.dubbo.metrics.aggregate.TimeWindowAggregator
type TimeMetrics interface {
type TimeMetric interface {
Record(float64)
}

// NewTimeMetrics init and write all data to registry
func NewTimeMetrics(name string, l MetricLevel) {
const (
defaultBucketNum = 10
defalutTimeWindowSeconds = 120
)

// NewTimeMetric init and write all data to registry
func NewTimeMetric(min, max, avg, sum, last *MetricId, mr MetricRegistry) TimeMetric {
return &DefaultTimeMetric{r: mr, min: min, max: max, avg: avg, sum: sum, last: last,
agg: aggregate.NewTimeWindowAggregator(defaultBucketNum, defalutTimeWindowSeconds)}
}

type DefaultTimeMetric struct {
r MetricRegistry
agg *aggregate.TimeWindowAggregator
min, max, avg, sum, last *MetricId
}

func (m *DefaultTimeMetric) Record(v float64) {
m.agg.Add(v)
result := m.agg.Result()
m.r.Gauge(m.max).Set(result.Max)
m.r.Gauge(m.min).Set(result.Min)
m.r.Gauge(m.avg).Set(result.Avg)
m.r.Gauge(m.sum).Set(result.Total)
m.r.Gauge(m.last).Set(v)
}

// cache if needed, TimeMetrics must cached
var metricsCache map[string]interface{} = make(map[string]interface{})
var metricsCacheMutex sync.RWMutex

func ComputeIfAbsentCache(key string, supplier func() interface{}) interface{} {
metricsCacheMutex.RLock()
v, ok := metricsCache[key]
metricsCacheMutex.RUnlock()
if ok {
return v
} else {
metricsCacheMutex.Lock()
defer metricsCacheMutex.Unlock()
n := supplier()
metricsCache[key] = n
return n
}
}
60 changes: 45 additions & 15 deletions metrics/metadata/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func (c *MetadataMetricCollector) start() {
c.handleMetadataPush(event)
case MetadataSub:
c.handleMetadataSub(event)
case SubscribeServiceRt:
c.handleSubscribeService(event)
default:
}
}
Expand All @@ -61,31 +63,59 @@ func (c *MetadataMetricCollector) start() {
}

func (c *MetadataMetricCollector) handleMetadataPush(event *MetadataMetricEvent) {
m := newStatesMetricFunc(metadataPushNum, metadataPushNumSucceed, metadataPushNumFailed, metrics.GetApplicationLevel(), c.r)
m := metrics.ComputeIfAbsentCache(dubboMetadataPush, func() interface{} {
return newStatesMetricFunc(metadataPushNum, metadataPushNumSucceed, metadataPushNumFailed, metrics.GetApplicationLevel(), c.r)
}).(metrics.StatesMetrics)
m.Inc(event.Succ)
// TODO add RT metric dubbo_push_rt_milliseconds
metric := metrics.ComputeIfAbsentCache(dubboPushRt, func() interface{} {
return newTimeMetrics(pushRtMin, pushRtMax, pushRtAvg, pushRtSum, pushRtLast, metrics.GetApplicationLevel(), c.r)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
}

func (c *MetadataMetricCollector) handleMetadataSub(event *MetadataMetricEvent) {
m := newStatesMetricFunc(metadataSubNum, metadataSubNumSucceed, metadataSubNumFailed, metrics.GetApplicationLevel(), c.r)
m := metrics.ComputeIfAbsentCache(dubboMetadataSubscribe, func() interface{} {
return newStatesMetricFunc(metadataSubNum, metadataSubNumSucceed, metadataSubNumFailed, metrics.GetApplicationLevel(), c.r)
}).(metrics.StatesMetrics)
m.Inc(event.Succ)
// TODO add RT metric dubbo_subscribe_rt_milliseconds
metric := metrics.ComputeIfAbsentCache(dubboSubscribeRt, func() interface{} {
return newTimeMetrics(subscribeRtMin, subscribeRtMax, subscribeRtAvg, subscribeRtSum, subscribeRtLast, metrics.GetApplicationLevel(), c.r)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
}

func (c *MetadataMetricCollector) handleStoreProvider(event *MetadataMetricEvent) {
level := metrics.NewServiceMetric(event.Attachment[constant.InterfaceKey])
m := newStatesMetricFunc(metadataStoreProvider, metadataStoreProviderSucceed, metadataStoreProviderFailed, level, c.r)
interfaceName := event.Attachment[constant.InterfaceKey]
m := metrics.ComputeIfAbsentCache(dubboMetadataStoreProvider+":"+interfaceName, func() interface{} {
return newStatesMetricFunc(metadataStoreProvider, metadataStoreProviderSucceed, metadataStoreProviderFailed,
metrics.NewServiceMetric(interfaceName), c.r)
}).(metrics.StatesMetrics)
m.Inc(event.Succ)
// TODO add RT metric dubbo_store_provider_interface_rt_milliseconds
metric := metrics.ComputeIfAbsentCache(dubboStoreProviderInterfaceRt+":"+interfaceName, func() interface{} {
return newTimeMetrics(storeProviderInterfaceRtMin, storeProviderInterfaceRtMax, storeProviderInterfaceRtAvg,
storeProviderInterfaceRtSum, storeProviderInterfaceRtLast, metrics.NewServiceMetric(interfaceName), c.r)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
}

func newStatesMetricFunc(total *metrics.MetricKey, succ *metrics.MetricKey, fail *metrics.MetricKey, level metrics.MetricLevel, reg metrics.MetricRegistry) metrics.StatesMetrics {
return metrics.NewStatesMetrics(
func() *metrics.MetricId { return metrics.NewMetricId(total, level) },
func() *metrics.MetricId { return metrics.NewMetricId(succ, level) },
func() *metrics.MetricId { return metrics.NewMetricId(fail, level) },
reg,
)
func (c *MetadataMetricCollector) handleSubscribeService(event *MetadataMetricEvent) {
interfaceName := event.Attachment[constant.InterfaceKey]
metric := metrics.ComputeIfAbsentCache(dubboSubscribeServiceRt+":"+interfaceName, func() interface{} {
return newTimeMetrics(subscribeServiceRtMin, subscribeServiceRtMax, subscribeServiceRtAvg, subscribeServiceRtSum,
subscribeServiceRtLast, metrics.NewServiceMetric(interfaceName), c.r)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
}

func newStatesMetricFunc(total *metrics.MetricKey, succ *metrics.MetricKey, fail *metrics.MetricKey,
level metrics.MetricLevel, reg metrics.MetricRegistry) metrics.StatesMetrics {
return metrics.NewStatesMetrics(metrics.NewMetricId(total, level), metrics.NewMetricId(succ, level),
metrics.NewMetricId(fail, level), reg)
}

func newTimeMetrics(min, max, avg, sum, last *metrics.MetricKey, level metrics.MetricLevel, mr metrics.MetricRegistry) metrics.TimeMetric {
return metrics.NewTimeMetric(metrics.NewMetricId(min, level), metrics.NewMetricId(max, level), metrics.NewMetricId(avg, level),
metrics.NewMetricId(sum, level), metrics.NewMetricId(last, level), mr)
}

type MetadataMetricEvent struct {
Expand All @@ -105,5 +135,5 @@ func (e *MetadataMetricEvent) CostMs() float64 {
}

func NewMetadataMetricTimeEvent(n MetricName) *MetadataMetricEvent {
return &MetadataMetricEvent{Name: n, Start: time.Now()}
return &MetadataMetricEvent{Name: n, Start: time.Now(), Attachment: make(map[string]string)}
}
87 changes: 54 additions & 33 deletions metrics/metadata/metric_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,55 +33,76 @@ const (
SubscribeServiceRt
)

const (
dubboMetadataPush = "dubbo_metadata_push_num"
dubboPushRt = "dubbo_push_rt_milliseconds"
dubboMetadataSubscribe = "dubbo_metadata_subscribe_num"
dubboSubscribeRt = "dubbo_subscribe_rt_milliseconds"
dubboMetadataStoreProvider = "dubbo_metadata_store_provider"
dubboStoreProviderInterfaceRt = "dubbo_store_provider_interface_rt_milliseconds"
dubboSubscribeServiceRt = "dubbo_subscribe_service_rt_milliseconds"
)

const (
totalSuffix = "_total"
succSuffix = "_succeed_total"
failedSuffix = "_failed_total"
sumSuffix = "_sum"
lastSuffix = "_last"
minSuffix = "_min"
maxSuffix = "_max"
avgSuffix = "_avg"
)

var (
// app level
metadataPushNum = metrics.NewMetricKey("dubbo_metadata_push_num_total", "Total Num")
metadataPushNumSucceed = metrics.NewMetricKey("dubbo_metadata_push_num_succeed_total", "Succeed Push Num")
metadataPushNumFailed = metrics.NewMetricKey("dubbo_metadata_push_num_failed_total", "Failed Push Num")
metadataPushNum = metrics.NewMetricKey(dubboMetadataPush+totalSuffix, "Total Num")
metadataPushNumSucceed = metrics.NewMetricKey(dubboMetadataPush+succSuffix, "Succeed Push Num")
metadataPushNumFailed = metrics.NewMetricKey(dubboMetadataPush+failedSuffix, "Failed Push Num")
// app level
metadataSubNum = metrics.NewMetricKey("dubbo_metadata_subscribe_num_total", "Total Metadata Subscribe Num")
metadataSubNumSucceed = metrics.NewMetricKey("dubbo_metadata_subscribe_num_succeed_total", "Succeed Metadata Subscribe Num")
metadataSubNumFailed = metrics.NewMetricKey("dubbo_metadata_subscribe_num_failed_total", "Failed Metadata Subscribe Num")
metadataSubNum = metrics.NewMetricKey(dubboMetadataSubscribe+totalSuffix, "Total Metadata Subscribe Num")
metadataSubNumSucceed = metrics.NewMetricKey(dubboMetadataSubscribe+succSuffix, "Succeed Metadata Subscribe Num")
metadataSubNumFailed = metrics.NewMetricKey(dubboMetadataSubscribe+failedSuffix, "Failed Metadata Subscribe Num")
// app level
pushRtSum = metrics.NewMetricKey("dubbo_push_rt_milliseconds_sum", "Sum Response Time")
pushRtLast = metrics.NewMetricKey("dubbo_push_rt_milliseconds_last", "Last Response Time")
pushRtMin = metrics.NewMetricKey("dubbo_push_rt_milliseconds_min", "Min Response Time")
pushRtMax = metrics.NewMetricKey("dubbo_push_rt_milliseconds_max", "Max Response Time")
pushRtAvg = metrics.NewMetricKey("dubbo_push_rt_milliseconds_avg", "Average Response Time")
pushRtSum = metrics.NewMetricKey(dubboPushRt+sumSuffix, "Sum Response Time")
pushRtLast = metrics.NewMetricKey(dubboPushRt+lastSuffix, "Last Response Time")
pushRtMin = metrics.NewMetricKey(dubboPushRt+minSuffix, "Min Response Time")
pushRtMax = metrics.NewMetricKey(dubboPushRt+maxSuffix, "Max Response Time")
pushRtAvg = metrics.NewMetricKey(dubboPushRt+avgSuffix, "Average Response Time")
// app level
subscribeRtSum = metrics.NewMetricKey("dubbo_subscribe_rt_milliseconds_sum", "Sum Response Time")
subscribeRtLast = metrics.NewMetricKey("dubbo_subscribe_rt_milliseconds_last", "Last Response Time")
subscribeRtMin = metrics.NewMetricKey("dubbo_subscribe_rt_milliseconds_min", "Min Response Time")
subscribeRtMax = metrics.NewMetricKey("dubbo_subscribe_rt_milliseconds_max", "Max Response Time")
subscribeRtAvg = metrics.NewMetricKey("dubbo_subscribe_rt_milliseconds_avg", "Average Response Time")
subscribeRtSum = metrics.NewMetricKey(dubboSubscribeRt+sumSuffix, "Sum Response Time")
subscribeRtLast = metrics.NewMetricKey(dubboSubscribeRt+lastSuffix, "Last Response Time")
subscribeRtMin = metrics.NewMetricKey(dubboSubscribeRt+minSuffix, "Min Response Time")
subscribeRtMax = metrics.NewMetricKey(dubboSubscribeRt+maxSuffix, "Max Response Time")
subscribeRtAvg = metrics.NewMetricKey(dubboSubscribeRt+avgSuffix, "Average Response Time")

/*
# HELP dubbo_metadata_store_provider_succeed_total Succeed Store Provider Metadata
# TYPE dubbo_metadata_store_provider_succeed_total gauge
dubbo_metadata_store_provider_succeed_total{application_name="metrics-provider",hostname="foghostdeMacBook-Pro.local",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService2",ip="10.252.156.213",} 1.0
dubbo_metadata_store_provider_succeed_total{application_name="metrics-provider",hostname="foghostdeMacBook-Pro.local",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="10.252.156.213",} 1.0
dubbo_metadata_store_provider_succeed_total{application_name="metrics-provider",hostname="localhost",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService2",ip="10.252.156.213",} 1.0
dubbo_metadata_store_provider_succeed_total{application_name="metrics-provider",hostname="localhost",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="10.252.156.213",} 1.0
*/
// service level
metadataStoreProviderFailed = metrics.NewMetricKey("dubbo_metadata_store_provider_failed_total", "Total Failed Provider Metadata Store")
metadataStoreProviderSucceed = metrics.NewMetricKey("dubbo_metadata_store_provider_succeed_total", "Total Succeed Provider Metadata Store")
metadataStoreProvider = metrics.NewMetricKey("dubbo_metadata_store_provider_total", "Total Provider Metadata Store")
metadataStoreProviderFailed = metrics.NewMetricKey(dubboMetadataStoreProvider+failedSuffix, "Total Failed Provider Metadata Store")
metadataStoreProviderSucceed = metrics.NewMetricKey(dubboMetadataStoreProvider+succSuffix, "Total Succeed Provider Metadata Store")
metadataStoreProvider = metrics.NewMetricKey(dubboMetadataStoreProvider+totalSuffix, "Total Provider Metadata Store")

/*
# HELP dubbo_store_provider_interface_rt_milliseconds_avg Average Response Time
# TYPE dubbo_store_provider_interface_rt_milliseconds_avg gauge
dubbo_store_provider_interface_rt_milliseconds_avg{application_name="metrics-provider",application_version="3.2.1",git_commit_id="20de8b22ffb2a23531f6d9494a4963fcabd52561",hostname="foghostdeMacBook-Pro.local",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="10.252.156.213",} 504.0
dubbo_store_provider_interface_rt_milliseconds_avg{application_name="metrics-provider",application_version="3.2.1",git_commit_id="20de8b22ffb2a23531f6d9494a4963fcabd52561",hostname="foghostdeMacBook-Pro.local",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService2",ip="10.252.156.213",} 10837.0
dubbo_store_provider_interface_rt_milliseconds_avg{application_name="metrics-provider",application_version="3.2.1",git_commit_id="20de8b22ffb2a23531f6d9494a4963fcabd52561",hostname="localhost",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService",ip="10.252.156.213",} 504.0
dubbo_store_provider_interface_rt_milliseconds_avg{application_name="metrics-provider",application_version="3.2.1",git_commit_id="20de8b22ffb2a23531f6d9494a4963fcabd52561",hostname="localhost",interface="org.apache.dubbo.samples.metrics.prometheus.api.DemoService2",ip="10.252.156.213",} 10837.0
*/
// service level
storeProviderInterfaceRtAvg = metrics.NewMetricKey("dubbo_store_provider_interface_rt_milliseconds_avg", "Average Store Provider Interface Time")
storeProviderInterfaceRtLast = metrics.NewMetricKey("dubbo_store_provider_interface_rt_milliseconds_last", "Last Store Provider Interface Time")
storeProviderInterfaceRtMax = metrics.NewMetricKey("dubbo_store_provider_interface_rt_milliseconds_max", "Max Store Provider Interface Time")
storeProviderInterfaceRtMin = metrics.NewMetricKey("dubbo_store_provider_interface_rt_milliseconds_min", "Min Store Provider Interface Time")
storeProviderInterfaceRtSum = metrics.NewMetricKey("dubbo_store_provider_interface_rt_milliseconds_sum", "Sum Store Provider Interface Time")
storeProviderInterfaceRtAvg = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+avgSuffix, "Average Store Provider Interface Time")
storeProviderInterfaceRtLast = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+lastSuffix, "Last Store Provider Interface Time")
storeProviderInterfaceRtMax = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+maxSuffix, "Max Store Provider Interface Time")
storeProviderInterfaceRtMin = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+minSuffix, "Min Store Provider Interface Time")
storeProviderInterfaceRtSum = metrics.NewMetricKey(dubboStoreProviderInterfaceRt+sumSuffix, "Sum Store Provider Interface Time")

subscribeServiceRtLast = metrics.NewMetricKey("dubbo_subscribe_service_rt_milliseconds_last", "Last Subscribe Service Time")
subscribeServiceRtMax = metrics.NewMetricKey("dubbo_subscribe_service_rt_milliseconds_max", "Max Subscribe Service Time")
subscribeServiceRtMin = metrics.NewMetricKey("dubbo_subscribe_service_rt_milliseconds_min", "Min Subscribe Service Time")
subscribeServiceRtSum = metrics.NewMetricKey("dubbo_subscribe_service_rt_milliseconds_sum", "Sum Subscribe Service Time")
subscribeServiceRtAvg = metrics.NewMetricKey("dubbo_subscribe_service_rt_milliseconds_avg", "Average Subscribe Service Time")
subscribeServiceRtLast = metrics.NewMetricKey(dubboSubscribeServiceRt+lastSuffix, "Last Subscribe Service Time")
subscribeServiceRtMax = metrics.NewMetricKey(dubboSubscribeServiceRt+maxSuffix, "Max Subscribe Service Time")
subscribeServiceRtMin = metrics.NewMetricKey(dubboSubscribeServiceRt+minSuffix, "Min Subscribe Service Time")
subscribeServiceRtSum = metrics.NewMetricKey(dubboSubscribeServiceRt+sumSuffix, "Sum Subscribe Service Time")
subscribeServiceRtAvg = metrics.NewMetricKey(dubboSubscribeServiceRt+avgSuffix, "Average Subscribe Service Time")
)
Loading
Loading