Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: split metrics module into multiple files #2310

Merged
merged 2 commits into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ const (

const (
ApplicationKey = "application"
ApplicationNameKey = "application_name"
HostnameKey = "hostname"
IpKey = "ip"
OrganizationKey = "organization"
NameKey = "name"
ModuleKey = "module"
Expand Down
11 changes: 3 additions & 8 deletions common/extension/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,10 @@ import (
"context"
"testing"
"time"
)

import (
"github.com/stretchr/testify/assert"
)

import (
"dubbo.apache.org/dubbo-go/v3/metrics"
"dubbo.apache.org/dubbo-go/v3/protocol"
"github.com/stretchr/testify/assert"
)

func TestGetMetricReporter(t *testing.T) {
Expand All @@ -44,6 +39,6 @@ func TestGetMetricReporter(t *testing.T) {

type mockReporter struct{}

// Report method for feature expansion
func (m mockReporter) Report(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
// implement the interface of Reporter
func (m mockReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
}
9 changes: 3 additions & 6 deletions config/metric_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,16 @@ import (

"github.com/dubbogo/gost/log/logger"

"github.com/pkg/errors"
)

import (
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/metrics"
"github.com/pkg/errors"
)

// MetricConfig This is the config struct for all metrics implementation
type MetricConfig struct {
Mode string `default:"pull" yaml:"mode" json:"mode,omitempty" property:"mode"` // push or pull,
Namespace string `default:"dubbo" yaml:"namespace" json:"namespace,omitempty" property:"namespace"`
Enable bool `default:"true" yaml:"enable" json:"enable,omitempty" property:"enable"`
Enable *bool `default:"true" yaml:"enable" json:"enable,omitempty" property:"enable"`
Port string `default:"9090" yaml:"port" json:"port,omitempty" property:"port"`
Path string `default:"/metrics" yaml:"path" json:"path,omitempty" property:"path"`
PushGatewayAddress string `default:"" yaml:"push-gateway-address" json:"push-gateway-address,omitempty" property:"push-gateway-address"`
Expand All @@ -50,7 +47,7 @@ func (mc *MetricConfig) ToReporterConfig() *metrics.ReporterConfig {
defaultMetricsReportConfig.Namespace = mc.Namespace
}

defaultMetricsReportConfig.Enable = mc.Enable
defaultMetricsReportConfig.Enable = *mc.Enable
defaultMetricsReportConfig.Port = mc.Port
defaultMetricsReportConfig.Path = mc.Path
defaultMetricsReportConfig.PushGatewayAddress = mc.PushGatewayAddress
Expand Down
4 changes: 1 addition & 3 deletions filter/metrics/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ package metrics
import (
"context"
"time"
)

import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/filter"
Expand Down Expand Up @@ -52,7 +50,7 @@ func (p *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocatio
duration := end.Sub(start)
go func() {
for _, reporter := range p.reporters {
reporter.Report(ctx, invoker, invocation, duration, res)
reporter.ReportAfterInvocation(ctx, invoker, invocation, duration, res)
}
}()
return res
Expand Down
12 changes: 4 additions & 8 deletions filter/metrics/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,13 @@ import (
"sync"
"testing"
"time"
)

import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/metrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

_ "dubbo.apache.org/dubbo-go/v3/metrics/prometheus"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
Expand Down Expand Up @@ -76,7 +72,7 @@ type mockReporter struct {
wg sync.WaitGroup
}

func (m *mockReporter) Report(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
func (m *mockReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
m.Called(ctx, invoker, invocation)
m.wg.Done()
}
78 changes: 78 additions & 0 deletions metrics/prometheus/after_invocation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package prometheus

import (
"context"
"time"

"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol"
"github.com/dubbogo/gost/log/logger"
"github.com/prometheus/client_golang/prometheus"
)

func (reporter *PrometheusReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
if !reporter.reporterConfig.Enable {
return
}

url := invoker.GetURL()

var role string // provider or consumer
if isProvider(url) {
role = providerField
} else if isConsumer(url) {
role = consumerField
} else {
logger.Warnf("The url belongs neither the consumer nor the provider, "+
"so the invocation will be ignored. url: %s", url.String())
return
}
labels := prometheus.Labels{
applicationNameKey: url.GetParam(constant.ApplicationKey, ""),
groupKey: url.Group(),
hostnameKey: "",
interfaceKey: url.Service(),
ipKey: common.GetLocalIp(),
versionKey: url.GetParam(constant.AppVersionKey, ""),
methodKey: invocation.MethodName(),
}

reporter.reportRTSummaryVec(role, &labels, cost.Milliseconds())
reporter.reportRequestTotalCounterVec(role, &labels)
}

func (r *PrometheusReporter) reportRTSummaryVec(role string, labels *prometheus.Labels, costMs int64) {
switch role {
case providerField:
r.providerRTSummaryVec.With(*labels).Observe(float64(costMs))
case consumerField:
r.consumerRTSummaryVec.With(*labels).Observe(float64(costMs))
}
}

func (r *PrometheusReporter) reportRequestTotalCounterVec(role string, labels *prometheus.Labels) {
switch role {
case providerField:
r.providerRequestTotalCounterVec.With(*labels).Inc()
case consumerField:
r.consumerRequestTotalCounterVec.With(*labels).Inc()
}
}
199 changes: 199 additions & 0 deletions metrics/prometheus/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package prometheus

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
)

type syncMaps struct {
userGauge sync.Map
userSummary sync.Map
userCounter sync.Map
userCounterVec sync.Map
userGaugeVec sync.Map
userSummaryVec sync.Map
}

// setGauge set gauge to target value with given label, if label is not empty, set gauge vec
// if target gauge/gaugevec not exist, just create new gauge and set the value
func (reporter *PrometheusReporter) setGauge(gaugeName string, toSetValue float64, labelMap prometheus.Labels) {
if len(labelMap) == 0 {
// gauge
if val, exist := reporter.userGauge.Load(gaugeName); !exist {
gauge := newGauge(gaugeName, reporter.namespace)
err := prometheus.DefaultRegisterer.Register(gauge)
if err == nil {
reporter.userGauge.Store(gaugeName, gauge)
gauge.Set(toSetValue)
} else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
// A gauge for that metric has been registered before.
// Use the old gauge from now on.
are.ExistingCollector.(prometheus.Gauge).Set(toSetValue)
}

} else {
val.(prometheus.Gauge).Set(toSetValue)
}
return
}

// gauge vec
if val, exist := reporter.userGaugeVec.Load(gaugeName); !exist {
keyList := make([]string, 0)
for k := range labelMap {
keyList = append(keyList, k)
}
gaugeVec := newGaugeVec(gaugeName, reporter.namespace, keyList)
err := prometheus.DefaultRegisterer.Register(gaugeVec)
if err == nil {
reporter.userGaugeVec.Store(gaugeName, gaugeVec)
gaugeVec.With(labelMap).Set(toSetValue)
} else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
// A gauge for that metric has been registered before.
// Use the old gauge from now on.
are.ExistingCollector.(*prometheus.GaugeVec).With(labelMap).Set(toSetValue)
}
} else {
val.(*prometheus.GaugeVec).With(labelMap).Set(toSetValue)
}
}

// incCounter inc counter to inc if label is not empty, set counter vec
// if target counter/counterVec not exist, just create new counter and inc the value
func (reporter *PrometheusReporter) incCounter(counterName string, labelMap prometheus.Labels) {
if len(labelMap) == 0 {
// counter
if val, exist := reporter.userCounter.Load(counterName); !exist {
counter := newCounter(counterName, reporter.namespace)
err := prometheus.DefaultRegisterer.Register(counter)
if err == nil {
reporter.userCounter.Store(counterName, counter)
counter.Inc()
} else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
// A counter for that metric has been registered before.
// Use the old counter from now on.
are.ExistingCollector.(prometheus.Counter).Inc()
}
} else {
val.(prometheus.Counter).Inc()
}
return
}

// counter vec inc
if val, exist := reporter.userCounterVec.Load(counterName); !exist {
keyList := make([]string, 0)
for k := range labelMap {
keyList = append(keyList, k)
}
counterVec := newCounterVec(counterName, reporter.namespace, keyList)
err := prometheus.DefaultRegisterer.Register(counterVec)
if err == nil {
reporter.userCounterVec.Store(counterName, counterVec)
counterVec.With(labelMap).Inc()
} else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
// A counter for that metric has been registered before.
// Use the old counter from now on.
are.ExistingCollector.(*prometheus.CounterVec).With(labelMap).Inc()
}
} else {
val.(*prometheus.CounterVec).With(labelMap).Inc()
}
}

// incSummary inc summary to target value with given label, if label is not empty, set summary vec
// if target summary/summaryVec not exist, just create new summary and set the value
func (reporter *PrometheusReporter) incSummary(summaryName string, toSetValue float64, labelMap prometheus.Labels) {
if len(labelMap) == 0 {
// summary
if val, exist := reporter.userSummary.Load(summaryName); !exist {
summary := newSummary(summaryName, reporter.namespace)
err := prometheus.DefaultRegisterer.Register(summary)
if err == nil {
reporter.userSummary.Store(summaryName, summary)
summary.Observe(toSetValue)
} else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
// A summary for that metric has been registered before.
// Use the old summary from now on.
are.ExistingCollector.(prometheus.Summary).Observe(toSetValue)
}
} else {
val.(prometheus.Summary).Observe(toSetValue)
}
return
}

// summary vec
if val, exist := reporter.userSummaryVec.Load(summaryName); !exist {
keyList := make([]string, 0)
for k := range labelMap {
keyList = append(keyList, k)
}
summaryVec := newSummaryVec(summaryName, reporter.namespace, keyList, reporter.reporterConfig.SummaryMaxAge)
err := prometheus.DefaultRegisterer.Register(summaryVec)
if err == nil {
reporter.userSummaryVec.Store(summaryName, summaryVec)
summaryVec.With(labelMap).Observe(toSetValue)
} else if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
// A summary for that metric has been registered before.
// Use the old summary from now on.
are.ExistingCollector.(*prometheus.SummaryVec).With(labelMap).Observe(toSetValue)
}
} else {
val.(*prometheus.SummaryVec).With(labelMap).Observe(toSetValue)
}
}

func SetGaugeWithLabel(gaugeName string, val float64, label prometheus.Labels) {
if reporterInstance.reporterConfig.Enable {
reporterInstance.setGauge(gaugeName, val, label)
}
}

func SetGauge(gaugeName string, val float64) {
if reporterInstance.reporterConfig.Enable {
reporterInstance.setGauge(gaugeName, val, make(prometheus.Labels))
}
}

func IncCounterWithLabel(counterName string, label prometheus.Labels) {
if reporterInstance.reporterConfig.Enable {
reporterInstance.incCounter(counterName, label)
}
}

func IncCounter(summaryName string) {
if reporterInstance.reporterConfig.Enable {
reporterInstance.incCounter(summaryName, make(prometheus.Labels))
}
}

func IncSummaryWithLabel(counterName string, val float64, label prometheus.Labels) {
if reporterInstance.reporterConfig.Enable {
reporterInstance.incSummary(counterName, val, label)
}
}

func IncSummary(summaryName string, val float64) {
if reporterInstance.reporterConfig.Enable {
reporterInstance.incSummary(summaryName, val, make(prometheus.Labels))
}
}
Loading