Skip to content

Commit

Permalink
Merge 0d4b7c7 into 10336a5
Browse files Browse the repository at this point in the history
  • Loading branch information
ev1lQuark committed Jul 20, 2023
2 parents 10336a5 + 0d4b7c7 commit f59f68b
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 10 deletions.
2 changes: 2 additions & 0 deletions metrics/prometheus/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
providerField = "provider"
consumerField = "consumer"

qpsField = "qps"
requestsField = "requests"
rtField = "rt"

Expand All @@ -48,6 +49,7 @@ const (
lastField = "last"

totalField = "total"
aggregateField = "aggregate"
processingField = "processing"
succeedField = "succeed"
)
Expand Down
42 changes: 33 additions & 9 deletions metrics/prometheus/metric_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,47 +43,71 @@ func (ms *metricSet) init(reporterConfig *metrics.ReporterConfig) {
}

type rpcCommonMetrics struct {
requestsTotal *prometheus.CounterVec
requestsProcessingTotal *prometheus.GaugeVec
requestsSucceedTotal *prometheus.CounterVec
rtMillisecondsMin *GaugeVecWithSyncMap
rtMillisecondsMax *GaugeVecWithSyncMap
rtMillisecondsSum *prometheus.CounterVec
rtMillisecondsAvg *GaugeVecWithSyncMap
rtMillisecondsLast *prometheus.GaugeVec
rtMillisecondsQuantiles *quantileGaugeVec
qpsTotal *qpsGaugeVec
requestsTotal *prometheus.CounterVec
requestsTotalAggregate *aggregateCounterGaugeVec
requestsProcessingTotal *prometheus.GaugeVec
requestsSucceedTotal *prometheus.CounterVec
requestsSucceedTotalAggregate *aggregateCounterGaugeVec
rtMillisecondsMin *GaugeVecWithSyncMap
rtMillisecondsMax *GaugeVecWithSyncMap
rtMillisecondsSum *prometheus.CounterVec
rtMillisecondsAvg *GaugeVecWithSyncMap
rtMillisecondsLast *prometheus.GaugeVec
rtMillisecondsQuantiles *quantileGaugeVec
rtMillisecondsAggregate *aggregateFunctionsGaugeVec
}

type providerMetrics struct {
rpcCommonMetrics
}

func (pm *providerMetrics) init(reporterConfig *metrics.ReporterConfig) {
pm.qpsTotal = newQpsGaugeVec(buildMetricsName(providerField, qpsField, totalField), reporterConfig.Namespace, labelNames)
pm.requestsTotal = newAutoCounterVec(buildMetricsName(providerField, requestsField, totalField), reporterConfig.Namespace, labelNames)
pm.requestsTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(providerField, requestsField, totalField, aggregateField), reporterConfig.Namespace, labelNames)
pm.requestsProcessingTotal = newAutoGaugeVec(buildMetricsName(providerField, requestsField, processingField, totalField), reporterConfig.Namespace, labelNames)
pm.requestsSucceedTotal = newAutoCounterVec(buildMetricsName(providerField, requestsField, succeedField, totalField), reporterConfig.Namespace, labelNames)
pm.requestsSucceedTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(providerField, requestsField, succeedField, totalField, aggregateField), reporterConfig.Namespace, labelNames)
pm.rtMillisecondsMin = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, minField), reporterConfig.Namespace, labelNames)
pm.rtMillisecondsMax = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, maxField), reporterConfig.Namespace, labelNames)
pm.rtMillisecondsSum = newAutoCounterVec(buildMetricsName(providerField, rtField, milliSecondsField, sumField), reporterConfig.Namespace, labelNames)
pm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames)
pm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(providerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames)
pm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(providerField, quantiles), reporterConfig.Namespace, labelNames, quantiles)
pm.rtMillisecondsAggregate = newAggregateFunctionsGaugeVec(
buildMetricsName(providerField, rtField, minField, milliSecondsField, aggregateField),
buildMetricsName(providerField, rtField, maxField, milliSecondsField, aggregateField),
buildMetricsName(providerField, rtField, avgField, milliSecondsField, aggregateField),
reporterConfig.Namespace,
labelNames,
)
}

type consumerMetrics struct {
rpcCommonMetrics
}

func (cm *consumerMetrics) init(reporterConfig *metrics.ReporterConfig) {
cm.qpsTotal = newQpsGaugeVec(buildMetricsName(consumerField, qpsField, totalField), reporterConfig.Namespace, labelNames)
cm.requestsTotal = newAutoCounterVec(buildMetricsName(consumerField, requestsField, totalField), reporterConfig.Namespace, labelNames)
cm.requestsTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(consumerField, requestsField, totalField, aggregateField), reporterConfig.Namespace, labelNames)
cm.requestsProcessingTotal = newAutoGaugeVec(buildMetricsName(consumerField, requestsField, processingField, totalField), reporterConfig.Namespace, labelNames)
cm.requestsSucceedTotal = newAutoCounterVec(buildMetricsName(consumerField, requestsField, succeedField, totalField), reporterConfig.Namespace, labelNames)
cm.requestsSucceedTotalAggregate = newAggregateCounterGaugeVec(buildMetricsName(consumerField, requestsField, succeedField, totalField, aggregateField), reporterConfig.Namespace, labelNames)
cm.rtMillisecondsMin = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, minField), reporterConfig.Namespace, labelNames)
cm.rtMillisecondsMax = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, maxField), reporterConfig.Namespace, labelNames)
cm.rtMillisecondsSum = newAutoCounterVec(buildMetricsName(consumerField, rtField, milliSecondsField, sumField), reporterConfig.Namespace, labelNames)
cm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames)
cm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(consumerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames)
cm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(consumerField, quantiles), reporterConfig.Namespace, labelNames, quantiles)
cm.rtMillisecondsAggregate = newAggregateFunctionsGaugeVec(
buildMetricsName(consumerField, rtField, minField, milliSecondsField, aggregateField),
buildMetricsName(consumerField, rtField, maxField, milliSecondsField, aggregateField),
buildMetricsName(consumerField, rtField, avgField, milliSecondsField, aggregateField),
reporterConfig.Namespace,
labelNames,
)
}

// buildMetricsName builds metrics name split by "_".
Expand Down
88 changes: 88 additions & 0 deletions metrics/prometheus/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,91 @@ func (gv *quantileGaugeVec) updateQuantile(labels *prometheus.Labels, curValue i
updateFunc(cur)
}
}

type qpsGaugeVec struct {
gaugeVec *prometheus.GaugeVec
syncMap *sync.Map // key: labels string, value: TimeWindowCounter
}

func newQpsGaugeVec(name, namespace string, labels []string) *qpsGaugeVec {
return &qpsGaugeVec{
gaugeVec: newAutoGaugeVec(name, namespace, labels),
syncMap: &sync.Map{},
}
}

func (gv *qpsGaugeVec) updateQps(labels *prometheus.Labels) {
key := convertLabelsToMapKey(*labels)
cur := aggregate.NewTimeWindowCounter(10, 120)
cur.Inc()

if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded {
store := actual.(*aggregate.TimeWindowCounter)
store.Inc()
gv.gaugeVec.With(*labels).Set(store.Count() / float64(store.LivedSeconds()))
} else {
gv.gaugeVec.With(*labels).Set(cur.Count() / float64(cur.LivedSeconds()))
}
}

type aggregateCounterGaugeVec struct {
gaugeVec *prometheus.GaugeVec
syncMap *sync.Map // key: labels string, value: TimeWindowCounter
}

func newAggregateCounterGaugeVec(name, namespace string, labels []string) *aggregateCounterGaugeVec {
return &aggregateCounterGaugeVec{
gaugeVec: newAutoGaugeVec(name, namespace, labels),
syncMap: &sync.Map{},
}
}

func (gv *aggregateCounterGaugeVec) inc(labels *prometheus.Labels) {
key := convertLabelsToMapKey(*labels)
cur := aggregate.NewTimeWindowCounter(10, 120)
cur.Inc()

if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded {
store := actual.(*aggregate.TimeWindowCounter)
store.Inc()
gv.gaugeVec.With(*labels).Set(store.Count())
} else {
gv.gaugeVec.With(*labels).Set(cur.Count())
}
}

type aggregateFunctionsGaugeVec struct {
min *prometheus.GaugeVec
max *prometheus.GaugeVec
avg *prometheus.GaugeVec
syncMap *sync.Map // key: labels string, value: TimeWindowAggregator
}

func newAggregateFunctionsGaugeVec(minName, maxName, avgName, namespace string, labels []string) *aggregateFunctionsGaugeVec {
return &aggregateFunctionsGaugeVec{
min: newAutoGaugeVec(minName, namespace, labels),
max: newAutoGaugeVec(maxName, namespace, labels),
avg: newAutoGaugeVec(avgName, namespace, labels),
syncMap: &sync.Map{},
}
}

func (gv *aggregateFunctionsGaugeVec) update(labels *prometheus.Labels, curValue int64) {
key := convertLabelsToMapKey(*labels)
cur := aggregate.NewTimeWindowAggregator(10, 120)
cur.Add(float64(curValue))

updateFunc := func(aggregator *aggregate.TimeWindowAggregator) {
gv.min.With(*labels).Set(aggregator.Result().Min)
gv.max.With(*labels).Set(aggregator.Result().Max)
gv.avg.With(*labels).Set(aggregator.Result().Avg)
}

if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded {
store := actual.(*aggregate.TimeWindowAggregator)
store.Add(float64(curValue))
updateFunc(store)
} else {
updateFunc(cur)
}
}
17 changes: 16 additions & 1 deletion metrics/prometheus/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (reporter *PrometheusReporter) ReportBeforeInvocation(ctx context.Context,
return
}
labels := buildLabels(url)

reporter.incQpsTotal(role, &labels)
reporter.incRequestsProcessingTotal(role, &labels)
}

Expand All @@ -142,12 +142,23 @@ func (reporter *PrometheusReporter) ReportAfterInvocation(ctx context.Context, i
}
}

func (reporter *PrometheusReporter) incQpsTotal(role string, labels *prometheus.Labels) {
switch role {
case providerField:
reporter.provider.qpsTotal.updateQps(labels)
case consumerField:
reporter.consumer.qpsTotal.updateQps(labels)
}
}

func (reporter *PrometheusReporter) incRequestsTotal(role string, labels *prometheus.Labels) {
switch role {
case providerField:
reporter.provider.requestsTotal.With(*labels).Inc()
reporter.provider.requestsTotalAggregate.inc(labels)
case consumerField:
reporter.consumer.requestsTotal.With(*labels).Inc()
reporter.consumer.requestsTotalAggregate.inc(labels)
}
}

Expand All @@ -173,8 +184,10 @@ func (reporter *PrometheusReporter) incRequestsSucceedTotal(role string, labels
switch role {
case providerField:
reporter.provider.requestsSucceedTotal.With(*labels).Inc()
reporter.provider.requestsSucceedTotalAggregate.inc(labels)
case consumerField:
reporter.consumer.requestsSucceedTotal.With(*labels).Inc()
reporter.consumer.requestsSucceedTotalAggregate.inc(labels)
}
}

Expand All @@ -187,12 +200,14 @@ func (reporter *PrometheusReporter) reportRTMilliseconds(role string, labels *pr
go reporter.provider.rtMillisecondsMax.updateMax(labels, costMs)
go reporter.provider.rtMillisecondsAvg.updateAvg(labels, costMs)
go reporter.provider.rtMillisecondsQuantiles.updateQuantile(labels, costMs)
go reporter.provider.rtMillisecondsAggregate.update(labels, costMs)
case consumerField:
go reporter.consumer.rtMillisecondsLast.With(*labels).Set(float64(costMs))
go reporter.consumer.rtMillisecondsSum.With(*labels).Add(float64(costMs))
go reporter.consumer.rtMillisecondsMin.updateMin(labels, costMs)
go reporter.consumer.rtMillisecondsMax.updateMax(labels, costMs)
go reporter.consumer.rtMillisecondsAvg.updateAvg(labels, costMs)
go reporter.consumer.rtMillisecondsQuantiles.updateQuantile(labels, costMs)
go reporter.consumer.rtMillisecondsAggregate.update(labels, costMs)
}
}

0 comments on commit f59f68b

Please sign in to comment.