diff --git a/ethdb/mutation.go b/ethdb/mutation.go index 4627fe91434..a1573f59d67 100644 --- a/ethdb/mutation.go +++ b/ethdb/mutation.go @@ -1,14 +1,17 @@ package ethdb import ( + "bytes" "context" - "fmt" + "strings" + "sync" + "sync/atomic" + "unsafe" + "github.com/c2h5oh/datasize" + "github.com/google/btree" "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/metrics" - "sort" - "sync" - "sync/atomic" ) var ( @@ -17,10 +20,26 @@ var ( ) type mutation struct { - puts *puts // Map buckets to map[key]value - mu sync.RWMutex - db Database - tuples MultiPutTuples + puts *btree.BTree + mu sync.RWMutex + searchItem MutationItem + size int + db Database +} + +type MutationItem struct { + table string + key []byte + value []byte +} + +func (mi *MutationItem) Less(than btree.Item) bool { + i := than.(*MutationItem) + c := strings.Compare(mi.table, i.table) + if c != 0 { + return c < 0 + } + return bytes.Compare(mi.key, i.key) < 0 } func (m *mutation) KV() KV { @@ -30,54 +49,61 @@ func (m *mutation) KV() KV { return nil } -func (m *mutation) getMem(bucket string, key []byte) ([]byte, bool) { +func (m *mutation) getMem(table string, key []byte) ([]byte, bool) { m.mu.RLock() defer m.mu.RUnlock() - return m.puts.get(bucket, key) + m.searchItem.table = table + m.searchItem.key = key + i := m.puts.Get(&m.searchItem) + if i == nil { + return nil, false + } + return i.(*MutationItem).value, true } // Can only be called from the worker thread -func (m *mutation) Get(bucket string, key []byte) ([]byte, error) { - if value, ok := m.getMem(bucket, key); ok { +func (m *mutation) Get(table string, key []byte) ([]byte, error) { + if value, ok := m.getMem(table, key); ok { if value == nil { return nil, ErrKeyNotFound } return value, nil } if m.db != nil { - return m.db.Get(bucket, key) + return m.db.Get(table, key) } return nil, ErrKeyNotFound } -func (m *mutation) Last(bucket string) ([]byte, []byte, error) { - return m.db.Last(bucket) +func (m *mutation) Last(table string) ([]byte, []byte, error) { + return m.db.Last(table) } -func (m *mutation) Reserve(bucket string, key []byte, i int) ([]byte, error) { - return m.db.(DbWithPendingMutations).Reserve(bucket, key, i) +func (m *mutation) Reserve(table string, key []byte, i int) ([]byte, error) { + return m.db.(DbWithPendingMutations).Reserve(table, key, i) } -func (m *mutation) GetIndexChunk(bucket string, key []byte, timestamp uint64) ([]byte, error) { +func (m *mutation) GetIndexChunk(table string, key []byte, timestamp uint64) ([]byte, error) { if m.db != nil { - return m.db.GetIndexChunk(bucket, key, timestamp) + return m.db.GetIndexChunk(table, key, timestamp) } return nil, ErrKeyNotFound } -func (m *mutation) hasMem(bucket string, key []byte) bool { +func (m *mutation) hasMem(table string, key []byte) bool { m.mu.RLock() defer m.mu.RUnlock() - _, ok := m.puts.get(bucket, key) - return ok + m.searchItem.table = table + m.searchItem.key = key + return m.puts.Has(&m.searchItem) } -func (m *mutation) Has(bucket string, key []byte) (bool, error) { - if m.hasMem(bucket, key) { +func (m *mutation) Has(table string, key []byte) (bool, error) { + if m.hasMem(table, key) { return true, nil } if m.db != nil { - return m.db.Has(bucket, key) + return m.db.Has(table, key) } return false, nil } @@ -93,20 +119,22 @@ func (m *mutation) DiskSize(ctx context.Context) (common.StorageSize, error) { return common.StorageSize(sz), nil } -func (m *mutation) Put(bucket string, key []byte, value []byte) error { +func (m *mutation) Put(table string, key []byte, value []byte) error { m.mu.Lock() defer m.mu.Unlock() - m.puts.set(bucket, key, value) + newMi := &MutationItem{table: table, key: key, value: value} + i := m.puts.ReplaceOrInsert(newMi) + m.size += int(unsafe.Sizeof(newMi)) + len(key) + len(value) + if i != nil { + oldMi := i.(*MutationItem) + m.size -= (int(unsafe.Sizeof(oldMi)) + len(oldMi.key) + len(oldMi.value)) + } return nil } -func (m *mutation) Append(bucket string, key []byte, value []byte) error { - m.mu.Lock() - defer m.mu.Unlock() - - m.puts.set(bucket, key, value) - return nil +func (m *mutation) Append(table string, key []byte, value []byte) error { + return m.Put(table, key, value) } func (m *mutation) MultiPut(tuples ...[]byte) (uint64, error) { @@ -114,7 +142,13 @@ func (m *mutation) MultiPut(tuples ...[]byte) (uint64, error) { defer m.mu.Unlock() l := len(tuples) for i := 0; i < l; i += 3 { - m.puts.set(string(tuples[i]), tuples[i+1], tuples[i+2]) + newMi := &MutationItem{table: string(tuples[i]), key: tuples[i+1], value: tuples[i+2]} + i := m.puts.ReplaceOrInsert(newMi) + m.size += int(unsafe.Sizeof(newMi)) + len(newMi.key) + len(newMi.value) + if i != nil { + oldMi := i.(*MutationItem) + m.size -= (int(unsafe.Sizeof(oldMi)) + len(oldMi.key) + len(oldMi.value)) + } } return 0, nil } @@ -122,7 +156,7 @@ func (m *mutation) MultiPut(tuples ...[]byte) (uint64, error) { func (m *mutation) BatchSize() int { m.mu.RLock() defer m.mu.RUnlock() - return m.puts.Size() + return m.size } // IdealBatchSize defines the size of the data batches should ideally add in one write. @@ -131,22 +165,19 @@ func (m *mutation) IdealBatchSize() int { } // WARNING: Merged mem/DB walk is not implemented -func (m *mutation) Walk(bucket string, startkey []byte, fixedbits int, walker func([]byte, []byte) (bool, error)) error { +func (m *mutation) Walk(table string, startkey []byte, fixedbits int, walker func([]byte, []byte) (bool, error)) error { m.panicOnEmptyDB() - return m.db.Walk(bucket, startkey, fixedbits, walker) + return m.db.Walk(table, startkey, fixedbits, walker) } // WARNING: Merged mem/DB walk is not implemented -func (m *mutation) MultiWalk(bucket string, startkeys [][]byte, fixedbits []int, walker func(int, []byte, []byte) error) error { +func (m *mutation) MultiWalk(table string, startkeys [][]byte, fixedbits []int, walker func(int, []byte, []byte) error) error { m.panicOnEmptyDB() - return m.db.MultiWalk(bucket, startkeys, fixedbits, walker) + return m.db.MultiWalk(table, startkeys, fixedbits, walker) } -func (m *mutation) Delete(bucket string, key []byte) error { - m.mu.Lock() - defer m.mu.Unlock() - m.puts.Delete(bucket, key) - return nil +func (m *mutation) Delete(table string, key []byte) error { + return m.Put(table, key, nil) } func (m *mutation) CommitAndBegin(ctx context.Context) error { @@ -159,57 +190,93 @@ func (m *mutation) RollbackAndBegin(ctx context.Context) error { return nil } +func (m *mutation) doCommit(tx Tx) error { + var prevTable string + var c Cursor + var innerErr error + var isEndOfBucket bool + m.puts.Ascend(func(i btree.Item) bool { + mi := i.(*MutationItem) + if mi.table != prevTable { + if c != nil { + c.Close() + } + c = tx.Cursor(mi.table) + prevTable = mi.table + firstKey, _, err := c.Seek(mi.key) + if err != nil { + innerErr = err + return false + } + isEndOfBucket = firstKey == nil + } + if isEndOfBucket { + if len(mi.value) > 0 { + if err := c.Append(mi.key, mi.value); err != nil { + innerErr = err + return false + } + } + } else if len(mi.value) == 0 { + if err := c.Delete(mi.key); err != nil { + innerErr = err + return false + } + } else { + if err := c.Put(mi.key, mi.value); err != nil { + innerErr = err + return false + } + } + return true + }) + return innerErr +} + func (m *mutation) Commit() (uint64, error) { if m.db == nil { return 0, nil } m.mu.Lock() defer m.mu.Unlock() - if m.tuples == nil { - m.tuples = make(MultiPutTuples, 0, m.puts.Len()*3) - } - m.tuples = m.tuples[:0] - for bucketStr, bt := range m.puts.mp { - bucketB := []byte(bucketStr) - for key := range bt { - value, _ := bt.GetStr(key) - m.tuples = append(m.tuples, bucketB, []byte(key), value) + if tx, ok := m.db.(HasTx); ok { + if err := m.doCommit(tx.Tx()); err != nil { + return 0, err + } + } else { + if err := m.db.(HasKV).KV().Update(context.Background(), func(tx Tx) error { + return m.doCommit(tx) + }); err != nil { + return 0, err } - delete(m.puts.mp, bucketStr) - } - sort.Sort(m.tuples) - - written, err := m.db.MultiPut(m.tuples...) - if err != nil { - return 0, fmt.Errorf("db.MultiPut failed: %w", err) } - m.puts = newPuts() - m.tuples = nil - return written, nil + m.puts.Clear(false /* addNodesToFreelist */) + m.size = 0 + return 0, nil } func (m *mutation) Rollback() { m.mu.Lock() defer m.mu.Unlock() - m.puts = newPuts() - m.tuples = nil + m.puts.Clear(false /* addNodesToFreelist */) + m.size = 0 } func (m *mutation) Keys() ([][]byte, error) { m.mu.RLock() defer m.mu.RUnlock() tuples := common.NewTuples(m.puts.Len(), 2, 1) - for bucketStr, bt := range m.puts.mp { - bucketB := []byte(bucketStr) - for key := range bt { - if err := tuples.Append(bucketB, []byte(key)); err != nil { - return nil, err - } + var innerErr error + m.puts.Ascend(func(i btree.Item) bool { + mi := i.(*MutationItem) + if err := tuples.Append([]byte(mi.table), mi.key); err != nil { + innerErr = err + return false } - } - sort.Sort(tuples) - return tuples.Values, nil + return true + }) + return tuples.Values, innerErr } func (m *mutation) Close() { @@ -219,7 +286,7 @@ func (m *mutation) Close() { func (m *mutation) NewBatch() DbWithPendingMutations { mm := &mutation{ db: m, - puts: newPuts(), + puts: btree.New(32), } return mm } @@ -309,7 +376,7 @@ func (d *RWCounterDecorator) MultiPut(tuples ...[]byte) (uint64, error) { func (d *RWCounterDecorator) NewBatch() DbWithPendingMutations { mm := &mutation{ db: d, - puts: newPuts(), + puts: btree.New(32), } return mm } diff --git a/ethdb/mutation_puts.go b/ethdb/mutation_puts.go deleted file mode 100644 index 9fa554c355d..00000000000 --- a/ethdb/mutation_puts.go +++ /dev/null @@ -1,81 +0,0 @@ -package ethdb - -type puts struct { - mp map[string]putsBucket //map[dbi]putsBucket - size int - len int -} - -func newPuts() *puts { - return &puts{ - mp: make(map[string]putsBucket), - } -} - -func (p *puts) set(bucket string, key, value []byte) { - var bucketPuts putsBucket - var ok bool - if bucketPuts, ok = p.mp[bucket]; !ok { - bucketPuts = make(putsBucket) - p.mp[bucket] = bucketPuts - } - oldLen := len(bucketPuts) - skey := string(key) - if oldVal, ok := bucketPuts[skey]; ok { - p.size -= len(oldVal) - } else { - p.size += len(skey) + 32 // Add fixed overhead per key - } - bucketPuts[skey] = value - p.size += len(value) - p.len += len(bucketPuts) - oldLen -} - -func (p *puts) get(bucket string, key []byte) ([]byte, bool) { - var bucketPuts putsBucket - var ok bool - if bucketPuts, ok = p.mp[bucket]; !ok { - return nil, false - } - return bucketPuts.Get(key) -} - -func (p *puts) Delete(bucket string, key []byte) { - p.set(bucket, key, nil) -} - -func (p *puts) Len() int { - return p.len -} - -func (p *puts) Size() int { - return p.size -} - -type putsBucket map[string][]byte //map[key]value - -func (pb putsBucket) Get(key []byte) ([]byte, bool) { - value, ok := pb[string(key)] - if !ok { - return nil, false - } - - if value == nil { - return nil, true - } - - return value, true -} - -func (pb putsBucket) GetStr(key string) ([]byte, bool) { - value, ok := pb[key] - if !ok { - return nil, false - } - - if value == nil { - return nil, true - } - - return value, true -} diff --git a/ethdb/object_db.go b/ethdb/object_db.go index 871c68a67af..b95f2ea7a7e 100644 --- a/ethdb/object_db.go +++ b/ethdb/object_db.go @@ -25,6 +25,7 @@ import ( "strings" "time" + "github.com/google/btree" "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/common/dbutils" "github.com/ledgerwatch/turbo-geth/common/debug" @@ -350,7 +351,7 @@ func (db *ObjectDatabase) MemCopy() *ObjectDatabase { func (db *ObjectDatabase) NewBatch() DbWithPendingMutations { m := &mutation{ db: db, - puts: newPuts(), + puts: btree.New(32), } return m } diff --git a/ethdb/tx_db.go b/ethdb/tx_db.go index 9ce80e498ce..fb2947e01ae 100644 --- a/ethdb/tx_db.go +++ b/ethdb/tx_db.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + "github.com/google/btree" "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/common/dbutils" "github.com/ledgerwatch/turbo-geth/log" @@ -82,7 +83,7 @@ func (m *TxDb) Delete(bucket string, key []byte) error { func (m *TxDb) NewBatch() DbWithPendingMutations { return &mutation{ db: m, - puts: newPuts(), + puts: btree.New(32), } } diff --git a/go.mod b/go.mod index 0edfb4c94cc..4baec04bcc9 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/go-stack/stack v1.8.0 github.com/golang/protobuf v1.4.2 github.com/golang/snappy v0.0.2-0.20200707131729-196ae77b8a26 + github.com/google/btree v1.0.0 github.com/google/gofuzz v1.1.1-0.20200604201612-c04b05f3adfa github.com/gorilla/websocket v1.4.2 github.com/graph-gophers/graphql-go v0.0.0-20200819123640-3b5ddcd884ae