Skip to content

Commit

Permalink
Merge pull request #5484 from guggero/database-full-remote
Browse files Browse the repository at this point in the history
etcd: enable full remote database support
  • Loading branch information
guggero authored Aug 4, 2021
2 parents 44971f0 + 66ed64d commit 4c010e8
Show file tree
Hide file tree
Showing 52 changed files with 1,011 additions and 767 deletions.
2 changes: 1 addition & 1 deletion chainntnfs/bitcoindnotify/bitcoind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func initHintCache(t *testing.T) *chainntnfs.HeightHintCache {
testCfg := chainntnfs.CacheConfig{
QueryDisable: false,
}
hintCache, err := chainntnfs.NewHeightHintCache(testCfg, db)
hintCache, err := chainntnfs.NewHeightHintCache(testCfg, db.Backend)
if err != nil {
t.Fatalf("unable to create hint cache: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion chainntnfs/btcdnotify/btcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func initHintCache(t *testing.T) *chainntnfs.HeightHintCache {
testCfg := chainntnfs.CacheConfig{
QueryDisable: false,
}
hintCache, err := chainntnfs.NewHeightHintCache(testCfg, db)
hintCache, err := chainntnfs.NewHeightHintCache(testCfg, db.Backend)
if err != nil {
t.Fatalf("unable to create hint cache: %v", err)
}
Expand Down
16 changes: 9 additions & 7 deletions chainntnfs/height_hint_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type ConfirmHintCache interface {
// will be stored.
type HeightHintCache struct {
cfg CacheConfig
db *channeldb.DB
db kvdb.Backend
}

// Compile-time checks to ensure HeightHintCache satisfies the SpendHintCache
Expand All @@ -93,7 +93,9 @@ var _ SpendHintCache = (*HeightHintCache)(nil)
var _ ConfirmHintCache = (*HeightHintCache)(nil)

// NewHeightHintCache returns a new height hint cache backed by a database.
func NewHeightHintCache(cfg CacheConfig, db *channeldb.DB) (*HeightHintCache, error) {
func NewHeightHintCache(cfg CacheConfig, db kvdb.Backend) (*HeightHintCache,
error) {

cache := &HeightHintCache{cfg, db}
if err := cache.initBuckets(); err != nil {
return nil, err
Expand All @@ -105,7 +107,7 @@ func NewHeightHintCache(cfg CacheConfig, db *channeldb.DB) (*HeightHintCache, er
// initBuckets ensures that the primary buckets used by the circuit are
// initialized so that we can assume their existence after startup.
func (c *HeightHintCache) initBuckets() error {
return kvdb.Batch(c.db.Backend, func(tx kvdb.RwTx) error {
return kvdb.Batch(c.db, func(tx kvdb.RwTx) error {
_, err := tx.CreateTopLevelBucket(spendHintBucket)
if err != nil {
return err
Expand All @@ -127,7 +129,7 @@ func (c *HeightHintCache) CommitSpendHint(height uint32,
Log.Tracef("Updating spend hint to height %d for %v", height,
spendRequests)

return kvdb.Batch(c.db.Backend, func(tx kvdb.RwTx) error {
return kvdb.Batch(c.db, func(tx kvdb.RwTx) error {
spendHints := tx.ReadWriteBucket(spendHintBucket)
if spendHints == nil {
return ErrCorruptedHeightHintCache
Expand Down Expand Up @@ -197,7 +199,7 @@ func (c *HeightHintCache) PurgeSpendHint(spendRequests ...SpendRequest) error {

Log.Tracef("Removing spend hints for %v", spendRequests)

return kvdb.Batch(c.db.Backend, func(tx kvdb.RwTx) error {
return kvdb.Batch(c.db, func(tx kvdb.RwTx) error {
spendHints := tx.ReadWriteBucket(spendHintBucket)
if spendHints == nil {
return ErrCorruptedHeightHintCache
Expand Down Expand Up @@ -228,7 +230,7 @@ func (c *HeightHintCache) CommitConfirmHint(height uint32,
Log.Tracef("Updating confirm hints to height %d for %v", height,
confRequests)

return kvdb.Batch(c.db.Backend, func(tx kvdb.RwTx) error {
return kvdb.Batch(c.db, func(tx kvdb.RwTx) error {
confirmHints := tx.ReadWriteBucket(confirmHintBucket)
if confirmHints == nil {
return ErrCorruptedHeightHintCache
Expand Down Expand Up @@ -299,7 +301,7 @@ func (c *HeightHintCache) PurgeConfirmHint(confRequests ...ConfRequest) error {

Log.Tracef("Removing confirm hints for %v", confRequests)

return kvdb.Batch(c.db.Backend, func(tx kvdb.RwTx) error {
return kvdb.Batch(c.db, func(tx kvdb.RwTx) error {
confirmHints := tx.ReadWriteBucket(confirmHintBucket)
if confirmHints == nil {
return ErrCorruptedHeightHintCache
Expand Down
2 changes: 1 addition & 1 deletion chainntnfs/height_hint_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func initHintCacheWithConfig(t *testing.T, cfg CacheConfig) *HeightHintCache {
if err != nil {
t.Fatalf("unable to create db: %v", err)
}
hintCache, err := NewHeightHintCache(cfg, db)
hintCache, err := NewHeightHintCache(cfg, db.Backend)
if err != nil {
t.Fatalf("unable to create hint cache: %v", err)
}
Expand Down
4 changes: 3 additions & 1 deletion chainntnfs/test/test_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -1926,7 +1926,9 @@ func TestInterfaces(t *testing.T, targetBackEnd string) {
testCfg := chainntnfs.CacheConfig{
QueryDisable: false,
}
hintCache, err := chainntnfs.NewHeightHintCache(testCfg, db)
hintCache, err := chainntnfs.NewHeightHintCache(
testCfg, db.Backend,
)
if err != nil {
t.Fatalf("unable to create height hint cache: %v", err)
}
Expand Down
15 changes: 9 additions & 6 deletions chainreg/chainregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lncfg"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/btcwallet"
Expand Down Expand Up @@ -68,11 +69,13 @@ type Config struct {
// LtcdMode defines settings for connecting to an ltcd node.
LtcdMode *lncfg.Btcd

// LocalChanDB is a pointer to the local backing channel database.
LocalChanDB *channeldb.DB
// HeightHintDB is a pointer to the database that stores the height
// hints.
HeightHintDB kvdb.Backend

// RemoteChanDB is a pointer to the remote backing channel database.
RemoteChanDB *channeldb.DB
// ChanStateDB is a pointer to the database that stores the channel
// state.
ChanStateDB *channeldb.DB

// BlockCacheSize is the size (in bytes) of blocks kept in memory.
BlockCacheSize uint64
Expand Down Expand Up @@ -304,7 +307,7 @@ func NewChainControl(cfg *Config, blockCache *blockcache.BlockCache) (

// Initialize the height hint cache within the chain directory.
hintCache, err := chainntnfs.NewHeightHintCache(
heightHintCacheConfig, cfg.LocalChanDB,
heightHintCacheConfig, cfg.HeightHintDB,
)
if err != nil {
return nil, nil, fmt.Errorf("unable to initialize height hint "+
Expand Down Expand Up @@ -684,7 +687,7 @@ func NewChainControl(cfg *Config, blockCache *blockcache.BlockCache) (
// Create, and start the lnwallet, which handles the core payment
// channel logic, and exposes control via proxy state machines.
walletCfg := lnwallet.Config{
Database: cfg.RemoteChanDB,
Database: cfg.ChanStateDB,
Notifier: cc.ChainNotifier,
WalletController: wc,
Signer: cc.Signer,
Expand Down
68 changes: 13 additions & 55 deletions channeldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/walletdb"
"github.com/go-errors/errors"
mig "github.com/lightningnetwork/lnd/channeldb/migration"
"github.com/lightningnetwork/lnd/channeldb/migration12"
Expand All @@ -25,8 +24,7 @@ import (
)

const (
dbName = "channel.db"
dbFilePermission = 0600
dbName = "channel.db"
)

var (
Expand Down Expand Up @@ -214,46 +212,6 @@ type DB struct {
dryRun bool
}

// Update is a wrapper around walletdb.Update which calls into the extended
// backend when available. This call is needed to be able to cast DB to
// ExtendedBackend. The passed reset function is called before the start of the
// transaction and can be used to reset intermediate state. As callers may
// expect retries of the f closure (depending on the database backend used), the
// reset function will be called before each retry respectively.
func (db *DB) Update(f func(tx walletdb.ReadWriteTx) error, reset func()) error {
if v, ok := db.Backend.(kvdb.ExtendedBackend); ok {
return v.Update(f, reset)
}

reset()
return walletdb.Update(db, f)
}

// View is a wrapper around walletdb.View which calls into the extended
// backend when available. This call is needed to be able to cast DB to
// ExtendedBackend. The passed reset function is called before the start of the
// transaction and can be used to reset intermediate state. As callers may
// expect retries of the f closure (depending on the database backend used), the
// reset function will be called before each retry respectively.
func (db *DB) View(f func(tx walletdb.ReadTx) error, reset func()) error {
if v, ok := db.Backend.(kvdb.ExtendedBackend); ok {
return v.View(f, reset)
}

reset()
return walletdb.View(db, f)
}

// PrintStats calls into the extended backend if available. This call is needed
// to be able to cast DB to ExtendedBackend.
func (db *DB) PrintStats() string {
if v, ok := db.Backend.(kvdb.ExtendedBackend); ok {
return v.PrintStats()
}

return "unimplemented"
}

// Open opens or creates channeldb. Any necessary schemas migrations due
// to updates will take place as necessary.
// TODO(bhandras): deprecate this function.
Expand Down Expand Up @@ -449,7 +407,7 @@ func (d *DB) FetchOpenChannels(nodeID *btcec.PublicKey) ([]*OpenChannel, error)
// stored currently active/open channels associated with the target nodeID. In
// the case that no active channels are known to have been created with this
// node, then a zero-length slice is returned.
func (db *DB) fetchOpenChannels(tx kvdb.RTx,
func (d *DB) fetchOpenChannels(tx kvdb.RTx,
nodeID *btcec.PublicKey) ([]*OpenChannel, error) {

// Get the bucket dedicated to storing the metadata for open channels.
Expand Down Expand Up @@ -485,7 +443,7 @@ func (db *DB) fetchOpenChannels(tx kvdb.RTx,

// Finally, we both of the necessary buckets retrieved, fetch
// all the active channels related to this node.
nodeChannels, err := db.fetchNodeChannels(chainBucket)
nodeChannels, err := d.fetchNodeChannels(chainBucket)
if err != nil {
return fmt.Errorf("unable to read channel for "+
"chain_hash=%x, node_key=%x: %v",
Expand All @@ -502,7 +460,7 @@ func (db *DB) fetchOpenChannels(tx kvdb.RTx,
// fetchNodeChannels retrieves all active channels from the target chainBucket
// which is under a node's dedicated channel bucket. This function is typically
// used to fetch all the active channels related to a particular node.
func (db *DB) fetchNodeChannels(chainBucket kvdb.RBucket) ([]*OpenChannel, error) {
func (d *DB) fetchNodeChannels(chainBucket kvdb.RBucket) ([]*OpenChannel, error) {

var channels []*OpenChannel

Expand All @@ -528,7 +486,7 @@ func (db *DB) fetchNodeChannels(chainBucket kvdb.RBucket) ([]*OpenChannel, error
return fmt.Errorf("unable to read channel data for "+
"chan_point=%v: %v", outPoint, err)
}
oChannel.Db = db
oChannel.Db = d

channels = append(channels, oChannel)

Expand Down Expand Up @@ -990,8 +948,8 @@ func (d *DB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
// pruneLinkNode determines whether we should garbage collect a link node from
// the database due to no longer having any open channels with it. If there are
// any left, then this acts as a no-op.
func (db *DB) pruneLinkNode(tx kvdb.RwTx, remotePub *btcec.PublicKey) error {
openChannels, err := db.fetchOpenChannels(tx, remotePub)
func (d *DB) pruneLinkNode(tx kvdb.RwTx, remotePub *btcec.PublicKey) error {
openChannels, err := d.fetchOpenChannels(tx, remotePub)
if err != nil {
return fmt.Errorf("unable to fetch open channels for peer %x: "+
"%v", remotePub.SerializeCompressed(), err)
Expand All @@ -1004,7 +962,7 @@ func (db *DB) pruneLinkNode(tx kvdb.RwTx, remotePub *btcec.PublicKey) error {
log.Infof("Pruning link node %x with zero open channels from database",
remotePub.SerializeCompressed())

return db.deleteLinkNode(tx, remotePub)
return d.deleteLinkNode(tx, remotePub)
}

// PruneLinkNodes attempts to prune all link nodes found within the databse with
Expand Down Expand Up @@ -1140,16 +1098,16 @@ func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr, error) {
// database. If the channel was already removed (has a closed channel entry),
// then we'll return a nil error. Otherwise, we'll insert a new close summary
// into the database.
func (db *DB) AbandonChannel(chanPoint *wire.OutPoint, bestHeight uint32) error {
func (d *DB) AbandonChannel(chanPoint *wire.OutPoint, bestHeight uint32) error {
// With the chanPoint constructed, we'll attempt to find the target
// channel in the database. If we can't find the channel, then we'll
// return the error back to the caller.
dbChan, err := db.FetchChannel(*chanPoint)
dbChan, err := d.FetchChannel(*chanPoint)
switch {
// If the channel wasn't found, then it's possible that it was already
// abandoned from the database.
case err == ErrChannelNotFound:
_, closedErr := db.FetchClosedChannel(chanPoint)
_, closedErr := d.FetchClosedChannel(chanPoint)
if closedErr != nil {
return closedErr
}
Expand Down Expand Up @@ -1312,9 +1270,9 @@ func fetchHistoricalChanBucket(tx kvdb.RTx,

// FetchHistoricalChannel fetches open channel data from the historical channel
// bucket.
func (db *DB) FetchHistoricalChannel(outPoint *wire.OutPoint) (*OpenChannel, error) {
func (d *DB) FetchHistoricalChannel(outPoint *wire.OutPoint) (*OpenChannel, error) {
var channel *OpenChannel
err := kvdb.View(db, func(tx kvdb.RTx) error {
err := kvdb.View(d, func(tx kvdb.RTx) error {
chanBucket, err := fetchHistoricalChanBucket(tx, outPoint)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion channeldb/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -3406,7 +3406,7 @@ func (c *ChannelGraph) MarkEdgeZombie(chanID uint64,
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

err := kvdb.Batch(c.db, func(tx kvdb.RwTx) error {
err := kvdb.Batch(c.db.Backend, func(tx kvdb.RwTx) error {
edges := tx.ReadWriteBucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
Expand Down
24 changes: 12 additions & 12 deletions channeldb/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ type LinkNode struct {

// NewLinkNode creates a new LinkNode from the provided parameters, which is
// backed by an instance of channeldb.
func (db *DB) NewLinkNode(bitNet wire.BitcoinNet, pub *btcec.PublicKey,
func (d *DB) NewLinkNode(bitNet wire.BitcoinNet, pub *btcec.PublicKey,
addrs ...net.Addr) *LinkNode {

return &LinkNode{
Network: bitNet,
IdentityPub: pub,
LastSeen: time.Now(),
Addresses: addrs,
db: db,
db: d,
}
}

Expand Down Expand Up @@ -129,13 +129,13 @@ func putLinkNode(nodeMetaBucket kvdb.RwBucket, l *LinkNode) error {

// DeleteLinkNode removes the link node with the given identity from the
// database.
func (db *DB) DeleteLinkNode(identity *btcec.PublicKey) error {
return kvdb.Update(db, func(tx kvdb.RwTx) error {
return db.deleteLinkNode(tx, identity)
func (d *DB) DeleteLinkNode(identity *btcec.PublicKey) error {
return kvdb.Update(d, func(tx kvdb.RwTx) error {
return d.deleteLinkNode(tx, identity)
}, func() {})
}

func (db *DB) deleteLinkNode(tx kvdb.RwTx, identity *btcec.PublicKey) error {
func (d *DB) deleteLinkNode(tx kvdb.RwTx, identity *btcec.PublicKey) error {
nodeMetaBucket := tx.ReadWriteBucket(nodeInfoBucket)
if nodeMetaBucket == nil {
return ErrLinkNodesNotFound
Expand All @@ -148,9 +148,9 @@ func (db *DB) deleteLinkNode(tx kvdb.RwTx, identity *btcec.PublicKey) error {
// FetchLinkNode attempts to lookup the data for a LinkNode based on a target
// identity public key. If a particular LinkNode for the passed identity public
// key cannot be found, then ErrNodeNotFound if returned.
func (db *DB) FetchLinkNode(identity *btcec.PublicKey) (*LinkNode, error) {
func (d *DB) FetchLinkNode(identity *btcec.PublicKey) (*LinkNode, error) {
var linkNode *LinkNode
err := kvdb.View(db, func(tx kvdb.RTx) error {
err := kvdb.View(d, func(tx kvdb.RTx) error {
node, err := fetchLinkNode(tx, identity)
if err != nil {
return err
Expand Down Expand Up @@ -191,10 +191,10 @@ func fetchLinkNode(tx kvdb.RTx, targetPub *btcec.PublicKey) (*LinkNode, error) {

// FetchAllLinkNodes starts a new database transaction to fetch all nodes with
// whom we have active channels with.
func (db *DB) FetchAllLinkNodes() ([]*LinkNode, error) {
func (d *DB) FetchAllLinkNodes() ([]*LinkNode, error) {
var linkNodes []*LinkNode
err := kvdb.View(db, func(tx kvdb.RTx) error {
nodes, err := db.fetchAllLinkNodes(tx)
err := kvdb.View(d, func(tx kvdb.RTx) error {
nodes, err := d.fetchAllLinkNodes(tx)
if err != nil {
return err
}
Expand All @@ -213,7 +213,7 @@ func (db *DB) FetchAllLinkNodes() ([]*LinkNode, error) {

// fetchAllLinkNodes uses an existing database transaction to fetch all nodes
// with whom we have active channels with.
func (db *DB) fetchAllLinkNodes(tx kvdb.RTx) ([]*LinkNode, error) {
func (d *DB) fetchAllLinkNodes(tx kvdb.RTx) ([]*LinkNode, error) {
nodeMetaBucket := tx.ReadBucket(nodeInfoBucket)
if nodeMetaBucket == nil {
return nil, ErrLinkNodesNotFound
Expand Down
Loading

0 comments on commit 4c010e8

Please sign in to comment.