Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mdb no db size log #1665

Merged
merged 11 commits into from
Apr 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ grpc:
--go_opt=Mtypes/types.proto=github.com/ledgerwatch/turbo-geth/gointerfaces/types \
types/types.proto \
p2psentry/sentry.proto \
remote/kv.proto remote/db.proto remote/ethbackend.proto \
remote/kv.proto remote/ethbackend.proto \
snapshot_downloader/external_downloader.proto

prometheus:
Expand Down
4 changes: 2 additions & 2 deletions eth/stagedsync/stage_call_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ func promoteCallTraces(logPrefix string, tx ethdb.Database, startBlock, endBlock
select {
default:
case <-logEvery.C:
sz, err := tx.(ethdb.HasTx).Tx().BucketSize(dbutils.CallFromIndex)
sz, err := tx.(ethdb.HasTx).Tx().(ethdb.HasStats).BucketSize(dbutils.CallFromIndex)
if err != nil {
return err
}
sz2, err := tx.(ethdb.HasTx).Tx().BucketSize(dbutils.CallToIndex)
sz2, err := tx.(ethdb.HasTx).Tx().(ethdb.HasStats).BucketSize(dbutils.CallToIndex)
if err != nil {
return err
}
Expand Down
6 changes: 0 additions & 6 deletions eth/stagedsync/stage_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,6 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
if err = tx.CommitAndBegin(context.Background()); err != nil {
return err
}
if err = printBucketsSize(tx); err != nil {
return err
}
chainContext.SetDB(tx)
}
}
Expand All @@ -242,9 +239,6 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
if err = tx.CommitAndBegin(context.Background()); err != nil {
return err
}
if err = printBucketsSize(tx); err != nil {
return err
}
chainContext.SetDB(tx)
}
//start = time.Now()
Expand Down
43 changes: 0 additions & 43 deletions eth/stagedsync/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"bytes"
"fmt"
"runtime"
"sort"
"time"

"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
Expand Down Expand Up @@ -212,47 +210,6 @@ func (s *State) Run(db ethdb.GetterPutter, tx ethdb.GetterPutter) error {
} else {
log.Info("Timings", timings...)
}
if err := printBucketsSize(tx); err != nil {
return err
}
return nil
}

//nolint
func printBucketsSize(dbTx ethdb.Getter) error {
hasTx, ok := dbTx.(ethdb.HasTx)
if !ok {
return nil
}
tx := hasTx.Tx()
if tx == nil {
return nil
}
buckets, err := tx.(ethdb.BucketMigrator).ExistingBuckets()
if err != nil {
return err
}
sort.Strings(buckets)
bucketSizes := make([]interface{}, 0, 2*len(buckets))
for _, bucket := range buckets {
sz, err1 := tx.BucketSize(bucket)
if err1 != nil {
return err1
}
if sz < uint64(10*datasize.GB) {
continue
}
bucketSizes = append(bucketSizes, bucket, common.StorageSize(sz))
}
if len(bucketSizes) == 0 {
return nil
}
sz, err1 := tx.BucketSize("freelist")
if err1 != nil {
return err1
}
bucketSizes = append(bucketSizes, "freelist", common.StorageSize(sz))
log.Info("Tables", bucketSizes...)
return nil
}

Expand Down
25 changes: 2 additions & 23 deletions ethdb/kv_abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,7 @@ var (
ErrAttemptToDeleteNonDeprecatedBucket = errors.New("only buckets from dbutils.DeprecatedBuckets can be deleted")
ErrUnknownBucket = errors.New("unknown bucket. add it to dbutils.Buckets")

dbSize = metrics.GetOrRegisterGauge("db/size", metrics.DefaultRegistry)
tableScsLeaf = metrics.GetOrRegisterGauge("table/scs/leaf", metrics.DefaultRegistry) //nolint
tableScsBranch = metrics.GetOrRegisterGauge("table/scs/branch", metrics.DefaultRegistry) //nolint
tableScsOverflow = metrics.GetOrRegisterGauge("table/scs/overflow", metrics.DefaultRegistry) //nolint
tableScsEntries = metrics.GetOrRegisterGauge("table/scs/entries", metrics.DefaultRegistry) //nolint
tableStateLeaf = metrics.GetOrRegisterGauge("table/state/leaf", metrics.DefaultRegistry) //nolint
tableStateBranch = metrics.GetOrRegisterGauge("table/state/branch", metrics.DefaultRegistry) //nolint
tableStateOverflow = metrics.GetOrRegisterGauge("table/state/overflow", metrics.DefaultRegistry) //nolint
tableStateEntries = metrics.GetOrRegisterGauge("table/state/entries", metrics.DefaultRegistry) //nolint
tableLogLeaf = metrics.GetOrRegisterGauge("table/log/leaf", metrics.DefaultRegistry) //nolint
tableLogBranch = metrics.GetOrRegisterGauge("table/log/branch", metrics.DefaultRegistry) //nolint
tableLogOverflow = metrics.GetOrRegisterGauge("table/log/overflow", metrics.DefaultRegistry) //nolint
tableLogEntries = metrics.GetOrRegisterGauge("table/log/entries", metrics.DefaultRegistry) //nolint
tableTxLeaf = metrics.GetOrRegisterGauge("table/tx/leaf", metrics.DefaultRegistry) //nolint
tableTxBranch = metrics.GetOrRegisterGauge("table/tx/branch", metrics.DefaultRegistry) //nolint
tableTxOverflow = metrics.GetOrRegisterGauge("table/tx/overflow", metrics.DefaultRegistry) //nolint
tableTxEntries = metrics.GetOrRegisterGauge("table/tx/entries", metrics.DefaultRegistry) //nolint
tableGcLeaf = metrics.GetOrRegisterGauge("table/gc/leaf", metrics.DefaultRegistry) //nolint
tableGcBranch = metrics.GetOrRegisterGauge("table/gc/branch", metrics.DefaultRegistry) //nolint
tableGcOverflow = metrics.GetOrRegisterGauge("table/gc/overflow", metrics.DefaultRegistry) //nolint
tableGcEntries = metrics.GetOrRegisterGauge("table/gc/entries", metrics.DefaultRegistry) //nolint
dbSize = metrics.GetOrRegisterGauge("db/size", metrics.DefaultRegistry) //nolint
)

// Read-only version of KV.
Expand Down Expand Up @@ -116,8 +96,6 @@ type Tx interface {
Commit(ctx context.Context) error // Commit all the operations of a transaction into the database.
Rollback() // Rollback - abandon all the operations of the transaction instead of saving them.

BucketSize(name string) (uint64, error)

Comparator(bucket string) dbutils.CmpFunc

// ReadSequence - allows to create a linear sequence of unique positive integers for each table.
Expand Down Expand Up @@ -215,5 +193,6 @@ type RwCursorDupSort interface {
}

type HasStats interface {
BucketSize(name string) (uint64, error)
DiskSize(context.Context) (uint64, error) // db size
}
75 changes: 38 additions & 37 deletions ethdb/kv_lmdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,43 +321,44 @@ func (db *LmdbKV) DiskSize(_ context.Context) (uint64, error) {
}

func (db *LmdbKV) CollectMetrics() {
fileInfo, _ := os.Stat(path.Join(db.opts.path, "data.mdb"))
dbSize.Update(fileInfo.Size())

if err := db.View(context.Background(), func(tx Tx) error {
stat, _ := tx.(*lmdbTx).BucketStat(dbutils.PlainStorageChangeSetBucket)
tableScsLeaf.Update(int64(stat.LeafPages))
tableScsBranch.Update(int64(stat.BranchPages))
tableScsOverflow.Update(int64(stat.OverflowPages))
tableScsEntries.Update(int64(stat.Entries))

stat, _ = tx.(*lmdbTx).BucketStat(dbutils.PlainStateBucket)
tableStateLeaf.Update(int64(stat.LeafPages))
tableStateBranch.Update(int64(stat.BranchPages))
tableStateOverflow.Update(int64(stat.OverflowPages))
tableStateEntries.Update(int64(stat.Entries))

stat, _ = tx.(*lmdbTx).BucketStat(dbutils.Log)
tableLogLeaf.Update(int64(stat.LeafPages))
tableLogBranch.Update(int64(stat.BranchPages))
tableLogOverflow.Update(int64(stat.OverflowPages))
tableLogEntries.Update(int64(stat.Entries))

stat, _ = tx.(*lmdbTx).BucketStat(dbutils.EthTx)
tableTxLeaf.Update(int64(stat.LeafPages))
tableTxBranch.Update(int64(stat.BranchPages))
tableTxOverflow.Update(int64(stat.OverflowPages))
tableTxEntries.Update(int64(stat.Entries))

stat, _ = tx.(*lmdbTx).BucketStat("gc")
tableGcLeaf.Update(int64(stat.LeafPages))
tableGcBranch.Update(int64(stat.BranchPages))
tableGcOverflow.Update(int64(stat.OverflowPages))
tableGcEntries.Update(int64(stat.Entries))
return nil
}); err != nil {
log.Error("collecting metrics failed", "err", err)
}
/*
fileInfo, _ := os.Stat(path.Join(db.opts.path, "data.mdb"))
dbSize.Update(fileInfo.Size())
if err := db.View(context.Background(), func(tx Tx) error {
stat, _ := tx.(*lmdbTx).BucketStat(dbutils.PlainStorageChangeSetBucket)
tableScsLeaf.Update(int64(stat.LeafPages))
tableScsBranch.Update(int64(stat.BranchPages))
tableScsOverflow.Update(int64(stat.OverflowPages))
tableScsEntries.Update(int64(stat.Entries))

stat, _ = tx.(*lmdbTx).BucketStat(dbutils.PlainStateBucket)
tableStateLeaf.Update(int64(stat.LeafPages))
tableStateBranch.Update(int64(stat.BranchPages))
tableStateOverflow.Update(int64(stat.OverflowPages))
tableStateEntries.Update(int64(stat.Entries))

stat, _ = tx.(*lmdbTx).BucketStat(dbutils.Log)
tableLogLeaf.Update(int64(stat.LeafPages))
tableLogBranch.Update(int64(stat.BranchPages))
tableLogOverflow.Update(int64(stat.OverflowPages))
tableLogEntries.Update(int64(stat.Entries))

stat, _ = tx.(*lmdbTx).BucketStat(dbutils.EthTx)
tableTxLeaf.Update(int64(stat.LeafPages))
tableTxBranch.Update(int64(stat.BranchPages))
tableTxOverflow.Update(int64(stat.OverflowPages))
tableTxEntries.Update(int64(stat.Entries))

stat, _ = tx.(*lmdbTx).BucketStat("gc")
tableGcLeaf.Update(int64(stat.LeafPages))
tableGcBranch.Update(int64(stat.BranchPages))
tableGcOverflow.Update(int64(stat.OverflowPages))
tableGcEntries.Update(int64(stat.Entries))
return nil
}); err != nil {
log.Error("collecting metrics failed", "err", err)
}
*/
}

func (db *LmdbKV) Begin(_ context.Context) (txn Tx, err error) {
Expand Down
81 changes: 42 additions & 39 deletions ethdb/kv_mdbx.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,43 +313,45 @@ func (db *MdbxKV) DiskSize(_ context.Context) (uint64, error) {
}

func (db *MdbxKV) CollectMetrics() {
info, _ := db.env.Info()
dbSize.Update(int64(info.Geo.Current))

if err := db.View(context.Background(), func(tx Tx) error {
stat, _ := tx.(*MdbxTx).BucketStat(dbutils.PlainStorageChangeSetBucket)
tableScsLeaf.Update(int64(stat.LeafPages))
tableScsBranch.Update(int64(stat.BranchPages))
tableScsOverflow.Update(int64(stat.OverflowPages))
tableScsEntries.Update(int64(stat.Entries))

stat, _ = tx.(*MdbxTx).BucketStat(dbutils.PlainStateBucket)
tableStateLeaf.Update(int64(stat.LeafPages))
tableStateBranch.Update(int64(stat.BranchPages))
tableStateOverflow.Update(int64(stat.OverflowPages))
tableStateEntries.Update(int64(stat.Entries))

stat, _ = tx.(*MdbxTx).BucketStat(dbutils.Log)
tableLogLeaf.Update(int64(stat.LeafPages))
tableLogBranch.Update(int64(stat.BranchPages))
tableLogOverflow.Update(int64(stat.OverflowPages))
tableLogEntries.Update(int64(stat.Entries))

stat, _ = tx.(*MdbxTx).BucketStat(dbutils.EthTx)
tableTxLeaf.Update(int64(stat.LeafPages))
tableTxBranch.Update(int64(stat.BranchPages))
tableTxOverflow.Update(int64(stat.OverflowPages))
tableTxEntries.Update(int64(stat.Entries))

stat, _ = tx.(*MdbxTx).BucketStat("gc")
tableGcLeaf.Update(int64(stat.LeafPages))
tableGcBranch.Update(int64(stat.BranchPages))
tableGcOverflow.Update(int64(stat.OverflowPages))
tableGcEntries.Update(int64(stat.Entries))
return nil
}); err != nil {
log.Error("collecting metrics failed", "err", err)
}
/*
info, _ := db.env.Info()
dbSize.Update(int64(info.Geo.Current))

if err := db.View(context.Background(), func(tx Tx) error {
stat, _ := tx.(*MdbxTx).BucketStat(dbutils.PlainStorageChangeSetBucket)
tableScsLeaf.Update(int64(stat.LeafPages))
tableScsBranch.Update(int64(stat.BranchPages))
tableScsOverflow.Update(int64(stat.OverflowPages))
tableScsEntries.Update(int64(stat.Entries))

stat, _ = tx.(*MdbxTx).BucketStat(dbutils.PlainStateBucket)
tableStateLeaf.Update(int64(stat.LeafPages))
tableStateBranch.Update(int64(stat.BranchPages))
tableStateOverflow.Update(int64(stat.OverflowPages))
tableStateEntries.Update(int64(stat.Entries))

stat, _ = tx.(*MdbxTx).BucketStat(dbutils.Log)
tableLogLeaf.Update(int64(stat.LeafPages))
tableLogBranch.Update(int64(stat.BranchPages))
tableLogOverflow.Update(int64(stat.OverflowPages))
tableLogEntries.Update(int64(stat.Entries))

stat, _ = tx.(*MdbxTx).BucketStat(dbutils.EthTx)
tableTxLeaf.Update(int64(stat.LeafPages))
tableTxBranch.Update(int64(stat.BranchPages))
tableTxOverflow.Update(int64(stat.OverflowPages))
tableTxEntries.Update(int64(stat.Entries))

stat, _ = tx.(*MdbxTx).BucketStat("gc")
tableGcLeaf.Update(int64(stat.LeafPages))
tableGcBranch.Update(int64(stat.BranchPages))
tableGcOverflow.Update(int64(stat.OverflowPages))
tableGcEntries.Update(int64(stat.Entries))
return nil
}); err != nil {
log.Error("collecting metrics failed", "err", err)
}
*/
}

func (db *MdbxKV) Begin(_ context.Context) (txn Tx, err error) {
Expand Down Expand Up @@ -642,7 +644,7 @@ func (tx *MdbxTx) Commit(ctx context.Context) error {
slowTx = debug.SlowCommit()
}

tx.printDebugInfo()
//tx.printDebugInfo()

latency, err := tx.tx.Commit()
if err != nil {
Expand Down Expand Up @@ -679,10 +681,11 @@ func (tx *MdbxTx) Rollback() {
}
}()
tx.closeCursors()
tx.printDebugInfo()
//tx.printDebugInfo()
tx.tx.Abort()
}

//nolint
func (tx *MdbxTx) printDebugInfo() {
if debug.BigRoTxKb() > 0 || debug.BigRwTxKb() > 0 {
txInfo, err := tx.tx.Info(true)
Expand Down
18 changes: 0 additions & 18 deletions ethdb/kv_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type remoteOpts struct {
type RemoteKV struct {
opts remoteOpts
remoteKV remote.KVClient
remoteDB remote.DBClient
conn *grpc.ClientConn
log log.Logger
buckets dbutils.BucketsCfg
Expand Down Expand Up @@ -147,7 +146,6 @@ func (opts remoteOpts) Open(certFile, keyFile, caCert string) (RwKV, error) {
opts: opts,
conn: conn,
remoteKV: remote.NewKVClient(conn),
remoteDB: remote.NewDBClient(conn),
log: log.New("remote_db", opts.DialAddress),
buckets: dbutils.BucketsCfg{},
}
Expand Down Expand Up @@ -192,14 +190,6 @@ func (db *RemoteKV) Close() {
}
}

func (db *RemoteKV) DiskSize(ctx context.Context) (uint64, error) {
sizeReply, err := db.remoteDB.Size(ctx, &remote.SizeRequest{})
if err != nil {
return 0, err
}
return sizeReply.Size, nil
}

func (db *RemoteKV) CollectMetrics() {}

func (db *RemoteKV) Begin(ctx context.Context) (Tx, error) {
Expand Down Expand Up @@ -259,14 +249,6 @@ func (c *remoteCursor) Prefetch(v uint) Cursor {
return c
}

func (tx *remoteTx) BucketSize(name string) (uint64, error) {
sizeReply, err := tx.db.remoteDB.BucketSize(tx.ctx, &remote.BucketSizeRequest{BucketName: name})
if err != nil {
return 0, err
}
return sizeReply.Size, nil
}

func (tx *remoteTx) GetOne(bucket string, key []byte) (val []byte, err error) {
c, _ := tx.Cursor(bucket)
defer c.Close()
Expand Down
11 changes: 0 additions & 11 deletions ethdb/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,6 @@ func (m *mutation) Has(table string, key []byte) (bool, error) {
return false, nil
}

func (m *mutation) DiskSize(ctx context.Context) (common.StorageSize, error) {
if m.db == nil {
return 0, nil
}
sz, err := m.db.(HasStats).DiskSize(ctx)
if err != nil {
return 0, err
}
return common.StorageSize(sz), nil
}

func (m *mutation) Put(table string, key []byte, value []byte) error {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
Loading