Skip to content

Commit

Permalink
feat(v1.x.x): async pruning of orphan nodes (#876)
Browse files Browse the repository at this point in the history
Co-authored-by: Dev Ojha <dojha@berkeley.edu>
Co-authored-by: cool-developer <51834436+cool-develope@users.noreply.github.com>
  • Loading branch information
3 people authored Feb 15, 2024
1 parent b0d383c commit e1fa67e
Show file tree
Hide file tree
Showing 10 changed files with 275 additions and 118 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ jobs:
name: golangci-lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v4
- name: Check out repository code
uses: actions/checkout@v4
- name: 🐿 Setup Golang
uses: actions/setup-go@v4
with:
go-version: '^1.20.0'
go-version: 1.21
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.51.2
run: make lint
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## Unreleased

### Improvements

- [#876](https://github.com/cosmos/iavl/pull/876) Make pruning of legacy orphan nodes asynchronous.

## v1.0.0 (October 30, 2023)

### Improvements
Expand Down
9 changes: 9 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package iavl

import (
"sync"

dbm "github.com/cosmos/cosmos-db"
)

Expand All @@ -11,6 +13,7 @@ type BatchWithFlusher struct {
db dbm.DB // This is only used to create new batch
batch dbm.Batch // Batched writing buffer.

mtx sync.Mutex
flushThreshold int // The threshold to flush the batch to disk.
}

Expand Down Expand Up @@ -46,6 +49,9 @@ func (b *BatchWithFlusher) estimateSizeAfterSetting(key []byte, value []byte) (i
// the batch is flushed to disk, cleared, and a new one is created with buffer pre-allocated to threshold.
// The addition entry is then added to the batch.
func (b *BatchWithFlusher) Set(key, value []byte) error {
b.mtx.Lock()
defer b.mtx.Unlock()

batchSizeAfter, err := b.estimateSizeAfterSetting(key, value)
if err != nil {
return err
Expand All @@ -67,6 +73,9 @@ func (b *BatchWithFlusher) Set(key, value []byte) error {
// the batch is flushed to disk, cleared, and a new one is created with buffer pre-allocated to threshold.
// The deletion entry is then added to the batch.
func (b *BatchWithFlusher) Delete(key []byte) error {
b.mtx.Lock()
defer b.mtx.Unlock()

batchSizeAfter, err := b.estimateSizeAfterSetting(key, []byte{})
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions fastnode/fast_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func NewNode(key []byte, value []byte, version int64) *Node {
}

// DeserializeNode constructs an *FastNode from an encoded byte slice.
// It assumes we do not mutate this input []byte.
func DeserializeNode(key []byte, buf []byte) (*Node, error) {
ver, n, err := encoding.DecodeVarint(buf)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ require (
github.com/emicklei/dot v1.4.2
github.com/golang/mock v1.6.0
github.com/stretchr/testify v1.8.4
google.golang.org/protobuf v1.30.0
golang.org/x/crypto v0.12.0
google.golang.org/protobuf v1.30.0
)

require (
Expand Down Expand Up @@ -49,8 +49,8 @@ require (
)

retract (
v0.18.0
// This version is not used by the Cosmos SDK and adds a maintenance burden.
// Use v1.x.x instead.
[v0.21.0, v0.21.2]
v0.18.0
)
22 changes: 19 additions & 3 deletions internal/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var uvarintPool = &sync.Pool{

// decodeBytes decodes a varint length-prefixed byte slice, returning it along with the number
// of input bytes read.
// Assumes bz will not be mutated.
func DecodeBytes(bz []byte) ([]byte, int, error) {
s, n, err := DecodeUvarint(bz)
if err != nil {
Expand All @@ -51,9 +52,7 @@ func DecodeBytes(bz []byte) ([]byte, int, error) {
if len(bz) < end {
return nil, n, fmt.Errorf("insufficient bytes decoding []byte of length %v", size)
}
bz2 := make([]byte, size)
copy(bz2, bz[n:end])
return bz2, end, nil
return bz[n:end], end, nil
}

// decodeUvarint decodes a varint-encoded unsigned integer from a byte slice, returning it and the
Expand Down Expand Up @@ -97,6 +96,23 @@ func EncodeBytes(w io.Writer, bz []byte) error {
return err
}

var hashLenBz []byte

func init() {
hashLenBz = make([]byte, 1)
binary.PutUvarint(hashLenBz, 32)
}

// Encode 32 byte long hash
func Encode32BytesHash(w io.Writer, bz []byte) error {
_, err := w.Write(hashLenBz)
if err != nil {
return err
}
_, err = w.Write(bz)
return err
}

// encodeBytesSlice length-prefixes the byte slice and returns it.
func EncodeBytesSlice(bz []byte) ([]byte, error) {
buf := bufPool.Get().(*bytes.Buffer)
Expand Down
42 changes: 42 additions & 0 deletions keyformat/prefix_formatter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package keyformat

import "encoding/binary"

// This file builds some dedicated key formatters for what appears in benchmarks.

// Prefixes a single byte before a 32 byte hash.
type FastPrefixFormatter struct {
prefix byte
length int
prefixSlice []byte
}

func NewFastPrefixFormatter(prefix byte, length int) *FastPrefixFormatter {
return &FastPrefixFormatter{prefix: prefix, length: length, prefixSlice: []byte{prefix}}
}

func (f *FastPrefixFormatter) Key(bz []byte) []byte {
key := make([]byte, 1+f.length)
key[0] = f.prefix
copy(key[1:], bz)
return key
}

func (f *FastPrefixFormatter) Scan(key []byte, a interface{}) {
scan(a, key[1:])
}

func (f *FastPrefixFormatter) KeyInt64(bz int64) []byte {
key := make([]byte, 1+f.length)
key[0] = f.prefix
binary.BigEndian.PutUint64(key[1:], uint64(bz))
return key
}

func (f *FastPrefixFormatter) Prefix() []byte {
return f.prefixSlice
}

func (f *FastPrefixFormatter) Length() int {
return 1 + f.length
}
106 changes: 55 additions & 51 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,64 +266,68 @@ func (tree *MutableTree) set(key []byte, value []byte) (updated bool, err error)
func (tree *MutableTree) recursiveSet(node *Node, key []byte, value []byte) (
newSelf *Node, updated bool, err error,
) {
version := tree.version + 1

if node.isLeaf() {
if !tree.skipFastStorageUpgrade {
tree.addUnsavedAddition(key, fastnode.NewNode(key, value, version))
}
switch bytes.Compare(key, node.key) {
case -1: // setKey < leafKey
return &Node{
key: node.key,
subtreeHeight: 1,
size: 2,
nodeKey: nil,
leftNode: NewNode(key, value),
rightNode: node,
}, false, nil
case 1: // setKey > leafKey
return &Node{
key: key,
subtreeHeight: 1,
size: 2,
nodeKey: nil,
leftNode: node,
rightNode: NewNode(key, value),
}, false, nil
default:
return NewNode(key, value), true, nil
return tree.recursiveSetLeaf(node, key, value)
}
node, err = node.clone(tree)
if err != nil {
return nil, false, err
}

if bytes.Compare(key, node.key) < 0 {
node.leftNode, updated, err = tree.recursiveSet(node.leftNode, key, value)
if err != nil {
return nil, updated, err
}
} else {
node, err = node.clone(tree)
node.rightNode, updated, err = tree.recursiveSet(node.rightNode, key, value)
if err != nil {
return nil, false, err
return nil, updated, err
}
}

if bytes.Compare(key, node.key) < 0 {
node.leftNode, updated, err = tree.recursiveSet(node.leftNode, key, value)
if err != nil {
return nil, updated, err
}
} else {
node.rightNode, updated, err = tree.recursiveSet(node.rightNode, key, value)
if err != nil {
return nil, updated, err
}
}
if updated {
return node, updated, nil
}
err = node.calcHeightAndSize(tree.ImmutableTree)
if err != nil {
return nil, false, err
}
newNode, err := tree.balance(node)
if err != nil {
return nil, false, err
}
return newNode, updated, err
}

if updated {
return node, updated, nil
}
err = node.calcHeightAndSize(tree.ImmutableTree)
if err != nil {
return nil, false, err
}
newNode, err := tree.balance(node)
if err != nil {
return nil, false, err
}
return newNode, updated, err
func (tree *MutableTree) recursiveSetLeaf(node *Node, key []byte, value []byte) (
newSelf *Node, updated bool, err error,
) {
version := tree.version + 1
if !tree.skipFastStorageUpgrade {
tree.addUnsavedAddition(key, fastnode.NewNode(key, value, version))
}
switch bytes.Compare(key, node.key) {
case -1: // setKey < leafKey
return &Node{
key: node.key,
subtreeHeight: 1,
size: 2,
nodeKey: nil,
leftNode: NewNode(key, value),
rightNode: node,
}, false, nil
case 1: // setKey > leafKey
return &Node{
key: key,
subtreeHeight: 1,
size: 2,
nodeKey: nil,
leftNode: node,
rightNode: NewNode(key, value),
}, false, nil
default:
return NewNode(key, value), true, nil
}
}

Expand Down
32 changes: 23 additions & 9 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,15 @@ func GetRootKey(version int64) []byte {

// Node represents a node in a Tree.
type Node struct {
key []byte
value []byte
hash []byte
nodeKey *NodeKey
leftNodeKey []byte
key []byte
value []byte
hash []byte
nodeKey *NodeKey
// Legacy: LeftNodeHash
// v1: Left node ptr via Version/key
leftNodeKey []byte
// Legacy: RightNodeHash
// v1: Right node ptr via Version/key
rightNodeKey []byte
size int64
leftNode *Node
Expand Down Expand Up @@ -517,19 +521,29 @@ func (node *Node) writeHashBytes(w io.Writer, version int64) error {
// (e.g. ProofLeafNode.ValueHash)
valueHash := sha256.Sum256(node.value)

err = encoding.EncodeBytes(w, valueHash[:])
err = encoding.Encode32BytesHash(w, valueHash[:])
if err != nil {
return fmt.Errorf("writing value, %w", err)
}
} else {
if node.leftNode == nil || node.rightNode == nil {
if (node.leftNode == nil && len(node.leftNodeKey) != 32) || (node.rightNode == nil && len(node.rightNodeKey) != 32) {
return ErrEmptyChild
}
err = encoding.EncodeBytes(w, node.leftNode.hash)
// If left/rightNodeKey is 32 bytes, it is a legacy node whose value is just the hash.
// We may have skipped fetching leftNode/rightNode.
if len(node.leftNodeKey) == 32 {
err = encoding.Encode32BytesHash(w, node.leftNodeKey)
} else {
err = encoding.Encode32BytesHash(w, node.leftNode.hash)
}
if err != nil {
return fmt.Errorf("writing left hash, %w", err)
}
err = encoding.EncodeBytes(w, node.rightNode.hash)
if len(node.rightNodeKey) == 32 {
err = encoding.Encode32BytesHash(w, node.rightNodeKey)
} else {
err = encoding.Encode32BytesHash(w, node.rightNode.hash)
}
if err != nil {
return fmt.Errorf("writing right hash, %w", err)
}
Expand Down
Loading

0 comments on commit e1fa67e

Please sign in to comment.