From b86ec48fc291897b2af9a2126080f085cd68abc5 Mon Sep 17 00:00:00 2001 From: "sean.liu" Date: Thu, 15 Dec 2022 19:22:17 +0800 Subject: [PATCH] add metrics and fix unit test. --- metrics/metrics.go | 2 ++ metrics/server.go | 8 ++++++++ store/copr/mpp_probe.go | 45 +++++++++++++++++++++++++++++------------ 3 files changed, 42 insertions(+), 13 deletions(-) diff --git a/metrics/metrics.go b/metrics/metrics.go index 2984b66ddb27c..70eff78bc4318 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -182,6 +182,7 @@ func RegisterMetrics() { prometheus.MustRegister(TokenGauge) prometheus.MustRegister(ConfigStatus) prometheus.MustRegister(TiFlashQueryTotalCounter) + prometheus.MustRegister(TiFlashFailedMPPStoreState) prometheus.MustRegister(SmallTxnWriteDuration) prometheus.MustRegister(TxnWriteThroughput) prometheus.MustRegister(LoadSysVarCacheCounter) @@ -236,6 +237,7 @@ func ToggleSimplifiedMode(simplified bool) { InfoCacheCounters, ReadFromTableCacheCounter, TiFlashQueryTotalCounter, + TiFlashFailedMPPStoreState, CampaignOwnerCounter, NonTransactionalDMLCount, MemoryUsage, diff --git a/metrics/server.go b/metrics/server.go index 116b02eb122b6..9425bbf94e960 100644 --- a/metrics/server.go +++ b/metrics/server.go @@ -279,6 +279,14 @@ var ( Help: "Counter of TiFlash queries.", }, []string{LblType, LblResult}) + TiFlashFailedMPPStoreState = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "tidb", + Subsystem: "server", + Name: "tiflash_failed_mpp_store", + Help: "Statues of failed tiflash mpp store,-1 means detector heartbeat,0 means reachable,1 means abnormal.", + }, []string{LblAddress}) + PDAPIExecutionHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "tidb", diff --git a/store/copr/mpp_probe.go b/store/copr/mpp_probe.go index 78150a2de87b8..a5fb4e8335cca 100644 --- a/store/copr/mpp_probe.go +++ b/store/copr/mpp_probe.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/mpp" + "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/util/logutil" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" @@ -51,6 +52,7 @@ type MPPSotreState struct { // MPPFailedStoreProbe use for detecting of failed TiFlash instance type MPPFailedStoreProbe struct { failedMPPStores *sync.Map + lock *sync.Mutex } func (t *MPPSotreState) detect(ctx context.Context) { @@ -64,9 +66,10 @@ func (t *MPPSotreState) detect(ctx context.Context) { } defer func() { t.lastDetectTime = time.Now() }() - + metrics.TiFlashFailedMPPStoreState.WithLabelValues(t.address).Set(0) err := detectMPPStore(ctx, t.store.GetTiKVClient(), t.address) if err != nil { + metrics.TiFlashFailedMPPStoreState.WithLabelValues(t.address).Set(1) t.recoveryTime = time.Time{} // if detect failed,reset recovery time to zero. return } @@ -105,7 +108,7 @@ func (t MPPFailedStoreProbe) scan(ctx context.Context) { logutil.BgLogger().Warn("MPPSotreState struct assert failed,will be clean", zap.String("address", address), zap.Any("state", v)) - t.failedMPPStores.Delete(address) + t.Delete(address) return } @@ -113,10 +116,10 @@ func (t MPPFailedStoreProbe) scan(ctx context.Context) { // clean restored store if !state.recoveryTime.IsZero() && time.Since(state.recoveryTime) > MaxRecoveryTimeLimit { - t.failedMPPStores.Delete(address) + t.Delete(address) // clean store that may be obsolete } else if !state.recoveryTime.IsZero() && time.Since(state.lastLookupTime) > MaxObsoletTimeLimit { - t.failedMPPStores.Delete(address) + t.Delete(address) } } @@ -129,7 +132,10 @@ func (t MPPFailedStoreProbe) scan(ctx context.Context) { } // Add add a store when sync probe failed -func (t MPPFailedStoreProbe) Add(ctx context.Context, address string, store *kvStore) { +func (t *MPPFailedStoreProbe) Add(ctx context.Context, address string, store *kvStore) { + // run a background probe process,if not start + globalMPPFailedStoreProbe.Run() + state := MPPSotreState{ address: address, store: store, @@ -139,8 +145,9 @@ func (t MPPFailedStoreProbe) Add(ctx context.Context, address string, store *kvS } // IsRecovery check whether the store is recovery -func (t MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, recoveryTTL time.Duration) bool { - logutil.Logger(ctx).Debug("check failed store recovery", zap.String("address", address), zap.Duration("ttl", recoveryTTL)) +func (t *MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, recoveryTTL time.Duration) bool { + logutil.Logger(ctx).Debug("check failed store recovery", + zap.String("address", address), zap.Duration("ttl", recoveryTTL)) v, ok := t.failedMPPStores.Load(address) if !ok { // store not in failed map @@ -152,7 +159,7 @@ func (t MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, rec logutil.BgLogger().Warn("MPPSotreState struct assert failed,will be clean", zap.String("address", address), zap.Any("state", v)) - t.failedMPPStores.Delete(address) + t.Delete(address) return false } @@ -162,10 +169,23 @@ func (t MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, rec // Run a loop of scan // there can be only one background task func (t *MPPFailedStoreProbe) Run() { - for { - t.scan(context.Background()) - time.Sleep(time.Second) + if !t.lock.TryLock() { + return } + go func() { + defer t.lock.Unlock() + for { + metrics.TiFlashFailedMPPStoreState.WithLabelValues("probe").Set(-1) //probe heartbeat + t.scan(context.Background()) + time.Sleep(time.Second) + } + }() + +} + +func (t *MPPFailedStoreProbe) Delete(address string) { + metrics.TiFlashFailedMPPStoreState.DeleteLabelValues(address) + t.failedMPPStores.Delete(address) } // MPPStore detect function @@ -191,7 +211,6 @@ func detectMPPStore(ctx context.Context, client tikv.Client, address string) err func init() { globalMPPFailedStoreProbe = &MPPFailedStoreProbe{ failedMPPStores: &sync.Map{}, + lock: &sync.Mutex{}, } - // run a background probe process - go globalMPPFailedStoreProbe.Run() }