diff --git a/common/extension/metrics_test.go b/common/extension/metrics_test.go index 47395161d4..30df7d3d40 100644 --- a/common/extension/metrics_test.go +++ b/common/extension/metrics_test.go @@ -40,5 +40,8 @@ func TestGetMetricReporter(t *testing.T) { type mockReporter struct{} // implement the interface of Reporter -func (m mockReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) { +func (m *mockReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) { +} + +func (m *mockReporter) ReportBeforeInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) { } diff --git a/filter/metrics/filter.go b/filter/metrics/filter.go index 9782c0d4ec..3e36404bcf 100644 --- a/filter/metrics/filter.go +++ b/filter/metrics/filter.go @@ -44,6 +44,11 @@ type Filter struct { // Invoke collect the duration of invocation and then report the duration by using goroutine func (p *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + go func() { + for _, reporter := range p.reporters { + reporter.ReportBeforeInvocation(ctx, invoker, invocation) + } + }() start := time.Now() res := invoker.Invoke(ctx, invocation) end := time.Now() diff --git a/filter/metrics/filter_test.go b/filter/metrics/filter_test.go index c12247d478..f18b760e70 100644 --- a/filter/metrics/filter_test.go +++ b/filter/metrics/filter_test.go @@ -76,3 +76,8 @@ func (m *mockReporter) ReportAfterInvocation(ctx context.Context, invoker protoc m.Called(ctx, invoker, invocation) m.wg.Done() } + +func (m *mockReporter) ReportBeforeInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) { + m.Called(ctx, invoker, invocation) + m.wg.Done() +} diff --git a/metrics/prometheus/after_invocation.go b/metrics/prometheus/after_invocation.go index 62e05016fe..f1235a2d75 100644 --- a/metrics/prometheus/after_invocation.go +++ b/metrics/prometheus/after_invocation.go @@ -23,61 +23,27 @@ import ( ) import ( - "dubbo.apache.org/dubbo-go/v3/common" - "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/protocol" ) -import ( - "github.com/dubbogo/gost/log/logger" - "github.com/prometheus/client_golang/prometheus" -) - func (reporter *PrometheusReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) { if !reporter.reporterConfig.Enable { return } - url := invoker.GetURL() - var role string // provider or consumer - if isProvider(url) { - role = providerField - } else if isConsumer(url) { - role = consumerField - } else { - logger.Warnf("The url belongs neither the consumer nor the provider, "+ - "so the invocation will be ignored. url: %s", url.String()) + role := getRole(url) + if role == "" { return } - labels := prometheus.Labels{ - applicationNameKey: url.GetParam(constant.ApplicationKey, ""), - groupKey: url.Group(), - hostnameKey: "", - interfaceKey: url.Service(), - ipKey: common.GetLocalIp(), - versionKey: url.GetParam(constant.AppVersionKey, ""), - methodKey: invocation.MethodName(), - } + labels := buildLabels(url) reporter.reportRTSummaryVec(role, &labels, cost.Milliseconds()) - reporter.reportRequestTotalCounterVec(role, &labels) -} - -func (r *PrometheusReporter) reportRTSummaryVec(role string, labels *prometheus.Labels, costMs int64) { - switch role { - case providerField: - r.providerRTSummaryVec.With(*labels).Observe(float64(costMs)) - case consumerField: - r.consumerRTSummaryVec.With(*labels).Observe(float64(costMs)) - } -} + reporter.reportRequestsTotalCounterVec(role, &labels) + reporter.decRequestsProcessingTotalGaugeVec(role, &labels) -func (r *PrometheusReporter) reportRequestTotalCounterVec(role string, labels *prometheus.Labels) { - switch role { - case providerField: - r.providerRequestTotalCounterVec.With(*labels).Inc() - case consumerField: - r.consumerRequestTotalCounterVec.With(*labels).Inc() + if res != nil && res.Error() == nil { + // succeed + reporter.incRequestsSucceedTotalCounterVec(role, &labels) } } diff --git a/metrics/prometheus/before_invocation.go b/metrics/prometheus/before_invocation.go index aa288e5572..7477b13626 100644 --- a/metrics/prometheus/before_invocation.go +++ b/metrics/prometheus/before_invocation.go @@ -16,3 +16,25 @@ */ package prometheus + +import ( + "context" +) +import ( + "dubbo.apache.org/dubbo-go/v3/protocol" +) + +func (reporter *PrometheusReporter) ReportBeforeInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) { + if !reporter.reporterConfig.Enable { + return + } + url := invoker.GetURL() + + role := getRole(url) + if role == "" { + return + } + labels := buildLabels(url) + + reporter.incRequestsProcessingTotalGaugeVec(role, &labels) +} diff --git a/metrics/prometheus/common.go b/metrics/prometheus/common.go index 7df0389a98..32137d1e2a 100644 --- a/metrics/prometheus/common.go +++ b/metrics/prometheus/common.go @@ -24,6 +24,7 @@ import ( ) import ( + "github.com/dubbogo/gost/log/logger" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) @@ -33,6 +34,35 @@ import ( "dubbo.apache.org/dubbo-go/v3/common/constant" ) +var ( + defaultHistogramBucket = []float64{10, 50, 100, 200, 500, 1000, 10000} +) + +func buildLabels(url *common.URL) prometheus.Labels { + return prometheus.Labels{ + applicationNameKey: url.GetParam(constant.ApplicationKey, ""), + groupKey: url.Group(), + hostnameKey: "not implemented yet", + interfaceKey: url.Service(), + ipKey: common.GetLocalIp(), + versionKey: url.GetParam(constant.AppVersionKey, ""), + methodKey: url.GetParam(constant.MethodKey, ""), + } +} + +// return the role of the application, provider or consumer, if the url is not a valid one, return empty string +func getRole(url *common.URL) (role string) { + if isProvider(url) { + role = providerField + } else if isConsumer(url) { + role = consumerField + } else { + logger.Warnf("The url belongs neither the consumer nor the provider, "+ + "so the invocation will be ignored. url: %s", url.String()) + } + return +} + // isProvider shows whether this url represents the application received the request as server func isProvider(url *common.URL) bool { role := url.GetParam(constant.RegistryRoleKey, "") diff --git a/metrics/prometheus/constant.go b/metrics/prometheus/constant.go index b4d5e1ca9a..95e090d466 100644 --- a/metrics/prometheus/constant.go +++ b/metrics/prometheus/constant.go @@ -41,5 +41,7 @@ const ( counterField = "counter" summaryField = "summary" - totalField = "total" + totalField = "total" + processingField = "processing" + succeedField = "succeed" ) diff --git a/metrics/prometheus/metric_set.go b/metrics/prometheus/metric_set.go index 4226771fe2..79c0203690 100644 --- a/metrics/prometheus/metric_set.go +++ b/metrics/prometheus/metric_set.go @@ -35,14 +35,20 @@ type metricSet struct { consumerRTSummaryVec *prometheus.SummaryVec // report the provider-side's rt gauge data providerRTSummaryVec *prometheus.SummaryVec + // report the provider-side's request total counter data - providerRequestTotalCounterVec *prometheus.CounterVec - // report the consumer-side's request total counter data - consumerRequestTotalCounterVec *prometheus.CounterVec + providerRequestsTotalCounterVec *prometheus.CounterVec // report the provider-side's processing request counter data - // providerRequestProcessingGaugeVec *prometheus.GaugeVec + providerRequestsProcessingTotalGaugeVec *prometheus.GaugeVec + // The number of requests successfully received by the provider + providerRequestsSucceedTotalCounterVec *prometheus.CounterVec + + // report the consumer-side's request total counter data + consumerRequestsTotalCounterVec *prometheus.CounterVec // report the consumer-side's processing request counter data - // consumerRequestProcessingGaugeVec *prometheus.GaugeVec + consumerRequestsProcessingTotalGaugeVec *prometheus.GaugeVec + // The number of successful requests sent by consumers + consumerRequestsSucceedTotalCounterVec *prometheus.CounterVec } var labelNames = []string{applicationNameKey, groupKey, hostnameKey, interfaceKey, ipKey, methodKey, versionKey} @@ -51,8 +57,12 @@ var labelNames = []string{applicationNameKey, groupKey, hostnameKey, interfaceKe func (ms *metricSet) initAndRegister(reporterConfig *metrics.ReporterConfig) { ms.consumerRTSummaryVec = newAutoSummaryVec(buildMetricsName(consumerField, rtField, milliSecondsField, summaryField), reporterConfig.Namespace, labelNames, reporterConfig.SummaryMaxAge) ms.providerRTSummaryVec = newAutoSummaryVec(buildMetricsName(providerField, rtField, milliSecondsField, summaryField), reporterConfig.Namespace, labelNames, reporterConfig.SummaryMaxAge) - ms.consumerRequestTotalCounterVec = newAutoCounterVec(buildMetricsName(consumerField, requestsField, totalField), reporterConfig.Namespace, labelNames) - ms.providerRequestTotalCounterVec = newAutoCounterVec(buildMetricsName(providerField, requestsField, totalField), reporterConfig.Namespace, labelNames) + ms.consumerRequestsTotalCounterVec = newAutoCounterVec(buildMetricsName(consumerField, requestsField, totalField), reporterConfig.Namespace, labelNames) + ms.providerRequestsTotalCounterVec = newAutoCounterVec(buildMetricsName(providerField, requestsField, totalField), reporterConfig.Namespace, labelNames) + ms.consumerRequestsProcessingTotalGaugeVec = newAutoGaugeVec(buildMetricsName(consumerField, requestsField, processingField, totalField), reporterConfig.Namespace, labelNames) + ms.providerRequestsProcessingTotalGaugeVec = newAutoGaugeVec(buildMetricsName(providerField, requestsField, processingField, totalField), reporterConfig.Namespace, labelNames) + ms.consumerRequestsSucceedTotalCounterVec = newAutoCounterVec(buildMetricsName(consumerField, requestsField, succeedField, totalField), reporterConfig.Namespace, labelNames) + ms.providerRequestsSucceedTotalCounterVec = newAutoCounterVec(buildMetricsName(providerField, requestsField, succeedField, totalField), reporterConfig.Namespace, labelNames) } func buildMetricsName(args ...string) string { diff --git a/metrics/prometheus/reporter.go b/metrics/prometheus/reporter.go index 249510f558..8c38c6114e 100644 --- a/metrics/prometheus/reporter.go +++ b/metrics/prometheus/reporter.go @@ -25,6 +25,7 @@ import ( import ( "github.com/dubbogo/gost/log/logger" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" ) @@ -34,9 +35,8 @@ import ( ) var ( - reporterInstance *PrometheusReporter - reporterInitOnce sync.Once - defaultHistogramBucket = []float64{10, 50, 100, 200, 500, 1000, 10000} + reporterInstance *PrometheusReporter + reporterInitOnce sync.Once ) // should initialize after loading configuration @@ -102,3 +102,48 @@ func (reporter *PrometheusReporter) shutdownServer() { } } } + +func (reporter *PrometheusReporter) reportRTSummaryVec(role string, labels *prometheus.Labels, costMs int64) { + switch role { + case providerField: + reporter.providerRTSummaryVec.With(*labels).Observe(float64(costMs)) + case consumerField: + reporter.consumerRTSummaryVec.With(*labels).Observe(float64(costMs)) + } +} + +func (reporter *PrometheusReporter) reportRequestsTotalCounterVec(role string, labels *prometheus.Labels) { + switch role { + case providerField: + reporter.providerRequestsTotalCounterVec.With(*labels).Inc() + case consumerField: + reporter.consumerRequestsTotalCounterVec.With(*labels).Inc() + } +} + +func (reporter *PrometheusReporter) incRequestsProcessingTotalGaugeVec(role string, labels *prometheus.Labels) { + switch role { + case providerField: + reporter.providerRequestsProcessingTotalGaugeVec.With(*labels).Inc() + case consumerField: + reporter.consumerRequestsProcessingTotalGaugeVec.With(*labels).Inc() + } +} + +func (reporter *PrometheusReporter) decRequestsProcessingTotalGaugeVec(role string, labels *prometheus.Labels) { + switch role { + case providerField: + reporter.providerRequestsProcessingTotalGaugeVec.With(*labels).Dec() + case consumerField: + reporter.consumerRequestsProcessingTotalGaugeVec.With(*labels).Dec() + } +} + +func (reporter *PrometheusReporter) incRequestsSucceedTotalCounterVec(role string, labels *prometheus.Labels) { + switch role { + case providerField: + reporter.providerRequestsSucceedTotalCounterVec.With(*labels).Inc() + case consumerField: + reporter.consumerRequestsSucceedTotalCounterVec.With(*labels).Inc() + } +} diff --git a/metrics/prometheus/reporter_test.go b/metrics/prometheus/reporter_test.go index af3def4028..de4283ed08 100644 --- a/metrics/prometheus/reporter_test.go +++ b/metrics/prometheus/reporter_test.go @@ -50,6 +50,7 @@ func TestPrometheusReporter_Report(t *testing.T) { assert.False(t, isConsumer(url)) ctx := context.Background() + reporter.ReportBeforeInvocation(ctx, invoker, inv) reporter.ReportAfterInvocation(ctx, invoker, inv, 100*time.Millisecond, nil) // consumer side @@ -60,6 +61,7 @@ func TestPrometheusReporter_Report(t *testing.T) { "BDTService&organization=ikurento.com&owner=ZX®istry.role=0&retries=&" + "service.filter=echo%2Ctoken%2Caccesslog×tamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100") invoker = protocol.NewBaseInvoker(url) + reporter.ReportBeforeInvocation(ctx, invoker, inv) reporter.ReportAfterInvocation(ctx, invoker, inv, 100*time.Millisecond, nil) // invalid role @@ -70,5 +72,6 @@ func TestPrometheusReporter_Report(t *testing.T) { "BDTService&organization=ikurento.com&owner=ZX®istry.role=9&retries=&" + "service.filter=echo%2Ctoken%2Caccesslog×tamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100") invoker = protocol.NewBaseInvoker(url) + reporter.ReportBeforeInvocation(ctx, invoker, inv) reporter.ReportAfterInvocation(ctx, invoker, inv, 100*time.Millisecond, nil) } diff --git a/metrics/reporter.go b/metrics/reporter.go index 9439f02c20..604d412bea 100644 --- a/metrics/reporter.go +++ b/metrics/reporter.go @@ -63,4 +63,5 @@ func NewReporterConfig() *ReporterConfig { type Reporter interface { ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) + ReportBeforeInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) }