Skip to content

Commit

Permalink
Mdb no db size log (erigontech#1665)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored and sam committed Apr 5, 2021
1 parent 9890b76 commit 30faf17
Show file tree
Hide file tree
Showing 15 changed files with 145 additions and 768 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ grpc:
--go_opt=Mtypes/types.proto=github.com/ledgerwatch/turbo-geth/gointerfaces/types \
types/types.proto \
p2psentry/sentry.proto \
remote/kv.proto remote/db.proto remote/ethbackend.proto \
remote/kv.proto remote/ethbackend.proto \
snapshot_downloader/external_downloader.proto

prometheus:
Expand Down
4 changes: 2 additions & 2 deletions eth/stagedsync/stage_call_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ func promoteCallTraces(logPrefix string, tx ethdb.Database, startBlock, endBlock
select {
default:
case <-logEvery.C:
sz, err := tx.(ethdb.HasTx).Tx().BucketSize(dbutils.CallFromIndex)
sz, err := tx.(ethdb.HasTx).Tx().(ethdb.HasStats).BucketSize(dbutils.CallFromIndex)
if err != nil {
return err
}
sz2, err := tx.(ethdb.HasTx).Tx().BucketSize(dbutils.CallToIndex)
sz2, err := tx.(ethdb.HasTx).Tx().(ethdb.HasStats).BucketSize(dbutils.CallToIndex)
if err != nil {
return err
}
Expand Down
6 changes: 0 additions & 6 deletions eth/stagedsync/stage_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,6 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
if err = tx.CommitAndBegin(context.Background()); err != nil {
return err
}
if err = printBucketsSize(tx); err != nil {
return err
}
chainContext.SetDB(tx)
}
}
Expand All @@ -242,9 +239,6 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
if err = tx.CommitAndBegin(context.Background()); err != nil {
return err
}
if err = printBucketsSize(tx); err != nil {
return err
}
chainContext.SetDB(tx)
}
//start = time.Now()
Expand Down
43 changes: 0 additions & 43 deletions eth/stagedsync/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"bytes"
"fmt"
"runtime"
"sort"
"time"

"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
Expand Down Expand Up @@ -212,47 +210,6 @@ func (s *State) Run(db ethdb.GetterPutter, tx ethdb.GetterPutter) error {
} else {
log.Info("Timings", timings...)
}
if err := printBucketsSize(tx); err != nil {
return err
}
return nil
}

//nolint
func printBucketsSize(dbTx ethdb.Getter) error {
hasTx, ok := dbTx.(ethdb.HasTx)
if !ok {
return nil
}
tx := hasTx.Tx()
if tx == nil {
return nil
}
buckets, err := tx.(ethdb.BucketMigrator).ExistingBuckets()
if err != nil {
return err
}
sort.Strings(buckets)
bucketSizes := make([]interface{}, 0, 2*len(buckets))
for _, bucket := range buckets {
sz, err1 := tx.BucketSize(bucket)
if err1 != nil {
return err1
}
if sz < uint64(10*datasize.GB) {
continue
}
bucketSizes = append(bucketSizes, bucket, common.StorageSize(sz))
}
if len(bucketSizes) == 0 {
return nil
}
sz, err1 := tx.BucketSize("freelist")
if err1 != nil {
return err1
}
bucketSizes = append(bucketSizes, "freelist", common.StorageSize(sz))
log.Info("Tables", bucketSizes...)
return nil
}

Expand Down
25 changes: 2 additions & 23 deletions ethdb/kv_abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,7 @@ var (
ErrAttemptToDeleteNonDeprecatedBucket = errors.New("only buckets from dbutils.DeprecatedBuckets can be deleted")
ErrUnknownBucket = errors.New("unknown bucket. add it to dbutils.Buckets")

dbSize = metrics.GetOrRegisterGauge("db/size", metrics.DefaultRegistry)
tableScsLeaf = metrics.GetOrRegisterGauge("table/scs/leaf", metrics.DefaultRegistry) //nolint
tableScsBranch = metrics.GetOrRegisterGauge("table/scs/branch", metrics.DefaultRegistry) //nolint
tableScsOverflow = metrics.GetOrRegisterGauge("table/scs/overflow", metrics.DefaultRegistry) //nolint
tableScsEntries = metrics.GetOrRegisterGauge("table/scs/entries", metrics.DefaultRegistry) //nolint
tableStateLeaf = metrics.GetOrRegisterGauge("table/state/leaf", metrics.DefaultRegistry) //nolint
tableStateBranch = metrics.GetOrRegisterGauge("table/state/branch", metrics.DefaultRegistry) //nolint
tableStateOverflow = metrics.GetOrRegisterGauge("table/state/overflow", metrics.DefaultRegistry) //nolint
tableStateEntries = metrics.GetOrRegisterGauge("table/state/entries", metrics.DefaultRegistry) //nolint
tableLogLeaf = metrics.GetOrRegisterGauge("table/log/leaf", metrics.DefaultRegistry) //nolint
tableLogBranch = metrics.GetOrRegisterGauge("table/log/branch", metrics.DefaultRegistry) //nolint
tableLogOverflow = metrics.GetOrRegisterGauge("table/log/overflow", metrics.DefaultRegistry) //nolint
tableLogEntries = metrics.GetOrRegisterGauge("table/log/entries", metrics.DefaultRegistry) //nolint
tableTxLeaf = metrics.GetOrRegisterGauge("table/tx/leaf", metrics.DefaultRegistry) //nolint
tableTxBranch = metrics.GetOrRegisterGauge("table/tx/branch", metrics.DefaultRegistry) //nolint
tableTxOverflow = metrics.GetOrRegisterGauge("table/tx/overflow", metrics.DefaultRegistry) //nolint
tableTxEntries = metrics.GetOrRegisterGauge("table/tx/entries", metrics.DefaultRegistry) //nolint
tableGcLeaf = metrics.GetOrRegisterGauge("table/gc/leaf", metrics.DefaultRegistry) //nolint
tableGcBranch = metrics.GetOrRegisterGauge("table/gc/branch", metrics.DefaultRegistry) //nolint
tableGcOverflow = metrics.GetOrRegisterGauge("table/gc/overflow", metrics.DefaultRegistry) //nolint
tableGcEntries = metrics.GetOrRegisterGauge("table/gc/entries", metrics.DefaultRegistry) //nolint
dbSize = metrics.GetOrRegisterGauge("db/size", metrics.DefaultRegistry) //nolint
)

// Read-only version of KV.
Expand Down Expand Up @@ -116,8 +96,6 @@ type Tx interface {
Commit(ctx context.Context) error // Commit all the operations of a transaction into the database.
Rollback() // Rollback - abandon all the operations of the transaction instead of saving them.

BucketSize(name string) (uint64, error)

Comparator(bucket string) dbutils.CmpFunc

// ReadSequence - allows to create a linear sequence of unique positive integers for each table.
Expand Down Expand Up @@ -215,5 +193,6 @@ type RwCursorDupSort interface {
}

type HasStats interface {
BucketSize(name string) (uint64, error)
DiskSize(context.Context) (uint64, error) // db size
}
75 changes: 38 additions & 37 deletions ethdb/kv_lmdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,43 +321,44 @@ func (db *LmdbKV) DiskSize(_ context.Context) (uint64, error) {
}

func (db *LmdbKV) CollectMetrics() {
fileInfo, _ := os.Stat(path.Join(db.opts.path, "data.mdb"))
dbSize.Update(fileInfo.Size())

if err := db.View(context.Background(), func(tx Tx) error {
stat, _ := tx.(*lmdbTx).BucketStat(dbutils.PlainStorageChangeSetBucket)
tableScsLeaf.Update(int64(stat.LeafPages))
tableScsBranch.Update(int64(stat.BranchPages))
tableScsOverflow.Update(int64(stat.OverflowPages))
tableScsEntries.Update(int64(stat.Entries))

stat, _ = tx.(*lmdbTx).BucketStat(dbutils.PlainStateBucket)
tableStateLeaf.Update(int64(stat.LeafPages))
tableStateBranch.Update(int64(stat.BranchPages))
tableStateOverflow.Update(int64(stat.OverflowPages))
tableStateEntries.Update(int64(stat.Entries))

stat, _ = tx.(*lmdbTx).BucketStat(dbutils.Log)
tableLogLeaf.Update(int64(stat.LeafPages))
tableLogBranch.Update(int64(stat.BranchPages))
tableLogOverflow.Update(int64(stat.OverflowPages))
tableLogEntries.Update(int64(stat.Entries))

stat, _ = tx.(*lmdbTx).BucketStat(dbutils.EthTx)
tableTxLeaf.Update(int64(stat.LeafPages))
tableTxBranch.Update(int64(stat.BranchPages))
tableTxOverflow.Update(int64(stat.OverflowPages))
tableTxEntries.Update(int64(stat.Entries))

stat, _ = tx.(*lmdbTx).BucketStat("gc")
tableGcLeaf.Update(int64(stat.LeafPages))
tableGcBranch.Update(int64(stat.BranchPages))
tableGcOverflow.Update(int64(stat.OverflowPages))
tableGcEntries.Update(int64(stat.Entries))
return nil
}); err != nil {
log.Error("collecting metrics failed", "err", err)
}
/*
fileInfo, _ := os.Stat(path.Join(db.opts.path, "data.mdb"))
dbSize.Update(fileInfo.Size())
if err := db.View(context.Background(), func(tx Tx) error {
stat, _ := tx.(*lmdbTx).BucketStat(dbutils.PlainStorageChangeSetBucket)
tableScsLeaf.Update(int64(stat.LeafPages))
tableScsBranch.Update(int64(stat.BranchPages))
tableScsOverflow.Update(int64(stat.OverflowPages))
tableScsEntries.Update(int64(stat.Entries))
stat, _ = tx.(*lmdbTx).BucketStat(dbutils.PlainStateBucket)
tableStateLeaf.Update(int64(stat.LeafPages))
tableStateBranch.Update(int64(stat.BranchPages))
tableStateOverflow.Update(int64(stat.OverflowPages))
tableStateEntries.Update(int64(stat.Entries))
stat, _ = tx.(*lmdbTx).BucketStat(dbutils.Log)
tableLogLeaf.Update(int64(stat.LeafPages))
tableLogBranch.Update(int64(stat.BranchPages))
tableLogOverflow.Update(int64(stat.OverflowPages))
tableLogEntries.Update(int64(stat.Entries))
stat, _ = tx.(*lmdbTx).BucketStat(dbutils.EthTx)
tableTxLeaf.Update(int64(stat.LeafPages))
tableTxBranch.Update(int64(stat.BranchPages))
tableTxOverflow.Update(int64(stat.OverflowPages))
tableTxEntries.Update(int64(stat.Entries))
stat, _ = tx.(*lmdbTx).BucketStat("gc")
tableGcLeaf.Update(int64(stat.LeafPages))
tableGcBranch.Update(int64(stat.BranchPages))
tableGcOverflow.Update(int64(stat.OverflowPages))
tableGcEntries.Update(int64(stat.Entries))
return nil
}); err != nil {
log.Error("collecting metrics failed", "err", err)
}
*/
}

func (db *LmdbKV) Begin(_ context.Context) (txn Tx, err error) {
Expand Down
81 changes: 42 additions & 39 deletions ethdb/kv_mdbx.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,43 +313,45 @@ func (db *MdbxKV) DiskSize(_ context.Context) (uint64, error) {
}

func (db *MdbxKV) CollectMetrics() {
info, _ := db.env.Info()
dbSize.Update(int64(info.Geo.Current))

if err := db.View(context.Background(), func(tx Tx) error {
stat, _ := tx.(*MdbxTx).BucketStat(dbutils.PlainStorageChangeSetBucket)
tableScsLeaf.Update(int64(stat.LeafPages))
tableScsBranch.Update(int64(stat.BranchPages))
tableScsOverflow.Update(int64(stat.OverflowPages))
tableScsEntries.Update(int64(stat.Entries))

stat, _ = tx.(*MdbxTx).BucketStat(dbutils.PlainStateBucket)
tableStateLeaf.Update(int64(stat.LeafPages))
tableStateBranch.Update(int64(stat.BranchPages))
tableStateOverflow.Update(int64(stat.OverflowPages))
tableStateEntries.Update(int64(stat.Entries))

stat, _ = tx.(*MdbxTx).BucketStat(dbutils.Log)
tableLogLeaf.Update(int64(stat.LeafPages))
tableLogBranch.Update(int64(stat.BranchPages))
tableLogOverflow.Update(int64(stat.OverflowPages))
tableLogEntries.Update(int64(stat.Entries))

stat, _ = tx.(*MdbxTx).BucketStat(dbutils.EthTx)
tableTxLeaf.Update(int64(stat.LeafPages))
tableTxBranch.Update(int64(stat.BranchPages))
tableTxOverflow.Update(int64(stat.OverflowPages))
tableTxEntries.Update(int64(stat.Entries))

stat, _ = tx.(*MdbxTx).BucketStat("gc")
tableGcLeaf.Update(int64(stat.LeafPages))
tableGcBranch.Update(int64(stat.BranchPages))
tableGcOverflow.Update(int64(stat.OverflowPages))
tableGcEntries.Update(int64(stat.Entries))
return nil
}); err != nil {
log.Error("collecting metrics failed", "err", err)
}
/*
info, _ := db.env.Info()
dbSize.Update(int64(info.Geo.Current))
if err := db.View(context.Background(), func(tx Tx) error {
stat, _ := tx.(*MdbxTx).BucketStat(dbutils.PlainStorageChangeSetBucket)
tableScsLeaf.Update(int64(stat.LeafPages))
tableScsBranch.Update(int64(stat.BranchPages))
tableScsOverflow.Update(int64(stat.OverflowPages))
tableScsEntries.Update(int64(stat.Entries))
stat, _ = tx.(*MdbxTx).BucketStat(dbutils.PlainStateBucket)
tableStateLeaf.Update(int64(stat.LeafPages))
tableStateBranch.Update(int64(stat.BranchPages))
tableStateOverflow.Update(int64(stat.OverflowPages))
tableStateEntries.Update(int64(stat.Entries))
stat, _ = tx.(*MdbxTx).BucketStat(dbutils.Log)
tableLogLeaf.Update(int64(stat.LeafPages))
tableLogBranch.Update(int64(stat.BranchPages))
tableLogOverflow.Update(int64(stat.OverflowPages))
tableLogEntries.Update(int64(stat.Entries))
stat, _ = tx.(*MdbxTx).BucketStat(dbutils.EthTx)
tableTxLeaf.Update(int64(stat.LeafPages))
tableTxBranch.Update(int64(stat.BranchPages))
tableTxOverflow.Update(int64(stat.OverflowPages))
tableTxEntries.Update(int64(stat.Entries))
stat, _ = tx.(*MdbxTx).BucketStat("gc")
tableGcLeaf.Update(int64(stat.LeafPages))
tableGcBranch.Update(int64(stat.BranchPages))
tableGcOverflow.Update(int64(stat.OverflowPages))
tableGcEntries.Update(int64(stat.Entries))
return nil
}); err != nil {
log.Error("collecting metrics failed", "err", err)
}
*/
}

func (db *MdbxKV) Begin(_ context.Context) (txn Tx, err error) {
Expand Down Expand Up @@ -642,7 +644,7 @@ func (tx *MdbxTx) Commit(ctx context.Context) error {
slowTx = debug.SlowCommit()
}

tx.printDebugInfo()
//tx.printDebugInfo()

latency, err := tx.tx.Commit()
if err != nil {
Expand Down Expand Up @@ -679,10 +681,11 @@ func (tx *MdbxTx) Rollback() {
}
}()
tx.closeCursors()
tx.printDebugInfo()
//tx.printDebugInfo()
tx.tx.Abort()
}

//nolint
func (tx *MdbxTx) printDebugInfo() {
if debug.BigRoTxKb() > 0 || debug.BigRwTxKb() > 0 {
txInfo, err := tx.tx.Info(true)
Expand Down
18 changes: 0 additions & 18 deletions ethdb/kv_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type remoteOpts struct {
type RemoteKV struct {
opts remoteOpts
remoteKV remote.KVClient
remoteDB remote.DBClient
conn *grpc.ClientConn
log log.Logger
buckets dbutils.BucketsCfg
Expand Down Expand Up @@ -147,7 +146,6 @@ func (opts remoteOpts) Open(certFile, keyFile, caCert string) (RwKV, error) {
opts: opts,
conn: conn,
remoteKV: remote.NewKVClient(conn),
remoteDB: remote.NewDBClient(conn),
log: log.New("remote_db", opts.DialAddress),
buckets: dbutils.BucketsCfg{},
}
Expand Down Expand Up @@ -192,14 +190,6 @@ func (db *RemoteKV) Close() {
}
}

func (db *RemoteKV) DiskSize(ctx context.Context) (uint64, error) {
sizeReply, err := db.remoteDB.Size(ctx, &remote.SizeRequest{})
if err != nil {
return 0, err
}
return sizeReply.Size, nil
}

func (db *RemoteKV) CollectMetrics() {}

func (db *RemoteKV) Begin(ctx context.Context) (Tx, error) {
Expand Down Expand Up @@ -259,14 +249,6 @@ func (c *remoteCursor) Prefetch(v uint) Cursor {
return c
}

func (tx *remoteTx) BucketSize(name string) (uint64, error) {
sizeReply, err := tx.db.remoteDB.BucketSize(tx.ctx, &remote.BucketSizeRequest{BucketName: name})
if err != nil {
return 0, err
}
return sizeReply.Size, nil
}

func (tx *remoteTx) GetOne(bucket string, key []byte) (val []byte, err error) {
c, _ := tx.Cursor(bucket)
defer c.Close()
Expand Down
11 changes: 0 additions & 11 deletions ethdb/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,6 @@ func (m *mutation) Has(table string, key []byte) (bool, error) {
return false, nil
}

func (m *mutation) DiskSize(ctx context.Context) (common.StorageSize, error) {
if m.db == nil {
return 0, nil
}
sz, err := m.db.(HasStats).DiskSize(ctx)
if err != nil {
return 0, err
}
return common.StorageSize(sz), nil
}

func (m *mutation) Put(table string, key []byte, value []byte) error {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
Loading

0 comments on commit 30faf17

Please sign in to comment.