Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dedup interceptor #5901

Merged
merged 3 commits into from
Oct 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/release-notes/release-notes-0.14.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,10 @@ messages directly. There is no routing/path finding involved.
fixed](https://github.com/lightningnetwork/lnd/pull/5893) in the
[`btcwallet` dependency](https://github.com/btcsuite/btcwallet/pull/773).

* [A bug has been fixed that would at times cause intercepted HTLCs to be
re-notified](https://github.com/lightningnetwork/lnd/pull/5901), which could
lead to higher-level HTLC mismanagement issues.

## Documentation

The [code contribution guidelines have been updated to mention the new
Expand Down
5 changes: 5 additions & 0 deletions lnrpc/routerrpc/forward_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ func (r *forwardInterceptor) holdAndForwardToClient(
htlc := forward.Packet()
inKey := htlc.IncomingCircuit

// Ignore already held htlcs.
if _, ok := r.holdForwards[inKey]; ok {
return nil
}

// First hold the forward, then send to client.
r.holdForwards[inKey] = forward
interceptionRequest := &ForwardHtlcInterceptRequest{
Expand Down
165 changes: 163 additions & 2 deletions lntest/itest/lnd_forward_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,168 @@ type interceptorTestCase struct {
interceptorAction routerrpc.ResolveHoldForwardAction
}

// testForwardInterceptor tests the forward interceptor RPC layer.
// testForwardInterceptorDedupHtlc tests that upon reconnection, duplicate
// HTLCs aren't re-notified using the HTLC interceptor API.
func testForwardInterceptorDedupHtlc(net *lntest.NetworkHarness, t *harnessTest) {
// Initialize the test context with 3 connected nodes.
alice := net.NewNode(t.t, "alice", nil)
defer shutdownAndAssert(net, t, alice)

bob := net.NewNode(t.t, "bob", nil)
defer shutdownAndAssert(net, t, alice)

carol := net.NewNode(t.t, "carol", nil)
defer shutdownAndAssert(net, t, alice)

tc := newInterceptorTestContext(t, net, alice, bob, carol)

const (
chanAmt = btcutil.Amount(300000)
)

// Open and wait for channels.
tc.openChannel(tc.alice, tc.bob, chanAmt)
tc.openChannel(tc.bob, tc.carol, chanAmt)
defer tc.closeChannels()
tc.waitForChannels()

ctxb := context.Background()
ctxt, cancelInterceptor := context.WithCancel(ctxb)
interceptor, err := tc.bob.RouterClient.HtlcInterceptor(ctxt)
require.NoError(tc.t.t, err, "failed to create HtlcInterceptor")

addResponse, err := tc.carol.AddInvoice(ctxb, &lnrpc.Invoice{
ValueMsat: 1000,
})
require.NoError(tc.t.t, err, "unable to add invoice")

invoice, err := tc.carol.LookupInvoice(ctxb, &lnrpc.PaymentHash{
RHashStr: hex.EncodeToString(addResponse.RHash),
})
require.NoError(tc.t.t, err, "unable to find invoice")

// We start the htlc interceptor with a simple implementation that
// saves all intercepted packets. These packets are held to simulate a
// pending payment.
interceptedPacketstMap := &sync.Map{}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
packet, err := interceptor.Recv()
if err != nil {
// If it is just the error result of the
// context cancellation the we exit silently.
status, ok := status.FromError(err)
if ok && status.Code() == codes.Canceled {
return
}

// Otherwise it an unexpected error, we fail
// the test.
require.NoError(
tc.t.t, err,
"unexpected error in interceptor.Recv()",
)
return
}
interceptedPacketstMap.Store(
packet.IncomingCircuitKey.HtlcId, packet,
)
}
}()

// We initiate a payment from Alice.
wg.Add(1)
go func() {
defer wg.Done()
_, _ = tc.sendAliceToCarolPayment(
ctxb, 1000,
invoice.RHash, invoice.PaymentAddr,
)
}()

// Here we should wait for the channel to contain a pending htlc, and
// also be shown as being active.
err = wait.Predicate(func() bool {
channels, err := tc.bob.ListChannels(ctxt, &lnrpc.ListChannelsRequest{
ActiveOnly: true,
Peer: tc.alice.PubKey[:],
})
if err != nil {
return false
}
if len(channels.Channels) == 0 {
return false
}

aliceChan := channels.Channels[0]
if len(aliceChan.PendingHtlcs) == 0 {
return false
}
return aliceChan.Active
}, defaultTimeout)
require.NoError(
tc.t.t, err, "alice <> bob channel pending htlc never arrived",
)

// At this point we want to make bob's link send all pending htlcs to
// the switch again. We force this behavior by disconnecting and
// connecting to the peer.
if err := tc.net.DisconnectNodes(tc.bob, tc.alice); err != nil {
tc.t.Fatalf("failed to disconnect alice and bob")
}
tc.net.EnsureConnected(tc.t.t, tc.bob, tc.alice)

// Here we wait for the channel to be active again.
err = wait.Predicate(func() bool {
req := &lnrpc.ListChannelsRequest{
ActiveOnly: true,
Peer: tc.alice.PubKey[:],
}

channels, err := tc.bob.ListChannels(ctxt, req)
return err == nil && len(channels.Channels) > 0
}, defaultTimeout)
require.NoError(
tc.t.t, err, "alice <> bob channel didn't re-activate",
)

// Now that the channel is active we make sure the test passes as
// expected.
payments, err := tc.alice.ListPayments(ctxb, &lnrpc.ListPaymentsRequest{
IncludeIncomplete: true,
})
require.NoError(tc.t.t, err, "failed to fetch payment")

// We expect one in flight payment since we held the htlcs.
require.Equal(tc.t.t, len(payments.Payments), 1)
require.Equal(tc.t.t, payments.Payments[0].Status, lnrpc.Payment_IN_FLIGHT)

// We now fail all htlcs to cancel the payment.
packetsCount := 0
interceptedPacketstMap.Range(func(_, packet interface{}) bool {
p := packet.(*routerrpc.ForwardHtlcInterceptRequest)
_ = interceptor.Send(&routerrpc.ForwardHtlcInterceptResponse{
IncomingCircuitKey: p.IncomingCircuitKey,
Action: routerrpc.ResolveHoldForwardAction_FAIL,
})
packetsCount++
return true
})

// At this point if we have more than one held htlcs then we should
// fail. This means we hold the same htlc twice which is a risk we
// want to eliminate. If we don't have the same htlc twice in theory we
// can cancel one and settle the other by mistake.
require.Equal(tc.t.t, packetsCount, 1)

cancelInterceptor()
wg.Wait()
}

// testForwardInterceptorBasic tests the forward interceptor RPC layer.
// The test creates a cluster of 3 connected nodes: Alice -> Bob -> Carol
// Alice sends 4 different payments to Carol while the interceptor handles
// differently the htlcs.
Expand All @@ -43,7 +204,7 @@ type interceptorTestCase struct {
// 3. Intercepted held htlcs result in no payment (invoice is not settled).
// 4. When Interceptor disconnects it resumes all held htlcs, which result in
// valid payment (invoice is settled).
func testForwardInterceptor(net *lntest.NetworkHarness, t *harnessTest) {
func testForwardInterceptorBasic(net *lntest.NetworkHarness, t *harnessTest) {
// Initialize the test context with 3 connected nodes.
alice := net.NewNode(t.t, "alice", nil)
defer shutdownAndAssert(net, t, alice)
Expand Down
8 changes: 6 additions & 2 deletions lntest/itest/lnd_test_list_on_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,12 @@ var allTestCases = []*testCase{
test: testRestAPI,
},
{
name: "intercept forwarded htlc packets",
test: testForwardInterceptor,
name: "forward interceptor",
test: testForwardInterceptorBasic,
},
{
name: "forward interceptor dedup htlcs",
test: testForwardInterceptorDedupHtlc,
},
{
name: "wumbo channels",
Expand Down