Skip to content

Commit

Permalink
refactor: split metrics module into multiple files (#2310)
Browse files Browse the repository at this point in the history
  • Loading branch information
ev1lQuark authored May 30, 2023
1 parent 9cf1b9f commit 66a152c
Show file tree
Hide file tree
Showing 14 changed files with 542 additions and 374 deletions.
3 changes: 3 additions & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ const (

const (
ApplicationKey = "application"
ApplicationNameKey = "application_name"
HostnameKey = "hostname"
IpKey = "ip"
OrganizationKey = "organization"
NameKey = "name"
ModuleKey = "module"
Expand Down
11 changes: 3 additions & 8 deletions common/extension/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,10 @@ import (
"context"
"testing"
"time"
)

import (
"github.com/stretchr/testify/assert"
)

import (
"dubbo.apache.org/dubbo-go/v3/metrics"
"dubbo.apache.org/dubbo-go/v3/protocol"
"github.com/stretchr/testify/assert"
)

func TestGetMetricReporter(t *testing.T) {
Expand All @@ -44,6 +39,6 @@ func TestGetMetricReporter(t *testing.T) {

type mockReporter struct{}

// Report method for feature expansion
func (m mockReporter) Report(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
// implement the interface of Reporter
func (m mockReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
}
9 changes: 3 additions & 6 deletions config/metric_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,16 @@ import (

"github.com/dubbogo/gost/log/logger"

"github.com/pkg/errors"
)

import (
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/metrics"
"github.com/pkg/errors"
)

// MetricConfig This is the config struct for all metrics implementation
type MetricConfig struct {
Mode string `default:"pull" yaml:"mode" json:"mode,omitempty" property:"mode"` // push or pull,
Namespace string `default:"dubbo" yaml:"namespace" json:"namespace,omitempty" property:"namespace"`
Enable bool `default:"true" yaml:"enable" json:"enable,omitempty" property:"enable"`
Enable *bool `default:"true" yaml:"enable" json:"enable,omitempty" property:"enable"`
Port string `default:"9090" yaml:"port" json:"port,omitempty" property:"port"`
Path string `default:"/metrics" yaml:"path" json:"path,omitempty" property:"path"`
PushGatewayAddress string `default:"" yaml:"push-gateway-address" json:"push-gateway-address,omitempty" property:"push-gateway-address"`
Expand All @@ -50,7 +47,7 @@ func (mc *MetricConfig) ToReporterConfig() *metrics.ReporterConfig {
defaultMetricsReportConfig.Namespace = mc.Namespace
}

defaultMetricsReportConfig.Enable = mc.Enable
defaultMetricsReportConfig.Enable = *mc.Enable
defaultMetricsReportConfig.Port = mc.Port
defaultMetricsReportConfig.Path = mc.Path
defaultMetricsReportConfig.PushGatewayAddress = mc.PushGatewayAddress
Expand Down
4 changes: 1 addition & 3 deletions filter/metrics/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ package metrics
import (
"context"
"time"
)

import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/filter"
Expand Down Expand Up @@ -52,7 +50,7 @@ func (p *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocatio
duration := end.Sub(start)
go func() {
for _, reporter := range p.reporters {
reporter.Report(ctx, invoker, invocation, duration, res)
reporter.ReportAfterInvocation(ctx, invoker, invocation, duration, res)
}
}()
return res
Expand Down
12 changes: 4 additions & 8 deletions filter/metrics/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,13 @@ import (
"sync"
"testing"
"time"
)

import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/metrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

_ "dubbo.apache.org/dubbo-go/v3/metrics/prometheus"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
Expand Down Expand Up @@ -76,7 +72,7 @@ type mockReporter struct {
wg sync.WaitGroup
}

func (m *mockReporter) Report(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
func (m *mockReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
m.Called(ctx, invoker, invocation)
m.wg.Done()
}
78 changes: 78 additions & 0 deletions metrics/prometheus/after_invocation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package prometheus

import (
"context"
"time"

"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol"
"github.com/dubbogo/gost/log/logger"
"github.com/prometheus/client_golang/prometheus"
)

func (reporter *PrometheusReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
if !reporter.reporterConfig.Enable {
return
}

url := invoker.GetURL()

var role string // provider or consumer
if isProvider(url) {
role = providerField
} else if isConsumer(url) {
role = consumerField
} else {
logger.Warnf("The url belongs neither the consumer nor the provider, "+
"so the invocation will be ignored. url: %s", url.String())
return
}
labels := prometheus.Labels{
applicationNameKey: url.GetParam(constant.ApplicationKey, ""),
groupKey: url.Group(),
hostnameKey: "",
interfaceKey: url.Service(),
ipKey: common.GetLocalIp(),
versionKey: url.GetParam(constant.AppVersionKey, ""),
methodKey: invocation.MethodName(),
}

reporter.reportRTSummaryVec(role, &labels, cost.Milliseconds())
reporter.reportRequestTotalCounterVec(role, &labels)
}

func (r *PrometheusReporter) reportRTSummaryVec(role string, labels *prometheus.Labels, costMs int64) {
switch role {
case providerField:
r.providerRTSummaryVec.With(*labels).Observe(float64(costMs))
case consumerField:
r.consumerRTSummaryVec.With(*labels).Observe(float64(costMs))
}
}

func (r *PrometheusReporter) reportRequestTotalCounterVec(role string, labels *prometheus.Labels) {
switch role {
case providerField:
r.providerRequestTotalCounterVec.With(*labels).Inc()
case consumerField:
r.consumerRequestTotalCounterVec.With(*labels).Inc()
}
}
199 changes: 199 additions & 0 deletions metrics/prometheus/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package prometheus

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
)

type syncMaps struct {
userGauge sync.Map
userSummary sync.Map
userCounter sync.Map
userCounterVec sync.Map
userGaugeVec sync.Map
userSummaryVec sync.Map
}

// setGauge set gauge to target value with given label, if label is not empty, set gauge vec
// if target gauge/gaugevec not exist, just create new gauge and set the value
func (reporter *PrometheusReporter) setGauge(gaugeName string, toSetValue float64, labelMap prometheus.Labels) {
if len(labelMap) == 0 {
// gauge
if val, exist := reporter.userGauge.Load(gaugeName); !exist {
gauge := newGauge(gaugeName, reporter.namespace)
err := prometheus.DefaultRegisterer.Register(gauge)
if err == nil {
reporter.userGauge.Store(gaugeName, gauge)
gauge.Set(toSetValue)
} else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
// A gauge for that metric has been registered before.
// Use the old gauge from now on.
are.ExistingCollector.(prometheus.Gauge).Set(toSetValue)
}

} else {
val.(prometheus.Gauge).Set(toSetValue)
}
return
}

// gauge vec
if val, exist := reporter.userGaugeVec.Load(gaugeName); !exist {
keyList := make([]string, 0)
for k := range labelMap {
keyList = append(keyList, k)
}
gaugeVec := newGaugeVec(gaugeName, reporter.namespace, keyList)
err := prometheus.DefaultRegisterer.Register(gaugeVec)
if err == nil {
reporter.userGaugeVec.Store(gaugeName, gaugeVec)
gaugeVec.With(labelMap).Set(toSetValue)
} else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
// A gauge for that metric has been registered before.
// Use the old gauge from now on.
are.ExistingCollector.(*prometheus.GaugeVec).With(labelMap).Set(toSetValue)
}
} else {
val.(*prometheus.GaugeVec).With(labelMap).Set(toSetValue)
}
}

// incCounter inc counter to inc if label is not empty, set counter vec
// if target counter/counterVec not exist, just create new counter and inc the value
func (reporter *PrometheusReporter) incCounter(counterName string, labelMap prometheus.Labels) {
if len(labelMap) == 0 {
// counter
if val, exist := reporter.userCounter.Load(counterName); !exist {
counter := newCounter(counterName, reporter.namespace)
err := prometheus.DefaultRegisterer.Register(counter)
if err == nil {
reporter.userCounter.Store(counterName, counter)
counter.Inc()
} else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
// A counter for that metric has been registered before.
// Use the old counter from now on.
are.ExistingCollector.(prometheus.Counter).Inc()
}
} else {
val.(prometheus.Counter).Inc()
}
return
}

// counter vec inc
if val, exist := reporter.userCounterVec.Load(counterName); !exist {
keyList := make([]string, 0)
for k := range labelMap {
keyList = append(keyList, k)
}
counterVec := newCounterVec(counterName, reporter.namespace, keyList)
err := prometheus.DefaultRegisterer.Register(counterVec)
if err == nil {
reporter.userCounterVec.Store(counterName, counterVec)
counterVec.With(labelMap).Inc()
} else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
// A counter for that metric has been registered before.
// Use the old counter from now on.
are.ExistingCollector.(*prometheus.CounterVec).With(labelMap).Inc()
}
} else {
val.(*prometheus.CounterVec).With(labelMap).Inc()
}
}

// incSummary inc summary to target value with given label, if label is not empty, set summary vec
// if target summary/summaryVec not exist, just create new summary and set the value
func (reporter *PrometheusReporter) incSummary(summaryName string, toSetValue float64, labelMap prometheus.Labels) {
if len(labelMap) == 0 {
// summary
if val, exist := reporter.userSummary.Load(summaryName); !exist {
summary := newSummary(summaryName, reporter.namespace)
err := prometheus.DefaultRegisterer.Register(summary)
if err == nil {
reporter.userSummary.Store(summaryName, summary)
summary.Observe(toSetValue)
} else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
// A summary for that metric has been registered before.
// Use the old summary from now on.
are.ExistingCollector.(prometheus.Summary).Observe(toSetValue)
}
} else {
val.(prometheus.Summary).Observe(toSetValue)
}
return
}

// summary vec
if val, exist := reporter.userSummaryVec.Load(summaryName); !exist {
keyList := make([]string, 0)
for k := range labelMap {
keyList = append(keyList, k)
}
summaryVec := newSummaryVec(summaryName, reporter.namespace, keyList, reporter.reporterConfig.SummaryMaxAge)
err := prometheus.DefaultRegisterer.Register(summaryVec)
if err == nil {
reporter.userSummaryVec.Store(summaryName, summaryVec)
summaryVec.With(labelMap).Observe(toSetValue)
} else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
// A summary for that metric has been registered before.
// Use the old summary from now on.
are.ExistingCollector.(*prometheus.SummaryVec).With(labelMap).Observe(toSetValue)
}
} else {
val.(*prometheus.SummaryVec).With(labelMap).Observe(toSetValue)
}
}

func SetGaugeWithLabel(gaugeName string, val float64, label prometheus.Labels) {
if reporterInstance.reporterConfig.Enable {
reporterInstance.setGauge(gaugeName, val, label)
}
}

func SetGauge(gaugeName string, val float64) {
if reporterInstance.reporterConfig.Enable {
reporterInstance.setGauge(gaugeName, val, make(prometheus.Labels))
}
}

func IncCounterWithLabel(counterName string, label prometheus.Labels) {
if reporterInstance.reporterConfig.Enable {
reporterInstance.incCounter(counterName, label)
}
}

func IncCounter(summaryName string) {
if reporterInstance.reporterConfig.Enable {
reporterInstance.incCounter(summaryName, make(prometheus.Labels))
}
}

func IncSummaryWithLabel(counterName string, val float64, label prometheus.Labels) {
if reporterInstance.reporterConfig.Enable {
reporterInstance.incSummary(counterName, val, label)
}
}

func IncSummary(summaryName string, val float64) {
if reporterInstance.reporterConfig.Enable {
reporterInstance.incSummary(summaryName, val, make(prometheus.Labels))
}
}
Loading

0 comments on commit 66a152c

Please sign in to comment.