Skip to content

Commit

Permalink
add switch for metric collector (#2424)
Browse files Browse the repository at this point in the history
  • Loading branch information
FoghostCn authored Oct 24, 2023
1 parent 8b04140 commit 40dd198
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 71 deletions.
4 changes: 4 additions & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,10 @@ const (

// metrics key
const (
MetadataEnabledKey = "metrics.metadata.enabled"
RegistryEnabledKey = "metrics.registry.enabled"
ConfigCenterEnabledKey = "metrics.config-center.enabled"
RpcEnabledKey = "metrics.rpc.enabled"
AggregationEnabledKey = "aggregation.enabled"
AggregationBucketNumKey = "aggregation.bucket.num"
AggregationTimeWindowSecondsKey = "aggregation.time.window.seconds"
Expand Down
44 changes: 36 additions & 8 deletions config/metric_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ import (

// MetricConfig This is the config struct for all metrics implementation
type MetricConfig struct {
Enable *bool `default:"false" yaml:"enable" json:"enable,omitempty" property:"enable"`
Port string `default:"9090" yaml:"port" json:"port,omitempty" property:"port"`
Path string `default:"/metrics" yaml:"path" json:"path,omitempty" property:"path"`
Protocol string `default:"prometheus" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
Prometheus *PrometheusConfig `yaml:"prometheus" json:"prometheus" property:"prometheus"`
Aggregation *AggregateConfig `yaml:"aggregation" json:"aggregation" property:"aggregation"`
rootConfig *RootConfig
Enable *bool `default:"false" yaml:"enable" json:"enable,omitempty" property:"enable"`
Port string `default:"9090" yaml:"port" json:"port,omitempty" property:"port"`
Path string `default:"/metrics" yaml:"path" json:"path,omitempty" property:"path"`
Protocol string `default:"prometheus" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
EnableMetadata *bool `default:"true" yaml:"enable-metadata" json:"enable-metadata,omitempty" property:"enable-metadata"`
EnableRegistry *bool `default:"true" yaml:"enable-registry" json:"enable-registry,omitempty" property:"enable-registry"`
EnableConfigCenter *bool `default:"true" yaml:"enable-config-center" json:"enable-config-center,omitempty" property:"enable-config-center"`
EnableRpc *bool `default:"true" yaml:"enable-rpc" json:"enable-rpc,omitempty" property:"enable-rpc"`
Prometheus *PrometheusConfig `yaml:"prometheus" json:"prometheus" property:"prometheus"`
Aggregation *AggregateConfig `yaml:"aggregation" json:"aggregation" property:"aggregation"`
rootConfig *RootConfig
}

type AggregateConfig struct {
Expand Down Expand Up @@ -101,6 +105,26 @@ func NewMetricConfigBuilder() *MetricConfigBuilder {
return &MetricConfigBuilder{metricConfig: &MetricConfig{}}
}

func (mcb *MetricConfigBuilder) SetMetadataEnabled(enabled bool) *MetricConfigBuilder {
mcb.metricConfig.EnableMetadata = &enabled
return mcb
}

func (mcb *MetricConfigBuilder) SetRegistryEnabled(enabled bool) *MetricConfigBuilder {
mcb.metricConfig.EnableRegistry = &enabled
return mcb
}

func (mcb *MetricConfigBuilder) SetConfigCenterEnabled(enabled bool) *MetricConfigBuilder {
mcb.metricConfig.EnableConfigCenter = &enabled
return mcb
}

func (mcb *MetricConfigBuilder) SetRpcEnabled(enabled bool) *MetricConfigBuilder {
mcb.metricConfig.EnableRpc = &enabled
return mcb
}

func (mcb *MetricConfigBuilder) Build() *MetricConfig {
return mcb.metricConfig
}
Expand All @@ -113,11 +137,15 @@ func (mc *MetricConfig) DynamicUpdateProperties(newMetricConfig *MetricConfig) {
// prometheus://localhost:9090?&histogram.enabled=false&prometheus.exporter.enabled=false
func (mc *MetricConfig) toURL() *common.URL {
url, _ := common.NewURL("localhost", common.WithProtocol(mc.Protocol))
url.SetParam(constant.PrometheusExporterEnabledKey, strconv.FormatBool(*mc.Enable))
url.SetParam(constant.PrometheusExporterEnabledKey, strconv.FormatBool(*mc.Enable)) // for compatibility
url.SetParam(constant.PrometheusExporterMetricsPortKey, mc.Port)
url.SetParam(constant.PrometheusExporterMetricsPathKey, mc.Path)
url.SetParam(constant.ApplicationKey, mc.rootConfig.Application.Name)
url.SetParam(constant.AppVersionKey, mc.rootConfig.Application.Version)
url.SetParam(constant.MetadataEnabledKey, strconv.FormatBool(*mc.EnableMetadata))
url.SetParam(constant.RegistryEnabledKey, strconv.FormatBool(*mc.EnableRegistry))
url.SetParam(constant.ConfigCenterEnabledKey, strconv.FormatBool(*mc.EnableConfigCenter))
url.SetParam(constant.RpcEnabledKey, strconv.FormatBool(*mc.EnableRpc))
if mc.Aggregation != nil {
url.SetParam(constant.AggregationEnabledKey, strconv.FormatBool(*mc.Aggregation.Enabled))
url.SetParam(constant.AggregationBucketNumKey, strconv.Itoa(mc.Aggregation.BucketNum))
Expand Down
18 changes: 13 additions & 5 deletions config/metric_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,17 @@ import (
)

func TestMetricConfigBuilder(t *testing.T) {
config := NewMetricConfigBuilder().Build()
err := config.Init(&RootConfig{Application: &ApplicationConfig{Name: "dubbo", Version: "1.0.0"}})
assert.NoError(t, err)
reporterConfig := config.ToReporterConfig()
assert.Equal(t, string(reporterConfig.Mode), "pull")
config := NewMetricConfigBuilder().
SetConfigCenterEnabled(false).
SetMetadataEnabled(false).
SetRegistryEnabled(false).
SetRpcEnabled(false).
Build()
enable := false
assert.Equal(t, &MetricConfig{
EnableConfigCenter: &enable,
EnableMetadata: &enable,
EnableRegistry: &enable,
EnableRpc: &enable,
}, config)
}
8 changes: 5 additions & 3 deletions metrics/config_center/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ var ch = make(chan metrics.MetricsEvent, 10)
var info = metrics.NewMetricKey("dubbo_configcenter_total", "Config Changed Total")

func init() {
metrics.AddCollector("config_center", func(mr metrics.MetricRegistry, _ *common.URL) {
c := &configCenterCollector{r: mr}
c.start()
metrics.AddCollector("config_center", func(mr metrics.MetricRegistry, url *common.URL) {
if url.GetParamBool(constant.ConfigCenterEnabledKey, true) {
c := &configCenterCollector{r: mr}
c.start()
}
})
}

Expand Down
8 changes: 5 additions & 3 deletions metrics/metadata/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ const eventType = constant.MetricsMetadata
var ch = make(chan metrics.MetricsEvent, 10)

func init() {
metrics.AddCollector("metadata", func(mr metrics.MetricRegistry, _ *common.URL) {
l := &MetadataMetricCollector{metrics.BaseCollector{R: mr}}
l.start()
metrics.AddCollector("metadata", func(mr metrics.MetricRegistry, url *common.URL) {
if url.GetParamBool(constant.MetadataEnabledKey, true) {
l := &MetadataMetricCollector{metrics.BaseCollector{R: mr}}
l.start()
}
})
}

Expand Down
94 changes: 50 additions & 44 deletions metrics/prometheus/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,56 +143,62 @@ func (p *promMetricRegistry) Rt(m *metrics.MetricId, opts *metrics.RtOpts) metri

func (p *promMetricRegistry) Export() {
if p.url.GetParamBool(constant.PrometheusExporterEnabledKey, false) {
go func() {
mux := http.NewServeMux()
path := p.url.GetParam(constant.PrometheusDefaultMetricsPath, constant.PrometheusDefaultMetricsPath)
port := p.url.GetParam(constant.PrometheusExporterMetricsPortKey, constant.PrometheusDefaultMetricsPort)
mux.Handle(path, promhttp.InstrumentMetricHandler(p.r, promhttp.HandlerFor(p.gather, promhttp.HandlerOpts{})))
srv := &http.Server{Addr: ":" + port, Handler: mux}
extension.AddCustomShutdownCallback(func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); nil != err {
logger.Fatalf("prometheus server shutdown failed, err: %v", err)
} else {
logger.Info("prometheus server gracefully shutdown success")
}
})
logger.Infof("prometheus endpoint :%s%s", port, path)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { // except Shutdown or Close
logger.Errorf("new prometheus server with error = %v", err)
}
}()
go p.exportHttp()
}
if p.url.GetParamBool(constant.PrometheusPushgatewayEnabledKey, false) {
baseUrl, exist := p.url.GetNonDefaultParam(constant.PrometheusPushgatewayBaseUrlKey)
if !exist {
logger.Error("no pushgateway url found in config path: metrics.prometheus.pushgateway.bash-url, please check your config file")
return
}
username := p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
password := p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
job := p.url.GetParam(constant.PrometheusPushgatewayJobKey, constant.PrometheusDefaultJobName)
pushInterval := p.url.GetParamByIntValue(constant.PrometheusPushgatewayPushIntervalKey, constant.PrometheusDefaultPushInterval)
pusher := push.New(baseUrl, job).Gatherer(p.gather)
if len(username) != 0 {
pusher.BasicAuth(username, password)
p.exportPushgateway()
}
}

func (p *promMetricRegistry) exportHttp() {
mux := http.NewServeMux()
path := p.url.GetParam(constant.PrometheusDefaultMetricsPath, constant.PrometheusDefaultMetricsPath)
port := p.url.GetParam(constant.PrometheusExporterMetricsPortKey, constant.PrometheusDefaultMetricsPort)
mux.Handle(path, promhttp.InstrumentMetricHandler(p.r, promhttp.HandlerFor(p.gather, promhttp.HandlerOpts{})))
srv := &http.Server{Addr: ":" + port, Handler: mux}
extension.AddCustomShutdownCallback(func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); nil != err {
logger.Fatalf("prometheus server shutdown failed, err: %v", err)
} else {
logger.Info("prometheus server gracefully shutdown success")
}
logger.Infof("prometheus pushgateway will push to %s every %d seconds", baseUrl, pushInterval)
ticker := time.NewTicker(time.Duration(pushInterval) * time.Second)
go func() {
for range ticker.C {
err := pusher.Add()
if err != nil {
logger.Errorf("push metric data to prometheus push gateway error", err)
} else {
logger.Debugf("prometheus pushgateway push to %s success", baseUrl)
}
}
}()
})
logger.Infof("prometheus endpoint :%s%s", port, path)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { // except Shutdown or Close
logger.Errorf("new prometheus server with error: %v", err)
}
}

func (p *promMetricRegistry) exportPushgateway() {
baseUrl, exist := p.url.GetNonDefaultParam(constant.PrometheusPushgatewayBaseUrlKey)
if !exist {
logger.Error("no pushgateway base url found in config path: metrics.prometheus.pushgateway.base-url, please check your config")
return
}
username := p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
password := p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
job := p.url.GetParam(constant.PrometheusPushgatewayJobKey, constant.PrometheusDefaultJobName)
pushInterval := p.url.GetParamByIntValue(constant.PrometheusPushgatewayPushIntervalKey, constant.PrometheusDefaultPushInterval)
pusher := push.New(baseUrl, job).Gatherer(p.gather)
if len(username) != 0 {
pusher.BasicAuth(username, password)
}
logger.Infof("prometheus pushgateway will push to %s every %d seconds", baseUrl, pushInterval)
ticker := time.NewTicker(time.Duration(pushInterval) * time.Second)
go func() {
for range ticker.C {
err := pusher.Add()
if err != nil {
logger.Errorf("push metric data to prometheus pushgateway error: %v", err)
} else {
logger.Debugf("prometheus pushgateway push to %s success", baseUrl)
}
}
}()
}

func (p *promMetricRegistry) Scrape() (string, error) {
gathering, err := p.gather.Gather()
if err != nil {
Expand Down
8 changes: 5 additions & 3 deletions metrics/registry/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ var (
)

func init() {
metrics.AddCollector("registry", func(m metrics.MetricRegistry, _ *common.URL) {
rc := &registryCollector{metrics.BaseCollector{R: m}}
go rc.start()
metrics.AddCollector("registry", func(m metrics.MetricRegistry, url *common.URL) {
if url.GetParamBool(constant.RegistryEnabledKey, true) {
rc := &registryCollector{metrics.BaseCollector{R: m}}
go rc.start()
}
})
}

Expand Down
12 changes: 7 additions & 5 deletions metrics/rpc/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ var (

// init will add the rpc collectorFunc to metrics.collectors slice, and lazy start the rpc collector goroutine
func init() {
collectorFunc := func(registry metrics.MetricRegistry, c *common.URL) {
rc := &rpcCollector{
registry: registry,
metricSet: buildMetricSet(registry),
collectorFunc := func(registry metrics.MetricRegistry, url *common.URL) {
if url.GetParamBool(constant.RpcEnabledKey, true) {
rc := &rpcCollector{
registry: registry,
metricSet: buildMetricSet(registry),
}
go rc.start()
}
go rc.start()
}

metrics.AddCollector("rpc", collectorFunc)
Expand Down

0 comments on commit 40dd198

Please sign in to comment.