Skip to content

Commit

Permalink
Merge 0c7ddbe into 95b463b
Browse files Browse the repository at this point in the history
  • Loading branch information
ev1lQuark authored Jun 10, 2023
2 parents 95b463b + 0c7ddbe commit ad55d34
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 54 deletions.
5 changes: 4 additions & 1 deletion common/extension/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,8 @@ func TestGetMetricReporter(t *testing.T) {
type mockReporter struct{}

// implement the interface of Reporter
func (m mockReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
func (m *mockReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
}

func (m *mockReporter) ReportBeforeInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) {
}
5 changes: 5 additions & 0 deletions filter/metrics/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type Filter struct {

// Invoke collect the duration of invocation and then report the duration by using goroutine
func (p *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
go func() {
for _, reporter := range p.reporters {
reporter.ReportBeforeInvocation(ctx, invoker, invocation)
}
}()
start := time.Now()
res := invoker.Invoke(ctx, invocation)
end := time.Now()
Expand Down
5 changes: 5 additions & 0 deletions filter/metrics/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,8 @@ func (m *mockReporter) ReportAfterInvocation(ctx context.Context, invoker protoc
m.Called(ctx, invoker, invocation)
m.wg.Done()
}

func (m *mockReporter) ReportBeforeInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) {
m.Called(ctx, invoker, invocation)
m.wg.Done()
}
50 changes: 8 additions & 42 deletions metrics/prometheus/after_invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,61 +23,27 @@ import (
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

import (
"github.com/dubbogo/gost/log/logger"
"github.com/prometheus/client_golang/prometheus"
)

func (reporter *PrometheusReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
if !reporter.reporterConfig.Enable {
return
}

url := invoker.GetURL()

var role string // provider or consumer
if isProvider(url) {
role = providerField
} else if isConsumer(url) {
role = consumerField
} else {
logger.Warnf("The url belongs neither the consumer nor the provider, "+
"so the invocation will be ignored. url: %s", url.String())
role := getRole(url)
if role == "" {
return
}
labels := prometheus.Labels{
applicationNameKey: url.GetParam(constant.ApplicationKey, ""),
groupKey: url.Group(),
hostnameKey: "",
interfaceKey: url.Service(),
ipKey: common.GetLocalIp(),
versionKey: url.GetParam(constant.AppVersionKey, ""),
methodKey: invocation.MethodName(),
}
labels := buildLabels(url)

reporter.reportRTSummaryVec(role, &labels, cost.Milliseconds())
reporter.reportRequestTotalCounterVec(role, &labels)
}

func (r *PrometheusReporter) reportRTSummaryVec(role string, labels *prometheus.Labels, costMs int64) {
switch role {
case providerField:
r.providerRTSummaryVec.With(*labels).Observe(float64(costMs))
case consumerField:
r.consumerRTSummaryVec.With(*labels).Observe(float64(costMs))
}
}
reporter.reportRequestsTotalCounterVec(role, &labels)
reporter.decRequestsProcessingTotalGaugeVec(role, &labels)

func (r *PrometheusReporter) reportRequestTotalCounterVec(role string, labels *prometheus.Labels) {
switch role {
case providerField:
r.providerRequestTotalCounterVec.With(*labels).Inc()
case consumerField:
r.consumerRequestTotalCounterVec.With(*labels).Inc()
if res != nil && res.Error() == nil {
// succeed
reporter.incRequestsSucceedTotalCounterVec(role, &labels)
}
}
22 changes: 22 additions & 0 deletions metrics/prometheus/before_invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,25 @@
*/

package prometheus

import (
"context"
)
import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)

func (reporter *PrometheusReporter) ReportBeforeInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) {
if !reporter.reporterConfig.Enable {
return
}
url := invoker.GetURL()

role := getRole(url)
if role == "" {
return
}
labels := buildLabels(url)

reporter.incRequestsProcessingTotalGaugeVec(role, &labels)
}
30 changes: 30 additions & 0 deletions metrics/prometheus/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
)

import (
"github.com/dubbogo/gost/log/logger"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
Expand All @@ -33,6 +34,35 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
)

var (
defaultHistogramBucket = []float64{10, 50, 100, 200, 500, 1000, 10000}
)

func buildLabels(url *common.URL) prometheus.Labels {
return prometheus.Labels{
applicationNameKey: url.GetParam(constant.ApplicationKey, ""),
groupKey: url.Group(),
hostnameKey: "not implemented yet",
interfaceKey: url.Service(),
ipKey: common.GetLocalIp(),
versionKey: url.GetParam(constant.AppVersionKey, ""),
methodKey: url.GetParam(constant.MethodKey, ""),
}
}

// return the role of the application, provider or consumer, if the url is not a valid one, return empty string
func getRole(url *common.URL) (role string) {
if isProvider(url) {
role = providerField
} else if isConsumer(url) {
role = consumerField
} else {
logger.Warnf("The url belongs neither the consumer nor the provider, "+
"so the invocation will be ignored. url: %s", url.String())
}
return
}

// isProvider shows whether this url represents the application received the request as server
func isProvider(url *common.URL) bool {
role := url.GetParam(constant.RegistryRoleKey, "")
Expand Down
4 changes: 3 additions & 1 deletion metrics/prometheus/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,7 @@ const (
counterField = "counter"
summaryField = "summary"

totalField = "total"
totalField = "total"
processingField = "processing"
succeedField = "succeed"
)
24 changes: 17 additions & 7 deletions metrics/prometheus/metric_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,20 @@ type metricSet struct {
consumerRTSummaryVec *prometheus.SummaryVec
// report the provider-side's rt gauge data
providerRTSummaryVec *prometheus.SummaryVec

// report the provider-side's request total counter data
providerRequestTotalCounterVec *prometheus.CounterVec
// report the consumer-side's request total counter data
consumerRequestTotalCounterVec *prometheus.CounterVec
providerRequestsTotalCounterVec *prometheus.CounterVec
// report the provider-side's processing request counter data
// providerRequestProcessingGaugeVec *prometheus.GaugeVec
providerRequestsProcessingTotalGaugeVec *prometheus.GaugeVec
// The number of requests successfully received by the provider
providerRequestsSucceedTotalCounterVec *prometheus.CounterVec

// report the consumer-side's request total counter data
consumerRequestsTotalCounterVec *prometheus.CounterVec
// report the consumer-side's processing request counter data
// consumerRequestProcessingGaugeVec *prometheus.GaugeVec
consumerRequestsProcessingTotalGaugeVec *prometheus.GaugeVec
// The number of successful requests sent by consumers
consumerRequestsSucceedTotalCounterVec *prometheus.CounterVec
}

var labelNames = []string{applicationNameKey, groupKey, hostnameKey, interfaceKey, ipKey, methodKey, versionKey}
Expand All @@ -51,8 +57,12 @@ var labelNames = []string{applicationNameKey, groupKey, hostnameKey, interfaceKe
func (ms *metricSet) initAndRegister(reporterConfig *metrics.ReporterConfig) {
ms.consumerRTSummaryVec = newAutoSummaryVec(buildMetricsName(consumerField, rtField, milliSecondsField, summaryField), reporterConfig.Namespace, labelNames, reporterConfig.SummaryMaxAge)
ms.providerRTSummaryVec = newAutoSummaryVec(buildMetricsName(providerField, rtField, milliSecondsField, summaryField), reporterConfig.Namespace, labelNames, reporterConfig.SummaryMaxAge)
ms.consumerRequestTotalCounterVec = newAutoCounterVec(buildMetricsName(consumerField, requestsField, totalField), reporterConfig.Namespace, labelNames)
ms.providerRequestTotalCounterVec = newAutoCounterVec(buildMetricsName(providerField, requestsField, totalField), reporterConfig.Namespace, labelNames)
ms.consumerRequestsTotalCounterVec = newAutoCounterVec(buildMetricsName(consumerField, requestsField, totalField), reporterConfig.Namespace, labelNames)
ms.providerRequestsTotalCounterVec = newAutoCounterVec(buildMetricsName(providerField, requestsField, totalField), reporterConfig.Namespace, labelNames)
ms.consumerRequestsProcessingTotalGaugeVec = newAutoGaugeVec(buildMetricsName(consumerField, requestsField, processingField, totalField), reporterConfig.Namespace, labelNames)
ms.providerRequestsProcessingTotalGaugeVec = newAutoGaugeVec(buildMetricsName(providerField, requestsField, processingField, totalField), reporterConfig.Namespace, labelNames)
ms.consumerRequestsSucceedTotalCounterVec = newAutoCounterVec(buildMetricsName(consumerField, requestsField, succeedField, totalField), reporterConfig.Namespace, labelNames)
ms.providerRequestsSucceedTotalCounterVec = newAutoCounterVec(buildMetricsName(providerField, requestsField, succeedField, totalField), reporterConfig.Namespace, labelNames)
}

func buildMetricsName(args ...string) string {
Expand Down
51 changes: 48 additions & 3 deletions metrics/prometheus/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

import (
"github.com/dubbogo/gost/log/logger"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

Expand All @@ -34,9 +35,8 @@ import (
)

var (
reporterInstance *PrometheusReporter
reporterInitOnce sync.Once
defaultHistogramBucket = []float64{10, 50, 100, 200, 500, 1000, 10000}
reporterInstance *PrometheusReporter
reporterInitOnce sync.Once
)

// should initialize after loading configuration
Expand Down Expand Up @@ -102,3 +102,48 @@ func (reporter *PrometheusReporter) shutdownServer() {
}
}
}

func (reporter *PrometheusReporter) reportRTSummaryVec(role string, labels *prometheus.Labels, costMs int64) {
switch role {
case providerField:
reporter.providerRTSummaryVec.With(*labels).Observe(float64(costMs))
case consumerField:
reporter.consumerRTSummaryVec.With(*labels).Observe(float64(costMs))
}
}

func (reporter *PrometheusReporter) reportRequestsTotalCounterVec(role string, labels *prometheus.Labels) {
switch role {
case providerField:
reporter.providerRequestsTotalCounterVec.With(*labels).Inc()
case consumerField:
reporter.consumerRequestsTotalCounterVec.With(*labels).Inc()
}
}

func (reporter *PrometheusReporter) incRequestsProcessingTotalGaugeVec(role string, labels *prometheus.Labels) {
switch role {
case providerField:
reporter.providerRequestsProcessingTotalGaugeVec.With(*labels).Inc()
case consumerField:
reporter.consumerRequestsProcessingTotalGaugeVec.With(*labels).Inc()
}
}

func (reporter *PrometheusReporter) decRequestsProcessingTotalGaugeVec(role string, labels *prometheus.Labels) {
switch role {
case providerField:
reporter.providerRequestsProcessingTotalGaugeVec.With(*labels).Dec()
case consumerField:
reporter.consumerRequestsProcessingTotalGaugeVec.With(*labels).Dec()
}
}

func (reporter *PrometheusReporter) incRequestsSucceedTotalCounterVec(role string, labels *prometheus.Labels) {
switch role {
case providerField:
reporter.providerRequestsSucceedTotalCounterVec.With(*labels).Inc()
case consumerField:
reporter.consumerRequestsSucceedTotalCounterVec.With(*labels).Inc()
}
}
3 changes: 3 additions & 0 deletions metrics/prometheus/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func TestPrometheusReporter_Report(t *testing.T) {

assert.False(t, isConsumer(url))
ctx := context.Background()
reporter.ReportBeforeInvocation(ctx, invoker, inv)
reporter.ReportAfterInvocation(ctx, invoker, inv, 100*time.Millisecond, nil)

// consumer side
Expand All @@ -60,6 +61,7 @@ func TestPrometheusReporter_Report(t *testing.T) {
"BDTService&organization=ikurento.com&owner=ZX&registry.role=0&retries=&" +
"service.filter=echo%2Ctoken%2Caccesslog&timestamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100")
invoker = protocol.NewBaseInvoker(url)
reporter.ReportBeforeInvocation(ctx, invoker, inv)
reporter.ReportAfterInvocation(ctx, invoker, inv, 100*time.Millisecond, nil)

// invalid role
Expand All @@ -70,5 +72,6 @@ func TestPrometheusReporter_Report(t *testing.T) {
"BDTService&organization=ikurento.com&owner=ZX&registry.role=9&retries=&" +
"service.filter=echo%2Ctoken%2Caccesslog&timestamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100")
invoker = protocol.NewBaseInvoker(url)
reporter.ReportBeforeInvocation(ctx, invoker, inv)
reporter.ReportAfterInvocation(ctx, invoker, inv, 100*time.Millisecond, nil)
}
1 change: 1 addition & 0 deletions metrics/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,5 @@ func NewReporterConfig() *ReporterConfig {
type Reporter interface {
ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation,
cost time.Duration, res protocol.Result)
ReportBeforeInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation)
}

0 comments on commit ad55d34

Please sign in to comment.