Skip to content

core, ethdb, metrics, p2p: expose various counter metrics for grafana #19692

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ import (
)

var (
headBlockGauge = metrics.NewRegisteredGauge("chain/head/block", nil)
headHeaderGauge = metrics.NewRegisteredGauge("chain/head/header", nil)
headFastBlockGauge = metrics.NewRegisteredGauge("chain/head/receipt", nil)

accountReadTimer = metrics.NewRegisteredTimer("chain/account/reads", nil)
accountHashTimer = metrics.NewRegisteredTimer("chain/account/hashes", nil)
accountUpdateTimer = metrics.NewRegisteredTimer("chain/account/updates", nil)
Expand Down Expand Up @@ -332,6 +336,7 @@ func (bc *BlockChain) loadLastState() error {
}
// Everything seems to be fine, set as the head block
bc.currentBlock.Store(currentBlock)
headBlockGauge.Update(int64(currentBlock.NumberU64()))

// Restore the last known head header
currentHeader := currentBlock.Header()
Expand All @@ -344,12 +349,14 @@ func (bc *BlockChain) loadLastState() error {

// Restore the last known head fast block
bc.currentFastBlock.Store(currentBlock)
headFastBlockGauge.Update(int64(currentBlock.NumberU64()))

if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) {
if block := bc.GetBlockByHash(head); block != nil {
bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
}
}

// Issue a status log for the user
currentFastBlock := bc.CurrentFastBlock()

Expand Down Expand Up @@ -388,6 +395,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
}
rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash())
bc.currentBlock.Store(newHeadBlock)
headBlockGauge.Update(int64(newHeadBlock.NumberU64()))
}

// Rewind the fast block in a simpleton way to the target head
Expand All @@ -399,6 +407,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
}
rawdb.WriteHeadFastBlockHash(db, newHeadFastBlock.Hash())
bc.currentFastBlock.Store(newHeadFastBlock)
headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64()))
}
}

Expand Down Expand Up @@ -450,6 +459,7 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
// If all checks out, manually set the head block
bc.chainmu.Lock()
bc.currentBlock.Store(block)
headBlockGauge.Update(int64(block.NumberU64()))
bc.chainmu.Unlock()

log.Info("Committed new head block", "number", block.Number(), "hash", hash)
Expand Down Expand Up @@ -522,9 +532,12 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
bc.genesisBlock = genesis
bc.insert(bc.genesisBlock)
bc.currentBlock.Store(bc.genesisBlock)
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))

bc.hc.SetGenesis(bc.genesisBlock.Header())
bc.hc.SetCurrentHeader(bc.genesisBlock.Header())
bc.currentFastBlock.Store(bc.genesisBlock)
headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))

return nil
}
Expand Down Expand Up @@ -598,13 +611,15 @@ func (bc *BlockChain) insert(block *types.Block) {
rawdb.WriteHeadBlockHash(bc.db, block.Hash())

bc.currentBlock.Store(block)
headBlockGauge.Update(int64(block.NumberU64()))

// If the block is better than our head or is on a different chain, force update heads
if updateHeads {
bc.hc.SetCurrentHeader(block.Header())
rawdb.WriteHeadFastBlockHash(bc.db, block.Hash())

bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
}
}

Expand Down Expand Up @@ -862,11 +877,13 @@ func (bc *BlockChain) Rollback(chain []common.Hash) {
newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1)
rawdb.WriteHeadFastBlockHash(bc.db, newFastBlock.Hash())
bc.currentFastBlock.Store(newFastBlock)
headFastBlockGauge.Update(int64(newFastBlock.NumberU64()))
}
if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash {
newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1)
rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash())
bc.currentBlock.Store(newBlock)
headBlockGauge.Update(int64(newBlock.NumberU64()))
}
}
// Truncate ancient data which exceeds the current header.
Expand Down Expand Up @@ -952,6 +969,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64()).Cmp(td) < 0 {
rawdb.WriteHeadFastBlockHash(bc.db, head.Hash())
bc.currentFastBlock.Store(head)
headFastBlockGauge.Update(int64(head.NumberU64()))
isCanonical = true
}
}
Expand Down
5 changes: 4 additions & 1 deletion core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine c
}
}
hc.currentHeaderHash = hc.CurrentHeader().Hash()
headHeaderGauge.Update(hc.CurrentHeader().Number.Int64())

return hc, nil
}
Expand Down Expand Up @@ -185,12 +186,12 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er

hc.currentHeaderHash = hash
hc.currentHeader.Store(types.CopyHeader(header))
headHeaderGauge.Update(header.Number.Int64())

status = CanonStatTy
} else {
status = SideStatTy
}

hc.headerCache.Add(hash, header)
hc.numberCache.Add(hash, number)

Expand Down Expand Up @@ -456,6 +457,7 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.Header) {

hc.currentHeader.Store(head)
hc.currentHeaderHash = head.Hash()
headHeaderGauge.Update(head.Number.Int64())
}

type (
Expand Down Expand Up @@ -508,6 +510,7 @@ func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, d

hc.currentHeader.Store(parent)
hc.currentHeaderHash = parentHash
headHeaderGauge.Update(parent.Number.Int64())
}
batch.Write()

Expand Down
7 changes: 4 additions & 3 deletions core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ type freezer struct {
func newFreezer(datadir string, namespace string) (*freezer, error) {
// Create the initial freezer object
var (
readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil)
writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil)
readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil)
writeMeter = metrics.NewRegisteredMeter(namespace+"ancient/write", nil)
sizeCounter = metrics.NewRegisteredCounter(namespace+"ancient/size", nil)
)
// Ensure the datadir is not a symbolic link if it exists.
if info, err := os.Lstat(datadir); !os.IsNotExist(err) {
Expand All @@ -102,7 +103,7 @@ func newFreezer(datadir string, namespace string) (*freezer, error) {
instanceLock: lock,
}
for name, disableSnappy := range freezerNoSnappy {
table, err := newTable(datadir, name, readMeter, writeMeter, disableSnappy)
table, err := newTable(datadir, name, readMeter, writeMeter, sizeCounter, disableSnappy)
if err != nil {
for _, table := range freezer.tables {
table.Close()
Expand Down
44 changes: 38 additions & 6 deletions core/rawdb/freezer_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,18 @@ type freezerTable struct {
// to count how many historic items have gone missing.
itemOffset uint32 // Offset (number of discarded items)

headBytes uint32 // Number of bytes written to the head file
readMeter metrics.Meter // Meter for measuring the effective amount of data read
writeMeter metrics.Meter // Meter for measuring the effective amount of data written
headBytes uint32 // Number of bytes written to the head file
readMeter metrics.Meter // Meter for measuring the effective amount of data read
writeMeter metrics.Meter // Meter for measuring the effective amount of data written
sizeCounter metrics.Counter // Counter for tracking the combined size of all freezer tables

logger log.Logger // Logger with database path and table name ambedded
lock sync.RWMutex // Mutex protecting the data file descriptors
}

// newTable opens a freezer table with default settings - 2G files
func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, disableSnappy bool) (*freezerTable, error) {
return newCustomTable(path, name, readMeter, writeMeter, 2*1000*1000*1000, disableSnappy)
func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeCounter metrics.Counter, disableSnappy bool) (*freezerTable, error) {
return newCustomTable(path, name, readMeter, writeMeter, sizeCounter, 2*1000*1000*1000, disableSnappy)
}

// openFreezerFileForAppend opens a freezer table file and seeks to the end
Expand Down Expand Up @@ -148,7 +149,7 @@ func truncateFreezerFile(file *os.File, size int64) error {
// newCustomTable opens a freezer table, creating the data and index files if they are
// non existent. Both files are truncated to the shortest common length to ensure
// they don't go out of sync.
func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, maxFilesize uint32, noCompression bool) (*freezerTable, error) {
func newCustomTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeCounter metrics.Counter, maxFilesize uint32, noCompression bool) (*freezerTable, error) {
// Ensure the containing directory exists and open the indexEntry file
if err := os.MkdirAll(path, 0755); err != nil {
return nil, err
Expand All @@ -171,6 +172,7 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete
files: make(map[uint32]*os.File),
readMeter: readMeter,
writeMeter: writeMeter,
sizeCounter: sizeCounter,
name: name,
path: path,
logger: log.New("database", path, "table", name),
Expand All @@ -181,6 +183,14 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete
tab.Close()
return nil, err
}
// Initialize the starting size counter
size, err := tab.sizeNolock()
if err != nil {
tab.Close()
return nil, err
}
tab.sizeCounter.Inc(int64(size))

return tab, nil
}

Expand Down Expand Up @@ -321,6 +331,11 @@ func (t *freezerTable) truncate(items uint64) error {
if atomic.LoadUint64(&t.items) <= items {
return nil
}
// We need to truncate, save the old size for metrics tracking
oldSize, err := t.sizeNolock()
if err != nil {
return err
}
// Something's out of sync, truncate the table's offset index
t.logger.Warn("Truncating freezer table", "items", t.items, "limit", items)
if err := truncateFreezerFile(t.index, int64(items+1)*indexEntrySize); err != nil {
Expand Down Expand Up @@ -355,6 +370,14 @@ func (t *freezerTable) truncate(items uint64) error {
// All data files truncated, set internal counters and return
atomic.StoreUint64(&t.items, items)
atomic.StoreUint32(&t.headBytes, expected.offset)

// Retrieve the new size and update the total size counter
newSize, err := t.sizeNolock()
if err != nil {
return err
}
t.sizeCounter.Dec(int64(oldSize - newSize))

return nil
}

Expand Down Expand Up @@ -483,7 +506,10 @@ func (t *freezerTable) Append(item uint64, blob []byte) error {
}
// Write indexEntry
t.index.Write(idx.marshallBinary())

t.writeMeter.Mark(int64(bLen + indexEntrySize))
t.sizeCounter.Inc(int64(bLen + indexEntrySize))

atomic.AddUint64(&t.items, 1)
return nil
}
Expand Down Expand Up @@ -562,6 +588,12 @@ func (t *freezerTable) size() (uint64, error) {
t.lock.RLock()
defer t.lock.RUnlock()

return t.sizeNolock()
}

// sizeNolock returns the total data size in the freezer table without obtaining
// the mutex first.
func (t *freezerTable) sizeNolock() (uint64, error) {
stat, err := t.index.Stat()
if err != nil {
return 0, err
Expand Down
Loading