From 5f98ac798424a7a457f4e8c1baaef1634b61f0f7 Mon Sep 17 00:00:00 2001 From: Wang Guan Date: Wed, 19 Jul 2023 15:55:51 +0800 Subject: [PATCH 1/3] feat: add QPS metrics --- metrics/prometheus/constant.go | 1 + metrics/prometheus/metric_set.go | 3 +++ metrics/prometheus/model.go | 26 ++++++++++++++++++++++++++ metrics/prometheus/reporter.go | 11 ++++++++++- 4 files changed, 40 insertions(+), 1 deletion(-) diff --git a/metrics/prometheus/constant.go b/metrics/prometheus/constant.go index 275fe25ca4..10647999e3 100644 --- a/metrics/prometheus/constant.go +++ b/metrics/prometheus/constant.go @@ -36,6 +36,7 @@ const ( providerField = "provider" consumerField = "consumer" + qpsField = "qps" requestsField = "requests" rtField = "rt" diff --git a/metrics/prometheus/metric_set.go b/metrics/prometheus/metric_set.go index 6c661c7a9a..f2f0ec633a 100644 --- a/metrics/prometheus/metric_set.go +++ b/metrics/prometheus/metric_set.go @@ -43,6 +43,7 @@ func (ms *metricSet) init(reporterConfig *metrics.ReporterConfig) { } type rpcCommonMetrics struct { + qpsTotal *qpsGaugeVec requestsTotal *prometheus.CounterVec requestsProcessingTotal *prometheus.GaugeVec requestsSucceedTotal *prometheus.CounterVec @@ -59,6 +60,7 @@ type providerMetrics struct { } func (pm *providerMetrics) init(reporterConfig *metrics.ReporterConfig) { + pm.qpsTotal = newQpsGaugeVec(buildMetricsName(providerField, qpsField, totalField), reporterConfig.Namespace, labelNames) pm.requestsTotal = newAutoCounterVec(buildMetricsName(providerField, requestsField, totalField), reporterConfig.Namespace, labelNames) pm.requestsProcessingTotal = newAutoGaugeVec(buildMetricsName(providerField, requestsField, processingField, totalField), reporterConfig.Namespace, labelNames) pm.requestsSucceedTotal = newAutoCounterVec(buildMetricsName(providerField, requestsField, succeedField, totalField), reporterConfig.Namespace, labelNames) @@ -75,6 +77,7 @@ type consumerMetrics struct { } func (cm *consumerMetrics) init(reporterConfig *metrics.ReporterConfig) { + cm.qpsTotal = newQpsGaugeVec(buildMetricsName(consumerField, qpsField, totalField), reporterConfig.Namespace, labelNames) cm.requestsTotal = newAutoCounterVec(buildMetricsName(consumerField, requestsField, totalField), reporterConfig.Namespace, labelNames) cm.requestsProcessingTotal = newAutoGaugeVec(buildMetricsName(consumerField, requestsField, processingField, totalField), reporterConfig.Namespace, labelNames) cm.requestsSucceedTotal = newAutoCounterVec(buildMetricsName(consumerField, requestsField, succeedField, totalField), reporterConfig.Namespace, labelNames) diff --git a/metrics/prometheus/model.go b/metrics/prometheus/model.go index 7ee94f27e9..8e5de8593a 100644 --- a/metrics/prometheus/model.go +++ b/metrics/prometheus/model.go @@ -298,3 +298,29 @@ func (gv *quantileGaugeVec) updateQuantile(labels *prometheus.Labels, curValue i updateFunc(cur) } } + +type qpsGaugeVec struct { + gaugeVec *prometheus.GaugeVec + syncMap *sync.Map // key: labels string, value: TimeWindowCounter +} + +func newQpsGaugeVec(name, namespace string, labels []string) *qpsGaugeVec { + return &qpsGaugeVec{ + gaugeVec: newAutoGaugeVec(name, namespace, labels), + syncMap: &sync.Map{}, + } +} + +func (cv *qpsGaugeVec) updateQps(labels *prometheus.Labels) { + key := convertLabelsToMapKey(*labels) + cur := aggregate.NewTimeWindowCounter(10, 120) + cur.Inc() + + if actual, loaded := cv.syncMap.LoadOrStore(key, cur); loaded { + store := actual.(*aggregate.TimeWindowCounter) + store.Inc() + cv.gaugeVec.With(*labels).Set(store.Count() / float64(store.LivedSeconds())) + } else { + cv.gaugeVec.With(*labels).Set(cur.Count() / float64(cur.LivedSeconds())) + } +} diff --git a/metrics/prometheus/reporter.go b/metrics/prometheus/reporter.go index c123df33d1..54e45035fb 100644 --- a/metrics/prometheus/reporter.go +++ b/metrics/prometheus/reporter.go @@ -116,7 +116,7 @@ func (reporter *PrometheusReporter) ReportBeforeInvocation(ctx context.Context, return } labels := buildLabels(url) - + reporter.incQpsTotal(role, &labels) reporter.incRequestsProcessingTotal(role, &labels) } @@ -142,6 +142,15 @@ func (reporter *PrometheusReporter) ReportAfterInvocation(ctx context.Context, i } } +func (reporter *PrometheusReporter) incQpsTotal(role string, labels *prometheus.Labels) { + switch role { + case providerField: + reporter.provider.qpsTotal.updateQps(labels) + case consumerField: + reporter.consumer.qpsTotal.updateQps(labels) + } +} + func (reporter *PrometheusReporter) incRequestsTotal(role string, labels *prometheus.Labels) { switch role { case providerField: From debb08af8f48e8913318347aa55385208271ed65 Mon Sep 17 00:00:00 2001 From: Wang Guan Date: Wed, 19 Jul 2023 19:25:48 +0800 Subject: [PATCH 2/3] feat: add rt min, max, avg aggregate metrics --- metrics/prometheus/constant.go | 1 + metrics/prometheus/metric_set.go | 15 +++++++++++ metrics/prometheus/model.go | 44 +++++++++++++++++++++++++++++--- metrics/prometheus/reporter.go | 2 ++ 4 files changed, 58 insertions(+), 4 deletions(-) diff --git a/metrics/prometheus/constant.go b/metrics/prometheus/constant.go index 10647999e3..41904dba3e 100644 --- a/metrics/prometheus/constant.go +++ b/metrics/prometheus/constant.go @@ -49,6 +49,7 @@ const ( lastField = "last" totalField = "total" + aggregateField = "aggregate" processingField = "processing" succeedField = "succeed" ) diff --git a/metrics/prometheus/metric_set.go b/metrics/prometheus/metric_set.go index f2f0ec633a..488fce1319 100644 --- a/metrics/prometheus/metric_set.go +++ b/metrics/prometheus/metric_set.go @@ -53,6 +53,7 @@ type rpcCommonMetrics struct { rtMillisecondsAvg *GaugeVecWithSyncMap rtMillisecondsLast *prometheus.GaugeVec rtMillisecondsQuantiles *quantileGaugeVec + rtMillisecondsAggregate *aggregatorGaugeVec } type providerMetrics struct { @@ -70,6 +71,13 @@ func (pm *providerMetrics) init(reporterConfig *metrics.ReporterConfig) { pm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames) pm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(providerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames) pm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(providerField, quantiles), reporterConfig.Namespace, labelNames, quantiles) + pm.rtMillisecondsAggregate = newAggregatorGaugeVec( + buildMetricsName(providerField, rtField, minField, milliSecondsField, aggregateField), + buildMetricsName(providerField, rtField, maxField, milliSecondsField, aggregateField), + buildMetricsName(providerField, rtField, avgField, milliSecondsField, aggregateField), + reporterConfig.Namespace, + labelNames, + ) } type consumerMetrics struct { @@ -87,6 +95,13 @@ func (cm *consumerMetrics) init(reporterConfig *metrics.ReporterConfig) { cm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames) cm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(consumerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames) cm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(consumerField, quantiles), reporterConfig.Namespace, labelNames, quantiles) + cm.rtMillisecondsAggregate = newAggregatorGaugeVec( + buildMetricsName(consumerField, rtField, minField, milliSecondsField, aggregateField), + buildMetricsName(consumerField, rtField, maxField, milliSecondsField, aggregateField), + buildMetricsName(consumerField, rtField, avgField, milliSecondsField, aggregateField), + reporterConfig.Namespace, + labelNames, + ) } // buildMetricsName builds metrics name split by "_". diff --git a/metrics/prometheus/model.go b/metrics/prometheus/model.go index 8e5de8593a..bda08a8315 100644 --- a/metrics/prometheus/model.go +++ b/metrics/prometheus/model.go @@ -311,16 +311,52 @@ func newQpsGaugeVec(name, namespace string, labels []string) *qpsGaugeVec { } } -func (cv *qpsGaugeVec) updateQps(labels *prometheus.Labels) { +func (gv *qpsGaugeVec) updateQps(labels *prometheus.Labels) { key := convertLabelsToMapKey(*labels) cur := aggregate.NewTimeWindowCounter(10, 120) cur.Inc() - if actual, loaded := cv.syncMap.LoadOrStore(key, cur); loaded { + if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded { store := actual.(*aggregate.TimeWindowCounter) store.Inc() - cv.gaugeVec.With(*labels).Set(store.Count() / float64(store.LivedSeconds())) + gv.gaugeVec.With(*labels).Set(store.Count() / float64(store.LivedSeconds())) } else { - cv.gaugeVec.With(*labels).Set(cur.Count() / float64(cur.LivedSeconds())) + gv.gaugeVec.With(*labels).Set(cur.Count() / float64(cur.LivedSeconds())) + } +} + +type aggregatorGaugeVec struct { + min *prometheus.GaugeVec + max *prometheus.GaugeVec + avg *prometheus.GaugeVec + syncMap *sync.Map // key: labels string, value: TimeWindowAggregator +} + +func newAggregatorGaugeVec(minName, maxName, avgName, namespace string, labels []string) *aggregatorGaugeVec { + return &aggregatorGaugeVec{ + min: newAutoGaugeVec(minName, namespace, labels), + max: newAutoGaugeVec(maxName, namespace, labels), + avg: newAutoGaugeVec(avgName, namespace, labels), + syncMap: &sync.Map{}, + } +} + +func (gv *aggregatorGaugeVec) update(labels *prometheus.Labels, curValue int64) { + key := convertLabelsToMapKey(*labels) + cur := aggregate.NewTimeWindowAggregator(10, 120) + cur.Add(float64(curValue)) + + updateFunc := func(aggregator *aggregate.TimeWindowAggregator) { + gv.min.With(*labels).Set(aggregator.Result().Min) + gv.max.With(*labels).Set(aggregator.Result().Max) + gv.avg.With(*labels).Set(aggregator.Result().Avg) + } + + if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded { + store := actual.(*aggregate.TimeWindowAggregator) + store.Add(float64(curValue)) + updateFunc(store) + } else { + updateFunc(cur) } } diff --git a/metrics/prometheus/reporter.go b/metrics/prometheus/reporter.go index 54e45035fb..db458504b7 100644 --- a/metrics/prometheus/reporter.go +++ b/metrics/prometheus/reporter.go @@ -196,6 +196,7 @@ func (reporter *PrometheusReporter) reportRTMilliseconds(role string, labels *pr go reporter.provider.rtMillisecondsMax.updateMax(labels, costMs) go reporter.provider.rtMillisecondsAvg.updateAvg(labels, costMs) go reporter.provider.rtMillisecondsQuantiles.updateQuantile(labels, costMs) + go reporter.provider.rtMillisecondsAggregate.update(labels, costMs) case consumerField: go reporter.consumer.rtMillisecondsLast.With(*labels).Set(float64(costMs)) go reporter.consumer.rtMillisecondsSum.With(*labels).Add(float64(costMs)) @@ -203,5 +204,6 @@ func (reporter *PrometheusReporter) reportRTMilliseconds(role string, labels *pr go reporter.consumer.rtMillisecondsMax.updateMax(labels, costMs) go reporter.consumer.rtMillisecondsAvg.updateAvg(labels, costMs) go reporter.consumer.rtMillisecondsQuantiles.updateQuantile(labels, costMs) + go reporter.consumer.rtMillisecondsAggregate.update(labels, costMs) } } From 0d4b7c77a1a66417acb01c9c0caefa3067dec34d Mon Sep 17 00:00:00 2001 From: Wang Guan Date: Thu, 20 Jul 2023 10:24:48 +0800 Subject: [PATCH 3/3] feat: add requests total and request succeed total aggregate metrics --- metrics/prometheus/metric_set.go | 32 ++++++++++++++++++------------ metrics/prometheus/model.go | 34 ++++++++++++++++++++++++++++---- metrics/prometheus/reporter.go | 4 ++++ 3 files changed, 53 insertions(+), 17 deletions(-) diff --git a/metrics/prometheus/metric_set.go b/metrics/prometheus/metric_set.go index 488fce1319..5d3caf2736 100644 --- a/metrics/prometheus/metric_set.go +++ b/metrics/prometheus/metric_set.go @@ -43,17 +43,19 @@ func (ms *metricSet) init(reporterConfig *metrics.ReporterConfig) { } type rpcCommonMetrics struct { - qpsTotal *qpsGaugeVec - requestsTotal *prometheus.CounterVec - requestsProcessingTotal *prometheus.GaugeVec - requestsSucceedTotal *prometheus.CounterVec - rtMillisecondsMin *GaugeVecWithSyncMap - rtMillisecondsMax *GaugeVecWithSyncMap - rtMillisecondsSum *prometheus.CounterVec - rtMillisecondsAvg *GaugeVecWithSyncMap - rtMillisecondsLast *prometheus.GaugeVec - rtMillisecondsQuantiles *quantileGaugeVec - rtMillisecondsAggregate *aggregatorGaugeVec + qpsTotal *qpsGaugeVec + requestsTotal *prometheus.CounterVec + requestsTotalAggregate *aggregateCounterGaugeVec + requestsProcessingTotal *prometheus.GaugeVec + requestsSucceedTotal *prometheus.CounterVec + requestsSucceedTotalAggregate *aggregateCounterGaugeVec + rtMillisecondsMin *GaugeVecWithSyncMap + rtMillisecondsMax *GaugeVecWithSyncMap + rtMillisecondsSum *prometheus.CounterVec + rtMillisecondsAvg *GaugeVecWithSyncMap + rtMillisecondsLast *prometheus.GaugeVec + rtMillisecondsQuantiles *quantileGaugeVec + rtMillisecondsAggregate *aggregateFunctionsGaugeVec } type providerMetrics struct { @@ -63,15 +65,17 @@ type providerMetrics struct { func (pm *providerMetrics) init(reporterConfig *metrics.ReporterConfig) { pm.qpsTotal = newQpsGaugeVec(buildMetricsName(providerField, qpsField, totalField), reporterConfig.Namespace, labelNames) pm.requestsTotal = newAutoCounterVec(buildMetricsName(providerField, requestsField, totalField), reporterConfig.Namespace, labelNames) + pm.requestsTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(providerField, requestsField, totalField, aggregateField), reporterConfig.Namespace, labelNames) pm.requestsProcessingTotal = newAutoGaugeVec(buildMetricsName(providerField, requestsField, processingField, totalField), reporterConfig.Namespace, labelNames) pm.requestsSucceedTotal = newAutoCounterVec(buildMetricsName(providerField, requestsField, succeedField, totalField), reporterConfig.Namespace, labelNames) + pm.requestsSucceedTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(providerField, requestsField, succeedField, totalField, aggregateField), reporterConfig.Namespace, labelNames) pm.rtMillisecondsMin = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, minField), reporterConfig.Namespace, labelNames) pm.rtMillisecondsMax = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, maxField), reporterConfig.Namespace, labelNames) pm.rtMillisecondsSum = newAutoCounterVec(buildMetricsName(providerField, rtField, milliSecondsField, sumField), reporterConfig.Namespace, labelNames) pm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames) pm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(providerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames) pm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(providerField, quantiles), reporterConfig.Namespace, labelNames, quantiles) - pm.rtMillisecondsAggregate = newAggregatorGaugeVec( + pm.rtMillisecondsAggregate = newAggregateFunctionsGaugeVec( buildMetricsName(providerField, rtField, minField, milliSecondsField, aggregateField), buildMetricsName(providerField, rtField, maxField, milliSecondsField, aggregateField), buildMetricsName(providerField, rtField, avgField, milliSecondsField, aggregateField), @@ -87,15 +91,17 @@ type consumerMetrics struct { func (cm *consumerMetrics) init(reporterConfig *metrics.ReporterConfig) { cm.qpsTotal = newQpsGaugeVec(buildMetricsName(consumerField, qpsField, totalField), reporterConfig.Namespace, labelNames) cm.requestsTotal = newAutoCounterVec(buildMetricsName(consumerField, requestsField, totalField), reporterConfig.Namespace, labelNames) + cm.requestsTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(consumerField, requestsField, totalField, aggregateField), reporterConfig.Namespace, labelNames) cm.requestsProcessingTotal = newAutoGaugeVec(buildMetricsName(consumerField, requestsField, processingField, totalField), reporterConfig.Namespace, labelNames) cm.requestsSucceedTotal = newAutoCounterVec(buildMetricsName(consumerField, requestsField, succeedField, totalField), reporterConfig.Namespace, labelNames) + cm.requestsSucceedTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(consumerField, requestsField, succeedField, totalField, aggregateField), reporterConfig.Namespace, labelNames) cm.rtMillisecondsMin = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, minField), reporterConfig.Namespace, labelNames) cm.rtMillisecondsMax = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, maxField), reporterConfig.Namespace, labelNames) cm.rtMillisecondsSum = newAutoCounterVec(buildMetricsName(consumerField, rtField, milliSecondsField, sumField), reporterConfig.Namespace, labelNames) cm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames) cm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(consumerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames) cm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(consumerField, quantiles), reporterConfig.Namespace, labelNames, quantiles) - cm.rtMillisecondsAggregate = newAggregatorGaugeVec( + cm.rtMillisecondsAggregate = newAggregateFunctionsGaugeVec( buildMetricsName(consumerField, rtField, minField, milliSecondsField, aggregateField), buildMetricsName(consumerField, rtField, maxField, milliSecondsField, aggregateField), buildMetricsName(consumerField, rtField, avgField, milliSecondsField, aggregateField), diff --git a/metrics/prometheus/model.go b/metrics/prometheus/model.go index bda08a8315..fbfeb7a94f 100644 --- a/metrics/prometheus/model.go +++ b/metrics/prometheus/model.go @@ -325,15 +325,41 @@ func (gv *qpsGaugeVec) updateQps(labels *prometheus.Labels) { } } -type aggregatorGaugeVec struct { +type aggregateCounterGaugeVec struct { + gaugeVec *prometheus.GaugeVec + syncMap *sync.Map // key: labels string, value: TimeWindowCounter +} + +func newAggregateCounterGaugeVec(name, namespace string, labels []string) *aggregateCounterGaugeVec { + return &aggregateCounterGaugeVec{ + gaugeVec: newAutoGaugeVec(name, namespace, labels), + syncMap: &sync.Map{}, + } +} + +func (gv *aggregateCounterGaugeVec) inc(labels *prometheus.Labels) { + key := convertLabelsToMapKey(*labels) + cur := aggregate.NewTimeWindowCounter(10, 120) + cur.Inc() + + if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded { + store := actual.(*aggregate.TimeWindowCounter) + store.Inc() + gv.gaugeVec.With(*labels).Set(store.Count()) + } else { + gv.gaugeVec.With(*labels).Set(cur.Count()) + } +} + +type aggregateFunctionsGaugeVec struct { min *prometheus.GaugeVec max *prometheus.GaugeVec avg *prometheus.GaugeVec syncMap *sync.Map // key: labels string, value: TimeWindowAggregator } -func newAggregatorGaugeVec(minName, maxName, avgName, namespace string, labels []string) *aggregatorGaugeVec { - return &aggregatorGaugeVec{ +func newAggregateFunctionsGaugeVec(minName, maxName, avgName, namespace string, labels []string) *aggregateFunctionsGaugeVec { + return &aggregateFunctionsGaugeVec{ min: newAutoGaugeVec(minName, namespace, labels), max: newAutoGaugeVec(maxName, namespace, labels), avg: newAutoGaugeVec(avgName, namespace, labels), @@ -341,7 +367,7 @@ func newAggregatorGaugeVec(minName, maxName, avgName, namespace string, labels [ } } -func (gv *aggregatorGaugeVec) update(labels *prometheus.Labels, curValue int64) { +func (gv *aggregateFunctionsGaugeVec) update(labels *prometheus.Labels, curValue int64) { key := convertLabelsToMapKey(*labels) cur := aggregate.NewTimeWindowAggregator(10, 120) cur.Add(float64(curValue)) diff --git a/metrics/prometheus/reporter.go b/metrics/prometheus/reporter.go index db458504b7..715fd97ca8 100644 --- a/metrics/prometheus/reporter.go +++ b/metrics/prometheus/reporter.go @@ -155,8 +155,10 @@ func (reporter *PrometheusReporter) incRequestsTotal(role string, labels *promet switch role { case providerField: reporter.provider.requestsTotal.With(*labels).Inc() + reporter.provider.requestsTotalAggregate.inc(labels) case consumerField: reporter.consumer.requestsTotal.With(*labels).Inc() + reporter.consumer.requestsTotalAggregate.inc(labels) } } @@ -182,8 +184,10 @@ func (reporter *PrometheusReporter) incRequestsSucceedTotal(role string, labels switch role { case providerField: reporter.provider.requestsSucceedTotal.With(*labels).Inc() + reporter.provider.requestsSucceedTotalAggregate.inc(labels) case consumerField: reporter.consumer.requestsSucceedTotal.With(*labels).Inc() + reporter.consumer.requestsSucceedTotalAggregate.inc(labels) } }