Skip to content

Commit

Permalink
add prometheus metrics for write procedure (ngaut#231)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored Aug 21, 2019
1 parent 7872afe commit d0f3b7d
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 20 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/pingcap/pd v2.1.0-rc.4+incompatible
github.com/pingcap/tidb v0.0.0-20190325083614-d6490c1cab3a
github.com/pingcap/tipb v0.0.0-20190107072121-abbec73437b7
github.com/prometheus/client_golang v0.9.0
github.com/shirou/gopsutil v2.18.10+incompatible
github.com/stretchr/testify v1.3.0
github.com/uber-go/atomic v1.3.2
Expand Down
100 changes: 100 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package metrics

import (
"net/http"

"github.com/prometheus/client_golang/prometheus"
)

const (
namespace = "unistore"
raft = "raft"
)

var (
RaftWriterWait = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: raft,
Name: "writer_wait",
Buckets: prometheus.ExponentialBuckets(0.001, 1.5, 20),
})
WriteWaiteStepOne = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: raft,
Name: "writer_wait_step_1",
Buckets: prometheus.ExponentialBuckets(0.001, 1.5, 20),
})
WriteWaiteStepTwo = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: raft,
Name: "writer_wait_step_2",
Buckets: prometheus.ExponentialBuckets(0.001, 1.5, 20),
})
WriteWaiteStepThree = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: raft,
Name: "writer_wait_step_3",
Buckets: prometheus.ExponentialBuckets(0.001, 1.5, 20),
})
WriteWaiteStepFour = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: raft,
Name: "writer_wait_step_4",
Buckets: prometheus.ExponentialBuckets(0.001, 1.5, 20),
})

RaftDBUpdate = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: raft,
Name: "raft_db_update",
Buckets: prometheus.ExponentialBuckets(0.001, 1.5, 20),
})
KVDBUpdate = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: raft,
Name: "kv_db_update",
Buckets: prometheus.ExponentialBuckets(0.001, 1.5, 20),
})
LockUpdate = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: raft,
Name: "lock_update",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 15),
})
LatchWait = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: raft,
Name: "latch_wait",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 15),
})
RaftBatchSize = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: raft,
Name: "batch_size",
Buckets: prometheus.ExponentialBuckets(1, 1.5, 20),
})
)

func init() {
prometheus.MustRegister(RaftWriterWait)
prometheus.MustRegister(WriteWaiteStepOne)
prometheus.MustRegister(WriteWaiteStepTwo)
prometheus.MustRegister(WriteWaiteStepThree)
prometheus.MustRegister(WriteWaiteStepFour)
prometheus.MustRegister(RaftDBUpdate)
prometheus.MustRegister(KVDBUpdate)
prometheus.MustRegister(LockUpdate)
prometheus.MustRegister(RaftBatchSize)
prometheus.MustRegister(LatchWait)
http.Handle("/metrics", prometheus.Handler())
}
13 changes: 12 additions & 1 deletion tikv/raftstore/db_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"time"

"github.com/ngaut/unistore/metrics"
"github.com/ngaut/unistore/tikv/mvcc"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/errorpb"
Expand Down Expand Up @@ -139,12 +140,22 @@ func (writer *raftDBWriter) Write(batch mvcc.WriteBatch) error {
Request: request,
Callback: NewCallback(),
}
start := time.Now()
err := writer.router.sendRaftCommand(cmd)
if err != nil {
return err
}
cmd.Callback.wg.Wait()
return writer.checkResponse(cmd.Callback.resp, len(b.requests))
waitDoneTime := time.Now()
metrics.RaftWriterWait.Observe(waitDoneTime.Sub(start).Seconds())
cb := cmd.Callback
if !cb.raftBeginTime.IsZero() {
metrics.WriteWaiteStepOne.Observe(cb.raftBeginTime.Sub(start).Seconds())
metrics.WriteWaiteStepTwo.Observe(cb.raftDoneTime.Sub(cb.raftBeginTime).Seconds())
metrics.WriteWaiteStepThree.Observe(cb.applyBeginTime.Sub(cb.raftDoneTime).Seconds())
metrics.WriteWaiteStepFour.Observe(cb.applyDoneTime.Sub(cb.applyBeginTime).Seconds())
}
return writer.checkResponse(cb.resp, len(b.requests))
}

type RaftError struct {
Expand Down
8 changes: 8 additions & 0 deletions tikv/raftstore/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package raftstore
import (
"bytes"
"math"
"time"

"github.com/coocood/badger"
"github.com/cznic/mathutil"
"github.com/golang/protobuf/proto"
"github.com/ngaut/unistore/lockstore"
"github.com/ngaut/unistore/metrics"
"github.com/ngaut/unistore/tikv/dbreader"
"github.com/ngaut/unistore/tikv/mvcc"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -221,6 +223,7 @@ func (wb *WriteBatch) RollbackToSafePoint() {
// 2. Update lockStore, the date in lockStore may be older than the DB, so we need to restore then entries from raft log.
func (wb *WriteBatch) WriteToKV(bundle *mvcc.DBBundle) error {
if len(wb.entries) > 0 {
start := time.Now()
err := bundle.DB.Update(func(txn *badger.Txn) error {
for _, entry := range wb.entries {
var err1 error
Expand All @@ -235,11 +238,13 @@ func (wb *WriteBatch) WriteToKV(bundle *mvcc.DBBundle) error {
}
return nil
})
metrics.KVDBUpdate.Observe(time.Since(start).Seconds())
if err != nil {
return errors.WithStack(err)
}
}
if len(wb.lockEntries) > 0 {
start := time.Now()
bundle.MemStoreMu.Lock()
for _, entry := range wb.lockEntries {
switch entry.UserMeta[0] {
Expand All @@ -258,12 +263,14 @@ func (wb *WriteBatch) WriteToKV(bundle *mvcc.DBBundle) error {
}
}
bundle.MemStoreMu.Unlock()
metrics.LockUpdate.Observe(time.Since(start).Seconds())
}
return nil
}

func (wb *WriteBatch) WriteToRaft(db *badger.DB) error {
if len(wb.entries) > 0 {
start := time.Now()
err := db.Update(func(txn *badger.Txn) error {
var err1 error
for _, entry := range wb.entries {
Expand All @@ -278,6 +285,7 @@ func (wb *WriteBatch) WriteToRaft(db *badger.DB) error {
}
return nil
})
metrics.RaftDBUpdate.Observe(time.Since(start).Seconds())
if err != nil {
return errors.WithStack(err)
}
Expand Down
6 changes: 4 additions & 2 deletions tikv/raftstore/fsm_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,11 @@ type applyCallback struct {
cbs []*Callback
}

func (c *applyCallback) invokeAll(host *CoprocessorHost) {
func (c *applyCallback) invokeAll(host *CoprocessorHost, doneApplyTime time.Time) {
for _, cb := range c.cbs {
if cb != nil {
host.postApply(c.region, cb.resp)
cb.applyDoneTime = doneApplyTime
cb.wg.Done()
}
}
Expand Down Expand Up @@ -353,8 +354,9 @@ func (ac *applyContext) writeToDB() {
}
ac.wb.Reset()
}
doneApply := time.Now()
for _, cb := range ac.cbs {
cb.invokeAll(ac.host)
cb.invokeAll(ac.host, doneApply)
}
ac.cbs = ac.cbs[:0]
}
Expand Down
8 changes: 6 additions & 2 deletions tikv/raftstore/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,12 @@ func NewMsg(tp MsgType, data interface{}) Msg {
}

type Callback struct {
resp *raft_cmdpb.RaftCmdResponse
wg sync.WaitGroup
resp *raft_cmdpb.RaftCmdResponse
wg sync.WaitGroup
raftBeginTime time.Time
raftDoneTime time.Time
applyBeginTime time.Time
applyDoneTime time.Time
}

func (cb *Callback) Done(resp *raft_cmdpb.RaftCmdResponse) {
Expand Down
48 changes: 33 additions & 15 deletions tikv/raftstore/peer_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"unsafe"

"github.com/ngaut/log"
"github.com/ngaut/unistore/metrics"
)

// peerState contains the peer states that needs to run raft command and apply command.
Expand Down Expand Up @@ -69,19 +70,29 @@ type workerHandle struct {
}

type applyBatch struct {
msgs []Msg
peers map[uint64]*peerState
barriers []*sync.WaitGroup
msgs []Msg
peers map[uint64]*peerState
barriers []*sync.WaitGroup
proposals []*regionProposal
}

func (b *applyBatch) iterCallbacks(f func(cb *Callback)) {
for _, rp := range b.proposals {
for _, p := range rp.Props {
if p.cb != nil {
f(p.cb)
}
}
}
}

// raftWorker is responsible for run raft commands and apply raft logs.
type raftWorker struct {
pr *router

raftCh chan Msg
raftCtx *PollContext
raftStartTime time.Time
pendingProposals []*regionProposal
raftCh chan Msg
raftCtx *PollContext
raftStartTime time.Time

applyCh chan *applyBatch
applyCtx *applyContext
Expand Down Expand Up @@ -123,6 +134,7 @@ func (rw *raftWorker) run(closeCh <-chan struct{}, wg *sync.WaitGroup) {
for i := 0; i < pending; i++ {
msgs = append(msgs, <-rw.raftCh)
}
metrics.RaftBatchSize.Observe(float64(len(msgs)))
atomic.AddUint64(&rw.msgCnt, uint64(len(msgs)))
peerStateMap := make(map[uint64]*peerState)
rw.raftCtx.pendingCount = 0
Expand All @@ -144,13 +156,18 @@ func (rw *raftWorker) run(closeCh <-chan struct{}, wg *sync.WaitGroup) {
for id, peerState := range peerStateMap {
movePeer = id
delegate := &peerFsmDelegate{peerFsm: peerState.peer, ctx: rw.raftCtx}
rw.pendingProposals = delegate.collectReady(rw.pendingProposals)
batch.proposals = delegate.collectReady(batch.proposals)
}
// Pick one peer as the candidate to be moved to other workers.
atomic.StoreUint64(&rw.movePeerCandidate, movePeer)
if rw.raftCtx.hasReady {
rw.handleRaftReady(peerStateMap)
}
doneRaftTime := time.Now()
batch.iterCallbacks(func(cb *Callback) {
cb.raftBeginTime = rw.raftStartTime
cb.raftDoneTime = doneRaftTime
})
applyMsgs := rw.raftCtx.applyMsgs
batch.msgs = append(batch.msgs, applyMsgs.msgs...)
applyMsgs.msgs = applyMsgs.msgs[:0]
Expand All @@ -168,13 +185,10 @@ func (rw *raftWorker) getPeerState(peersMap map[uint64]*peerState, regionID uint
return peer
}

func (rw *raftWorker) handleRaftReady(peers map[uint64]*peerState) {
if len(rw.pendingProposals) > 0 {
for _, proposal := range rw.pendingProposals {
msg := Msg{Type: MsgTypeApplyProposal, Data: proposal}
rw.raftCtx.applyMsgs.appendMsg(proposal.RegionId, msg)
}
rw.pendingProposals = nil
func (rw *raftWorker) handleRaftReady(peers map[uint64]*peerState, batch *applyBatch) {
for _, proposal := range batch.proposals {
msg := Msg{Type: MsgTypeApplyProposal, Data: proposal}
rw.raftCtx.applyMsgs.appendMsg(proposal.RegionId, msg)
}
kvWB := rw.raftCtx.kvWB
if len(kvWB.entries) > 0 {
Expand Down Expand Up @@ -233,6 +247,10 @@ func (rw *raftWorker) runApply(wg *sync.WaitGroup) {
wg.Done()
return
}
begin := time.Now()
batch.iterCallbacks(func(cb *Callback) {
cb.applyBeginTime = begin
})
for _, peer := range batch.peers {
peer.apply.redoIndex = peer.apply.applyState.appliedIndex + 1
}
Expand Down
2 changes: 2 additions & 0 deletions tikv/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/coocood/badger"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/ngaut/unistore/metrics"
"github.com/ngaut/unistore/pd"
"github.com/ngaut/unistore/tikv/raftstore"
"github.com/pingcap/kvproto/pkg/errorpb"
Expand Down Expand Up @@ -154,6 +155,7 @@ func (ri *regionCtx) AcquireLatches(hashVals []uint64) {
ok, wg := ri.tryAcquireLatches(hashVals)
if ok {
dur := time.Since(start)
metrics.LatchWait.Observe(dur.Seconds())
if dur > time.Millisecond*50 {
log.Warnf("region %d acquire %d locks takes %v", ri.meta.Id, len(hashVals), dur)
}
Expand Down

0 comments on commit d0f3b7d

Please sign in to comment.