Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
836 changes: 0 additions & 836 deletions channeldb/payment_control.go

This file was deleted.

1,328 changes: 0 additions & 1,328 deletions channeldb/payments.go

Large diffs are not rendered by default.

2,181 changes: 2,181 additions & 0 deletions channeldb/payments_kv_store.go

Large diffs are not rendered by default.

742 changes: 578 additions & 164 deletions channeldb/payment_control_test.go → channeldb/payments_kv_store_test.go

Large diffs are not rendered by default.

390 changes: 21 additions & 369 deletions channeldb/payments_test.go

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,10 @@ type DatabaseInstances struct {
// InvoiceDB is the database that stores information about invoices.
InvoiceDB invoices.InvoiceDB

// KVPaymentsDB is the database that stores all payment related
// information.
KVPaymentsDB *channeldb.KVPaymentsDB
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can just name it PaymentsDB given it will be an interface supporting multiple types of DBs.


// MacaroonDB is the database that stores macaroon root keys.
MacaroonDB kvdb.Backend

Expand Down Expand Up @@ -1215,6 +1219,14 @@ func (d *DefaultDatabaseBuilder) BuildDatabase(
return nil, nil, err
}

// Mount the payments DB which is only KV for now.
//
// TODO(ziggie): Add support for SQL payments DB.
kvPaymentsDB := channeldb.NewKVPaymentsDB(
dbs.ChanStateDB,
)
dbs.KVPaymentsDB = kvPaymentsDB

// Wrap the watchtower client DB and make sure we clean up.
if cfg.WtClient.Active {
dbs.TowerClientDB, err = wtdb.OpenClientDB(
Expand Down
4 changes: 2 additions & 2 deletions routing/control_tower.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (s *controlTowerSubscriberImpl) Updates() <-chan interface{} {
// controlTower is persistent implementation of ControlTower to restrict
// double payment sending.
type controlTower struct {
db *channeldb.PaymentControl
db *channeldb.KVPaymentsDB

// subscriberIndex is used to provide a unique id for each subscriber
// to all payments. This is used to easily remove the subscriber when
Expand All @@ -168,7 +168,7 @@ type controlTower struct {
}

// NewControlTower creates a new instance of the controlTower.
func NewControlTower(db *channeldb.PaymentControl) ControlTower {
func NewControlTower(db *channeldb.KVPaymentsDB) ControlTower {
return &controlTower{
db: db,
subscribersAllPayments: make(
Expand Down
38 changes: 19 additions & 19 deletions routing/control_tower_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestControlTowerSubscribeUnknown(t *testing.T) {

db := initDB(t, false)

pControl := NewControlTower(channeldb.NewPaymentControl(db))
pControl := NewControlTower(channeldb.NewKVPaymentsDB(db))

// Subscription should fail when the payment is not known.
_, err := pControl.SubscribePayment(lntypes.Hash{1})
Expand All @@ -63,7 +63,7 @@ func TestControlTowerSubscribeSuccess(t *testing.T) {

db := initDB(t, false)

pControl := NewControlTower(channeldb.NewPaymentControl(db))
pControl := NewControlTower(channeldb.NewKVPaymentsDB(db))

// Initiate a payment.
info, attempt, preimg, err := genInfo()
Expand Down Expand Up @@ -156,33 +156,33 @@ func TestControlTowerSubscribeSuccess(t *testing.T) {
}
}

// TestPaymentControlSubscribeFail tests that payment updates for a
// TestKVPaymentsDBSubscribeFail tests that payment updates for a
// failed payment are properly sent to subscribers.
func TestPaymentControlSubscribeFail(t *testing.T) {
func TestKVPaymentsDBSubscribeFail(t *testing.T) {
t.Parallel()

t.Run("register attempt, keep failed payments", func(t *testing.T) {
testPaymentControlSubscribeFail(t, true, true)
testKVPaymentsDBSubscribeFail(t, true, true)
})
t.Run("register attempt, delete failed payments", func(t *testing.T) {
testPaymentControlSubscribeFail(t, true, false)
testKVPaymentsDBSubscribeFail(t, true, false)
})
t.Run("no register attempt, keep failed payments", func(t *testing.T) {
testPaymentControlSubscribeFail(t, false, true)
testKVPaymentsDBSubscribeFail(t, false, true)
})
t.Run("no register attempt, delete failed payments", func(t *testing.T) {
testPaymentControlSubscribeFail(t, false, false)
testKVPaymentsDBSubscribeFail(t, false, false)
})
}

// TestPaymentControlSubscribeAllSuccess tests that multiple payments are
// TestKVPaymentsDBSubscribeAllSuccess tests that multiple payments are
// properly sent to subscribers of TrackPayments.
func TestPaymentControlSubscribeAllSuccess(t *testing.T) {
func TestKVPaymentsDBSubscribeAllSuccess(t *testing.T) {
t.Parallel()

db := initDB(t, true)

pControl := NewControlTower(channeldb.NewPaymentControl(db))
pControl := NewControlTower(channeldb.NewKVPaymentsDB(db))

// Initiate a payment.
info1, attempt1, preimg1, err := genInfo()
Expand Down Expand Up @@ -288,14 +288,14 @@ func TestPaymentControlSubscribeAllSuccess(t *testing.T) {
require.Equal(t, attempt2.Route, htlc2.Route, "unexpected htlc route.")
}

// TestPaymentControlSubscribeAllImmediate tests whether already inflight
// TestKVPaymentsDBSubscribeAllImmediate tests whether already inflight
// payments are reported at the start of the SubscribeAllPayments subscription.
func TestPaymentControlSubscribeAllImmediate(t *testing.T) {
func TestKVPaymentsDBSubscribeAllImmediate(t *testing.T) {
t.Parallel()

db := initDB(t, true)

pControl := NewControlTower(channeldb.NewPaymentControl(db))
pControl := NewControlTower(channeldb.NewKVPaymentsDB(db))

// Initiate a payment.
info, attempt, _, err := genInfo()
Expand Down Expand Up @@ -325,14 +325,14 @@ func TestPaymentControlSubscribeAllImmediate(t *testing.T) {
}
}

// TestPaymentControlUnsubscribeSuccess tests that when unsubscribed, there are
// TestKVPaymentsDBUnsubscribeSuccess tests that when unsubscribed, there are
// no more notifications to that specific subscription.
func TestPaymentControlUnsubscribeSuccess(t *testing.T) {
func TestKVPaymentsDBUnsubscribeSuccess(t *testing.T) {
t.Parallel()

db := initDB(t, true)

pControl := NewControlTower(channeldb.NewPaymentControl(db))
pControl := NewControlTower(channeldb.NewKVPaymentsDB(db))

subscription1, err := pControl.SubscribeAllPayments()
require.NoError(t, err, "expected subscribe to succeed, but got: %v")
Expand Down Expand Up @@ -396,12 +396,12 @@ func TestPaymentControlUnsubscribeSuccess(t *testing.T) {
require.Len(t, subscription2.Updates(), 0)
}

func testPaymentControlSubscribeFail(t *testing.T, registerAttempt,
func testKVPaymentsDBSubscribeFail(t *testing.T, registerAttempt,
keepFailedPaymentAttempts bool) {

db := initDB(t, keepFailedPaymentAttempts)

pControl := NewControlTower(channeldb.NewPaymentControl(db))
pControl := NewControlTower(channeldb.NewKVPaymentsDB(db))

// Initiate a payment.
info, attempt, _, err := genInfo()
Expand Down
6 changes: 3 additions & 3 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7527,7 +7527,7 @@ func (r *rpcServer) ListPayments(ctx context.Context,
query.MaxPayments = math.MaxUint64
}

paymentsQuerySlice, err := r.server.miscDB.QueryPayments(query)
paymentsQuerySlice, err := r.server.kvPaymentsDB.QueryPayments(query)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -7608,7 +7608,7 @@ func (r *rpcServer) DeletePayment(ctx context.Context,
rpcsLog.Infof("[DeletePayment] payment_identifier=%v, "+
"failed_htlcs_only=%v", hash, req.FailedHtlcsOnly)

err = r.server.miscDB.DeletePayment(hash, req.FailedHtlcsOnly)
err = r.server.kvPaymentsDB.DeletePayment(hash, req.FailedHtlcsOnly)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -7648,7 +7648,7 @@ func (r *rpcServer) DeleteAllPayments(ctx context.Context,
"failed_htlcs_only=%v", req.FailedPaymentsOnly,
req.FailedHtlcsOnly)

numDeletedPayments, err := r.server.miscDB.DeletePayments(
numDeletedPayments, err := r.server.kvPaymentsDB.DeletePayments(
req.FailedPaymentsOnly, req.FailedHtlcsOnly,
)
if err != nil {
Expand Down
11 changes: 8 additions & 3 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,12 @@ type server struct {

invoicesDB invoices.InvoiceDB

// kvPaymentsDB is the DB that contains all functions for managing
// payments.
//
// TODO(ziggie): Replace with interface.
kvPaymentsDB *channeldb.KVPaymentsDB

aliasMgr *aliasmgr.Manager

htlcSwitch *htlcswitch.Switch
Expand Down Expand Up @@ -678,6 +684,7 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr,
addrSource: addrSource,
miscDB: dbs.ChanStateDB,
invoicesDB: dbs.InvoiceDB,
kvPaymentsDB: dbs.KVPaymentsDB,
cc: cc,
sigPool: lnwallet.NewSigPool(cfg.Workers.Sig, cc.Signer),
writePool: writePool,
Expand Down Expand Up @@ -1127,9 +1134,7 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr,
PathFindingConfig: pathFindingConfig,
}

paymentControl := channeldb.NewPaymentControl(dbs.ChanStateDB)

s.controlTower = routing.NewControlTower(paymentControl)
s.controlTower = routing.NewControlTower(dbs.KVPaymentsDB)

strictPruning := cfg.Bitcoin.Node == "neutrino" ||
cfg.Routing.StrictZombiePruning
Expand Down
Loading