Skip to content

Commit

Permalink
Merge pull request #6322 from Crypt-iQ/handoff_deadlock_fix
Browse files Browse the repository at this point in the history
contractcourt: deadlock fix via temporary unmerged map
  • Loading branch information
Roasbeef authored May 3, 2022
2 parents 05fd4f9 + 0f74c6b commit 54adc69
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 76 deletions.
3 changes: 2 additions & 1 deletion contractcourt/chain_arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,8 @@ func (c *ChainArbitrator) NotifyContractUpdate(chanPoint wire.OutPoint,
return fmt.Errorf("can't find arbitrator for %v", chanPoint)
}

return arbitrator.notifyContractUpdate(update)
arbitrator.notifyContractUpdate(update)
return nil
}

// GetChannelArbitrator safely returns the channel arbitrator for a given
Expand Down
122 changes: 58 additions & 64 deletions contractcourt/channel_arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ var (
// close a channel that's already in the process of doing so.
errAlreadyForceClosed = errors.New("channel is already in the " +
"process of being force closed")

// errChanArbShuttingDown is an error returned when the channel arb is
// shutting down during the hand-off in notifyContractUpdate. This is
// mainly used to be able to notify the original caller (the link) that
// an error occurred.
errChanArbShuttingDown = errors.New("channel arb shutting down")
)

const (
Expand Down Expand Up @@ -335,6 +329,13 @@ type ChannelArbitrator struct {
// currently valid commitment transactions.
activeHTLCs map[HtlcSetKey]htlcSet

// unmergedSet is used to update the activeHTLCs map in two callsites:
// checkLocalChainActions and sweepAnchors. It contains the latest
// updates from the link. It is not deleted from, its entries may be
// replaced on subsequent calls to notifyContractUpdate.
unmergedSet map[HtlcSetKey]htlcSet
unmergedMtx sync.RWMutex

// cfg contains all the functionality that the ChannelArbitrator requires
// to do its duty.
cfg ChannelArbitratorConfig
Expand All @@ -348,11 +349,6 @@ type ChannelArbitrator struct {
// we're watching over will be sent.
signalUpdates chan *signalUpdateMsg

// htlcUpdates is a channel that is sent upon with new updates from the
// active channel. Each time a new commitment state is accepted, the
// set of HTLC's on the new state should be sent across this channel.
htlcUpdates chan *contractUpdateSignal

// activeResolvers is a slice of any active resolvers. This is used to
// be able to signal them for shutdown in the case that we shutdown.
activeResolvers []ContractResolver
Expand Down Expand Up @@ -383,14 +379,27 @@ type ChannelArbitrator struct {
func NewChannelArbitrator(cfg ChannelArbitratorConfig,
htlcSets map[HtlcSetKey]htlcSet, log ArbitratorLog) *ChannelArbitrator {

// Create a new map for unmerged HTLC's as we will overwrite the values
// and want to avoid modifying activeHTLCs directly. This soft copying
// is done to ensure that activeHTLCs isn't reset as an empty map later
// on.
unmerged := make(map[HtlcSetKey]htlcSet)
unmerged[LocalHtlcSet] = htlcSets[LocalHtlcSet]
unmerged[RemoteHtlcSet] = htlcSets[RemoteHtlcSet]

// If the pending set exists, write that as well.
if _, ok := htlcSets[RemotePendingHtlcSet]; ok {
unmerged[RemotePendingHtlcSet] = htlcSets[RemotePendingHtlcSet]
}

return &ChannelArbitrator{
log: log,
blocks: make(chan int32, arbitratorBlockBufferSize),
signalUpdates: make(chan *signalUpdateMsg),
htlcUpdates: make(chan *contractUpdateSignal),
resolutionSignal: make(chan struct{}),
forceCloseReqs: make(chan *forceCloseReq),
activeHTLCs: htlcSets,
unmergedSet: unmerged,
cfg: cfg,
quit: make(chan struct{}),
}
Expand Down Expand Up @@ -819,6 +828,10 @@ func (c *ChannelArbitrator) stateStep(
if confCommitSet != nil {
htlcs = confCommitSet.toActiveHTLCSets()
} else {
// Update the set of activeHTLCs so
// checkLocalChainActions has an up-to-date view of the
// commitments.
c.updateActiveHTLCs()
htlcs = c.activeHTLCs
}
chainActions, err := c.checkLocalChainActions(
Expand Down Expand Up @@ -1215,6 +1228,10 @@ func (c *ChannelArbitrator) sweepAnchors(anchors *lnwallet.AnchorResolutions,
return nil
}

// Update the set of activeHTLCs so that the sweeping routine has an
// up-to-date view of the set of commitments.
c.updateActiveHTLCs()

// Sweep anchors based on different HTLC sets. Notice the HTLC sets may
// differ across commitments, thus their deadline values could vary.
for htlcSet, htlcs := range c.activeHTLCs {
Expand Down Expand Up @@ -2394,39 +2411,40 @@ func (c *ChannelArbitrator) UpdateContractSignals(newSignals *ContractSignals) {
}
}

// contractUpdateSignal is a struct that carries the latest set of
// ContractUpdate for a particular key. It also carries a done chan that should
// be closed by the recipient.
type contractUpdateSignal struct {
// newUpdate contains the latest ContractUpdate for a key.
newUpdate *ContractUpdate

// doneChan is an acknowledgement channel.
doneChan chan struct{}
// notifyContractUpdate updates the ChannelArbitrator's unmerged mappings such
// that it can later be merged with activeHTLCs when calling
// checkLocalChainActions or sweepAnchors. These are the only two places that
// activeHTLCs is used.
func (c *ChannelArbitrator) notifyContractUpdate(upd *ContractUpdate) {
c.unmergedMtx.Lock()
defer c.unmergedMtx.Unlock()

// Update the mapping.
c.unmergedSet[upd.HtlcKey] = newHtlcSet(upd.Htlcs)

log.Tracef("ChannelArbitrator(%v): fresh set of htlcs=%v",
c.cfg.ChanPoint,
newLogClosure(func() string {
return spew.Sdump(upd)
}),
)
}

// notifyContractUpdate notifies the ChannelArbitrator that a new
// ContractUpdate is available from the link. The link will be paused until
// this function returns.
func (c *ChannelArbitrator) notifyContractUpdate(upd *ContractUpdate) error {
done := make(chan struct{})
// updateActiveHTLCs merges the unmerged set of HTLCs from the link with
// activeHTLCs.
func (c *ChannelArbitrator) updateActiveHTLCs() {
c.unmergedMtx.RLock()
defer c.unmergedMtx.RUnlock()

select {
case c.htlcUpdates <- &contractUpdateSignal{
newUpdate: upd,
doneChan: done,
}:
case <-c.quit:
return errChanArbShuttingDown
}
// Update the mapping.
c.activeHTLCs[LocalHtlcSet] = c.unmergedSet[LocalHtlcSet]
c.activeHTLCs[RemoteHtlcSet] = c.unmergedSet[RemoteHtlcSet]

select {
case <-done:
case <-c.quit:
return errChanArbShuttingDown
// If the pending set exists, update that as well.
if _, ok := c.unmergedSet[RemotePendingHtlcSet]; ok {
pendingSet := c.unmergedSet[RemotePendingHtlcSet]
c.activeHTLCs[RemotePendingHtlcSet] = pendingSet
}

return nil
}

// channelAttendant is the primary goroutine that acts at the judicial
Expand Down Expand Up @@ -2498,30 +2516,6 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
// registered the new ShortChannelID.
close(signalUpdate.doneChan)

// A new set of HTLC's has been added or removed from the
// commitment transaction. So we'll update our activeHTLCs map
// accordingly.
case htlcUpdate := <-c.htlcUpdates:
// We'll wipe out our old set of HTLC's for each
// htlcSetKey type included in this update in order to
// only monitor the HTLCs that are still active on this
// target commitment.
htlcKey := htlcUpdate.newUpdate.HtlcKey
c.activeHTLCs[htlcKey] = newHtlcSet(
htlcUpdate.newUpdate.Htlcs,
)

// Now that the activeHTLCs have been updated, we'll
// close the done channel.
close(htlcUpdate.doneChan)

log.Tracef("ChannelArbitrator(%v): fresh set of htlcs=%v",
c.cfg.ChanPoint,
newLogClosure(func() string {
return spew.Sdump(htlcUpdate.newUpdate)
}),
)

// We've cooperatively closed the channel, so we're no longer
// needed. We'll mark the channel as resolved and exit.
case closeInfo := <-c.cfg.ChainEvents.CooperativeClosure:
Expand Down
39 changes: 29 additions & 10 deletions contractcourt/channel_arbitrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,8 +864,7 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) {
HtlcKey: LocalHtlcSet,
Htlcs: htlcSet,
}
err = chanArb.notifyContractUpdate(newUpdate)
require.NoError(t, err)
chanArb.notifyContractUpdate(newUpdate)

errChan := make(chan error, 1)
respChan := make(chan *wire.MsgTx, 1)
Expand Down Expand Up @@ -1872,8 +1871,7 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) {
HtlcKey: htlcKey,
Htlcs: []channeldb.HTLC{danglingHTLC},
}
err = chanArb.notifyContractUpdate(newUpdate)
require.NoError(t, err)
chanArb.notifyContractUpdate(newUpdate)

// At this point, we now have a split commitment state
// from the PoV of the channel arb. There's now an HTLC
Expand Down Expand Up @@ -2061,8 +2059,7 @@ func TestChannelArbitratorPendingExpiredHTLC(t *testing.T) {
HtlcKey: RemoteHtlcSet,
Htlcs: []channeldb.HTLC{pendingHTLC},
}
err = chanArb.notifyContractUpdate(newUpdate)
require.NoError(t, err)
chanArb.notifyContractUpdate(newUpdate)

// We will advance the uptime to 10 seconds which should be still within
// the grace period and should not trigger going to chain.
Expand Down Expand Up @@ -2408,6 +2405,14 @@ func TestSweepAnchors(t *testing.T) {
htlcDust.HtlcIndex: htlcDust,
},
}
chanArb.unmergedSet[LocalHtlcSet] = htlcSet{
incomingHTLCs: map[uint64]channeldb.HTLC{
htlcWithPreimage.HtlcIndex: htlcWithPreimage,
},
outgoingHTLCs: map[uint64]channeldb.HTLC{
htlcDust.HtlcIndex: htlcDust,
},
}

// Setup our remote HTLC set such that no valid HTLCs can be used, thus
// we default to anchorSweepConfTarget.
Expand All @@ -2420,6 +2425,14 @@ func TestSweepAnchors(t *testing.T) {
htlcDust.HtlcIndex: htlcDust,
},
}
chanArb.unmergedSet[RemoteHtlcSet] = htlcSet{
incomingHTLCs: map[uint64]channeldb.HTLC{
htlcSmallExipry.HtlcIndex: htlcSmallExipry,
},
outgoingHTLCs: map[uint64]channeldb.HTLC{
htlcDust.HtlcIndex: htlcDust,
},
}

// Setup out pending remote HTLC set such that we will use the HTLC's
// CLTV from the outgoing HTLC set.
Expand All @@ -2432,6 +2445,14 @@ func TestSweepAnchors(t *testing.T) {
htlcSmallExipry.HtlcIndex: htlcSmallExipry,
},
}
chanArb.unmergedSet[RemotePendingHtlcSet] = htlcSet{
incomingHTLCs: map[uint64]channeldb.HTLC{
htlcDust.HtlcIndex: htlcDust,
},
outgoingHTLCs: map[uint64]channeldb.HTLC{
htlcSmallExipry.HtlcIndex: htlcSmallExipry,
},
}

// Create AnchorResolutions.
anchors := &lnwallet.AnchorResolutions{
Expand Down Expand Up @@ -2561,8 +2582,7 @@ func TestChannelArbitratorAnchors(t *testing.T) {
// preimage available.
Htlcs: []channeldb.HTLC{htlc, htlcWithPreimage},
}
err = chanArb.notifyContractUpdate(newUpdate)
require.NoError(t, err)
chanArb.notifyContractUpdate(newUpdate)

newUpdate = &ContractUpdate{
HtlcKey: RemoteHtlcSet,
Expand All @@ -2571,8 +2591,7 @@ func TestChannelArbitratorAnchors(t *testing.T) {
// incoming HTLC (toRemoteHTLCs) has a lower CLTV.
Htlcs: []channeldb.HTLC{htlc, htlcWithPreimage},
}
err = chanArb.notifyContractUpdate(newUpdate)
require.NoError(t, err)
chanArb.notifyContractUpdate(newUpdate)

errChan := make(chan error, 1)
respChan := make(chan *wire.MsgTx, 1)
Expand Down
2 changes: 1 addition & 1 deletion docs/release-notes/release-notes-0.15.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ gRPC performance metrics (latency to process `GetInfo`, etc)](https://github.com
`golangci-lint` in your `$GOPATH/bin` directory. `make lint` does not
automatically replace it with the new version if the binary already exists!

* [`ChannelLink` in the `htlcswitch` now performs a 2-way handoff instead of a 1-way handoff with its `ChannelArbitrator`.](https://github.com/lightningnetwork/lnd/pull/6221)
* [`ChannelLink` in the `htlcswitch` now performs a 1-way handoff via a temporary map with its `ChannelArbitrator`.](https://github.com/lightningnetwork/lnd/pull/6322)

* [The channel-commit-interval is now clamped to a reasonable timeframe of 1h.](https://github.com/lightningnetwork/lnd/pull/6220)

Expand Down

0 comments on commit 54adc69

Please sign in to comment.