From 62395c767e81ef83420aa6124c9812ee25042b45 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Mon, 9 Oct 2023 15:47:54 +0700 Subject: [PATCH] move NewHashBatch to erigon-lib, remove oddb package (#8408) --- cmd/hack/db/lmdb.go | 10 +- core/rawdb/accessors_account.go | 2 +- erigon-lib/kv/kv_interface.go | 13 +- erigon-lib/kv/mdbx/kv_migrator_test.go | 5 +- .../kv/membatch}/database_test.go | 44 +-- .../kv/membatch}/mapmutation.go | 93 ++--- erigon-lib/kv/memdb/memory_mutation.go | 2 +- eth/stagedsync/stage_execute.go | 21 +- eth/stagedsync/stage_mining_exec.go | 13 +- ethdb/db_interface.go | 80 ---- ethdb/olddb/mutation.go | 345 ------------------ ethdb/olddb/object_db.go | 248 ------------- ethdb/olddb/tx_db.go | 238 ------------ 13 files changed, 66 insertions(+), 1048 deletions(-) rename {ethdb/olddb => erigon-lib/kv/membatch}/database_test.go (84%) rename {ethdb/olddb => erigon-lib/kv/membatch}/mapmutation.go (66%) delete mode 100644 ethdb/olddb/mutation.go delete mode 100644 ethdb/olddb/object_db.go delete mode 100644 ethdb/olddb/tx_db.go diff --git a/cmd/hack/db/lmdb.go b/cmd/hack/db/lmdb.go index 288bcf10208..10df7a5e92a 100644 --- a/cmd/hack/db/lmdb.go +++ b/cmd/hack/db/lmdb.go @@ -577,7 +577,7 @@ func generate6(_ kv.RwDB, tx kv.RwTx) (bool, error) { } func dropT(_ kv.RwDB, tx kv.RwTx) (bool, error) { - if err := tx.(kv.BucketMigrator).ClearBucket("t"); err != nil { + if err := tx.ClearBucket("t"); err != nil { return false, err } return true, nil @@ -607,14 +607,14 @@ func generate7(_ kv.RwDB, tx kv.RwTx) (bool, error) { } func dropT1(_ kv.RwDB, tx kv.RwTx) (bool, error) { - if err := tx.(kv.BucketMigrator).ClearBucket("t1"); err != nil { + if err := tx.ClearBucket("t1"); err != nil { return false, err } return true, nil } func dropT2(_ kv.RwDB, tx kv.RwTx) (bool, error) { - if err := tx.(kv.BucketMigrator).ClearBucket("t2"); err != nil { + if err := tx.ClearBucket("t2"); err != nil { return false, err } return true, nil @@ -624,7 +624,7 @@ func dropT2(_ kv.RwDB, tx kv.RwTx) (bool, error) { func generate8(_ kv.RwDB, tx kv.RwTx) (bool, error) { for i := 0; i < 100; i++ { k := fmt.Sprintf("table_%05d", i) - if err := tx.(kv.BucketMigrator).CreateBucket(k); err != nil { + if err := tx.CreateBucket(k); err != nil { return false, err } } @@ -656,7 +656,7 @@ func generate9(tx kv.RwTx, entries int) error { func dropAll(_ kv.RwDB, tx kv.RwTx) (bool, error) { for i := 0; i < 100; i++ { k := fmt.Sprintf("table_%05d", i) - if err := tx.(kv.BucketMigrator).DropBucket(k); err != nil { + if err := tx.DropBucket(k); err != nil { return false, err } } diff --git a/core/rawdb/accessors_account.go b/core/rawdb/accessors_account.go index 254d4d9bb02..0607f04648e 100644 --- a/core/rawdb/accessors_account.go +++ b/core/rawdb/accessors_account.go @@ -23,7 +23,7 @@ import ( "github.com/ledgerwatch/erigon/core/types/accounts" ) -func ReadAccount(db kv.Tx, addr libcommon.Address, acc *accounts.Account) (bool, error) { +func ReadAccount(db kv.Getter, addr libcommon.Address, acc *accounts.Account) (bool, error) { enc, err := db.GetOne(kv.PlainState, addr[:]) if err != nil { return false, err diff --git a/erigon-lib/kv/kv_interface.go b/erigon-lib/kv/kv_interface.go index aad6d93c7f8..adbd8b23c32 100644 --- a/erigon-lib/kv/kv_interface.go +++ b/erigon-lib/kv/kv_interface.go @@ -303,8 +303,6 @@ type StatelessReadTx interface { // Sequence changes become visible outside the current write transaction after it is committed, and discarded on abort. // Starts from 0. ReadSequence(table string) (uint64, error) - - BucketSize(table string) (uint64, error) } type StatelessWriteTx interface { @@ -340,6 +338,16 @@ type StatelessRwTx interface { StatelessWriteTx } +// PendingMutations in-memory storage of changes +// Later they can either be flushed to the database or abandon +type PendingMutations interface { + StatelessRwTx + // Flush all in-memory data into `tx` + Flush(ctx context.Context, tx RwTx) error + Close() + BatchSize() int +} + // Tx // WARNING: // - Tx is not threadsafe and may only be used in the goroutine that created it @@ -397,6 +405,7 @@ type Tx interface { // Pointer to the underlying C transaction handle (e.g. *C.MDBX_txn) CHandle() unsafe.Pointer + BucketSize(table string) (uint64, error) } // RwTx diff --git a/erigon-lib/kv/mdbx/kv_migrator_test.go b/erigon-lib/kv/mdbx/kv_migrator_test.go index 62be6bcb86b..05eef49fd91 100644 --- a/erigon-lib/kv/mdbx/kv_migrator_test.go +++ b/erigon-lib/kv/mdbx/kv_migrator_test.go @@ -37,10 +37,7 @@ func TestBucketCRUD(t *testing.T) { normalBucket := kv.ChaindataTables[15] deprecatedBucket := kv.ChaindataDeprecatedTables[0] - migrator, ok := tx.(kv.BucketMigrator) - if !ok { - return - } + migrator := tx // check thad buckets have unique DBI's uniquness := map[kv.DBI]bool{} diff --git a/ethdb/olddb/database_test.go b/erigon-lib/kv/membatch/database_test.go similarity index 84% rename from ethdb/olddb/database_test.go rename to erigon-lib/kv/membatch/database_test.go index 7767f3005af..c2b789162cf 100644 --- a/ethdb/olddb/database_test.go +++ b/erigon-lib/kv/membatch/database_test.go @@ -16,7 +16,7 @@ //go:build !js -package olddb +package membatch import ( "bytes" @@ -29,9 +29,6 @@ import ( "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/memdb" - "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/ethdb" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -207,42 +204,3 @@ func TestParallelPutGet(t *testing.T) { } pending.Wait() } - -var hexEntries = map[string]string{ - "6b": "89c6", - "91": "c476", - "a8": "0a514e", - "bb": "7a", - "bd": "fe76", - "c0": "12", -} - -var startKey = common.FromHex("a0") -var fixedBits = 3 - -var keysInRange = [][]byte{common.FromHex("a8"), common.FromHex("bb"), common.FromHex("bd")} - -func TestWalk(t *testing.T) { - _, tx := memdb.NewTestTx(t) - - for k, v := range hexEntries { - err := tx.Put(testBucket, common.FromHex(k), common.FromHex(v)) - if err != nil { - t.Fatalf("put failed: %v", err) - } - } - - var gotKeys [][]byte - c, err := tx.Cursor(testBucket) - if err != nil { - panic(err) - } - defer c.Close() - err = ethdb.Walk(c, startKey, fixedBits, func(key, val []byte) (bool, error) { - gotKeys = append(gotKeys, common.CopyBytes(key)) - return true, nil - }) - assert.NoError(t, err) - - assert.Equal(t, keysInRange, gotKeys) -} diff --git a/ethdb/olddb/mapmutation.go b/erigon-lib/kv/membatch/mapmutation.go similarity index 66% rename from ethdb/olddb/mapmutation.go rename to erigon-lib/kv/membatch/mapmutation.go index 385d201ddd0..0a38f39feff 100644 --- a/ethdb/olddb/mapmutation.go +++ b/erigon-lib/kv/membatch/mapmutation.go @@ -1,4 +1,4 @@ -package olddb +package membatch import ( "context" @@ -11,13 +11,11 @@ import ( "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/log/v3" - - "github.com/ledgerwatch/erigon/ethdb" ) -type mapmutation struct { +type Mapmutation struct { puts map[string]map[string][]byte // table -> key -> value ie. blocks -> hash -> blockBod - db kv.RwTx + db kv.Tx quit <-chan struct{} clean func() mu sync.RWMutex @@ -32,10 +30,10 @@ type mapmutation struct { // Common pattern: // // batch := db.NewBatch() -// defer batch.Rollback() +// defer batch.Close() // ... some calculations on `batch` // batch.Commit() -func NewHashBatch(tx kv.RwTx, quit <-chan struct{}, tmpdir string, logger log.Logger) *mapmutation { +func NewHashBatch(tx kv.Tx, quit <-chan struct{}, tmpdir string, logger log.Logger) *Mapmutation { clean := func() {} if quit == nil { ch := make(chan struct{}) @@ -43,7 +41,7 @@ func NewHashBatch(tx kv.RwTx, quit <-chan struct{}, tmpdir string, logger log.Lo quit = ch } - return &mapmutation{ + return &Mapmutation{ db: tx, puts: make(map[string]map[string][]byte), quit: quit, @@ -53,14 +51,7 @@ func NewHashBatch(tx kv.RwTx, quit <-chan struct{}, tmpdir string, logger log.Lo } } -func (m *mapmutation) RwKV() kv.RwDB { - if casted, ok := m.db.(ethdb.HasRwKV); ok { - return casted.RwKV() - } - return nil -} - -func (m *mapmutation) getMem(table string, key []byte) ([]byte, bool) { +func (m *Mapmutation) getMem(table string, key []byte) ([]byte, bool) { m.mu.RLock() defer m.mu.RUnlock() if _, ok := m.puts[table]; !ok { @@ -73,7 +64,7 @@ func (m *mapmutation) getMem(table string, key []byte) ([]byte, bool) { return nil, false } -func (m *mapmutation) IncrementSequence(bucket string, amount uint64) (res uint64, err error) { +func (m *Mapmutation) IncrementSequence(bucket string, amount uint64) (res uint64, err error) { v, ok := m.getMem(kv.Sequence, []byte(bucket)) if !ok && m.db != nil { v, err = m.db.GetOne(kv.Sequence, []byte(bucket)) @@ -95,7 +86,7 @@ func (m *mapmutation) IncrementSequence(bucket string, amount uint64) (res uint6 return currentV, nil } -func (m *mapmutation) ReadSequence(bucket string) (res uint64, err error) { +func (m *Mapmutation) ReadSequence(bucket string) (res uint64, err error) { v, ok := m.getMem(kv.Sequence, []byte(bucket)) if !ok && m.db != nil { v, err = m.db.GetOne(kv.Sequence, []byte(bucket)) @@ -112,7 +103,7 @@ func (m *mapmutation) ReadSequence(bucket string) (res uint64, err error) { } // Can only be called from the worker thread -func (m *mapmutation) GetOne(table string, key []byte) ([]byte, error) { +func (m *Mapmutation) GetOne(table string, key []byte) ([]byte, error) { if value, ok := m.getMem(table, key); ok { return value, nil } @@ -127,21 +118,7 @@ func (m *mapmutation) GetOne(table string, key []byte) ([]byte, error) { return nil, nil } -// Can only be called from the worker thread -func (m *mapmutation) Get(table string, key []byte) ([]byte, error) { - value, err := m.GetOne(table, key) - if err != nil { - return nil, err - } - - if value == nil { - return nil, ethdb.ErrKeyNotFound - } - - return value, nil -} - -func (m *mapmutation) Last(table string) ([]byte, []byte, error) { +func (m *Mapmutation) Last(table string) ([]byte, []byte, error) { c, err := m.db.Cursor(table) if err != nil { return nil, nil, err @@ -150,7 +127,7 @@ func (m *mapmutation) Last(table string) ([]byte, []byte, error) { return c.Last() } -func (m *mapmutation) Has(table string, key []byte) (bool, error) { +func (m *Mapmutation) Has(table string, key []byte) (bool, error) { if _, ok := m.getMem(table, key); ok { return ok, nil } @@ -161,7 +138,7 @@ func (m *mapmutation) Has(table string, key []byte) (bool, error) { } // puts a table key with a value and if the table is not found then it appends a table -func (m *mapmutation) Put(table string, k, v []byte) error { +func (m *Mapmutation) Put(table string, k, v []byte) error { m.mu.Lock() defer m.mu.Unlock() if _, ok := m.puts[table]; !ok { @@ -183,40 +160,40 @@ func (m *mapmutation) Put(table string, k, v []byte) error { return nil } -func (m *mapmutation) Append(table string, key []byte, value []byte) error { +func (m *Mapmutation) Append(table string, key []byte, value []byte) error { return m.Put(table, key, value) } -func (m *mapmutation) AppendDup(table string, key []byte, value []byte) error { +func (m *Mapmutation) AppendDup(table string, key []byte, value []byte) error { return m.Put(table, key, value) } -func (m *mapmutation) BatchSize() int { +func (m *Mapmutation) BatchSize() int { m.mu.RLock() defer m.mu.RUnlock() return m.size } -func (m *mapmutation) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error { +func (m *Mapmutation) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error { m.panicOnEmptyDB() return m.db.ForEach(bucket, fromPrefix, walker) } -func (m *mapmutation) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error { +func (m *Mapmutation) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error { m.panicOnEmptyDB() return m.db.ForPrefix(bucket, prefix, walker) } -func (m *mapmutation) ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error { +func (m *Mapmutation) ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error { m.panicOnEmptyDB() return m.db.ForAmount(bucket, prefix, amount, walker) } -func (m *mapmutation) Delete(table string, k []byte) error { +func (m *Mapmutation) Delete(table string, k []byte) error { return m.Put(table, k, nil) } -func (m *mapmutation) doCommit(tx kv.RwTx) error { +func (m *Mapmutation) doCommit(tx kv.RwTx) error { logEvery := time.NewTicker(30 * time.Second) defer logEvery.Stop() count := 0 @@ -235,7 +212,7 @@ func (m *mapmutation) doCommit(tx kv.RwTx) error { tx.CollectMetrics() } } - if err := collector.Load(m.db, table, etl.IdentityLoadFunc, etl.TransformArgs{Quit: m.quit}); err != nil { + if err := collector.Load(tx, table, etl.IdentityLoadFunc, etl.TransformArgs{Quit: m.quit}); err != nil { return err } } @@ -244,13 +221,13 @@ func (m *mapmutation) doCommit(tx kv.RwTx) error { return nil } -func (m *mapmutation) Commit() error { +func (m *Mapmutation) Flush(ctx context.Context, tx kv.RwTx) error { if m.db == nil { return nil } m.mu.Lock() defer m.mu.Unlock() - if err := m.doCommit(m.db); err != nil { + if err := m.doCommit(tx); err != nil { return err } @@ -261,7 +238,7 @@ func (m *mapmutation) Commit() error { return nil } -func (m *mapmutation) Rollback() { +func (m *Mapmutation) Close() { m.mu.Lock() defer m.mu.Unlock() m.puts = map[string]map[string][]byte{} @@ -270,25 +247,11 @@ func (m *mapmutation) Rollback() { m.size = 0 m.clean() } +func (m *Mapmutation) Commit() error { panic("not db txn, use .Flush method") } +func (m *Mapmutation) Rollback() { panic("not db txn, use .Close method") } -func (m *mapmutation) Close() { - m.Rollback() -} - -func (m *mapmutation) Begin(ctx context.Context, flags ethdb.TxFlags) (ethdb.DbWithPendingMutations, error) { - panic("mutation can't start transaction, because doesn't own it") -} - -func (m *mapmutation) panicOnEmptyDB() { +func (m *Mapmutation) panicOnEmptyDB() { if m.db == nil { panic("Not implemented") } } - -func (m *mapmutation) SetRwKV(kv kv.RwDB) { - hasRwKV, ok := m.db.(ethdb.HasRwKV) - if !ok { - log.Warn("Failed to convert mapmutation type to HasRwKV interface") - } - hasRwKV.SetRwKV(kv) -} diff --git a/erigon-lib/kv/memdb/memory_mutation.go b/erigon-lib/kv/memdb/memory_mutation.go index c5182cb624b..0e45737bc01 100644 --- a/erigon-lib/kv/memdb/memory_mutation.go +++ b/erigon-lib/kv/memdb/memory_mutation.go @@ -41,7 +41,7 @@ type MemoryMutation struct { // Common pattern: // // batch := NewMemoryBatch(db, tmpDir) -// defer batch.Rollback() +// defer batch.Close() // ... some calculations on `batch` // batch.Commit() func NewMemoryBatch(tx kv.Tx, tmpDir string) *MemoryMutation { diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 39010bc1c5d..24e88dccaef 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -10,6 +10,7 @@ import ( "time" "github.com/c2h5oh/datasize" + "github.com/ledgerwatch/erigon-lib/kv/membatch" "github.com/ledgerwatch/log/v3" "golang.org/x/sync/errgroup" @@ -42,8 +43,6 @@ import ( "github.com/ledgerwatch/erigon/eth/ethconfig/estimate" "github.com/ledgerwatch/erigon/eth/stagedsync/stages" trace_logger "github.com/ledgerwatch/erigon/eth/tracers/logger" - "github.com/ledgerwatch/erigon/ethdb" - "github.com/ledgerwatch/erigon/ethdb/olddb" "github.com/ledgerwatch/erigon/ethdb/prune" "github.com/ledgerwatch/erigon/turbo/services" "github.com/ledgerwatch/erigon/turbo/shards" @@ -140,7 +139,7 @@ func StageExecuteBlocksCfg( func executeBlock( block *types.Block, tx kv.RwTx, - batch ethdb.Database, + batch kv.StatelessRwTx, cfg ExecuteBlockCfg, vmConfig vm.Config, // emit copy, because will modify it writeChangesets bool, @@ -206,7 +205,7 @@ func executeBlock( } func newStateReaderWriter( - batch ethdb.Database, + batch kv.StatelessRwTx, tx kv.RwTx, block *types.Block, writeChangesets bool, @@ -415,12 +414,12 @@ func SpawnExecuteBlocksStage(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint var stoppedErr error - var batch ethdb.DbWithPendingMutations + var batch kv.PendingMutations // state is stored through ethdb batches - batch = olddb.NewHashBatch(tx, quit, cfg.dirs.Tmp, logger) + batch = membatch.NewHashBatch(tx, quit, cfg.dirs.Tmp, logger) // avoids stacking defers within the loop defer func() { - batch.Rollback() + batch.Close() }() var readAhead chan uint64 @@ -508,7 +507,7 @@ Loop: if shouldUpdateProgress { logger.Info("Committed State", "gas reached", currentStateGas, "gasTarget", gasState) currentStateGas = 0 - if err = batch.Commit(); err != nil { + if err = batch.Flush(ctx, tx); err != nil { return err } if err = s.Update(tx, stageProgress); err != nil { @@ -525,7 +524,7 @@ Loop: // TODO: This creates stacked up deferrals defer tx.Rollback() } - batch = olddb.NewHashBatch(tx, quit, cfg.dirs.Tmp, logger) + batch = membatch.NewHashBatch(tx, quit, cfg.dirs.Tmp, logger) } gas = gas + block.GasUsed() @@ -543,7 +542,7 @@ Loop: if err = s.Update(batch, stageProgress); err != nil { return err } - if err = batch.Commit(); err != nil { + if err = batch.Flush(ctx, tx); err != nil { return fmt.Errorf("batch commit: %w", err) } @@ -647,7 +646,7 @@ func blocksReadAheadFunc(ctx context.Context, tx kv.Tx, cfg *ExecuteBlockCfg, bl } func logProgress(logPrefix string, prevBlock uint64, prevTime time.Time, currentBlock uint64, prevTx, currentTx uint64, gas uint64, - gasState float64, batch ethdb.DbWithPendingMutations, logger log.Logger) (uint64, uint64, time.Time) { + gasState float64, batch kv.PendingMutations, logger log.Logger) (uint64, uint64, time.Time) { currentTime := time.Now() interval := currentTime.Sub(prevTime) speed := float64(currentBlock-prevBlock) / (float64(interval) / float64(time.Second)) diff --git a/eth/stagedsync/stage_mining_exec.go b/eth/stagedsync/stage_mining_exec.go index 4bb14b2f0b1..afd017fead6 100644 --- a/eth/stagedsync/stage_mining_exec.go +++ b/eth/stagedsync/stage_mining_exec.go @@ -10,6 +10,7 @@ import ( mapset "github.com/deckarep/golang-set/v2" "github.com/holiman/uint256" + "github.com/ledgerwatch/erigon-lib/kv/membatch" "github.com/ledgerwatch/log/v3" "golang.org/x/net/context" @@ -17,7 +18,6 @@ import ( libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/fixedgas" "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon-lib/kv/memdb" types2 "github.com/ledgerwatch/erigon-lib/types" "github.com/ledgerwatch/erigon/consensus" @@ -115,8 +115,11 @@ func SpawnMiningExecStage(s *StageState, tx kv.RwTx, cfg MiningExecCfg, quit <-c } else { yielded := mapset.NewSet[[32]byte]() - simulationTx := memdb.NewMemoryBatch(tx, cfg.tmpdir) - defer simulationTx.Rollback() + var simulationTx kv.StatelessRwTx + m := membatch.NewHashBatch(tx, quit, cfg.tmpdir, logger) + defer m.Close() + simulationTx = m + executionAt, err := s.ExecutionAt(tx) if err != nil { return err @@ -180,7 +183,7 @@ func getNextTransactions( header *types.Header, amount uint16, executionAt uint64, - simulationTx *memdb.MemoryMutation, + simulationTx kv.StatelessRwTx, alreadyYielded mapset.Set[[32]byte], logger log.Logger, ) (types.TransactionsStream, int, error) { @@ -237,7 +240,7 @@ func getNextTransactions( return types.NewTransactionsFixedOrder(txs), count, nil } -func filterBadTransactions(transactions []types.Transaction, config chain.Config, blockNumber uint64, baseFee *big.Int, simulationTx *memdb.MemoryMutation, logger log.Logger) ([]types.Transaction, error) { +func filterBadTransactions(transactions []types.Transaction, config chain.Config, blockNumber uint64, baseFee *big.Int, simulationTx kv.StatelessRwTx, logger log.Logger) ([]types.Transaction, error) { initialCnt := len(transactions) var filtered []types.Transaction gasBailout := false diff --git a/ethdb/db_interface.go b/ethdb/db_interface.go index d8a1cf590f9..ab131be8800 100644 --- a/ethdb/db_interface.go +++ b/ethdb/db_interface.go @@ -17,7 +17,6 @@ package ethdb import ( - "context" "errors" "github.com/ledgerwatch/erigon-lib/kv" @@ -28,85 +27,6 @@ import ( // ErrKeyNotFound is returned when key isn't found in the database. var ErrKeyNotFound = errors.New("db: key not found") -type TxFlags uint - -const ( - RW TxFlags = 0x00 // default - RO TxFlags = 0x02 -) - -// DBGetter wraps the database read operations. -type DBGetter interface { - kv.Getter - - // Get returns the value for a given key if it's present. - Get(bucket string, key []byte) ([]byte, error) -} - -// Database wraps all database operations. All methods are safe for concurrent use. -type Database interface { - DBGetter - kv.Putter - kv.Deleter - kv.Closer - - Begin(ctx context.Context, flags TxFlags) (DbWithPendingMutations, error) // starts db transaction - Last(bucket string) ([]byte, []byte, error) - - IncrementSequence(bucket string, amount uint64) (uint64, error) - ReadSequence(bucket string) (uint64, error) - RwKV() kv.RwDB -} - -// MinDatabase is a minimalistic version of the Database interface. -type MinDatabase interface { - Get(bucket string, key []byte) ([]byte, error) - Put(table string, k, v []byte) error - Delete(table string, k []byte) error -} - -// DbWithPendingMutations is an extended version of the Database, -// where all changes are first made in memory. -// Later they can either be committed to the database or rolled back. -type DbWithPendingMutations interface { - Database - - // Commit - commits transaction (or flush data into underlying db object in case of `mutation`) - // - // Common pattern: - // - // tx := db.Begin() - // defer tx.Rollback() - // ... some calculations on `tx` - // tx.Commit() - // - Commit() error - - Rollback() - BatchSize() int -} - -type HasRwKV interface { - RwKV() kv.RwDB - SetRwKV(kv kv.RwDB) -} - type HasTx interface { Tx() kv.Tx } - -type BucketsMigrator interface { - BucketExists(bucket string) (bool, error) // makes them empty - ClearBuckets(buckets ...string) error // makes them empty - DropBuckets(buckets ...string) error // drops them, use of them after drop will panic -} - -func GetOneWrapper(dat []byte, err error) ([]byte, error) { - if err != nil { - return nil, err - } - if dat == nil { - return nil, ErrKeyNotFound - } - return dat, nil -} diff --git a/ethdb/olddb/mutation.go b/ethdb/olddb/mutation.go deleted file mode 100644 index ee39868dabb..00000000000 --- a/ethdb/olddb/mutation.go +++ /dev/null @@ -1,345 +0,0 @@ -package olddb - -import ( - "bytes" - "context" - "encoding/binary" - "fmt" - "strings" - "sync" - "time" - "unsafe" - - "github.com/google/btree" - "github.com/ledgerwatch/erigon-lib/common" - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/ethdb" - "github.com/ledgerwatch/log/v3" -) - -type mutation struct { - puts *btree.BTree - db kv.RwTx - quit <-chan struct{} - clean func() - searchItem MutationItem - mu sync.RWMutex - size int -} - -type MutationItem struct { - table string - key []byte - value []byte -} - -// NewBatch - starts in-mem batch -// -// Common pattern: -// -// batch := db.NewBatch() -// defer batch.Rollback() -// ... some calculations on `batch` -// batch.Commit() -func NewBatch(tx kv.RwTx, quit <-chan struct{}) *mutation { - clean := func() {} - if quit == nil { - ch := make(chan struct{}) - clean = func() { close(ch) } - quit = ch - } - return &mutation{ - db: tx, - puts: btree.New(32), - quit: quit, - clean: clean, - } -} - -func (mi *MutationItem) Less(than btree.Item) bool { - i, ok := than.(*MutationItem) - if !ok { - log.Warn("Failed to convert btree.Item to MutationItem pointer") - } - c := strings.Compare(mi.table, i.table) - if c != 0 { - return c < 0 - } - return bytes.Compare(mi.key, i.key) < 0 -} - -func (m *mutation) ReadOnly() bool { return false } -func (m *mutation) RwKV() kv.RwDB { - if casted, ok := m.db.(ethdb.HasRwKV); ok { - return casted.RwKV() - } - return nil -} - -func (m *mutation) getMem(table string, key []byte) ([]byte, bool) { - m.mu.RLock() - defer m.mu.RUnlock() - m.searchItem.table = table - m.searchItem.key = key - i := m.puts.Get(&m.searchItem) - if i == nil { - return nil, false - } - return i.(*MutationItem).value, true -} - -func (m *mutation) IncrementSequence(bucket string, amount uint64) (res uint64, err error) { - v, ok := m.getMem(kv.Sequence, []byte(bucket)) - if !ok && m.db != nil { - v, err = m.db.GetOne(kv.Sequence, []byte(bucket)) - if err != nil { - return 0, err - } - } - - var currentV uint64 = 0 - if len(v) > 0 { - currentV = binary.BigEndian.Uint64(v) - } - - newVBytes := make([]byte, 8) - binary.BigEndian.PutUint64(newVBytes, currentV+amount) - if err = m.Put(kv.Sequence, []byte(bucket), newVBytes); err != nil { - return 0, err - } - - return currentV, nil -} -func (m *mutation) ReadSequence(bucket string) (res uint64, err error) { - v, ok := m.getMem(kv.Sequence, []byte(bucket)) - if !ok && m.db != nil { - v, err = m.db.GetOne(kv.Sequence, []byte(bucket)) - if err != nil { - return 0, err - } - } - var currentV uint64 = 0 - if len(v) > 0 { - currentV = binary.BigEndian.Uint64(v) - } - - return currentV, nil -} - -// Can only be called from the worker thread -func (m *mutation) GetOne(table string, key []byte) ([]byte, error) { - if value, ok := m.getMem(table, key); ok { - if value == nil { - return nil, nil - } - return value, nil - } - if m.db != nil { - // TODO: simplify when tx can no longer be parent of mutation - value, err := m.db.GetOne(table, key) - if err != nil { - return nil, err - } - - return value, nil - } - return nil, nil -} - -// Can only be called from the worker thread -func (m *mutation) Get(table string, key []byte) ([]byte, error) { - value, err := m.GetOne(table, key) - if err != nil { - return nil, err - } - - if value == nil { - return nil, ethdb.ErrKeyNotFound - } - - return value, nil -} - -func (m *mutation) Last(table string) ([]byte, []byte, error) { - c, err := m.db.Cursor(table) - if err != nil { - return nil, nil, err - } - defer c.Close() - return c.Last() -} - -func (m *mutation) hasMem(table string, key []byte) bool { - m.mu.RLock() - defer m.mu.RUnlock() - m.searchItem.table = table - m.searchItem.key = key - return m.puts.Has(&m.searchItem) -} - -func (m *mutation) Has(table string, key []byte) (bool, error) { - if m.hasMem(table, key) { - return true, nil - } - if m.db != nil { - return m.db.Has(table, key) - } - return false, nil -} - -func (m *mutation) Put(table string, k, v []byte) error { - m.mu.Lock() - defer m.mu.Unlock() - - newMi := &MutationItem{table: table, key: k, value: v} - i := m.puts.ReplaceOrInsert(newMi) - m.size += int(unsafe.Sizeof(newMi)) + len(k) + len(v) - if i != nil { - oldMi := i.(*MutationItem) - m.size -= int(unsafe.Sizeof(oldMi)) + len(oldMi.key) + len(oldMi.value) - } - return nil -} - -func (m *mutation) Append(table string, key []byte, value []byte) error { - return m.Put(table, key, value) -} - -func (m *mutation) AppendDup(table string, key []byte, value []byte) error { - return m.Put(table, key, value) -} - -func (m *mutation) BatchSize() int { - m.mu.RLock() - defer m.mu.RUnlock() - return m.size -} - -func (m *mutation) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error { - m.panicOnEmptyDB() - return m.db.ForEach(bucket, fromPrefix, walker) -} - -func (m *mutation) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error { - m.panicOnEmptyDB() - return m.db.ForPrefix(bucket, prefix, walker) -} - -func (m *mutation) ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error { - m.panicOnEmptyDB() - return m.db.ForAmount(bucket, prefix, amount, walker) -} - -func (m *mutation) Delete(table string, k []byte) error { - //m.puts.Delete(table, k) - return m.Put(table, k, nil) -} - -func (m *mutation) doCommit(tx kv.RwTx) error { - var prevTable string - var c kv.RwCursor - var innerErr error - var isEndOfBucket bool - logEvery := time.NewTicker(30 * time.Second) - defer logEvery.Stop() - count := 0 - total := float64(m.puts.Len()) - - m.puts.Ascend(func(i btree.Item) bool { - mi := i.(*MutationItem) - if mi.table != prevTable { - if c != nil { - c.Close() - } - var err error - c, err = tx.RwCursor(mi.table) - if err != nil { - innerErr = err - return false - } - prevTable = mi.table - firstKey, _, err := c.Seek(mi.key) - if err != nil { - innerErr = err - return false - } - isEndOfBucket = firstKey == nil - } - if isEndOfBucket { - if len(mi.value) > 0 { - if err := c.Append(mi.key, mi.value); err != nil { - innerErr = err - return false - } - } - } else if len(mi.value) == 0 { - if err := c.Delete(mi.key); err != nil { - innerErr = err - return false - } - } else { - if err := c.Put(mi.key, mi.value); err != nil { - innerErr = err - return false - } - } - - count++ - - select { - default: - case <-logEvery.C: - progress := fmt.Sprintf("%.1fM/%.1fM", float64(count)/1_000_000, total/1_000_000) - log.Info("Write to db", "progress", progress, "current table", mi.table) - tx.CollectMetrics() - case <-m.quit: - innerErr = common.ErrStopped - return false - } - return true - }) - tx.CollectMetrics() - return innerErr -} - -func (m *mutation) Commit() error { - if m.db == nil { - return nil - } - m.mu.Lock() - defer m.mu.Unlock() - if err := m.doCommit(m.db); err != nil { - return err - } - - m.puts.Clear(false /* addNodesToFreelist */) - m.size = 0 - m.clean() - return nil -} - -func (m *mutation) Rollback() { - m.mu.Lock() - defer m.mu.Unlock() - m.puts.Clear(false /* addNodesToFreelist */) - m.size = 0 - m.clean() -} - -func (m *mutation) Close() { - m.Rollback() -} - -func (m *mutation) Begin(ctx context.Context, flags ethdb.TxFlags) (ethdb.DbWithPendingMutations, error) { - panic("mutation can't start transaction, because doesn't own it") -} - -func (m *mutation) panicOnEmptyDB() { - if m.db == nil { - panic("Not implemented") - } -} - -func (m *mutation) SetRwKV(kv kv.RwDB) { - m.db.(ethdb.HasRwKV).SetRwKV(kv) -} diff --git a/ethdb/olddb/object_db.go b/ethdb/olddb/object_db.go deleted file mode 100644 index 24d03523175..00000000000 --- a/ethdb/olddb/object_db.go +++ /dev/null @@ -1,248 +0,0 @@ -// Copyright 2014 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -// Package ethdb defines the interfaces for an Ethereum data store. -package olddb - -import ( - "context" - "fmt" - - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/common" - "github.com/ledgerwatch/erigon/ethdb" - "github.com/ledgerwatch/log/v3" -) - -// ObjectDatabase - is an object-style interface of DB accessing -type ObjectDatabase struct { - kv kv.RwDB -} - -// NewObjectDatabase returns a AbstractDB wrapper. -// Deprecated -func NewObjectDatabase(kv kv.RwDB) *ObjectDatabase { - return &ObjectDatabase{ - kv: kv, - } -} - -// Put inserts or updates a single entry. -func (db *ObjectDatabase) Put(table string, k, v []byte) error { - err := db.kv.Update(context.Background(), func(tx kv.RwTx) error { - return tx.Put(table, k, v) - }) - return err -} - -// Append appends a single entry to the end of the bucket. -func (db *ObjectDatabase) Append(bucket string, key []byte, value []byte) error { - err := db.kv.Update(context.Background(), func(tx kv.RwTx) error { - c, err := tx.RwCursor(bucket) - if err != nil { - return err - } - return c.Append(key, value) - }) - return err -} - -// AppendDup appends a single entry to the end of the bucket. -func (db *ObjectDatabase) AppendDup(bucket string, key []byte, value []byte) error { - err := db.kv.Update(context.Background(), func(tx kv.RwTx) error { - c, err := tx.RwCursorDupSort(bucket) - if err != nil { - return err - } - return c.AppendDup(key, value) - }) - return err -} - -func (db *ObjectDatabase) Has(bucket string, key []byte) (bool, error) { - var has bool - err := db.kv.View(context.Background(), func(tx kv.Tx) error { - v, err := tx.GetOne(bucket, key) - if err != nil { - return err - } - has = v != nil - return nil - }) - return has, err -} - -func (db *ObjectDatabase) IncrementSequence(bucket string, amount uint64) (res uint64, err error) { - err = db.kv.Update(context.Background(), func(tx kv.RwTx) error { - res, err = tx.IncrementSequence(bucket, amount) - return err - }) - return res, err -} -func (db *ObjectDatabase) ReadSequence(bucket string) (res uint64, err error) { - err = db.kv.View(context.Background(), func(tx kv.Tx) error { - res, err = tx.ReadSequence(bucket) - return err - }) - return res, err -} - -// Get returns the value for a given key if it's present. -func (db *ObjectDatabase) GetOne(bucket string, key []byte) ([]byte, error) { - var dat []byte - err := db.kv.View(context.Background(), func(tx kv.Tx) error { - v, err := tx.GetOne(bucket, key) - if err != nil { - return err - } - if v != nil { - dat = make([]byte, len(v)) - copy(dat, v) - } - return nil - }) - return dat, err -} - -func (db *ObjectDatabase) Get(bucket string, key []byte) ([]byte, error) { - dat, err := db.GetOne(bucket, key) - return ethdb.GetOneWrapper(dat, err) -} - -func (db *ObjectDatabase) Last(bucket string) ([]byte, []byte, error) { - var key, value []byte - if err := db.kv.View(context.Background(), func(tx kv.Tx) error { - c, err := tx.Cursor(bucket) - if err != nil { - return err - } - k, v, err := c.Last() - if err != nil { - return err - } - if k != nil { - key, value = common.CopyBytes(k), common.CopyBytes(v) - } - return nil - }); err != nil { - return nil, nil, err - } - return key, value, nil -} - -func (db *ObjectDatabase) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error { - return db.kv.View(context.Background(), func(tx kv.Tx) error { - return tx.ForEach(bucket, fromPrefix, walker) - }) -} -func (db *ObjectDatabase) ForAmount(bucket string, fromPrefix []byte, amount uint32, walker func(k, v []byte) error) error { - return db.kv.View(context.Background(), func(tx kv.Tx) error { - return tx.ForAmount(bucket, fromPrefix, amount, walker) - }) -} - -func (db *ObjectDatabase) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error { - return db.kv.View(context.Background(), func(tx kv.Tx) error { - return tx.ForPrefix(bucket, prefix, walker) - }) -} - -// Delete deletes the key from the queue and database -func (db *ObjectDatabase) Delete(table string, k []byte) error { - // Execute the actual operation - err := db.kv.Update(context.Background(), func(tx kv.RwTx) error { - return tx.Delete(table, k) - }) - return err -} - -func (db *ObjectDatabase) BucketExists(name string) (bool, error) { - exists := false - if err := db.kv.View(context.Background(), func(tx kv.Tx) (err error) { - migrator, ok := tx.(kv.BucketMigrator) - if !ok { - return fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", db.kv) - } - exists, err = migrator.ExistsBucket(name) - if err != nil { - return err - } - return nil - }); err != nil { - return false, err - } - return exists, nil -} - -func (db *ObjectDatabase) ClearBuckets(buckets ...string) error { - for i := range buckets { - name := buckets[i] - if err := db.kv.Update(context.Background(), func(tx kv.RwTx) error { - migrator, ok := tx.(kv.BucketMigrator) - if !ok { - return fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", db.kv) - } - if err := migrator.ClearBucket(name); err != nil { - return err - } - return nil - }); err != nil { - return err - } - } - - return nil -} - -func (db *ObjectDatabase) DropBuckets(buckets ...string) error { - for i := range buckets { - name := buckets[i] - log.Info("Dropping bucket", "name", name) - if err := db.kv.Update(context.Background(), func(tx kv.RwTx) error { - migrator, ok := tx.(kv.BucketMigrator) - if !ok { - return fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", db.kv) - } - if err := migrator.DropBucket(name); err != nil { - return err - } - return nil - }); err != nil { - return err - } - } - return nil -} - -func (db *ObjectDatabase) Close() { - db.kv.Close() -} - -func (db *ObjectDatabase) RwKV() kv.RwDB { - return db.kv -} - -func (db *ObjectDatabase) SetRwKV(kv kv.RwDB) { - db.kv = kv -} - -func (db *ObjectDatabase) Begin(ctx context.Context, flags ethdb.TxFlags) (ethdb.DbWithPendingMutations, error) { - batch := &TxDb{db: db} - if err := batch.begin(ctx, flags); err != nil { - return batch, err - } - return batch, nil -} diff --git a/ethdb/olddb/tx_db.go b/ethdb/olddb/tx_db.go deleted file mode 100644 index f54727ba04e..00000000000 --- a/ethdb/olddb/tx_db.go +++ /dev/null @@ -1,238 +0,0 @@ -package olddb - -import ( - "context" - "fmt" - - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/ethdb" - "github.com/ledgerwatch/log/v3" -) - -// TxDb - provides Database interface around ethdb.Tx -// It's not thread-safe! -// TxDb not usable after .Commit()/.Rollback() call, but usable after .CommitAndBegin() call -// you can put unlimited amount of data into this class -// Walk and MultiWalk methods - work outside of Tx object yet, will implement it later -// Deprecated -// nolint -type TxDb struct { - db ethdb.Database - tx kv.Tx - cursors map[string]kv.Cursor - txFlags ethdb.TxFlags - len uint64 -} - -// nolint -func WrapIntoTxDB(tx kv.RwTx) *TxDb { - return &TxDb{tx: tx, cursors: map[string]kv.Cursor{}} -} - -func (m *TxDb) Close() { - panic("don't call me") -} - -func (m *TxDb) Begin(ctx context.Context, flags ethdb.TxFlags) (ethdb.DbWithPendingMutations, error) { - batch := m - if m.tx != nil { - panic("nested transactions not supported") - } - - if err := batch.begin(ctx, flags); err != nil { - return nil, err - } - return batch, nil -} - -func (m *TxDb) cursor(bucket string) (kv.Cursor, error) { - c, ok := m.cursors[bucket] - if !ok { - var err error - c, err = m.tx.Cursor(bucket) - if err != nil { - return nil, err - } - m.cursors[bucket] = c - } - return c, nil -} - -func (m *TxDb) IncrementSequence(bucket string, amount uint64) (res uint64, err error) { - return m.tx.(kv.RwTx).IncrementSequence(bucket, amount) -} - -func (m *TxDb) ReadSequence(bucket string) (res uint64, err error) { - return m.tx.ReadSequence(bucket) -} - -func (m *TxDb) Put(table string, k, v []byte) error { - m.len += uint64(len(k) + len(v)) - c, err := m.cursor(table) - if err != nil { - return err - } - return c.(kv.RwCursor).Put(k, v) -} - -func (m *TxDb) Append(bucket string, key []byte, value []byte) error { - m.len += uint64(len(key) + len(value)) - c, err := m.cursor(bucket) - if err != nil { - return err - } - return c.(kv.RwCursor).Append(key, value) -} - -func (m *TxDb) AppendDup(bucket string, key []byte, value []byte) error { - m.len += uint64(len(key) + len(value)) - c, err := m.cursor(bucket) - if err != nil { - return err - } - return c.(kv.RwCursorDupSort).AppendDup(key, value) -} - -func (m *TxDb) Delete(table string, k []byte) error { - m.len += uint64(len(k)) - c, err := m.cursor(table) - if err != nil { - return err - } - return c.(kv.RwCursor).Delete(k) -} - -func (m *TxDb) begin(ctx context.Context, flags ethdb.TxFlags) error { - db := m.db.(ethdb.HasRwKV).RwKV() - - var tx kv.Tx - var err error - if flagsðdb.RO != 0 { - tx, err = db.BeginRo(ctx) - } else { - tx, err = db.BeginRw(ctx) - } - if err != nil { - return err - } - m.tx = tx - m.cursors = make(map[string]kv.Cursor, 16) - return nil -} - -func (m *TxDb) RwKV() kv.RwDB { - panic("not allowed to get KV interface because you will loose transaction, please use .Tx() method") -} - -// Last can only be called from the transaction thread -func (m *TxDb) Last(bucket string) ([]byte, []byte, error) { - c, err := m.cursor(bucket) - if err != nil { - return []byte{}, nil, err - } - return c.Last() -} - -func (m *TxDb) GetOne(bucket string, key []byte) ([]byte, error) { - c, err := m.cursor(bucket) - if err != nil { - return nil, err - } - _, v, err := c.SeekExact(key) - return v, err -} - -func (m *TxDb) Get(bucket string, key []byte) ([]byte, error) { - dat, err := m.GetOne(bucket, key) - return ethdb.GetOneWrapper(dat, err) -} - -func (m *TxDb) Has(bucket string, key []byte) (bool, error) { - v, err := m.Get(bucket, key) - if err != nil { - return false, err - } - return v != nil, nil -} - -func (m *TxDb) BatchSize() int { - return int(m.len) -} - -func (m *TxDb) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error { - return m.tx.ForEach(bucket, fromPrefix, walker) -} - -func (m *TxDb) ForPrefix(bucket string, prefix []byte, walker func(k, v []byte) error) error { - return m.tx.ForPrefix(bucket, prefix, walker) -} - -func (m *TxDb) ForAmount(bucket string, prefix []byte, amount uint32, walker func(k, v []byte) error) error { - return m.tx.ForAmount(bucket, prefix, amount, walker) -} - -func (m *TxDb) Commit() error { - if m.tx == nil { - return fmt.Errorf("second call .Commit() on same transaction") - } - if err := m.tx.Commit(); err != nil { - return err - } - m.tx = nil - m.cursors = nil - m.len = 0 - return nil -} - -func (m *TxDb) Rollback() { - if m.tx == nil { - return - } - m.tx.Rollback() - m.cursors = nil - m.tx = nil - m.len = 0 -} - -func (m *TxDb) Tx() kv.Tx { - return m.tx -} - -func (m *TxDb) BucketExists(name string) (bool, error) { - migrator, ok := m.tx.(kv.BucketMigrator) - if !ok { - return false, fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", m.tx) - } - return migrator.ExistsBucket(name) -} - -func (m *TxDb) ClearBuckets(buckets ...string) error { - for i := range buckets { - name := buckets[i] - - migrator, ok := m.tx.(kv.BucketMigrator) - if !ok { - return fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", m.tx) - } - if err := migrator.ClearBucket(name); err != nil { - return err - } - } - - return nil -} - -func (m *TxDb) DropBuckets(buckets ...string) error { - for i := range buckets { - name := buckets[i] - log.Info("Dropping bucket", "name", name) - migrator, ok := m.tx.(kv.BucketMigrator) - if !ok { - return fmt.Errorf("%T doesn't implement ethdb.TxMigrator interface", m.tx) - } - if err := migrator.DropBucket(name); err != nil { - return err - } - } - return nil -}