diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 4935210c8..1e03feecc 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -12,11 +12,11 @@ jobs: name: golangci-lint runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions/setup-go@v4 + - name: Check out repository code + uses: actions/checkout@v4 + - name: 🐿 Setup Golang + uses: actions/setup-go@v4 with: - go-version: '^1.20.0' + go-version: 1.21 - name: golangci-lint - uses: golangci/golangci-lint-action@v3 - with: - version: v1.51.2 + run: make lint diff --git a/CHANGELOG.md b/CHANGELOG.md index ce47b3094..df5d7f7d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## Unreleased + +### Improvements + +- [#876](https://github.com/cosmos/iavl/pull/876) Make pruning of legacy orphan nodes asynchronous. + ## v1.0.0 (October 30, 2023) ### Improvements diff --git a/batch.go b/batch.go index b25e879a2..a7e5e20b0 100644 --- a/batch.go +++ b/batch.go @@ -1,6 +1,8 @@ package iavl import ( + "sync" + dbm "github.com/cosmos/cosmos-db" ) @@ -11,6 +13,7 @@ type BatchWithFlusher struct { db dbm.DB // This is only used to create new batch batch dbm.Batch // Batched writing buffer. + mtx sync.Mutex flushThreshold int // The threshold to flush the batch to disk. } @@ -46,6 +49,9 @@ func (b *BatchWithFlusher) estimateSizeAfterSetting(key []byte, value []byte) (i // the batch is flushed to disk, cleared, and a new one is created with buffer pre-allocated to threshold. // The addition entry is then added to the batch. func (b *BatchWithFlusher) Set(key, value []byte) error { + b.mtx.Lock() + defer b.mtx.Unlock() + batchSizeAfter, err := b.estimateSizeAfterSetting(key, value) if err != nil { return err @@ -67,6 +73,9 @@ func (b *BatchWithFlusher) Set(key, value []byte) error { // the batch is flushed to disk, cleared, and a new one is created with buffer pre-allocated to threshold. // The deletion entry is then added to the batch. func (b *BatchWithFlusher) Delete(key []byte) error { + b.mtx.Lock() + defer b.mtx.Unlock() + batchSizeAfter, err := b.estimateSizeAfterSetting(key, []byte{}) if err != nil { return err diff --git a/fastnode/fast_node.go b/fastnode/fast_node.go index 149ac9b26..5d00bd997 100644 --- a/fastnode/fast_node.go +++ b/fastnode/fast_node.go @@ -30,6 +30,7 @@ func NewNode(key []byte, value []byte, version int64) *Node { } // DeserializeNode constructs an *FastNode from an encoded byte slice. +// It assumes we do not mutate this input []byte. func DeserializeNode(key []byte, buf []byte) (*Node, error) { ver, n, err := encoding.DecodeVarint(buf) if err != nil { diff --git a/go.mod b/go.mod index b38602555..356a691a7 100644 --- a/go.mod +++ b/go.mod @@ -9,8 +9,8 @@ require ( github.com/emicklei/dot v1.4.2 github.com/golang/mock v1.6.0 github.com/stretchr/testify v1.8.4 - google.golang.org/protobuf v1.30.0 golang.org/x/crypto v0.12.0 + google.golang.org/protobuf v1.30.0 ) require ( @@ -49,8 +49,8 @@ require ( ) retract ( - v0.18.0 // This version is not used by the Cosmos SDK and adds a maintenance burden. // Use v1.x.x instead. [v0.21.0, v0.21.2] + v0.18.0 ) diff --git a/internal/encoding/encoding.go b/internal/encoding/encoding.go index 17a994bc1..78a4f9899 100644 --- a/internal/encoding/encoding.go +++ b/internal/encoding/encoding.go @@ -30,6 +30,7 @@ var uvarintPool = &sync.Pool{ // decodeBytes decodes a varint length-prefixed byte slice, returning it along with the number // of input bytes read. +// Assumes bz will not be mutated. func DecodeBytes(bz []byte) ([]byte, int, error) { s, n, err := DecodeUvarint(bz) if err != nil { @@ -51,9 +52,7 @@ func DecodeBytes(bz []byte) ([]byte, int, error) { if len(bz) < end { return nil, n, fmt.Errorf("insufficient bytes decoding []byte of length %v", size) } - bz2 := make([]byte, size) - copy(bz2, bz[n:end]) - return bz2, end, nil + return bz[n:end], end, nil } // decodeUvarint decodes a varint-encoded unsigned integer from a byte slice, returning it and the @@ -97,6 +96,23 @@ func EncodeBytes(w io.Writer, bz []byte) error { return err } +var hashLenBz []byte + +func init() { + hashLenBz = make([]byte, 1) + binary.PutUvarint(hashLenBz, 32) +} + +// Encode 32 byte long hash +func Encode32BytesHash(w io.Writer, bz []byte) error { + _, err := w.Write(hashLenBz) + if err != nil { + return err + } + _, err = w.Write(bz) + return err +} + // encodeBytesSlice length-prefixes the byte slice and returns it. func EncodeBytesSlice(bz []byte) ([]byte, error) { buf := bufPool.Get().(*bytes.Buffer) diff --git a/keyformat/prefix_formatter.go b/keyformat/prefix_formatter.go new file mode 100644 index 000000000..873c3a7e1 --- /dev/null +++ b/keyformat/prefix_formatter.go @@ -0,0 +1,42 @@ +package keyformat + +import "encoding/binary" + +// This file builds some dedicated key formatters for what appears in benchmarks. + +// Prefixes a single byte before a 32 byte hash. +type FastPrefixFormatter struct { + prefix byte + length int + prefixSlice []byte +} + +func NewFastPrefixFormatter(prefix byte, length int) *FastPrefixFormatter { + return &FastPrefixFormatter{prefix: prefix, length: length, prefixSlice: []byte{prefix}} +} + +func (f *FastPrefixFormatter) Key(bz []byte) []byte { + key := make([]byte, 1+f.length) + key[0] = f.prefix + copy(key[1:], bz) + return key +} + +func (f *FastPrefixFormatter) Scan(key []byte, a interface{}) { + scan(a, key[1:]) +} + +func (f *FastPrefixFormatter) KeyInt64(bz int64) []byte { + key := make([]byte, 1+f.length) + key[0] = f.prefix + binary.BigEndian.PutUint64(key[1:], uint64(bz)) + return key +} + +func (f *FastPrefixFormatter) Prefix() []byte { + return f.prefixSlice +} + +func (f *FastPrefixFormatter) Length() int { + return 1 + f.length +} diff --git a/mutable_tree.go b/mutable_tree.go index e65dbd109..358a366e1 100644 --- a/mutable_tree.go +++ b/mutable_tree.go @@ -266,64 +266,68 @@ func (tree *MutableTree) set(key []byte, value []byte) (updated bool, err error) func (tree *MutableTree) recursiveSet(node *Node, key []byte, value []byte) ( newSelf *Node, updated bool, err error, ) { - version := tree.version + 1 - if node.isLeaf() { - if !tree.skipFastStorageUpgrade { - tree.addUnsavedAddition(key, fastnode.NewNode(key, value, version)) - } - switch bytes.Compare(key, node.key) { - case -1: // setKey < leafKey - return &Node{ - key: node.key, - subtreeHeight: 1, - size: 2, - nodeKey: nil, - leftNode: NewNode(key, value), - rightNode: node, - }, false, nil - case 1: // setKey > leafKey - return &Node{ - key: key, - subtreeHeight: 1, - size: 2, - nodeKey: nil, - leftNode: node, - rightNode: NewNode(key, value), - }, false, nil - default: - return NewNode(key, value), true, nil + return tree.recursiveSetLeaf(node, key, value) + } + node, err = node.clone(tree) + if err != nil { + return nil, false, err + } + + if bytes.Compare(key, node.key) < 0 { + node.leftNode, updated, err = tree.recursiveSet(node.leftNode, key, value) + if err != nil { + return nil, updated, err } } else { - node, err = node.clone(tree) + node.rightNode, updated, err = tree.recursiveSet(node.rightNode, key, value) if err != nil { - return nil, false, err + return nil, updated, err } + } - if bytes.Compare(key, node.key) < 0 { - node.leftNode, updated, err = tree.recursiveSet(node.leftNode, key, value) - if err != nil { - return nil, updated, err - } - } else { - node.rightNode, updated, err = tree.recursiveSet(node.rightNode, key, value) - if err != nil { - return nil, updated, err - } - } + if updated { + return node, updated, nil + } + err = node.calcHeightAndSize(tree.ImmutableTree) + if err != nil { + return nil, false, err + } + newNode, err := tree.balance(node) + if err != nil { + return nil, false, err + } + return newNode, updated, err +} - if updated { - return node, updated, nil - } - err = node.calcHeightAndSize(tree.ImmutableTree) - if err != nil { - return nil, false, err - } - newNode, err := tree.balance(node) - if err != nil { - return nil, false, err - } - return newNode, updated, err +func (tree *MutableTree) recursiveSetLeaf(node *Node, key []byte, value []byte) ( + newSelf *Node, updated bool, err error, +) { + version := tree.version + 1 + if !tree.skipFastStorageUpgrade { + tree.addUnsavedAddition(key, fastnode.NewNode(key, value, version)) + } + switch bytes.Compare(key, node.key) { + case -1: // setKey < leafKey + return &Node{ + key: node.key, + subtreeHeight: 1, + size: 2, + nodeKey: nil, + leftNode: NewNode(key, value), + rightNode: node, + }, false, nil + case 1: // setKey > leafKey + return &Node{ + key: key, + subtreeHeight: 1, + size: 2, + nodeKey: nil, + leftNode: node, + rightNode: NewNode(key, value), + }, false, nil + default: + return NewNode(key, value), true, nil } } diff --git a/node.go b/node.go index 97d54a865..0f1bc0542 100644 --- a/node.go +++ b/node.go @@ -57,11 +57,15 @@ func GetRootKey(version int64) []byte { // Node represents a node in a Tree. type Node struct { - key []byte - value []byte - hash []byte - nodeKey *NodeKey - leftNodeKey []byte + key []byte + value []byte + hash []byte + nodeKey *NodeKey + // Legacy: LeftNodeHash + // v1: Left node ptr via Version/key + leftNodeKey []byte + // Legacy: RightNodeHash + // v1: Right node ptr via Version/key rightNodeKey []byte size int64 leftNode *Node @@ -517,19 +521,29 @@ func (node *Node) writeHashBytes(w io.Writer, version int64) error { // (e.g. ProofLeafNode.ValueHash) valueHash := sha256.Sum256(node.value) - err = encoding.EncodeBytes(w, valueHash[:]) + err = encoding.Encode32BytesHash(w, valueHash[:]) if err != nil { return fmt.Errorf("writing value, %w", err) } } else { - if node.leftNode == nil || node.rightNode == nil { + if (node.leftNode == nil && len(node.leftNodeKey) != 32) || (node.rightNode == nil && len(node.rightNodeKey) != 32) { return ErrEmptyChild } - err = encoding.EncodeBytes(w, node.leftNode.hash) + // If left/rightNodeKey is 32 bytes, it is a legacy node whose value is just the hash. + // We may have skipped fetching leftNode/rightNode. + if len(node.leftNodeKey) == 32 { + err = encoding.Encode32BytesHash(w, node.leftNodeKey) + } else { + err = encoding.Encode32BytesHash(w, node.leftNode.hash) + } if err != nil { return fmt.Errorf("writing left hash, %w", err) } - err = encoding.EncodeBytes(w, node.rightNode.hash) + if len(node.rightNodeKey) == 32 { + err = encoding.Encode32BytesHash(w, node.rightNodeKey) + } else { + err = encoding.Encode32BytesHash(w, node.rightNode.hash) + } if err != nil { return fmt.Errorf("writing right hash, %w", err) } diff --git a/nodedb.go b/nodedb.go index d446b7c1d..0dfe45464 100644 --- a/nodedb.go +++ b/nodedb.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "sync" + "time" "cosmossdk.io/log" dbm "github.com/cosmos/cosmos-db" @@ -40,10 +41,10 @@ const ( var ( // All new node keys are prefixed with the byte 's'. This ensures no collision is // possible with the legacy nodes, and makes them easier to traverse. They are indexed by the version and the local nonce. - nodeKeyFormat = keyformat.NewKeyFormat('s', int64Size+int32Size) // s + nodeKeyFormat = keyformat.NewFastPrefixFormatter('s', int64Size+int32Size) // s // This is only used for the iteration purpose. - nodeKeyPrefixFormat = keyformat.NewKeyFormat('s', int64Size) // s + nodeKeyPrefixFormat = keyformat.NewFastPrefixFormatter('s', int64Size) // s // Key Format for making reads and iterates go through a data-locality preserving db. // The value at an entry will list what version it was written to. @@ -58,7 +59,7 @@ var ( metadataKeyFormat = keyformat.NewKeyFormat('m', 0) // m // All legacy node keys are prefixed with the byte 'n'. - legacyNodeKeyFormat = keyformat.NewKeyFormat('n', hashSize) // n + legacyNodeKeyFormat = keyformat.NewFastPrefixFormatter('n', hashSize) // n // All legacy orphan keys are prefixed with the byte 'o'. legacyOrphanKeyFormat = keyformat.NewKeyFormat('o', int64Size, int64Size, hashSize) // o @@ -419,60 +420,124 @@ func (ndb *nodeDB) deleteLegacyNodes(version int64, nk []byte) error { return ndb.batch.Delete(ndb.legacyNodeKey(nk)) } +var ( + isDeletingLegacyVersionsMutex = &sync.Mutex{} + isDeletingLegacyVersions = false +) + // deleteLegacyVersions deletes all legacy versions from disk. func (ndb *nodeDB) deleteLegacyVersions() error { - // Check if we have a legacy version - itr, err := dbm.IteratePrefix(ndb.db, legacyRootKeyFormat.Key()) - if err != nil { - return err + isDeletingLegacyVersionsMutex.Lock() + if isDeletingLegacyVersions { + isDeletingLegacyVersionsMutex.Unlock() + return nil } - defer itr.Close() + isDeletingLegacyVersions = true + isDeletingLegacyVersionsMutex.Unlock() - // Delete orphans for all legacy versions - var prevVersion, curVersion int64 - var rootKeys [][]byte - for ; itr.Valid(); itr.Next() { - legacyRootKeyFormat.Scan(itr.Key(), &curVersion) - rootKeys = append(rootKeys, itr.Key()) - if prevVersion > 0 { - if err := ndb.traverseOrphans(prevVersion, curVersion, func(orphan *Node) error { + go func() { + defer func() { + isDeletingLegacyVersionsMutex.Lock() + isDeletingLegacyVersions = false + isDeletingLegacyVersionsMutex.Unlock() + }() + + // Check if we have a legacy version + itr, err := dbm.IteratePrefix(ndb.db, legacyRootKeyFormat.Key()) + if err != nil { + ndb.logger.Error(err.Error()) + return + } + defer itr.Close() + + // Delete orphans for all legacy versions + var prevVersion, curVersion int64 + var rootKeys [][]byte + counter := 0 + for ; itr.Valid(); itr.Next() { + legacyRootKeyFormat.Scan(itr.Key(), &curVersion) + rootKeys = append(rootKeys, itr.Key()) + if prevVersion > 0 { + if err := ndb.traverseOrphans(prevVersion, curVersion, func(orphan *Node) error { + counter++ + if counter == 1000 { + counter = 0 + time.Sleep(1000 * time.Millisecond) + fmt.Println("IAVL sleep happening") + } + return ndb.batch.Delete(ndb.nodeKey(orphan.GetKey())) + }); err != nil { + ndb.logger.Error(err.Error()) + return + } + } + prevVersion = curVersion + } + // Delete the last version for the legacyLastVersion + if curVersion > 0 { + legacyLatestVersion, err := ndb.getLegacyLatestVersion() + if err != nil { + ndb.logger.Error(err.Error()) + return + } + if curVersion != legacyLatestVersion { + ndb.logger.Error("expected legacyLatestVersion to be %d, got %d", legacyLatestVersion, curVersion) + return + } + if err := ndb.traverseOrphans(curVersion, curVersion+1, func(orphan *Node) error { return ndb.batch.Delete(ndb.nodeKey(orphan.GetKey())) }); err != nil { - return err + ndb.logger.Error("failed to clean legacy orphans between versions", "err", err) + return } } - prevVersion = curVersion - } - // Delete the last version for the legacyLastVersion - if curVersion > 0 { - legacyLatestVersion, err := ndb.getLegacyLatestVersion() - if err != nil { - return err - } - if curVersion != legacyLatestVersion { - return fmt.Errorf("expected legacyLatestVersion to be %d, got %d", legacyLatestVersion, curVersion) + + // Delete all roots of the legacy versions + for _, rootKey := range rootKeys { + if err := ndb.batch.Delete(rootKey); err != nil { + ndb.logger.Error("failed to clean legacy orphans root keys", "err", err) + return + } } - if err := ndb.traverseOrphans(curVersion, curVersion+1, func(orphan *Node) error { - return ndb.batch.Delete(ndb.nodeKey(orphan.GetKey())) - }); err != nil { - return err + + // Initialize the legacy latest version to -1 to demonstrate that all legacy versions have been deleted + ndb.legacyLatestVersion = -1 + + // Delete all orphan nodes of the legacy versions + // TODO: Is this just deadcode????? + if err := ndb.deleteOrphans(); err != nil { + ndb.logger.Error("failed to clean legacy orphans", "err", err) + return } + }() + + return nil +} + +// deleteOrphans cleans all legacy orphans from the nodeDB. +func (ndb *nodeDB) deleteOrphans() error { + itr, err := dbm.IteratePrefix(ndb.db, legacyOrphanKeyFormat.Key()) + if err != nil { + return err } + defer itr.Close() - // Delete all roots of the legacy versions - for _, rootKey := range rootKeys { - if err := ndb.batch.Delete(rootKey); err != nil { + count := 0 + for ; itr.Valid(); itr.Next() { + if err := ndb.batch.Delete(itr.Key()); err != nil { return err } - } - // Initialize the legacy latest version to -1 to demonstrate that all legacy versions have been deleted - ndb.legacyLatestVersion = -1 + // Sleep for a while to avoid blocking the main thread i/o. + count++ + if count > 1000 { + count = 0 + time.Sleep(100 * time.Millisecond) + } - // Delete all orphan nodes of the legacy versions - return ndb.traversePrefix(legacyOrphanKeyFormat.Key(), func(key, value []byte) error { - return ndb.batch.Delete(key) - }) + } + + return nil } // DeleteVersionsFrom permanently deletes all tree versions from the given version upwards. @@ -520,7 +585,7 @@ func (ndb *nodeDB) DeleteVersionsFrom(fromVersion int64) error { } // Delete the nodes for new format - err = ndb.traverseRange(nodeKeyPrefixFormat.Key(fromVersion), nodeKeyPrefixFormat.Key(latest+1), func(k, v []byte) error { + err = ndb.traverseRange(nodeKeyPrefixFormat.KeyInt64(fromVersion), nodeKeyPrefixFormat.KeyInt64(latest+1), func(k, v []byte) error { return ndb.batch.Delete(k) }) @@ -600,7 +665,7 @@ func (ndb *nodeDB) nodeKey(nk []byte) []byte { } func (ndb *nodeDB) nodeKeyPrefix(version int64) []byte { - return nodeKeyPrefixFormat.Key(version) + return nodeKeyPrefixFormat.KeyInt64(version) } func (ndb *nodeDB) fastNodeKey(key []byte) []byte { @@ -696,8 +761,8 @@ func (ndb *nodeDB) getLatestVersion() (int64, error) { } itr, err := ndb.db.ReverseIterator( - nodeKeyPrefixFormat.Key(int64(1)), - nodeKeyPrefixFormat.Key(int64(math.MaxInt64)), + nodeKeyPrefixFormat.KeyInt64(int64(1)), + nodeKeyPrefixFormat.KeyInt64(int64(math.MaxInt64)), ) if err != nil { return 0, err @@ -1016,7 +1081,7 @@ func isReferenceToRoot(bz []byte) bool { func (ndb *nodeDB) traverseNodes(fn func(node *Node) error) error { nodes := []*Node{} - if err := ndb.traversePrefix(nodeKeyFormat.Key(), func(key, value []byte) error { + if err := ndb.traversePrefix(nodeKeyFormat.Prefix(), func(key, value []byte) error { if isReferenceToRoot(value) { return nil } @@ -1099,7 +1164,7 @@ func (ndb *nodeDB) String() (string, error) { index := 0 - err := ndb.traversePrefix(nodeKeyFormat.Key(), func(key, value []byte) error { + err := ndb.traversePrefix(nodeKeyFormat.Prefix(), func(key, value []byte) error { fmt.Fprintf(buf, "%s: %x\n", key, value) return nil })