Skip to content

Commit

Permalink
add metrics and fix unit test.
Browse files Browse the repository at this point in the history
  • Loading branch information
hackersean committed Dec 16, 2022
1 parent aedd8a4 commit b86ec48
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 13 deletions.
2 changes: 2 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TokenGauge)
prometheus.MustRegister(ConfigStatus)
prometheus.MustRegister(TiFlashQueryTotalCounter)
prometheus.MustRegister(TiFlashFailedMPPStoreState)
prometheus.MustRegister(SmallTxnWriteDuration)
prometheus.MustRegister(TxnWriteThroughput)
prometheus.MustRegister(LoadSysVarCacheCounter)
Expand Down Expand Up @@ -236,6 +237,7 @@ func ToggleSimplifiedMode(simplified bool) {
InfoCacheCounters,
ReadFromTableCacheCounter,
TiFlashQueryTotalCounter,
TiFlashFailedMPPStoreState,
CampaignOwnerCounter,
NonTransactionalDMLCount,
MemoryUsage,
Expand Down
8 changes: 8 additions & 0 deletions metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,14 @@ var (
Help: "Counter of TiFlash queries.",
}, []string{LblType, LblResult})

TiFlashFailedMPPStoreState = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "tiflash_failed_mpp_store",
Help: "Statues of failed tiflash mpp store,-1 means detector heartbeat,0 means reachable,1 means abnormal.",
}, []string{LblAddress})

PDAPIExecutionHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tidb",
Expand Down
45 changes: 32 additions & 13 deletions store/copr/mpp_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/util/logutil"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
Expand Down Expand Up @@ -51,6 +52,7 @@ type MPPSotreState struct {
// MPPFailedStoreProbe use for detecting of failed TiFlash instance
type MPPFailedStoreProbe struct {
failedMPPStores *sync.Map
lock *sync.Mutex
}

func (t *MPPSotreState) detect(ctx context.Context) {
Expand All @@ -64,9 +66,10 @@ func (t *MPPSotreState) detect(ctx context.Context) {
}

defer func() { t.lastDetectTime = time.Now() }()

metrics.TiFlashFailedMPPStoreState.WithLabelValues(t.address).Set(0)
err := detectMPPStore(ctx, t.store.GetTiKVClient(), t.address)
if err != nil {
metrics.TiFlashFailedMPPStoreState.WithLabelValues(t.address).Set(1)
t.recoveryTime = time.Time{} // if detect failed,reset recovery time to zero.
return
}
Expand Down Expand Up @@ -105,18 +108,18 @@ func (t MPPFailedStoreProbe) scan(ctx context.Context) {
logutil.BgLogger().Warn("MPPSotreState struct assert failed,will be clean",
zap.String("address", address),
zap.Any("state", v))
t.failedMPPStores.Delete(address)
t.Delete(address)
return
}

state.detect(ctx)

// clean restored store
if !state.recoveryTime.IsZero() && time.Since(state.recoveryTime) > MaxRecoveryTimeLimit {
t.failedMPPStores.Delete(address)
t.Delete(address)
// clean store that may be obsolete
} else if !state.recoveryTime.IsZero() && time.Since(state.lastLookupTime) > MaxObsoletTimeLimit {
t.failedMPPStores.Delete(address)
t.Delete(address)
}
}

Expand All @@ -129,7 +132,10 @@ func (t MPPFailedStoreProbe) scan(ctx context.Context) {
}

// Add add a store when sync probe failed
func (t MPPFailedStoreProbe) Add(ctx context.Context, address string, store *kvStore) {
func (t *MPPFailedStoreProbe) Add(ctx context.Context, address string, store *kvStore) {
// run a background probe process,if not start
globalMPPFailedStoreProbe.Run()

state := MPPSotreState{
address: address,
store: store,
Expand All @@ -139,8 +145,9 @@ func (t MPPFailedStoreProbe) Add(ctx context.Context, address string, store *kvS
}

// IsRecovery check whether the store is recovery
func (t MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, recoveryTTL time.Duration) bool {
logutil.Logger(ctx).Debug("check failed store recovery", zap.String("address", address), zap.Duration("ttl", recoveryTTL))
func (t *MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, recoveryTTL time.Duration) bool {
logutil.Logger(ctx).Debug("check failed store recovery",
zap.String("address", address), zap.Duration("ttl", recoveryTTL))
v, ok := t.failedMPPStores.Load(address)
if !ok {
// store not in failed map
Expand All @@ -152,7 +159,7 @@ func (t MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, rec
logutil.BgLogger().Warn("MPPSotreState struct assert failed,will be clean",
zap.String("address", address),
zap.Any("state", v))
t.failedMPPStores.Delete(address)
t.Delete(address)
return false
}

Expand All @@ -162,10 +169,23 @@ func (t MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, rec
// Run a loop of scan
// there can be only one background task
func (t *MPPFailedStoreProbe) Run() {
for {
t.scan(context.Background())
time.Sleep(time.Second)
if !t.lock.TryLock() {
return
}
go func() {
defer t.lock.Unlock()
for {
metrics.TiFlashFailedMPPStoreState.WithLabelValues("probe").Set(-1) //probe heartbeat
t.scan(context.Background())
time.Sleep(time.Second)
}
}()

}

func (t *MPPFailedStoreProbe) Delete(address string) {
metrics.TiFlashFailedMPPStoreState.DeleteLabelValues(address)
t.failedMPPStores.Delete(address)
}

// MPPStore detect function
Expand All @@ -191,7 +211,6 @@ func detectMPPStore(ctx context.Context, client tikv.Client, address string) err
func init() {
globalMPPFailedStoreProbe = &MPPFailedStoreProbe{
failedMPPStores: &sync.Map{},
lock: &sync.Mutex{},
}
// run a background probe process
go globalMPPFailedStoreProbe.Run()
}

0 comments on commit b86ec48

Please sign in to comment.