Skip to content

Commit

Permalink
Merge 642e0e0 into d0c3564
Browse files Browse the repository at this point in the history
  • Loading branch information
FinalT authored Jul 28, 2023
2 parents d0c3564 + 642e0e0 commit 8592e11
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 117 deletions.
153 changes: 153 additions & 0 deletions metrics/registry/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package registry

import (
"time"
)

import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/metrics"
)

var (
registryChan = make(chan metrics.MetricsEvent, 128)
)

func init() {
metrics.AddCollector("registry", func(m metrics.MetricRegistry, c *metrics.ReporterConfig) {
rc := &registryCollector{regRegistry: m}
go rc.start()
})
}

// registryCollector is the registry's metrics collector
type registryCollector struct {
regRegistry metrics.MetricRegistry
}

func (rc *registryCollector) start() {
metrics.Subscribe(constant.MetricsRegistry, registryChan)
for event := range registryChan {
if registryEvent, ok := event.(*RegistryMetricsEvent); ok {
switch registryEvent.Name {
case Reg:
rc.regHandler(registryEvent)
case Sub:
rc.subHandler(registryEvent)
case Notify:
rc.notifyHandler(registryEvent)
case ServerReg:
rc.serverRegHandler(registryEvent)
case ServerSub:
rc.serverSubHandler(registryEvent)
default:
}
}
}
}

func newStatesMetricFunc(total *metrics.MetricKey, succ *metrics.MetricKey, fail *metrics.MetricKey,
level metrics.MetricLevel, reg metrics.MetricRegistry) metrics.StatesMetrics {
return metrics.NewStatesMetrics(metrics.NewMetricId(total, level), metrics.NewMetricId(succ, level),
metrics.NewMetricId(fail, level), reg)
}

func newTimeMetrics(min, max, avg, sum, last *metrics.MetricKey, level metrics.MetricLevel, mr metrics.MetricRegistry) metrics.TimeMetric {
return metrics.NewTimeMetric(metrics.NewMetricId(min, level), metrics.NewMetricId(max, level), metrics.NewMetricId(avg, level),
metrics.NewMetricId(sum, level), metrics.NewMetricId(last, level), mr)
}

// regHandler handles register metrics
func (rc *registryCollector) regHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics
// Save metrics to the MetricRegistry
m := metrics.ComputeIfAbsentCache(dubboRegNum, func() interface{} {
return newStatesMetricFunc(RegisterMetricRequests, RegisterMetricRequestsSucceed, RegisterMetricRequestsFailed, metrics.GetApplicationLevel(), rc.regRegistry)
}).(metrics.StatesMetrics)
m.Inc(event.Succ)
metric := metrics.ComputeIfAbsentCache(dubboRegRt, func() interface{} {
return newTimeMetrics(RegisterRtMillisecondsMin, RegisterRtMillisecondsMax, RegisterRtMillisecondsAvg, RegisterRtMillisecondsSum, RegisterRtMillisecondsLast, metrics.GetApplicationLevel(), rc.regRegistry)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
}

// subHandler handles subscribe metrics
func (rc *registryCollector) subHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics
// Save metrics to the MetricRegistry
m := newStatesMetricFunc(SubscribeMetricNum, SubscribeMetricNumSucceed, SubscribeMetricNumFailed, metrics.GetApplicationLevel(), rc.regRegistry)
m.Inc(event.Succ)
}

// notifyHandler handles notify metrics
func (rc *registryCollector) notifyHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics
// Save metrics to the MetricRegistry
rc.regRegistry.Counter(metrics.NewMetricId(NotifyMetricRequests, metrics.GetApplicationLevel())).Inc()
rc.regRegistry.Histogram(metrics.NewMetricId(NotifyMetricNumLast, metrics.GetApplicationLevel())).Record(float64(event.End.UnixNano()) / float64(time.Second))
metric := metrics.ComputeIfAbsentCache(dubboNotifyRt, func() interface{} {
return newTimeMetrics(NotifyRtMillisecondsMin, NotifyRtMillisecondsMax, NotifyRtMillisecondsAvg, NotifyRtMillisecondsSum, NotifyRtMillisecondsLast, metrics.GetApplicationLevel(), rc.regRegistry)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
}

// directoryHandler handles directory metrics
func (rc *registryCollector) directoryHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics
// Save metrics to the MetricRegistry
level := metrics.GetApplicationLevel()
typ := event.Attachment["DirTyp"]
switch typ {
case NumAllInc:
rc.regRegistry.Counter(metrics.NewMetricId(DirectoryMetricNumAll, level)).Inc()
case NumAllDec:
rc.regRegistry.Counter(metrics.NewMetricId(DirectoryMetricNumAll, level)).Add(-1)
case NumDisableTotal:
rc.regRegistry.Counter(metrics.NewMetricId(DirectoryMetricNumDisable, level)).Inc()
case NumToReconnectTotal:
rc.regRegistry.Counter(metrics.NewMetricId(DirectoryMetricNumToReconnect, level)).Inc()
case NumValidTotal:
rc.regRegistry.Counter(metrics.NewMetricId(DirectoryMetricNumValid, level)).Inc()
default:
}

}

// serverRegHandler handles server register metrics
func (rc *registryCollector) serverRegHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics
// Save metrics to the MetricRegistry
m := metrics.ComputeIfAbsentCache(dubboRegServerNum, func() interface{} {
return newStatesMetricFunc(ServiceRegisterMetricRequests, ServiceRegisterMetricRequestsSucceed, ServiceRegisterMetricRequestsFailed, metrics.GetApplicationLevel(), rc.regRegistry)
}).(metrics.StatesMetrics)
m.Inc(event.Succ)
metric := metrics.ComputeIfAbsentCache(dubboRegServerRt, func() interface{} {
return newTimeMetrics(RegisterServiceRtMillisecondsMin, RegisterServiceRtMillisecondsMax, RegisterServiceRtMillisecondsAvg, RegisterServiceRtMillisecondsSum, RegisterServiceRtMillisecondsLast, metrics.GetApplicationLevel(), rc.regRegistry)
}).(metrics.TimeMetric)
metric.Record(event.CostMs())
}

// serverSubHandler handles server subscribe metrics
func (rc *registryCollector) serverSubHandler(event *RegistryMetricsEvent) {
// Event is converted to metrics
// Save metrics to the MetricRegistry
m := newStatesMetricFunc(ServiceSubscribeMetricNum, ServiceSubscribeMetricNumSucceed, ServiceSubscribeMetricNumFailed, metrics.GetApplicationLevel(), rc.regRegistry)
m.Inc(event.Succ)
}
78 changes: 62 additions & 16 deletions metrics/registry/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,81 @@

package registry

import (
"time"
)

import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/metrics"
)

// RegistryMetricsEvent contains info about register metrics
type RegistryMetricsEvent struct {
//Contains some information, such as time, success, failure

// PostType MetricKey
// FinishType MetricKey
// ErrorType MetricKey
// Level MetricsLevel

// Time
// Start time.time
// End time.time
Name MetricName
Succ bool
Start time.Time
End time.Time
Attachment map[string]string
}

func (r RegistryMetricsEvent) Type() string {
return constant.MetricsRegistry
}

// NewRegistryEvent
func (r *RegistryMetricsEvent) CostMs() float64 {
return float64(r.End.Sub(r.Start)) / float64(time.Millisecond)
}

// NewSubscribeEvent
// NewRegisterEvent for register metrics
func NewRegisterEvent(succ bool, start time.Time) metrics.MetricsEvent {
return &RegistryMetricsEvent{
Name: Reg,
Succ: succ,
Start: start,
End: time.Now(),
}
}

// NewNotifyEvent
// NewSubscribeEvent for subscribe metrics
func NewSubscribeEvent(succ bool) metrics.MetricsEvent {
return &RegistryMetricsEvent{
Name: Sub,
Succ: succ,
}
}

// NewNotifyEvent for notify metrics
func NewNotifyEvent(start time.Time) metrics.MetricsEvent {
return &RegistryMetricsEvent{
Name: Notify,
Start: start,
End: time.Now(),
}
}

// NewDirectoryEvent
// NewDirectoryEvent for directory metrics
func NewDirectoryEvent(dirTyp string) metrics.MetricsEvent {
return &RegistryMetricsEvent{
Name: Directory,
Attachment: map[string]string{"DirTyp": dirTyp},
}
}

// NewServerRegistryEvent
// NewServerRegisterEvent for server register metrics
func NewServerRegisterEvent(succ bool, start time.Time) metrics.MetricsEvent {
return &RegistryMetricsEvent{
Name: ServerReg,
Succ: succ,
Start: start,
End: time.Now(),
}
}

// NewServerSubscribeEvent
// NewServerSubscribeEvent for server subscribe metrics
func NewServerSubscribeEvent(succ bool) metrics.MetricsEvent {
return &RegistryMetricsEvent{
Name: ServerSub,
Succ: succ,
}
}
98 changes: 0 additions & 98 deletions metrics/registry/listener.go

This file was deleted.

Loading

0 comments on commit 8592e11

Please sign in to comment.