Skip to content

Reattempt proof delivery on node restart #1055

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Aug 8, 2024
2 changes: 1 addition & 1 deletion fn/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (o Option[A]) UnwrapOr(a A) A {
return a
}

// UnwrapPtr is used to extract a reference to a value from an option, and we
// UnwrapToPtr is used to extract a reference to a value from an option, and we
// supply an empty pointer in the case when the Option is empty.
func (o Option[A]) UnwrapToPtr() *A {
var v *A
Expand Down
4 changes: 2 additions & 2 deletions itest/psbt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,8 +1042,8 @@ func testPsbtMultiSend(t *harnessTest) {
ctxt, cancel := context.WithTimeout(ctxb, defaultWaitTimeout)
defer cancel()

// Now that we have the asset created, we'll make a new node that'll
// serve as the node which'll receive the assets.
// With the asset created, we'll set up a new node that will act as the
// receiver of the transfer.
secondTapd := setupTapdHarness(
t.t, t, t.lndHarness.Bob, t.universeServer,
)
Expand Down
254 changes: 220 additions & 34 deletions itest/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
wrpc "github.com/lightninglabs/taproot-assets/taprpc/assetwalletrpc"
"github.com/lightninglabs/taproot-assets/taprpc/mintrpc"
"github.com/lightninglabs/taproot-assets/taprpc/tapdevrpc"
unirpc "github.com/lightninglabs/taproot-assets/taprpc/universerpc"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -573,7 +574,8 @@ func testBasicSendPassiveAsset(t *harnessTest) {

// testReattemptFailedSendHashmailCourier tests that a failed attempt at
// sending an asset proof will be reattempted by the tapd node. This test
// targets the hashmail courier.
// targets the hashmail courier. The proof courier is specified in the test
// list entry.
func testReattemptFailedSendHashmailCourier(t *harnessTest) {
var (
ctxb = context.Background()
Expand Down Expand Up @@ -654,11 +656,6 @@ func testReattemptFailedSendHashmailCourier(t *harnessTest) {

// Simulate a failed attempt at sending the asset proof by stopping
// the receiver node.
//
// The receiving tapd node does not return a proof received confirmation
// message via the universe RPC courier. We can simulate a proof
// transfer failure by stopping the courier service directly and not the
// receiving tapd node.
require.NoError(t.t, t.tapd.stop(false))

// Send asset and then mine to confirm the associated on-chain tx.
Expand All @@ -668,33 +665,80 @@ func testReattemptFailedSendHashmailCourier(t *harnessTest) {
wg.Wait()
}

// testReattemptFailedSendUniCourier tests that a failed attempt at
// sending an asset proof will be reattempted by the tapd node. This test
// targets the universe proof courier.
func testReattemptFailedSendUniCourier(t *harnessTest) {
// testReattemptProofTransferOnTapdRestart tests that a failed attempt at
// transferring a transfer output proof to a proof courier will be reattempted
// by the sending tapd node upon restart. This test targets the universe
// courier.
func testReattemptProofTransferOnTapdRestart(t *harnessTest) {
var (
ctxb = context.Background()
wg sync.WaitGroup
)

// Make a new node which will send the asset to the primary tapd node.
// We expect this node to fail because our send call will time out
// whilst the porter continues to attempt to send the asset.
// For this test we will use the universe server as the proof courier.
proofCourier := t.universeServer

// Make a new tapd node which will send an asset to a receiving tapd
// node.
sendTapd := setupTapdHarness(
t.t, t, t.lndHarness.Bob, t.universeServer,
func(params *tapdHarnessParams) {
params.expectErrExit = true
params.proofCourier = proofCourier
},
)
defer func() {
// Any node that has been started within an itest should be
// explicitly stopped within the same itest.
require.NoError(t.t, sendTapd.stop(!*noDelete))
}()

// Use the primary tapd node as the receiver node.
recvTapd := t.tapd

// Use the sending node to mint an asset for sending.
rpcAssets := MintAssetsConfirmBatch(
t.t, t.lndHarness.Miner.Client, sendTapd,
[]*mintrpc.MintAssetRequest{simpleAssets[0]},
)

genInfo := rpcAssets[0].AssetGenesis

// After minting an asset with the sending node, we need to synchronize
// the Universe state to ensure the receiving node is updated and aware
// of the asset.
t.syncUniverseState(sendTapd, recvTapd, len(rpcAssets))

// Create a new address for the receiver node. We will use the universe
// server as the proof courier.
proofCourierAddr := fmt.Sprintf(
"%s://%s", proof.UniverseRpcCourierType,
proofCourier.service.rpcHost(),
)
t.Logf("Proof courier address: %s", proofCourierAddr)

recvAddr, err := recvTapd.NewAddr(ctxb, &taprpc.NewAddrRequest{
AssetId: genInfo.AssetId,
Amt: 10,
ProofCourierAddr: proofCourierAddr,
})
require.NoError(t.t, err)
AssertAddrCreated(t.t, recvTapd, rpcAssets[0], recvAddr)

// Soon we will be attempting to send an asset to the receiver node. We
// want the attempt to fail until we restart the sending node.
// Therefore, we will take the proof courier service offline.
t.Log("Stopping proof courier service")
require.NoError(t.t, proofCourier.Stop())

// Subscribe to receive asset send events from the sending tapd node.
// Now that the proof courier service is offline, the sending node's
// attempt to transfer the asset proof should fail.
//
// We will soon start the asset transfer process. However, before we
// start, we subscribe to the send events from the sending tapd node so
// that we can be sure that a transfer has been attempted.
events := SubscribeSendEvents(t.t, sendTapd)

// Test to ensure that we receive the expected number of backoff wait
// event notifications.
// This test is executed in a goroutine to ensure that we can receive
// the event notification(s) from the tapd node as the rest of the test
// proceeds.
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -712,7 +756,8 @@ func testReattemptFailedSendUniCourier(t *harnessTest) {
// Expected number of events is one less than the number of
// tries because the first attempt does not count as a backoff
// event.
nodeBackoffCfg := t.tapd.clientCfg.HashMailCourier.BackoffCfg
nodeBackoffCfg :=
sendTapd.clientCfg.UniverseRpcCourier.BackoffCfg
expectedEventCount := nodeBackoffCfg.NumTries - 1

// Context timeout scales with expected number of events.
Expand All @@ -729,33 +774,174 @@ func testReattemptFailedSendUniCourier(t *harnessTest) {
)
}()

// Mint an asset for sending.
// Start asset transfer and then mine to confirm the associated on-chain
// tx. The on-chain tx should be mined successfully, but we expect the
// asset proof transfer to be unsuccessful.
sendResp, _ := sendAssetsToAddr(t, sendTapd, recvAddr)
MineBlocks(t.t, t.lndHarness.Miner.Client, 1, 1)

// Wait to ensure that the asset transfer attempt has been made.
wg.Wait()

// Stop the sending tapd node. This downtime will give us the
// opportunity to restart the proof courier service.
t.Log("Stopping sending tapd node")
require.NoError(t.t, sendTapd.stop(false))

// Restart the proof courier service.
t.Log("Starting proof courier service")
require.NoError(t.t, proofCourier.Start(nil))
t.Logf("Proof courier address: %s", proofCourier.service.rpcHost())

// Ensure that the proof courier address has not changed on restart.
// The port is currently selected opportunistically.
// If the proof courier address has changed the tap address will be
// stale.
newProofCourierAddr := fmt.Sprintf(
"%s://%s", proof.UniverseRpcCourierType,
proofCourier.service.rpcHost(),
)
require.Equal(t.t, proofCourierAddr, newProofCourierAddr)

// Identify receiver's asset transfer output.
require.Len(t.t, sendResp.Transfer.Outputs, 2)
recvOutput := sendResp.Transfer.Outputs[0]

// If the script key of the output is local to the sending node, then
// the receiver's output is the second output.
if recvOutput.ScriptKeyIsLocal {
recvOutput = sendResp.Transfer.Outputs[1]
}

// Formulate a universe key to query the proof courier for the asset
// transfer proof.
uniKey := unirpc.UniverseKey{
Id: &unirpc.ID{
Id: &unirpc.ID_AssetId{
AssetId: genInfo.AssetId,
},
ProofType: unirpc.ProofType_PROOF_TYPE_TRANSFER,
},
LeafKey: &unirpc.AssetKey{
Outpoint: &unirpc.AssetKey_OpStr{
OpStr: recvOutput.Anchor.Outpoint,
},
ScriptKey: &unirpc.AssetKey_ScriptKeyBytes{
ScriptKeyBytes: recvOutput.ScriptKey,
},
},
}

// Ensure that the transfer proof has not reached the proof courier yet.
resp, err := proofCourier.service.QueryProof(ctxb, &uniKey)
require.Nil(t.t, resp)
require.ErrorContains(t.t, err, "no universe proof found")

// Restart the sending tapd node. The node should reattempt to transfer
// the asset proof to the proof courier.
t.Log("Restarting sending tapd node")
require.NoError(t.t, sendTapd.start(false))

require.Eventually(t.t, func() bool {
resp, err = proofCourier.service.QueryProof(ctxb, &uniKey)
return err == nil && resp != nil
}, defaultWaitTimeout, 200*time.Millisecond)

// TODO(ffranr): Modify the receiver node proof retrieval backoff
// schedule such that we can assert that the transfer fully completes
// in a timely and predictable manner.
// AssertNonInteractiveRecvComplete(t.t, recvTapd, 1)
}

// testReattemptFailedSendUniCourier tests that a failed attempt at
// sending an asset proof will be reattempted by the tapd node. This test
// targets the universe proof courier.
func testReattemptFailedSendUniCourier(t *harnessTest) {
var (
ctxb = context.Background()
wg sync.WaitGroup
)

// Make a new node which will send the asset to the primary tapd node.
// We expect this node to fail because our send call will time out
// whilst the porter continues to attempt to send the asset.
sendTapd := setupTapdHarness(
t.t, t, t.lndHarness.Bob, t.universeServer,
func(params *tapdHarnessParams) {
params.expectErrExit = true
},
)

// Use the primary tapd node as the receiver node.
recvTapd := t.tapd

// Use the sending node to mint an asset for sending.
rpcAssets := MintAssetsConfirmBatch(
t.t, t.lndHarness.Miner.Client, sendTapd,
[]*mintrpc.MintAssetRequest{simpleAssets[0]},
)

genInfo := rpcAssets[0].AssetGenesis

// Synchronize the Universe state of the second node, with the main
// node.
t.syncUniverseState(sendTapd, t.tapd, len(rpcAssets))
// After minting an asset with the sending node, we need to synchronize
// the Universe state to ensure the receiving node is updated and aware
// of the asset.
t.syncUniverseState(sendTapd, recvTapd, len(rpcAssets))

// Create a new address for the receiver node.
recvAddr, err := t.tapd.NewAddr(ctxb, &taprpc.NewAddrRequest{
recvAddr, err := recvTapd.NewAddr(ctxb, &taprpc.NewAddrRequest{
AssetId: genInfo.AssetId,
Amt: 10,
})
require.NoError(t.t, err)
AssertAddrCreated(t.t, t.tapd, rpcAssets[0], recvAddr)
AssertAddrCreated(t.t, recvTapd, rpcAssets[0], recvAddr)

// No we will ensure that the expected number of backoff wait event
// notifications are emitted from the sending node.
//
// We identify backoff wait events in a goroutine to ensure that we can
// capture event notifications from the send node while the main
// test continues.
//
// Subscribe to proof transfer send events from the sending tapd node.
events := SubscribeSendEvents(t.t, sendTapd)

wg.Add(1)
go func() {
defer wg.Done()

// Define a target event selector to match the backoff wait
// event. This function selects for a specific event type.
targetEventSelector := func(
event *tapdevrpc.SendAssetEvent) bool {

return AssertSendEventProofTransferBackoffWaitTypeSend(
t, event,
)
}

// Expected number of events is one less than the number of
// tries because the first attempt does not count as a backoff
// event.
nodeBackoffCfg := sendTapd.clientCfg.HashMailCourier.BackoffCfg
expectedEventCount := nodeBackoffCfg.NumTries - 1

// Context timeout scales with expected number of events.
timeout := time.Duration(expectedEventCount) *
defaultProofTransferReceiverAckTimeout

// Allow for some margin for the operations that aren't pure
// waiting on the receiver ACK.
timeout += timeoutMargin

assertAssetNtfsEvent(
t, events, timeout, targetEventSelector,
expectedEventCount,
)
}()

// Simulate a failed attempt at sending the asset proof by stopping
// the proof courier service.
//
// In following the hashmail proof courier protocol, the receiver node
// returns a proof received confirmation message via the courier.
// We can simulate a proof transfer failure by stopping the receiving
// tapd node. The courier service should still be operational.
require.NoError(t.t, t.proofCourier.Stop())

// Send asset and then mine to confirm the associated on-chain tx.
Expand All @@ -765,9 +951,9 @@ func testReattemptFailedSendUniCourier(t *harnessTest) {
wg.Wait()
}

// testReattemptFailedReceiveUniCourier tests that a failed attempt at
// receiving an asset proof will be reattempted by the receiving tapd node. This
// test targets the universe proof courier.
// testReattemptFailedReceiveUniCourier ensures that a failed attempt to receive
// an asset proof is retried by the receiving Tapd node. This test focuses on
// the universe proof courier.
func testReattemptFailedReceiveUniCourier(t *harnessTest) {
ctxb := context.Background()

Expand Down
4 changes: 4 additions & 0 deletions itest/test_list_on_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ var testCases = []*testCase{
name: "reattempt failed send uni courier",
test: testReattemptFailedSendUniCourier,
},
{
name: "reattempt proof transfer on tapd restart",
test: testReattemptProofTransferOnTapdRestart,
},
{
name: "reattempt failed receive uni courier",
test: testReattemptFailedReceiveUniCourier,
Expand Down
Loading
Loading