Skip to content

Commit

Permalink
metric[engine]:Replace framework metric (#5526)
Browse files Browse the repository at this point in the history
close #5525
  • Loading branch information
maxshuang authored May 24, 2022
1 parent a04fc1b commit b160a9a
Show file tree
Hide file tree
Showing 15 changed files with 66 additions and 95 deletions.
4 changes: 2 additions & 2 deletions engine/executor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

"github.com/pingcap/tiflow/dm/dm/common"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/pingcap/tiflow/engine/pkg/promutil"
)

func httpHandler(lis net.Listener) error {
Expand All @@ -31,7 +31,7 @@ func httpHandler(lis net.Listener) error {
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
mux.Handle("/metrics", promhttp.Handler())
mux.Handle("/metrics", promutil.HTTPHandlerForMetric())

httpS := &http.Server{
Handler: mux,
Expand Down
27 changes: 0 additions & 27 deletions engine/executor/metrics.go

This file was deleted.

23 changes: 11 additions & 12 deletions engine/executor/metrics_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@
package executor

import (
"github.com/pingcap/tiflow/engine/pkg/promutil"
"github.com/prometheus/client_golang/prometheus"
)

var executorTaskNumGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dataflow",
Subsystem: "executor",
Name: "task_num",
Help: "number of task in this executor",
}, []string{"status"})

// initServerMetrics registers statistics of executor server
func initServerMetrics(registry *prometheus.Registry) {
registry.MustRegister(executorTaskNumGauge)
}
var (
executorFactory = promutil.NewFactory4Framework()
executorTaskNumGauge = executorFactory.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dataflow",
Subsystem: "executor",
Name: "task_num",
Help: "number of task in this executor",
}, []string{"status"})
)
2 changes: 0 additions & 2 deletions engine/executor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,6 @@ func (s *Server) Run(ctx context.Context) error {
return s.startForTest(ctx)
}

registerMetrics()

wg, ctx := errgroup.WithContext(ctx)
s.taskRunner = worker.NewTaskRunner(defaultRuntimeIncomingQueueLen, defaultRuntimeInitConcurrency)
s.taskCommitter = worker.NewTaskCommitter(s.taskRunner, defaultTaskPreDispatchRequestTTL)
Expand Down
2 changes: 0 additions & 2 deletions engine/executor/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func TestStartTCPSrv(t *testing.T) {
s := NewServer(cfg, nil)

s.grpcSrv = grpc.NewServer()
registerMetrics()
wg, ctx := errgroup.WithContext(context.Background())
err = s.startTCPService(ctx, wg)
require.Nil(t, err)
Expand Down Expand Up @@ -114,7 +113,6 @@ func TestCollectMetric(t *testing.T) {
s.taskRunner = worker.NewTaskRunner(defaultRuntimeIncomingQueueLen, defaultRuntimeInitConcurrency)

s.grpcSrv = grpc.NewServer()
registerMetrics()
err = s.startTCPService(ctx, wg)
require.Nil(t, err)

Expand Down
10 changes: 7 additions & 3 deletions engine/lib/base_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/dm/pkg/log"
"go.uber.org/zap"

"github.com/pingcap/tiflow/engine/executor/worker"
libModel "github.com/pingcap/tiflow/engine/lib/model"
Expand Down Expand Up @@ -179,13 +180,16 @@ func (d *DefaultBaseJobMaster) GetWorkers() map[libModel.WorkerID]WorkerHandle {

// Close implements BaseJobMaster.Close
func (d *DefaultBaseJobMaster) Close(ctx context.Context) error {
if err := d.impl.CloseImpl(ctx); err != nil {
return errors.Trace(err)
err := d.impl.CloseImpl(ctx)
// We don't return here if CloseImpl return error to ensure
// that we can close inner resources of the framework
if err != nil {
log.L().Error("Failed to close JobMasterImpl", zap.Error(err))
}

d.master.doClose()
d.worker.doClose()
return nil
return errors.Trace(err)
}

// OnError implements BaseJobMaster.OnError
Expand Down
10 changes: 7 additions & 3 deletions engine/lib/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,16 +435,20 @@ func (m *DefaultBaseMaster) doClose() {
log.L().Warn("Failed to clean up message handlers",
zap.String("master-id", m.id))
}
promutil.UnregisterWorkerMetrics(m.id)
}

// Close implements BaseMaster.Close
func (m *DefaultBaseMaster) Close(ctx context.Context) error {
if err := m.Impl.CloseImpl(ctx); err != nil {
return errors.Trace(err)
err := m.Impl.CloseImpl(ctx)
// We don't return here if CloseImpl return error to ensure
// that we can close inner resources of the framework
if err != nil {
log.L().Error("Failed to close MasterImpl", zap.Error(err))
}

m.doClose()
return nil
return errors.Trace(err)
}

// OnError implements BaseMaster.OnError
Expand Down
9 changes: 6 additions & 3 deletions engine/lib/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,17 +342,20 @@ func (w *DefaultBaseWorker) doClose() {
}

w.wg.Wait()
promutil.UnregisterWorkerMetrics(w.id)
}

// Close implements BaseWorker.Close
func (w *DefaultBaseWorker) Close(ctx context.Context) error {
if err := w.Impl.CloseImpl(ctx); err != nil {
err := w.Impl.CloseImpl(ctx)
// We don't return here if CloseImpl return error to ensure
// that we can close inner resources of the framework
if err != nil {
log.L().Error("Failed to close WorkerImpl", zap.Error(err))
return errors.Trace(err)
}

w.doClose()
return nil
return errors.Trace(err)
}

// ID implements BaseWorker.ID
Expand Down
9 changes: 9 additions & 0 deletions engine/pkg/promutil/implement.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,25 @@ func (f *wrappingFactory) NewHistogramVec(opts prometheus.HistogramOpts, labelNa
}

func wrapCounterOpts(prefix string, constLabels prometheus.Labels, opts *prometheus.CounterOpts) *prometheus.CounterOpts {
if opts.ConstLabels == nil && constLabels != nil {
opts.ConstLabels = make(prometheus.Labels)
}
wrapOptsCommon(prefix, constLabels, &opts.Namespace, opts.ConstLabels)
return opts
}

func wrapGaugeOpts(prefix string, constLabels prometheus.Labels, opts *prometheus.GaugeOpts) *prometheus.GaugeOpts {
if opts.ConstLabels == nil && constLabels != nil {
opts.ConstLabels = make(prometheus.Labels)
}
wrapOptsCommon(prefix, constLabels, &opts.Namespace, opts.ConstLabels)
return opts
}

func wrapHistogramOpts(prefix string, constLabels prometheus.Labels, opts *prometheus.HistogramOpts) *prometheus.HistogramOpts {
if opts.ConstLabels == nil && constLabels != nil {
opts.ConstLabels = make(prometheus.Labels)
}
wrapOptsCommon(prefix, constLabels, &opts.Namespace, opts.ConstLabels)
return opts
}
Expand Down
11 changes: 11 additions & 0 deletions engine/pkg/promutil/implement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ func TestWrapCounterOpts(t *testing.T) {
},
},
},
{
constLabels: prometheus.Labels{
"k2": "v2",
},
inputOpts: &prometheus.CounterOpts{},
outputOpts: &prometheus.CounterOpts{
ConstLabels: prometheus.Labels{
"k2": "v2",
},
},
},
}

for _, c := range cases {
Expand Down
7 changes: 7 additions & 0 deletions engine/pkg/promutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,10 @@ func NewFactory4Worker(info tenant.ProjectInfo, jobType libModel.JobType, jobID
func NewFactory4Framework() Factory {
return NewFactory4FrameworkImpl(globalMetricRegistry)
}

// UnregisterWorkerMetrics unregisters all metrics of workerID
// IF 'worker' is a job master, use job id as workerID
// IF 'worker' is a worker, use worker id as workerID
func UnregisterWorkerMetrics(workerID libModel.WorkerID) {
globalMetricRegistry.Unregister(workerID)
}
27 changes: 0 additions & 27 deletions engine/servermaster/metrics.go

This file was deleted.

12 changes: 4 additions & 8 deletions engine/servermaster/metrics_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,24 @@
package servermaster

import (
"github.com/pingcap/tiflow/engine/pkg/promutil"
"github.com/prometheus/client_golang/prometheus"
)

var (
serverExecutorNumGauge = prometheus.NewGaugeVec(
serverFactory = promutil.NewFactory4Framework()
serverExecutorNumGauge = serverFactory.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dataflow",
Subsystem: "server_master",
Name: "executor_num",
Help: "number of executor servers in this cluster",
}, []string{"status"})
serverJobNumGauge = prometheus.NewGaugeVec(
serverJobNumGauge = serverFactory.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dataflow",
Subsystem: "server_master",
Name: "job_num",
Help: "number of jobs in this cluster",
}, []string{"status"})
)

// initServerMetrics registers statistics of server
func initServerMetrics(registry *prometheus.Registry) {
registry.MustRegister(serverExecutorNumGauge)
registry.MustRegister(serverJobNumGauge)
}
6 changes: 2 additions & 4 deletions engine/servermaster/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tiflow/dm/pkg/log"
p2pProtocol "github.com/pingcap/tiflow/proto/p2p"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/server/v3/embed"
Expand Down Expand Up @@ -56,6 +55,7 @@ import (
"github.com/pingcap/tiflow/engine/pkg/meta/metaclient"
pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm"
"github.com/pingcap/tiflow/engine/pkg/p2p"
"github.com/pingcap/tiflow/engine/pkg/promutil"
"github.com/pingcap/tiflow/engine/pkg/rpcutil"
"github.com/pingcap/tiflow/engine/pkg/serverutils"
"github.com/pingcap/tiflow/engine/pkg/tenant"
Expand Down Expand Up @@ -447,8 +447,6 @@ func (s *Server) Run(ctx context.Context) (err error) {
return s.startForTest(ctx)
}

registerMetrics()

err = s.registerMetaStore()
if err != nil {
return err
Expand Down Expand Up @@ -570,7 +568,7 @@ func (s *Server) startGrpcSrv(ctx context.Context) (err error) {

httpHandlers := map[string]http.Handler{
"/debug/": getDebugHandler(),
"/metrics": promhttp.Handler(),
"/metrics": promutil.HTTPHandlerForMetric(),
}

// generate grpcServer
Expand Down
2 changes: 0 additions & 2 deletions engine/servermaster/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func TestStartGrpcSrv(t *testing.T) {
defer cleanup()

s := &Server{cfg: cfg}
registerMetrics()
ctx := context.Background()
err := s.startGrpcSrv(ctx)
require.Nil(t, err)
Expand Down Expand Up @@ -320,7 +319,6 @@ func TestCollectMetric(t *testing.T) {
masterAddr, cfg, cleanup := prepareServerEnv(t, "test-collect-metric")
defer cleanup()

registerMetrics()
s := &Server{
cfg: cfg,
metrics: newServerMasterMetric(),
Expand Down

0 comments on commit b160a9a

Please sign in to comment.