Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add request processing total and request succeed total metrics #2331

Merged
merged 7 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion common/extension/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,8 @@ func TestGetMetricReporter(t *testing.T) {
type mockReporter struct{}

// implement the interface of Reporter
func (m mockReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
func (m *mockReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
}

func (m *mockReporter) ReportBeforeInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) {
}
5 changes: 5 additions & 0 deletions filter/metrics/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type Filter struct {

// Invoke collect the duration of invocation and then report the duration by using goroutine
func (p *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
go func() {
for _, reporter := range p.reporters {
reporter.ReportBeforeInvocation(ctx, invoker, invocation)
}
}()
start := time.Now()
res := invoker.Invoke(ctx, invocation)
end := time.Now()
Expand Down
5 changes: 5 additions & 0 deletions filter/metrics/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,8 @@ func (m *mockReporter) ReportAfterInvocation(ctx context.Context, invoker protoc
m.Called(ctx, invoker, invocation)
m.wg.Done()
}

func (m *mockReporter) ReportBeforeInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) {
m.Called(ctx, invoker, invocation)
m.wg.Done()
}
50 changes: 8 additions & 42 deletions metrics/prometheus/after_invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,61 +23,27 @@ import (
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

import (
"github.com/dubbogo/gost/log/logger"
"github.com/prometheus/client_golang/prometheus"
)

func (reporter *PrometheusReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
if !reporter.reporterConfig.Enable {
return
}

url := invoker.GetURL()

var role string // provider or consumer
if isProvider(url) {
role = providerField
} else if isConsumer(url) {
role = consumerField
} else {
logger.Warnf("The url belongs neither the consumer nor the provider, "+
"so the invocation will be ignored. url: %s", url.String())
role := getRole(url)
if role == "" {
return
}
labels := prometheus.Labels{
applicationNameKey: url.GetParam(constant.ApplicationKey, ""),
groupKey: url.Group(),
hostnameKey: "",
interfaceKey: url.Service(),
ipKey: common.GetLocalIp(),
versionKey: url.GetParam(constant.AppVersionKey, ""),
methodKey: invocation.MethodName(),
}
labels := buildLabels(url)

reporter.reportRTSummaryVec(role, &labels, cost.Milliseconds())
reporter.reportRequestTotalCounterVec(role, &labels)
}

func (r *PrometheusReporter) reportRTSummaryVec(role string, labels *prometheus.Labels, costMs int64) {
switch role {
case providerField:
r.providerRTSummaryVec.With(*labels).Observe(float64(costMs))
case consumerField:
r.consumerRTSummaryVec.With(*labels).Observe(float64(costMs))
}
}
reporter.reportRequestsTotalCounterVec(role, &labels)
reporter.decRequestsProcessingTotalGaugeVec(role, &labels)

func (r *PrometheusReporter) reportRequestTotalCounterVec(role string, labels *prometheus.Labels) {
switch role {
case providerField:
r.providerRequestTotalCounterVec.With(*labels).Inc()
case consumerField:
r.consumerRequestTotalCounterVec.With(*labels).Inc()
if res != nil && res.Error() == nil {
// succeed
reporter.incRequestsSucceedTotalCounterVec(role, &labels)
}
}
22 changes: 22 additions & 0 deletions metrics/prometheus/before_invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,25 @@
*/

package prometheus

import (
"context"
)
import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)

func (reporter *PrometheusReporter) ReportBeforeInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) {
if !reporter.reporterConfig.Enable {
return
}
url := invoker.GetURL()

role := getRole(url)
if role == "" {
return
}
labels := buildLabels(url)

reporter.incRequestsProcessingTotalGaugeVec(role, &labels)
}
30 changes: 30 additions & 0 deletions metrics/prometheus/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
)

import (
"github.com/dubbogo/gost/log/logger"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
Expand All @@ -33,6 +34,35 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
)

var (
defaultHistogramBucket = []float64{10, 50, 100, 200, 500, 1000, 10000}
)

func buildLabels(url *common.URL) prometheus.Labels {
return prometheus.Labels{
applicationNameKey: url.GetParam(constant.ApplicationKey, ""),
groupKey: url.Group(),
hostnameKey: "not implemented yet",
interfaceKey: url.Service(),
ipKey: common.GetLocalIp(),
versionKey: url.GetParam(constant.AppVersionKey, ""),
methodKey: url.GetParam(constant.MethodKey, ""),
}
}

// return the role of the application, provider or consumer, if the url is not a valid one, return empty string
func getRole(url *common.URL) (role string) {
if isProvider(url) {
role = providerField
} else if isConsumer(url) {
role = consumerField
} else {
logger.Warnf("The url belongs neither the consumer nor the provider, "+
"so the invocation will be ignored. url: %s", url.String())
}
return
}

// isProvider shows whether this url represents the application received the request as server
func isProvider(url *common.URL) bool {
role := url.GetParam(constant.RegistryRoleKey, "")
Expand Down
4 changes: 3 additions & 1 deletion metrics/prometheus/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,7 @@ const (
counterField = "counter"
summaryField = "summary"

totalField = "total"
totalField = "total"
processingField = "processing"
succeedField = "succeed"
)
24 changes: 17 additions & 7 deletions metrics/prometheus/metric_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,20 @@ type metricSet struct {
consumerRTSummaryVec *prometheus.SummaryVec
// report the provider-side's rt gauge data
providerRTSummaryVec *prometheus.SummaryVec

// report the provider-side's request total counter data
providerRequestTotalCounterVec *prometheus.CounterVec
// report the consumer-side's request total counter data
consumerRequestTotalCounterVec *prometheus.CounterVec
providerRequestsTotalCounterVec *prometheus.CounterVec
// report the provider-side's processing request counter data
// providerRequestProcessingGaugeVec *prometheus.GaugeVec
providerRequestsProcessingTotalGaugeVec *prometheus.GaugeVec
// The number of requests successfully received by the provider
providerRequestsSucceedTotalCounterVec *prometheus.CounterVec

// report the consumer-side's request total counter data
consumerRequestsTotalCounterVec *prometheus.CounterVec
// report the consumer-side's processing request counter data
// consumerRequestProcessingGaugeVec *prometheus.GaugeVec
consumerRequestsProcessingTotalGaugeVec *prometheus.GaugeVec
// The number of successful requests sent by consumers
consumerRequestsSucceedTotalCounterVec *prometheus.CounterVec
}

var labelNames = []string{applicationNameKey, groupKey, hostnameKey, interfaceKey, ipKey, methodKey, versionKey}
Expand All @@ -51,8 +57,12 @@ var labelNames = []string{applicationNameKey, groupKey, hostnameKey, interfaceKe
func (ms *metricSet) initAndRegister(reporterConfig *metrics.ReporterConfig) {
ms.consumerRTSummaryVec = newAutoSummaryVec(buildMetricsName(consumerField, rtField, milliSecondsField, summaryField), reporterConfig.Namespace, labelNames, reporterConfig.SummaryMaxAge)
ms.providerRTSummaryVec = newAutoSummaryVec(buildMetricsName(providerField, rtField, milliSecondsField, summaryField), reporterConfig.Namespace, labelNames, reporterConfig.SummaryMaxAge)
ms.consumerRequestTotalCounterVec = newAutoCounterVec(buildMetricsName(consumerField, requestsField, totalField), reporterConfig.Namespace, labelNames)
ms.providerRequestTotalCounterVec = newAutoCounterVec(buildMetricsName(providerField, requestsField, totalField), reporterConfig.Namespace, labelNames)
ms.consumerRequestsTotalCounterVec = newAutoCounterVec(buildMetricsName(consumerField, requestsField, totalField), reporterConfig.Namespace, labelNames)
ms.providerRequestsTotalCounterVec = newAutoCounterVec(buildMetricsName(providerField, requestsField, totalField), reporterConfig.Namespace, labelNames)
ms.consumerRequestsProcessingTotalGaugeVec = newAutoGaugeVec(buildMetricsName(consumerField, requestsField, processingField, totalField), reporterConfig.Namespace, labelNames)
ms.providerRequestsProcessingTotalGaugeVec = newAutoGaugeVec(buildMetricsName(providerField, requestsField, processingField, totalField), reporterConfig.Namespace, labelNames)
ms.consumerRequestsSucceedTotalCounterVec = newAutoCounterVec(buildMetricsName(consumerField, requestsField, succeedField, totalField), reporterConfig.Namespace, labelNames)
ms.providerRequestsSucceedTotalCounterVec = newAutoCounterVec(buildMetricsName(providerField, requestsField, succeedField, totalField), reporterConfig.Namespace, labelNames)
}

func buildMetricsName(args ...string) string {
Expand Down
51 changes: 48 additions & 3 deletions metrics/prometheus/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

import (
"github.com/dubbogo/gost/log/logger"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

Expand All @@ -34,9 +35,8 @@ import (
)

var (
reporterInstance *PrometheusReporter
reporterInitOnce sync.Once
defaultHistogramBucket = []float64{10, 50, 100, 200, 500, 1000, 10000}
reporterInstance *PrometheusReporter
reporterInitOnce sync.Once
)

// should initialize after loading configuration
Expand Down Expand Up @@ -102,3 +102,48 @@ func (reporter *PrometheusReporter) shutdownServer() {
}
}
}

func (reporter *PrometheusReporter) reportRTSummaryVec(role string, labels *prometheus.Labels, costMs int64) {
switch role {
case providerField:
reporter.providerRTSummaryVec.With(*labels).Observe(float64(costMs))
case consumerField:
reporter.consumerRTSummaryVec.With(*labels).Observe(float64(costMs))
}
}

func (reporter *PrometheusReporter) reportRequestsTotalCounterVec(role string, labels *prometheus.Labels) {
switch role {
case providerField:
reporter.providerRequestsTotalCounterVec.With(*labels).Inc()
case consumerField:
reporter.consumerRequestsTotalCounterVec.With(*labels).Inc()
}
}

func (reporter *PrometheusReporter) incRequestsProcessingTotalGaugeVec(role string, labels *prometheus.Labels) {
switch role {
case providerField:
reporter.providerRequestsProcessingTotalGaugeVec.With(*labels).Inc()
case consumerField:
reporter.consumerRequestsProcessingTotalGaugeVec.With(*labels).Inc()
}
}

func (reporter *PrometheusReporter) decRequestsProcessingTotalGaugeVec(role string, labels *prometheus.Labels) {
switch role {
case providerField:
reporter.providerRequestsProcessingTotalGaugeVec.With(*labels).Dec()
case consumerField:
reporter.consumerRequestsProcessingTotalGaugeVec.With(*labels).Dec()
}
}

func (reporter *PrometheusReporter) incRequestsSucceedTotalCounterVec(role string, labels *prometheus.Labels) {
switch role {
case providerField:
reporter.providerRequestsSucceedTotalCounterVec.With(*labels).Inc()
case consumerField:
reporter.consumerRequestsSucceedTotalCounterVec.With(*labels).Inc()
}
}
3 changes: 3 additions & 0 deletions metrics/prometheus/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func TestPrometheusReporter_Report(t *testing.T) {

assert.False(t, isConsumer(url))
ctx := context.Background()
reporter.ReportBeforeInvocation(ctx, invoker, inv)
reporter.ReportAfterInvocation(ctx, invoker, inv, 100*time.Millisecond, nil)

// consumer side
Expand All @@ -60,6 +61,7 @@ func TestPrometheusReporter_Report(t *testing.T) {
"BDTService&organization=ikurento.com&owner=ZX&registry.role=0&retries=&" +
"service.filter=echo%2Ctoken%2Caccesslog&timestamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100")
invoker = protocol.NewBaseInvoker(url)
reporter.ReportBeforeInvocation(ctx, invoker, inv)
reporter.ReportAfterInvocation(ctx, invoker, inv, 100*time.Millisecond, nil)

// invalid role
Expand All @@ -70,5 +72,6 @@ func TestPrometheusReporter_Report(t *testing.T) {
"BDTService&organization=ikurento.com&owner=ZX&registry.role=9&retries=&" +
"service.filter=echo%2Ctoken%2Caccesslog&timestamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100")
invoker = protocol.NewBaseInvoker(url)
reporter.ReportBeforeInvocation(ctx, invoker, inv)
reporter.ReportAfterInvocation(ctx, invoker, inv, 100*time.Millisecond, nil)
}
1 change: 1 addition & 0 deletions metrics/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,5 @@ func NewReporterConfig() *ReporterConfig {
type Reporter interface {
ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation,
cost time.Duration, res protocol.Result)
ReportBeforeInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation)
}