From 4599b0eac358428edab58dab26f5bebf4da3ab78 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 30 Aug 2018 20:33:57 -0700 Subject: [PATCH 1/7] autopilot/agent: track pending connections This commit modifies the autopilot agent to track all pending connection requests, and forgo further attempts if a connection is already present. Previously, the agent would try and establish hundreds of requests to a node, especially if the connections were timing out and not returning. This resulted in an OOM OMM when cranking up maxchannels to 200, since there would be close to 10k pending connections before the program was terminated. The issue was compounded by periodic batch timeouts, causing autopilot to try and process thousands of triggers for failing connections to the same peer. With these fixes, autopilot will skip nodes that we are trying to connect to during heuristic selection. The CPU and memory utilization have been significantly reduced as a result. --- autopilot/agent.go | 124 ++++++++++++++++++++++++++++++--------------- 1 file changed, 82 insertions(+), 42 deletions(-) diff --git a/autopilot/agent.go b/autopilot/agent.go index 8cd93be691..e91a9fc699 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -301,22 +301,26 @@ func (a *Agent) OnChannelClose(closedChans ...lnwire.ShortChannelID) { } // mergeNodeMaps merges the Agent's set of nodes that it already has active -// channels open to, with the set of nodes that are pending new channels. This -// ensures that the Agent doesn't attempt to open any "duplicate" channels to -// the same node. -func mergeNodeMaps(a map[NodeID]struct{}, b map[NodeID]struct{}, - c map[NodeID]Channel) map[NodeID]struct{} { - - res := make(map[NodeID]struct{}, len(a)+len(b)+len(c)) - for nodeID := range a { - res[nodeID] = struct{}{} - } - for nodeID := range b { - res[nodeID] = struct{}{} +// channels open to, with the other sets of nodes that should be removed from +// consideration during heuristic selection. This ensures that the Agent doesn't +// attempt to open any "duplicate" channels to the same node. +func mergeNodeMaps(c map[NodeID]Channel, + skips ...map[NodeID]struct{}) map[NodeID]struct{} { + + numNodes := len(c) + for _, skip := range skips { + numNodes += len(skip) } + + res := make(map[NodeID]struct{}, len(c)+numNodes) for nodeID := range c { res[nodeID] = struct{}{} } + for _, skip := range skips { + for nodeID := range skip { + res[nodeID] = struct{}{} + } + } return res } @@ -360,6 +364,11 @@ func (a *Agent) controller() { // channels with, but didn't succeed. failedNodes := make(map[NodeID]struct{}) + // pendingConns tracks the nodes that we are attempting to make + // connections to. This prevents us from making duplicate connection + // requests to the same node. + pendingConns := make(map[NodeID]struct{}) + // pendingOpens tracks the channels that we've requested to be // initiated, but haven't yet been confirmed as being fully opened. // This state is required as otherwise, we may go over our allotted @@ -481,7 +490,9 @@ func (a *Agent) controller() { // duplicate edges. connectedNodes := a.chanState.ConnectedNodes() pendingMtx.Lock() - nodesToSkip := mergeNodeMaps(connectedNodes, failedNodes, pendingOpens) + nodesToSkip := mergeNodeMaps(pendingOpens, + pendingConns, connectedNodes, failedNodes, + ) pendingMtx.Unlock() // If we reach this point, then according to our heuristic we @@ -507,32 +518,40 @@ func (a *Agent) controller() { log.Infof("Attempting to execute channel attachment "+ "directives: %v", spew.Sdump(chanCandidates)) + // Before proceeding, check to see if we have any slots + // available to open channels. If there are any, we will attempt + // to dispatch the retrieved directives since we can't be + // certain which ones may actually succeed. If too many + // connections succeed, we will they will be ignored and made + // available to future heuristic selections. + pendingMtx.Lock() + if uint16(len(pendingOpens)) >= a.cfg.MaxPendingOpens { + pendingMtx.Unlock() + log.Debugf("Reached cap of %v pending "+ + "channel opens, will retry "+ + "after success/failure", + a.cfg.MaxPendingOpens) + continue + } + // For each recommended attachment directive, we'll launch a // new goroutine to attempt to carry out the directive. If any // of these succeed, then we'll receive a new state update, // taking us back to the top of our controller loop. - pendingMtx.Lock() for _, chanCandidate := range chanCandidates { - // Before we proceed, we'll check to see if this - // attempt would take us past the total number of - // allowed pending opens. If so, then we'll skip this - // round and wait for an attempt to either fail or - // succeed. - if uint16(len(pendingOpens))+1 > - a.cfg.MaxPendingOpens { - - log.Debugf("Reached cap of %v pending "+ - "channel opens, will retry "+ - "after success/failure", - a.cfg.MaxPendingOpens) + // Skip candidates which we are already trying + // to establish a connection with. + nodeID := chanCandidate.NodeID + if _, ok := pendingConns[nodeID]; ok { continue } + pendingConns[nodeID] = struct{}{} go func(directive AttachmentDirective) { // We'll start out by attempting to connect to // the peer in order to begin the funding // workflow. - pub := directive.PeerKey + pub := directive.NodeKey alreadyConnected, err := a.cfg.ConnectToPeer( pub, directive.Addrs, ) @@ -548,6 +567,7 @@ func (a *Agent) controller() { // again. nodeID := NewNodeID(pub) pendingMtx.Lock() + delete(pendingConns, nodeID) failedNodes[nodeID] = struct{}{} pendingMtx.Unlock() @@ -558,24 +578,31 @@ func (a *Agent) controller() { return } - // If we were succesful, we'll track this peer - // in our set of pending opens. We do this here - // to ensure we don't stall on selecting new - // peers if the connection attempt happens to - // take too long. + // The connection was successful, though before + // progressing we must check that we have not + // already met our quota for max pending open + // channels. This can happen if multiple + // directives were spawned but fewer slots were + // available, and other successful attempts + // finished first. pendingMtx.Lock() - if uint16(len(pendingOpens))+1 > + if uint16(len(pendingOpens)) >= a.cfg.MaxPendingOpens { - - pendingMtx.Unlock() - - // Since we've reached our max number - // of pending opens, we'll disconnect - // this peer and exit. However, if we - // were previously connected to them, - // then we'll make sure to maintain the + // Since we've reached our max number of + // pending opens, we'll disconnect this + // peer and exit. However, if we were + // previously connected to them, then + // we'll make sure to maintain the // connection alive. if alreadyConnected { + // Since we succeeded in + // connecting, we won't add this + // peer to the failed nodes map, + // but we will remove it from + // pendingConns so that it can + // be retried in the future. + delete(pendingConns, nodeID) + pendingMtx.Unlock() return } @@ -589,10 +616,23 @@ func (a *Agent) controller() { pub.SerializeCompressed(), err) } + + // Now that we have disconnected, we can + // remove this node from our pending + // conns map, permitting subsequent + // connection attempts. + delete(pendingConns, nodeID) + pendingMtx.Unlock() return } - nodeID := NewNodeID(directive.PeerKey) + // If we were successful, we'll track this peer + // in our set of pending opens. We do this here + // to ensure we don't stall on selecting new + // peers if the connection attempt happens to + // take too long. + nodeID := directive.NodeID + delete(pendingConns, nodeID) pendingOpens[nodeID] = Channel{ Capacity: directive.ChanAmt, Node: nodeID, From 08b6bf54fb73e84d6478f02fd7778a2e2b4d5c1f Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 30 Aug 2018 20:40:52 -0700 Subject: [PATCH 2/7] autopilot/interface: expose NodeID in AttachmentDirective --- autopilot/interface.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/autopilot/interface.go b/autopilot/interface.go index 7f632d89b7..2b7155fee0 100644 --- a/autopilot/interface.go +++ b/autopilot/interface.go @@ -90,6 +90,9 @@ type AttachmentDirective struct { // a ChannelOpener implementation to execute the directive. PeerKey *btcec.PublicKey + // NodeID is the serialized compressed pubkey of the target node. + NodeID NodeID + // ChanAmt is the size of the channel that should be opened, expressed // in satoshis. ChanAmt btcutil.Amount From c214bea9db867a7ae756eb5d7fdcc2a83fb1b4d7 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 30 Aug 2018 20:41:12 -0700 Subject: [PATCH 3/7] autopilot/prefattach: set NodeID for selection candidates --- autopilot/prefattach.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/autopilot/prefattach.go b/autopilot/prefattach.go index fd1913ed1f..4bf00c25eb 100644 --- a/autopilot/prefattach.go +++ b/autopilot/prefattach.go @@ -249,7 +249,8 @@ func (p *ConstrainedPrefAttachment) Select(self *btcec.PublicKey, g ChannelGraph X: pub.X, Y: pub.Y, }, - Addrs: selectedNode.Addrs(), + NodeID: NewNodeID(pub), + Addrs: selectedNode.Addrs(), }) // With the node selected, we'll add it to the set of visited From 7d9483c20d2525a37cabb2ef7201d82ea9e2d943 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 31 Aug 2018 01:12:20 -0700 Subject: [PATCH 4/7] autopilot/agent_test: adds TestAgentSkipPendingConns Adds a test asserting that the agent prevents itself from making duplicate outstanding connection requests to the same peer. --- autopilot/agent_test.go | 178 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 178 insertions(+) diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index 113aad42ed..e68ff60e04 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -3,6 +3,7 @@ package autopilot import ( "bytes" "errors" + "fmt" "net" "sync" "testing" @@ -1310,3 +1311,180 @@ func TestAgentOnNodeUpdates(t *testing.T) { t.Fatalf("Select was not called but should have been") } } + +// TestAgentSkipPendingConns asserts that the agent will not try to make +// duplicate connection requests to the same node, even if the attachment +// heuristic instructs the agent to do so. It also asserts that the agent +// stops tracking the pending connection once it finishes. Note that in +// practice, a failed connection would be inserted into the skip map passed to +// the attachment heuristic, though this does not assert that case. +func TestAgentSkipPendingConns(t *testing.T) { + t.Parallel() + + // First, we'll create all the dependencies that we'll need in order to + // create the autopilot agent. + self, err := randKey() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + heuristic := &mockHeuristic{ + moreChansResps: make(chan moreChansResp), + directiveResps: make(chan []AttachmentDirective), + } + chanController := &mockChanController{ + openChanSignals: make(chan openChanIntent), + } + memGraph, _, _ := newMemChanGraph() + + // The wallet will start with 6 BTC available. + const walletBalance = btcutil.SatoshiPerBitcoin * 6 + + connect := make(chan chan error) + + // With the dependencies we created, we can now create the initial + // agent itself. + testCfg := Config{ + Self: self, + Heuristic: heuristic, + ChanController: chanController, + WalletBalance: func() (btcutil.Amount, error) { + return walletBalance, nil + }, + ConnectToPeer: func(*btcec.PublicKey, []net.Addr) (bool, error) { + errChan := make(chan error) + connect <- errChan + err := <-errChan + return false, err + }, + DisconnectPeer: func(*btcec.PublicKey) error { + return nil + }, + Graph: memGraph, + MaxPendingOpens: 10, + } + initialChans := []Channel{} + agent, err := New(testCfg, initialChans) + if err != nil { + t.Fatalf("unable to create agent: %v", err) + } + + // To ensure the heuristic doesn't block on quitting the agent, we'll + // use the agent's quit chan to signal when it should also stop. + heuristic.quit = agent.quit + + // With the autopilot agent and all its dependencies we'll start the + // primary controller goroutine. + if err := agent.Start(); err != nil { + t.Fatalf("unable to start agent: %v", err) + } + defer agent.Stop() + + // We'll send an initial "yes" response to advance the agent past its + // initial check. This will cause it to try to get directives from the + // graph. + select { + case heuristic.moreChansResps <- moreChansResp{ + needMore: true, + numMore: 1, + amt: walletBalance, + }: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + + // Next, the agent should deliver a query to the Select method of the + // heuristic. We'll only return a single directive for a pre-chosen + // node. + nodeKey, err := randKey() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + nodeDirective := AttachmentDirective{ + PeerKey: nodeKey, + NodeID: NewNodeID(nodeKey), + ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin, + Addrs: []net.Addr{ + &net.TCPAddr{ + IP: bytes.Repeat([]byte("a"), 16), + }, + }, + } + select { + case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + + var errChan chan error + select { + case errChan = <-connect: + case <-time.After(time.Second * 10): + t.Fatalf("agent did not attempt connection") + } + + // Signal the agent to go again, now that we've tried to connect. + agent.OnNodeUpdates() + + // The heuristic again informs the agent that we need more channels. + select { + case heuristic.moreChansResps <- moreChansResp{ + needMore: true, + numMore: 1, + amt: walletBalance, + }: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + + // Send a directive for the same node, which already has a pending conn. + select { + case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + + // This time, the agent should skip trying to connect to the node with a + // pending connection. + select { + case <-connect: + t.Fatalf("agent should not have attempted connection") + case <-time.After(time.Second * 3): + } + + // Now, timeout the original request, which should still be waiting for + // a response. + select { + case errChan <- fmt.Errorf("connection timeout"): + case <-time.After(time.Second * 10): + t.Fatalf("agent did not receive connection timeout") + } + + // Signal the agent to try again, now that there are no pending conns. + agent.OnNodeUpdates() + + // The heuristic again informs the agent that we need more channels. + select { + case heuristic.moreChansResps <- moreChansResp{ + needMore: true, + numMore: 1, + amt: walletBalance, + }: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + + // Send a directive for the same node, which already has a pending conn. + select { + case heuristic.directiveResps <- []AttachmentDirective{nodeDirective}: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } + + // This time, the agent should try the connection since the peer has + // been removed from the pending map. + select { + case <-connect: + case <-time.After(time.Second * 10): + t.Fatalf("agent have attempted connection") + } +} From 2b578e06fc6f8eb0ab458cc3beb7104de610c97e Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 31 Aug 2018 01:02:27 -0700 Subject: [PATCH 5/7] autopilot/agent_test: ensure directives use unique keys This commit ensures that the mock attachment directives use unique keys, ensuring that they aren't skipped due to already having pending connection requests. The tests fail when they're all the same since they collide in the pendingConns map. --- autopilot/agent_test.go | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index e68ff60e04..d572c17c0d 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -345,6 +345,7 @@ func TestAgentChannelFailureSignal(t *testing.T) { // attempt to open a channel. var fakeDirective = AttachmentDirective{ PeerKey: self, + NodeID: NewNodeID(self), ChanAmt: btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ &net.TCPAddr{ @@ -726,9 +727,16 @@ func TestAgentImmediateAttach(t *testing.T) { // requests attachment directives. We'll generate 5 mock directives so // it can progress within its loop. directives := make([]AttachmentDirective, numChans) + nodeKeys := make(map[NodeID]struct{}) for i := 0; i < numChans; i++ { + pub, err := randKey() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } + nodeID := NewNodeID(pub) directives[i] = AttachmentDirective{ - PeerKey: self, + PeerKey: pub, + NodeID: nodeID, ChanAmt: btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ &net.TCPAddr{ @@ -736,6 +744,7 @@ func TestAgentImmediateAttach(t *testing.T) { }, }, } + nodeKeys[nodeID] = struct{}{} } wg = sync.WaitGroup{} @@ -767,11 +776,13 @@ func TestAgentImmediateAttach(t *testing.T) { t.Fatalf("invalid chan amt: expected %v, got %v", btcutil.SatoshiPerBitcoin, openChan.amt) } - if !openChan.target.IsEqual(self) { - t.Fatalf("unexpected key: expected %x, got %x", - self.SerializeCompressed(), - openChan.target.SerializeCompressed()) + nodeID := NewNodeID(openChan.target) + _, ok := nodeKeys[nodeID] + if !ok { + t.Fatalf("unexpected key: %v, not found", + nodeID) } + delete(nodeKeys, nodeID) case <-time.After(time.Second * 10): t.Fatalf("channel not opened in time") } @@ -873,8 +884,13 @@ func TestAgentPrivateChannels(t *testing.T) { // it can progress within its loop. directives := make([]AttachmentDirective, numChans) for i := 0; i < numChans; i++ { + pub, err := randKey() + if err != nil { + t.Fatalf("unable to generate key: %v", err) + } directives[i] = AttachmentDirective{ - PeerKey: self, + PeerKey: pub, + NodeID: NewNodeID(pub), ChanAmt: btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ &net.TCPAddr{ @@ -1015,6 +1031,7 @@ func TestAgentPendingChannelState(t *testing.T) { nodeID := NewNodeID(nodeKey) nodeDirective := AttachmentDirective{ PeerKey: nodeKey, + NodeID: nodeID, ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ &net.TCPAddr{ @@ -1073,7 +1090,7 @@ func TestAgentPendingChannelState(t *testing.T) { } if req.chans[0].Node != nodeID { t.Fatalf("wrong node ID: expected %x, got %x", - req.chans[0].Node[:], nodeID) + nodeID, req.chans[0].Node[:]) } case <-time.After(time.Second * 10): t.Fatalf("need more chans wasn't queried in time") From e702a6a2661f8208ddbd109896c88a04a975f82e Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Sat, 1 Sep 2018 18:30:24 -0700 Subject: [PATCH 6/7] autopilot/agent_test: remove Fatalf calls from goroutines --- autopilot/agent_test.go | 386 +++++++++++++--------------------------- 1 file changed, 121 insertions(+), 265 deletions(-) diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index d572c17c0d..61f0ded8a9 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -187,22 +187,13 @@ func TestAgentChannelOpenSignal(t *testing.T) { } defer agent.Stop() - var wg sync.WaitGroup - // We'll send an initial "no" response to advance the agent past its // initial check. - wg.Add(1) - go func() { - select { - case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: - wg.Done() - return - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() - - wg.Wait() + select { + case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // Next we'll signal a new channel being opened by the backing LN node, // with a capacity of 1 BTC. @@ -212,34 +203,20 @@ func TestAgentChannelOpenSignal(t *testing.T) { } agent.OnChannelOpen(newChan) - wg = sync.WaitGroup{} - // The agent should now query the heuristic in order to determine its // next action as it local state has now been modified. - wg.Add(1) - go func() { - select { - case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: - // At this point, the local state of the agent should - // have also been updated to reflect that the LN node - // now has an additional channel with one BTC. - if _, ok := agent.chanState[newChan.ChanID]; !ok { - t.Fatalf("internal channel state wasn't updated") - } - - // With all of our assertions passed, we'll signal the - // main test goroutine to continue the test. - wg.Done() - return - - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") + select { + case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: + // At this point, the local state of the agent should + // have also been updated to reflect that the LN node + // now has an additional channel with one BTC. + if _, ok := agent.chanState[newChan.ChanID]; !ok { + t.Fatalf("internal channel state wasn't updated") } - }() - // We'll wait here for either the agent to query the heuristic to be - // queried, or for the timeout above to tick. - wg.Wait() + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // There shouldn't be a call to the Select method as we've returned // "false" for NeedMoreChans above. @@ -443,55 +420,32 @@ func TestAgentChannelCloseSignal(t *testing.T) { } defer agent.Stop() - var wg sync.WaitGroup - // We'll send an initial "no" response to advance the agent past its // initial check. - wg.Add(1) - go func() { - select { - case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: - wg.Done() - return - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() - - wg.Wait() + select { + case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // Next, we'll close both channels which should force the agent to // re-query the heuristic. agent.OnChannelClose(initialChans[0].ChanID, initialChans[1].ChanID) - wg = sync.WaitGroup{} - // The agent should now query the heuristic in order to determine its // next action as it local state has now been modified. - wg.Add(1) - go func() { - select { - case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: - // At this point, the local state of the agent should - // have also been updated to reflect that the LN node - // has no existing open channels. - if len(agent.chanState) != 0 { - t.Fatalf("internal channel state wasn't updated") - } - - // With all of our assertions passed, we'll signal the - // main test goroutine to continue the test. - wg.Done() - return - - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") + select { + case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: + // At this point, the local state of the agent should + // have also been updated to reflect that the LN node + // has no existing open channels. + if len(agent.chanState) != 0 { + t.Fatalf("internal channel state wasn't updated") } - }() - // We'll wait here for either the agent to query the heuristic to be - // queried, or for the timeout above to tick. - wg.Wait() + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // There shouldn't be a call to the Select method as we've returned // "false" for NeedMoreChans above. @@ -569,22 +523,13 @@ func TestAgentBalanceUpdate(t *testing.T) { } defer agent.Stop() - var wg sync.WaitGroup - // We'll send an initial "no" response to advance the agent past its // initial check. - wg.Add(1) - go func() { - select { - case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: - wg.Done() - return - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() - - wg.Wait() + select { + case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // Next we'll send a new balance update signal to the agent, adding 5 // BTC to the amount of available funds. @@ -594,36 +539,22 @@ func TestAgentBalanceUpdate(t *testing.T) { agent.OnBalanceChange() - wg = sync.WaitGroup{} - // The agent should now query the heuristic in order to determine its // next action as it local state has now been modified. - wg.Add(1) - go func() { - select { - case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: - // At this point, the local state of the agent should - // have also been updated to reflect that the LN node - // now has an additional 5BTC available. - if agent.totalBalance != walletBalance { - t.Fatalf("expected %v wallet balance "+ - "instead have %v", agent.totalBalance, - walletBalance) - } - - // With all of our assertions passed, we'll signal the - // main test goroutine to continue the test. - wg.Done() - return - - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") + select { + case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: + // At this point, the local state of the agent should + // have also been updated to reflect that the LN node + // now has an additional 5BTC available. + if agent.totalBalance != walletBalance { + t.Fatalf("expected %v wallet balance "+ + "instead have %v", agent.totalBalance, + walletBalance) } - }() - // We'll wait here for either the agent to query the heuristic to be - // queried, or for the timeout above to tick. - wg.Wait() + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // There shouldn't be a call to the Select method as we've returned // "false" for NeedMoreChans above. @@ -697,31 +628,24 @@ func TestAgentImmediateAttach(t *testing.T) { } defer agent.Stop() - var wg sync.WaitGroup - const numChans = 5 // The very first thing the agent should do is query the NeedMoreChans // method on the passed heuristic. So we'll provide it with a response // that will kick off the main loop. - wg.Add(1) - go func() { - select { - - // We'll send over a response indicating that it should - // establish more channels, and give it a budget of 5 BTC to do - // so. - case heuristic.moreChansResps <- moreChansResp{true, numChans, 5 * btcutil.SatoshiPerBitcoin}: - wg.Done() - return - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() + select { - // We'll wait here for the agent to query the heuristic. If ti doesn't - // do so within 10 seconds, then the test will fail out. - wg.Wait() + // We'll send over a response indicating that it should + // establish more channels, and give it a budget of 5 BTC to do + // so. + case heuristic.moreChansResps <- moreChansResp{ + needMore: true, + numMore: numChans, + amt: 5 * btcutil.SatoshiPerBitcoin, + }: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // At this point, the agent should now be querying the heuristic to // requests attachment directives. We'll generate 5 mock directives so @@ -747,24 +671,13 @@ func TestAgentImmediateAttach(t *testing.T) { nodeKeys[nodeID] = struct{}{} } - wg = sync.WaitGroup{} - // With our fake directives created, we'll now send then to the agent // as a return value for the Select function. - wg.Add(1) - go func() { - select { - case heuristic.directiveResps <- directives: - wg.Done() - return - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() - - // We'll wait here for either the agent to query the heuristic to be - // queried, or for the timeout above to tick. - wg.Wait() + select { + case heuristic.directiveResps <- directives: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // Finally, we should receive 5 calls to the OpenChannel method with // the exact same parameters that we specified within the attachment @@ -850,34 +763,22 @@ func TestAgentPrivateChannels(t *testing.T) { defer agent.Stop() const numChans = 5 - var wg sync.WaitGroup // The very first thing the agent should do is query the NeedMoreChans // method on the passed heuristic. So we'll provide it with a response - // that will kick off the main loop. - wg.Add(1) - go func() { - defer wg.Done() - - // We'll send over a response indicating that it should - // establish more channels, and give it a budget of 5 BTC to do - // so. - resp := moreChansResp{ - needMore: true, - numMore: numChans, - amt: 5 * btcutil.SatoshiPerBitcoin, - } - select { - case heuristic.moreChansResps <- resp: - return - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() - - // We'll wait here for the agent to query the heuristic. If it doesn't - // do so within 10 seconds, then the test will fail out. - wg.Wait() + // that will kick off the main loop. We'll send over a response + // indicating that it should establish more channels, and give it a + // budget of 5 BTC to do so. + resp := moreChansResp{ + needMore: true, + numMore: numChans, + amt: 5 * btcutil.SatoshiPerBitcoin, + } + select { + case heuristic.moreChansResps <- resp: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // At this point, the agent should now be querying the heuristic to // requests attachment directives. We'll generate 5 mock directives so @@ -902,21 +803,11 @@ func TestAgentPrivateChannels(t *testing.T) { // With our fake directives created, we'll now send then to the agent // as a return value for the Select function. - wg.Add(1) - go func() { - defer wg.Done() - - select { - case heuristic.directiveResps <- directives: - return - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() - - // We'll wait here for either the agent to query the heuristic to be - // queried, or for the timeout above to tick. - wg.Wait() + select { + case heuristic.directiveResps <- directives: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // Finally, we should receive 5 calls to the OpenChannel method, each // specifying that it's for a private channel. @@ -995,29 +886,19 @@ func TestAgentPendingChannelState(t *testing.T) { } defer agent.Stop() - var wg sync.WaitGroup - // Once again, we'll start by telling the agent as part of its first // query, that it needs more channels and has 3 BTC available for - // attachment. - wg.Add(1) - go func() { - select { - - // We'll send over a response indicating that it should - // establish more channels, and give it a budget of 1 BTC to do - // so. - case heuristic.moreChansResps <- moreChansResp{true, 1, btcutil.SatoshiPerBitcoin}: - wg.Done() - return - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() - - // We'll wait for the first query to be consumed. If this doesn't - // happen then the above goroutine will timeout, and fail the test. - wg.Wait() + // attachment. We'll send over a response indicating that it should + // establish more channels, and give it a budget of 1 BTC to do so. + select { + case heuristic.moreChansResps <- moreChansResp{ + needMore: true, + numMore: 1, + amt: btcutil.SatoshiPerBitcoin, + }: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } heuristic.moreChanArgs = make(chan moreChanArg) @@ -1175,16 +1056,11 @@ func TestAgentPendingOpenChannel(t *testing.T) { // We'll send an initial "no" response to advance the agent past its // initial check. - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - select { - case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() + select { + case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // Next, we'll signal that a new channel has been opened, but it is // still pending. @@ -1192,19 +1068,11 @@ func TestAgentPendingOpenChannel(t *testing.T) { // The agent should now query the heuristic in order to determine its // next action as its local state has now been modified. - wg.Add(1) - go func() { - defer wg.Done() - select { - case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() - - // We'll wait here for either the agent to query the heuristic to be - // queried, or for the timeout above to tick. - wg.Wait() + select { + case heuristic.moreChansResps <- moreChansResp{false, 0, 0}: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // There shouldn't be a call to the Select method as we've returned // "false" for NeedMoreChans above. @@ -1272,21 +1140,15 @@ func TestAgentOnNodeUpdates(t *testing.T) { // We'll send an initial "yes" response to advance the agent past its // initial check. This will cause it to try to get directives from an // empty graph. - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - select { - case heuristic.moreChansResps <- moreChansResp{ - needMore: true, - numMore: 2, - amt: walletBalance, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() - wg.Wait() + select { + case heuristic.moreChansResps <- moreChansResp{ + needMore: true, + numMore: 2, + amt: walletBalance, + }: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // Send over an empty list of attachment directives, which should cause // the agent to return to waiting on a new signal. @@ -1303,21 +1165,15 @@ func TestAgentOnNodeUpdates(t *testing.T) { // In response, the agent should wake up and see if it needs more // channels. Since we haven't done anything, we will send the same // response as before since we are still trying to open channels. - var wg2 sync.WaitGroup - wg2.Add(1) - go func() { - defer wg2.Done() - select { - case heuristic.moreChansResps <- moreChansResp{ - needMore: true, - numMore: 2, - amt: walletBalance, - }: - case <-time.After(time.Second * 10): - t.Fatalf("heuristic wasn't queried in time") - } - }() - wg2.Wait() + select { + case heuristic.moreChansResps <- moreChansResp{ + needMore: true, + numMore: 2, + amt: walletBalance, + }: + case <-time.After(time.Second * 10): + t.Fatalf("heuristic wasn't queried in time") + } // Again the agent should pull in the next set of attachment directives. // It's not important that this list is also empty, so long as the node From d5f97f7bdc8d947d3e6c8163edb8ecc9685bd8eb Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 4 Sep 2018 01:55:05 -0700 Subject: [PATCH 7/7] autopilot/multi: replace PubKey -> NodeKey on directive --- autopilot/agent_test.go | 10 +++++----- autopilot/interface.go | 4 ++-- autopilot/prefattach.go | 2 +- autopilot/prefattach_test.go | 10 +++++----- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/autopilot/agent_test.go b/autopilot/agent_test.go index 61f0ded8a9..16099eeb36 100644 --- a/autopilot/agent_test.go +++ b/autopilot/agent_test.go @@ -321,7 +321,7 @@ func TestAgentChannelFailureSignal(t *testing.T) { // request attachment directives, return a fake so the agent will // attempt to open a channel. var fakeDirective = AttachmentDirective{ - PeerKey: self, + NodeKey: self, NodeID: NewNodeID(self), ChanAmt: btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -659,7 +659,7 @@ func TestAgentImmediateAttach(t *testing.T) { } nodeID := NewNodeID(pub) directives[i] = AttachmentDirective{ - PeerKey: pub, + NodeKey: pub, NodeID: nodeID, ChanAmt: btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -790,7 +790,7 @@ func TestAgentPrivateChannels(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } directives[i] = AttachmentDirective{ - PeerKey: pub, + NodeKey: pub, NodeID: NewNodeID(pub), ChanAmt: btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -911,7 +911,7 @@ func TestAgentPendingChannelState(t *testing.T) { } nodeID := NewNodeID(nodeKey) nodeDirective := AttachmentDirective{ - PeerKey: nodeKey, + NodeKey: nodeKey, NodeID: nodeID, ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ @@ -1273,7 +1273,7 @@ func TestAgentSkipPendingConns(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } nodeDirective := AttachmentDirective{ - PeerKey: nodeKey, + NodeKey: nodeKey, NodeID: NewNodeID(nodeKey), ChanAmt: 0.5 * btcutil.SatoshiPerBitcoin, Addrs: []net.Addr{ diff --git a/autopilot/interface.go b/autopilot/interface.go index 2b7155fee0..efa9929857 100644 --- a/autopilot/interface.go +++ b/autopilot/interface.go @@ -85,10 +85,10 @@ type ChannelGraph interface { // AttachmentHeuristic. It details to which node a channel should be created // to, and also the parameters which should be used in the channel creation. type AttachmentDirective struct { - // PeerKey is the target node for this attachment directive. It can be + // NodeKey is the target node for this attachment directive. It can be // identified by its public key, and therefore can be used along with // a ChannelOpener implementation to execute the directive. - PeerKey *btcec.PublicKey + NodeKey *btcec.PublicKey // NodeID is the serialized compressed pubkey of the target node. NodeID NodeID diff --git a/autopilot/prefattach.go b/autopilot/prefattach.go index 4bf00c25eb..22362b7a90 100644 --- a/autopilot/prefattach.go +++ b/autopilot/prefattach.go @@ -245,7 +245,7 @@ func (p *ConstrainedPrefAttachment) Select(self *btcec.PublicKey, g ChannelGraph } directives = append(directives, AttachmentDirective{ // TODO(roasbeef): need curve? - PeerKey: &btcec.PublicKey{ + NodeKey: &btcec.PublicKey{ X: pub.X, Y: pub.Y, }, diff --git a/autopilot/prefattach_test.go b/autopilot/prefattach_test.go index b05bdd526e..d34402776d 100644 --- a/autopilot/prefattach_test.go +++ b/autopilot/prefattach_test.go @@ -349,11 +349,11 @@ func TestConstrainedPrefAttachmentSelectTwoVertexes(t *testing.T) { edge2Pub := edge2.Peer.PubKey() switch { - case bytes.Equal(directive.PeerKey.SerializeCompressed(), edge1Pub[:]): - case bytes.Equal(directive.PeerKey.SerializeCompressed(), edge2Pub[:]): + case bytes.Equal(directive.NodeKey.SerializeCompressed(), edge1Pub[:]): + case bytes.Equal(directive.NodeKey.SerializeCompressed(), edge2Pub[:]): default: t1.Fatalf("attached to unknown node: %x", - directive.PeerKey.SerializeCompressed()) + directive.NodeKey.SerializeCompressed()) } // As the number of funds available exceed the @@ -634,8 +634,8 @@ func TestConstrainedPrefAttachmentSelectSkipNodes(t *testing.T) { // We'll simulate a channel update by adding the nodes // we just establish channel with the to set of nodes // to be skipped. - skipNodes[NewNodeID(directives[0].PeerKey)] = struct{}{} - skipNodes[NewNodeID(directives[1].PeerKey)] = struct{}{} + skipNodes[NewNodeID(directives[0].NodeKey)] = struct{}{} + skipNodes[NewNodeID(directives[1].NodeKey)] = struct{}{} // If we attempt to make a call to the Select function, // without providing any new information, then we