Skip to content

Commit

Permalink
Use BTree instead of maps for Mutation (batch) (erigontech#1308)
Browse files Browse the repository at this point in the history
* Try mutation based on B-tree

* Use pointers

* Fix NPE

* Cleanup
  • Loading branch information
AlexeyAkhunov authored Oct 27, 2020
1 parent 9ad6978 commit 3227853
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 159 deletions.
219 changes: 143 additions & 76 deletions ethdb/mutation.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package ethdb

import (
"bytes"
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"unsafe"

"github.com/c2h5oh/datasize"
"github.com/google/btree"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/metrics"
"sort"
"sync"
"sync/atomic"
)

var (
Expand All @@ -17,10 +20,26 @@ var (
)

type mutation struct {
puts *puts // Map buckets to map[key]value
mu sync.RWMutex
db Database
tuples MultiPutTuples
puts *btree.BTree
mu sync.RWMutex
searchItem MutationItem
size int
db Database
}

type MutationItem struct {
table string
key []byte
value []byte
}

func (mi *MutationItem) Less(than btree.Item) bool {
i := than.(*MutationItem)
c := strings.Compare(mi.table, i.table)
if c != 0 {
return c < 0
}
return bytes.Compare(mi.key, i.key) < 0
}

func (m *mutation) KV() KV {
Expand All @@ -30,54 +49,61 @@ func (m *mutation) KV() KV {
return nil
}

func (m *mutation) getMem(bucket string, key []byte) ([]byte, bool) {
func (m *mutation) getMem(table string, key []byte) ([]byte, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
return m.puts.get(bucket, key)
m.searchItem.table = table
m.searchItem.key = key
i := m.puts.Get(&m.searchItem)
if i == nil {
return nil, false
}
return i.(*MutationItem).value, true
}

// Can only be called from the worker thread
func (m *mutation) Get(bucket string, key []byte) ([]byte, error) {
if value, ok := m.getMem(bucket, key); ok {
func (m *mutation) Get(table string, key []byte) ([]byte, error) {
if value, ok := m.getMem(table, key); ok {
if value == nil {
return nil, ErrKeyNotFound
}
return value, nil
}
if m.db != nil {
return m.db.Get(bucket, key)
return m.db.Get(table, key)
}
return nil, ErrKeyNotFound
}

func (m *mutation) Last(bucket string) ([]byte, []byte, error) {
return m.db.Last(bucket)
func (m *mutation) Last(table string) ([]byte, []byte, error) {
return m.db.Last(table)
}

func (m *mutation) Reserve(bucket string, key []byte, i int) ([]byte, error) {
return m.db.(DbWithPendingMutations).Reserve(bucket, key, i)
func (m *mutation) Reserve(table string, key []byte, i int) ([]byte, error) {
return m.db.(DbWithPendingMutations).Reserve(table, key, i)
}

func (m *mutation) GetIndexChunk(bucket string, key []byte, timestamp uint64) ([]byte, error) {
func (m *mutation) GetIndexChunk(table string, key []byte, timestamp uint64) ([]byte, error) {
if m.db != nil {
return m.db.GetIndexChunk(bucket, key, timestamp)
return m.db.GetIndexChunk(table, key, timestamp)
}
return nil, ErrKeyNotFound
}

func (m *mutation) hasMem(bucket string, key []byte) bool {
func (m *mutation) hasMem(table string, key []byte) bool {
m.mu.RLock()
defer m.mu.RUnlock()
_, ok := m.puts.get(bucket, key)
return ok
m.searchItem.table = table
m.searchItem.key = key
return m.puts.Has(&m.searchItem)
}

func (m *mutation) Has(bucket string, key []byte) (bool, error) {
if m.hasMem(bucket, key) {
func (m *mutation) Has(table string, key []byte) (bool, error) {
if m.hasMem(table, key) {
return true, nil
}
if m.db != nil {
return m.db.Has(bucket, key)
return m.db.Has(table, key)
}
return false, nil
}
Expand All @@ -93,36 +119,44 @@ func (m *mutation) DiskSize(ctx context.Context) (common.StorageSize, error) {
return common.StorageSize(sz), nil
}

func (m *mutation) Put(bucket string, key []byte, value []byte) error {
func (m *mutation) Put(table string, key []byte, value []byte) error {
m.mu.Lock()
defer m.mu.Unlock()

m.puts.set(bucket, key, value)
newMi := &MutationItem{table: table, key: key, value: value}
i := m.puts.ReplaceOrInsert(newMi)
m.size += int(unsafe.Sizeof(newMi)) + len(key) + len(value)
if i != nil {
oldMi := i.(*MutationItem)
m.size -= (int(unsafe.Sizeof(oldMi)) + len(oldMi.key) + len(oldMi.value))
}
return nil
}

func (m *mutation) Append(bucket string, key []byte, value []byte) error {
m.mu.Lock()
defer m.mu.Unlock()

m.puts.set(bucket, key, value)
return nil
func (m *mutation) Append(table string, key []byte, value []byte) error {
return m.Put(table, key, value)
}

func (m *mutation) MultiPut(tuples ...[]byte) (uint64, error) {
m.mu.Lock()
defer m.mu.Unlock()
l := len(tuples)
for i := 0; i < l; i += 3 {
m.puts.set(string(tuples[i]), tuples[i+1], tuples[i+2])
newMi := &MutationItem{table: string(tuples[i]), key: tuples[i+1], value: tuples[i+2]}
i := m.puts.ReplaceOrInsert(newMi)
m.size += int(unsafe.Sizeof(newMi)) + len(newMi.key) + len(newMi.value)
if i != nil {
oldMi := i.(*MutationItem)
m.size -= (int(unsafe.Sizeof(oldMi)) + len(oldMi.key) + len(oldMi.value))
}
}
return 0, nil
}

func (m *mutation) BatchSize() int {
m.mu.RLock()
defer m.mu.RUnlock()
return m.puts.Size()
return m.size
}

// IdealBatchSize defines the size of the data batches should ideally add in one write.
Expand All @@ -131,22 +165,19 @@ func (m *mutation) IdealBatchSize() int {
}

// WARNING: Merged mem/DB walk is not implemented
func (m *mutation) Walk(bucket string, startkey []byte, fixedbits int, walker func([]byte, []byte) (bool, error)) error {
func (m *mutation) Walk(table string, startkey []byte, fixedbits int, walker func([]byte, []byte) (bool, error)) error {
m.panicOnEmptyDB()
return m.db.Walk(bucket, startkey, fixedbits, walker)
return m.db.Walk(table, startkey, fixedbits, walker)
}

// WARNING: Merged mem/DB walk is not implemented
func (m *mutation) MultiWalk(bucket string, startkeys [][]byte, fixedbits []int, walker func(int, []byte, []byte) error) error {
func (m *mutation) MultiWalk(table string, startkeys [][]byte, fixedbits []int, walker func(int, []byte, []byte) error) error {
m.panicOnEmptyDB()
return m.db.MultiWalk(bucket, startkeys, fixedbits, walker)
return m.db.MultiWalk(table, startkeys, fixedbits, walker)
}

func (m *mutation) Delete(bucket string, key []byte) error {
m.mu.Lock()
defer m.mu.Unlock()
m.puts.Delete(bucket, key)
return nil
func (m *mutation) Delete(table string, key []byte) error {
return m.Put(table, key, nil)
}

func (m *mutation) CommitAndBegin(ctx context.Context) error {
Expand All @@ -159,57 +190,93 @@ func (m *mutation) RollbackAndBegin(ctx context.Context) error {
return nil
}

func (m *mutation) doCommit(tx Tx) error {
var prevTable string
var c Cursor
var innerErr error
var isEndOfBucket bool
m.puts.Ascend(func(i btree.Item) bool {
mi := i.(*MutationItem)
if mi.table != prevTable {
if c != nil {
c.Close()
}
c = tx.Cursor(mi.table)
prevTable = mi.table
firstKey, _, err := c.Seek(mi.key)
if err != nil {
innerErr = err
return false
}
isEndOfBucket = firstKey == nil
}
if isEndOfBucket {
if len(mi.value) > 0 {
if err := c.Append(mi.key, mi.value); err != nil {
innerErr = err
return false
}
}
} else if len(mi.value) == 0 {
if err := c.Delete(mi.key); err != nil {
innerErr = err
return false
}
} else {
if err := c.Put(mi.key, mi.value); err != nil {
innerErr = err
return false
}
}
return true
})
return innerErr
}

func (m *mutation) Commit() (uint64, error) {
if m.db == nil {
return 0, nil
}
m.mu.Lock()
defer m.mu.Unlock()
if m.tuples == nil {
m.tuples = make(MultiPutTuples, 0, m.puts.Len()*3)
}
m.tuples = m.tuples[:0]
for bucketStr, bt := range m.puts.mp {
bucketB := []byte(bucketStr)
for key := range bt {
value, _ := bt.GetStr(key)
m.tuples = append(m.tuples, bucketB, []byte(key), value)
if tx, ok := m.db.(HasTx); ok {
if err := m.doCommit(tx.Tx()); err != nil {
return 0, err
}
} else {
if err := m.db.(HasKV).KV().Update(context.Background(), func(tx Tx) error {
return m.doCommit(tx)
}); err != nil {
return 0, err
}
delete(m.puts.mp, bucketStr)
}
sort.Sort(m.tuples)

written, err := m.db.MultiPut(m.tuples...)
if err != nil {
return 0, fmt.Errorf("db.MultiPut failed: %w", err)
}

m.puts = newPuts()
m.tuples = nil
return written, nil
m.puts.Clear(false /* addNodesToFreelist */)
m.size = 0
return 0, nil
}

func (m *mutation) Rollback() {
m.mu.Lock()
defer m.mu.Unlock()
m.puts = newPuts()
m.tuples = nil
m.puts.Clear(false /* addNodesToFreelist */)
m.size = 0
}

func (m *mutation) Keys() ([][]byte, error) {
m.mu.RLock()
defer m.mu.RUnlock()
tuples := common.NewTuples(m.puts.Len(), 2, 1)
for bucketStr, bt := range m.puts.mp {
bucketB := []byte(bucketStr)
for key := range bt {
if err := tuples.Append(bucketB, []byte(key)); err != nil {
return nil, err
}
var innerErr error
m.puts.Ascend(func(i btree.Item) bool {
mi := i.(*MutationItem)
if err := tuples.Append([]byte(mi.table), mi.key); err != nil {
innerErr = err
return false
}
}
sort.Sort(tuples)
return tuples.Values, nil
return true
})
return tuples.Values, innerErr
}

func (m *mutation) Close() {
Expand All @@ -219,7 +286,7 @@ func (m *mutation) Close() {
func (m *mutation) NewBatch() DbWithPendingMutations {
mm := &mutation{
db: m,
puts: newPuts(),
puts: btree.New(32),
}
return mm
}
Expand Down Expand Up @@ -309,7 +376,7 @@ func (d *RWCounterDecorator) MultiPut(tuples ...[]byte) (uint64, error) {
func (d *RWCounterDecorator) NewBatch() DbWithPendingMutations {
mm := &mutation{
db: d,
puts: newPuts(),
puts: btree.New(32),
}
return mm
}
Loading

0 comments on commit 3227853

Please sign in to comment.