Skip to content

Commit

Permalink
Merge pull request #1104 from halseth/chainwatcher-handoff-race
Browse files Browse the repository at this point in the history
Fix chainwatcher handoff race
  • Loading branch information
Roasbeef authored May 4, 2018
2 parents ecfde2e + 6909920 commit d72f288
Show file tree
Hide file tree
Showing 10 changed files with 501 additions and 661 deletions.
422 changes: 141 additions & 281 deletions breacharbiter.go

Large diffs are not rendered by default.

116 changes: 73 additions & 43 deletions breacharbiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/btcsuite/btclog"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwallet"
Expand Down Expand Up @@ -949,19 +948,10 @@ func TestBreachHandoffSuccess(t *testing.T) {
defer cleanUpChans()

// Instantiate a breach arbiter to handle the breach of alice's channel.
alicePoint := alice.ChannelPoint()
spendEvents := contractcourt.ChainEventSubscription{
RemoteUnilateralClosure: make(chan *lnwallet.UnilateralCloseSummary, 1),
LocalUnilateralClosure: make(chan *contractcourt.LocalUnilateralCloseInfo, 1),
CooperativeClosure: make(chan struct{}, 1),
ContractBreach: make(chan *lnwallet.BreachRetribution, 1),
ProcessACK: make(chan error, 1),
ChanPoint: *alicePoint,
Cancel: func() {
},
}
contractBreaches := make(chan *ContractBreachEvent)

brar, cleanUpArb, err := createTestArbiter(
t, &spendEvents, alice.State().Db,
t, contractBreaches, alice.State().Db,
)
if err != nil {
t.Fatalf("unable to initialize test breach arbiter: %v", err)
Expand Down Expand Up @@ -1005,13 +995,21 @@ func TestBreachHandoffSuccess(t *testing.T) {

// Signal a spend of the funding transaction and wait for the close
// observer to exit.
spendEvents.ContractBreach <- &lnwallet.BreachRetribution{
BreachTransaction: bobClose.CloseTx,
breach := &ContractBreachEvent{
ChanPoint: *chanPoint,
ProcessACK: make(chan error, 1),
BreachRetribution: &lnwallet.BreachRetribution{
BreachTransaction: bobClose.CloseTx,
},
}
contractBreaches <- breach

// We'll also wait to consume the ACK back from the breach arbiter.
select {
case <-spendEvents.ProcessACK:
case err := <-breach.ProcessACK:
if err != nil {
t.Fatalf("handoff failed: %v", err)
}
case <-time.After(time.Second * 15):
t.Fatalf("breach arbiter didn't send ack back")
}
Expand All @@ -1020,6 +1018,32 @@ func TestBreachHandoffSuccess(t *testing.T) {
// retribution information and the channel should be shown as pending
// force closed.
assertArbiterBreach(t, brar, chanPoint)

// Send another breach event. Since the handoff for this channel was
// already ACKed, the breach arbiter should immediately ACK and ignore
// this event.
breach = &ContractBreachEvent{
ChanPoint: *chanPoint,
ProcessACK: make(chan error, 1),
BreachRetribution: &lnwallet.BreachRetribution{
BreachTransaction: bobClose.CloseTx,
},
}

contractBreaches <- breach

// We'll also wait to consume the ACK back from the breach arbiter.
select {
case err := <-breach.ProcessACK:
if err != nil {
t.Fatalf("handoff failed: %v", err)
}
case <-time.After(time.Second * 15):
t.Fatalf("breach arbiter didn't send ack back")
}

// State should not have changed.
assertArbiterBreach(t, brar, chanPoint)
}

// TestBreachHandoffFail tests that a channel's close observer properly
Expand All @@ -1038,19 +1062,10 @@ func TestBreachHandoffFail(t *testing.T) {
defer cleanUpChans()

// Instantiate a breach arbiter to handle the breach of alice's channel.
alicePoint := alice.ChannelPoint()
spendEvents := contractcourt.ChainEventSubscription{
RemoteUnilateralClosure: make(chan *lnwallet.UnilateralCloseSummary, 1),
LocalUnilateralClosure: make(chan *contractcourt.LocalUnilateralCloseInfo, 1),
CooperativeClosure: make(chan struct{}, 1),
ContractBreach: make(chan *lnwallet.BreachRetribution, 1),
ProcessACK: make(chan error, 1),
ChanPoint: *alicePoint,
Cancel: func() {
},
}
contractBreaches := make(chan *ContractBreachEvent)

brar, cleanUpArb, err := createTestArbiter(
t, &spendEvents, alice.State().Db,
t, contractBreaches, alice.State().Db,
)
if err != nil {
t.Fatalf("unable to initialize test breach arbiter: %v", err)
Expand Down Expand Up @@ -1099,11 +1114,18 @@ func TestBreachHandoffFail(t *testing.T) {
// Signal the notifier to dispatch spend notifications of the funding
// transaction using the transaction from bob's closing summary.
chanPoint := alice.ChanPoint
spendEvents.ContractBreach <- &lnwallet.BreachRetribution{
BreachTransaction: bobClose.CloseTx,
breach := &ContractBreachEvent{
ChanPoint: *chanPoint,
ProcessACK: make(chan error, 1),
BreachRetribution: &lnwallet.BreachRetribution{
BreachTransaction: bobClose.CloseTx,
},
}
contractBreaches <- breach

// We'll also wait to consume the ACK back from the breach arbiter.
select {
case err := <-spendEvents.ProcessACK:
case err := <-breach.ProcessACK:
if err == nil {
t.Fatalf("breach write should have failed")
}
Expand All @@ -1118,7 +1140,7 @@ func TestBreachHandoffFail(t *testing.T) {
assertNotPendingClosed(t, alice)

brar, cleanUpArb, err = createTestArbiter(
t, &spendEvents, alice.State().Db,
t, contractBreaches, alice.State().Db,
)
if err != nil {
t.Fatalf("unable to initialize test breach arbiter: %v", err)
Expand All @@ -1139,11 +1161,21 @@ func TestBreachHandoffFail(t *testing.T) {

// Signal a spend of the funding transaction and wait for the close
// observer to exit. This time we are allowing the handoff to succeed.
spendEvents.ContractBreach <- &lnwallet.BreachRetribution{
BreachTransaction: bobClose.CloseTx,
breach = &ContractBreachEvent{
ChanPoint: *chanPoint,
ProcessACK: make(chan error, 1),
BreachRetribution: &lnwallet.BreachRetribution{
BreachTransaction: bobClose.CloseTx,
},
}

contractBreaches <- breach

select {
case <-spendEvents.ProcessACK:
case err := <-breach.ProcessACK:
if err != nil {
t.Fatalf("handoff failed: %v", err)
}
case <-time.After(time.Second * 15):
t.Fatalf("breach arbiter didn't send ack back")
}
Expand Down Expand Up @@ -1207,7 +1239,7 @@ func assertNotPendingClosed(t *testing.T, c *lnwallet.LightningChannel) {

// createTestArbiter instantiates a breach arbiter with a failing retribution
// store, so that controlled failures can be tested.
func createTestArbiter(t *testing.T, chainEvents *contractcourt.ChainEventSubscription,
func createTestArbiter(t *testing.T, contractBreaches chan *ContractBreachEvent,
db *channeldb.DB) (*breachArbiter, func(), error) {

// Create a failing retribution store, that wraps a normal one.
Expand All @@ -1222,13 +1254,11 @@ func createTestArbiter(t *testing.T, chainEvents *contractcourt.ChainEventSubscr
// Assemble our test arbiter.
notifier := makeMockSpendNotifier()
ba := newBreachArbiter(&BreachConfig{
CloseLink: func(_ *wire.OutPoint, _ htlcswitch.ChannelCloseType) {},
DB: db,
Estimator: &lnwallet.StaticFeeEstimator{FeeRate: 50},
GenSweepScript: func() ([]byte, error) { return nil, nil },
SubscribeChannelEvents: func(_ wire.OutPoint) (*contractcourt.ChainEventSubscription, error) {
return chainEvents, nil
},
CloseLink: func(_ *wire.OutPoint, _ htlcswitch.ChannelCloseType) {},
DB: db,
Estimator: &lnwallet.StaticFeeEstimator{FeeRate: 50},
GenSweepScript: func() ([]byte, error) { return nil, nil },
ContractBreaches: contractBreaches,
Signer: signer,
Notifier: notifier,
PublishTransaction: func(_ *wire.MsgTx) error { return nil },
Expand Down
47 changes: 36 additions & 11 deletions contractcourt/chain_arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ type ChainArbitratorConfig struct {
// TODO(roasbeef): rename, routing based
MarkLinkInactive func(wire.OutPoint) error

// ContractBreach is a function closure that the ChainArbitrator will
// use to notify the breachArbiter about a contract breach. It should
// only return a non-nil error when the breachArbiter has preserved the
// necessary breach info for this channel point, and it is safe to mark
// the channel as pending close in the database.
ContractBreach func(wire.OutPoint, *lnwallet.BreachRetribution) error

// IsOurAddress is a function that returns true if the passed address
// is known to the underlying wallet. Otherwise, false should be
// returned.
Expand Down Expand Up @@ -327,10 +334,19 @@ func (c *ChainArbitrator) Start() error {
// First, we'll create an active chainWatcher for this channel
// to ensure that we detect any relevant on chain events.
chainWatcher, err := newChainWatcher(
channel, c.cfg.Notifier, c.cfg.PreimageDB, c.cfg.Signer,
c.cfg.IsOurAddress, func() error {
// TODO(roasbeef): also need to pass in log?
return c.resolveContract(chanPoint, nil)
chainWatcherConfig{
chanState: channel,
notifier: c.cfg.Notifier,
pCache: c.cfg.PreimageDB,
signer: c.cfg.Signer,
isOurAddr: c.cfg.IsOurAddress,
markChanClosed: func() error {
// TODO(roasbeef): also need to pass in log?
return c.resolveContract(chanPoint, nil)
},
contractBreach: func(retInfo *lnwallet.BreachRetribution) error {
return c.cfg.ContractBreach(chanPoint, retInfo)
},
},
)
if err != nil {
Expand All @@ -339,7 +355,7 @@ func (c *ChainArbitrator) Start() error {

c.activeWatchers[chanPoint] = chainWatcher
channelArb, err := newActiveChannelArbitrator(
channel, c, chainWatcher.SubscribeChannelEvents(false),
channel, c, chainWatcher.SubscribeChannelEvents(),
)
if err != nil {
return err
Expand Down Expand Up @@ -654,9 +670,18 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error
// First, also create an active chainWatcher for this channel to ensure
// that we detect any relevant on chain events.
chainWatcher, err := newChainWatcher(
newChan, c.cfg.Notifier, c.cfg.PreimageDB, c.cfg.Signer,
c.cfg.IsOurAddress, func() error {
return c.resolveContract(chanPoint, nil)
chainWatcherConfig{
chanState: newChan,
notifier: c.cfg.Notifier,
pCache: c.cfg.PreimageDB,
signer: c.cfg.Signer,
isOurAddr: c.cfg.IsOurAddress,
markChanClosed: func() error {
return c.resolveContract(chanPoint, nil)
},
contractBreach: func(retInfo *lnwallet.BreachRetribution) error {
return c.cfg.ContractBreach(chanPoint, retInfo)
},
},
)
if err != nil {
Expand All @@ -668,7 +693,7 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error
// We'll also create a new channel arbitrator instance using this new
// channel, and our internal state.
channelArb, err := newActiveChannelArbitrator(
newChan, c, chainWatcher.SubscribeChannelEvents(false),
newChan, c, chainWatcher.SubscribeChannelEvents(),
)
if err != nil {
return err
Expand Down Expand Up @@ -696,7 +721,7 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error
// TODO(roasbeef): can be used later to provide RPC hook for all channel
// lifetimes
func (c *ChainArbitrator) SubscribeChannelEvents(
chanPoint wire.OutPoint, syncDispatch bool) (*ChainEventSubscription, error) {
chanPoint wire.OutPoint) (*ChainEventSubscription, error) {

// First, we'll attempt to look up the active watcher for this channel.
// If we can't find it, then we'll return an error back to the caller.
Expand All @@ -708,7 +733,7 @@ func (c *ChainArbitrator) SubscribeChannelEvents(

// With the watcher located, we'll request for it to create a new chain
// event subscription client.
return watcher.SubscribeChannelEvents(syncDispatch), nil
return watcher.SubscribeChannelEvents(), nil
}

// BeginCoopChanClose allows the initiator or responder to a cooperative
Expand Down
Loading

0 comments on commit d72f288

Please sign in to comment.