Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 0 additions & 90 deletions discovery/chan_series.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package discovery

import (
"time"

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire"
Expand All @@ -23,13 +21,6 @@ type ChannelGraphTimeSeries interface {
// the remote node.
HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error)

// UpdatesInHorizon returns all known channel and node updates with an
// update timestamp between the start time and end time. We'll use this
// to catch up a remote node to the set of channel updates that they
// may have missed out on within the target chain.
UpdatesInHorizon(chain chainhash.Hash,
startTime time.Time, endTime time.Time) ([]lnwire.Message, error)

// FilterKnownChanIDs takes a target chain, and a set of channel ID's,
// and returns a filtered set of chan ID's. This filtered set of chan
// ID's represents the ID's that we don't know of which were in the
Expand Down Expand Up @@ -92,87 +83,6 @@ func (c *ChanSeries) HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID
return &shortChanID, nil
}

// UpdatesInHorizon returns all known channel and node updates with an update
// timestamp between the start time and end time. We'll use this to catch up a
// remote node to the set of channel updates that they may have missed out on
// within the target chain.
//
// NOTE: This is part of the ChannelGraphTimeSeries interface.
func (c *ChanSeries) UpdatesInHorizon(chain chainhash.Hash,
startTime time.Time, endTime time.Time) ([]lnwire.Message, error) {

var updates []lnwire.Message

// First, we'll query for all the set of channels that have an update
// that falls within the specified horizon.
chansInHorizon, err := c.graph.ChanUpdatesInHorizon(
startTime, endTime,
)
if err != nil {
return nil, err
}
for _, channel := range chansInHorizon {
// If the channel hasn't been fully advertised yet, or is a
// private channel, then we'll skip it as we can't construct a
// full authentication proof if one is requested.
if channel.Info.AuthProof == nil {
continue
}

chanAnn, edge1, edge2, err := CreateChanAnnouncement(
channel.Info.AuthProof, channel.Info, channel.Policy1,
channel.Policy2,
)
if err != nil {
return nil, err
}

updates = append(updates, chanAnn)
if edge1 != nil {
updates = append(updates, edge1)
}
if edge2 != nil {
updates = append(updates, edge2)
}
}

// Next, we'll send out all the node announcements that have an update
// within the horizon as well. We send these second to ensure that they
// follow any active channels they have.
nodeAnnsInHorizon, err := c.graph.NodeUpdatesInHorizon(
startTime, endTime,
)
if err != nil {
return nil, err
}
for _, nodeAnn := range nodeAnnsInHorizon {
// Ensure we only forward nodes that are publicly advertised to
// prevent leaking information about nodes.
isNodePublic, err := c.graph.IsPublicNode(nodeAnn.PubKeyBytes)
if err != nil {
log.Errorf("Unable to determine if node %x is "+
"advertised: %v", nodeAnn.PubKeyBytes, err)
continue
}

if !isNodePublic {
log.Tracef("Skipping forwarding announcement for "+
"node %x due to being unadvertised",
nodeAnn.PubKeyBytes)
continue
}

nodeUpdate, err := nodeAnn.NodeAnnouncement(true)
if err != nil {
return nil, err
}

updates = append(updates, nodeUpdate)
}

return updates, nil
}

// FilterKnownChanIDs takes a target chain, and a set of channel ID's, and
// returns a filtered set of chan ID's. This filtered set of chan ID's
// represents the ID's that we don't know of which were in the passed superSet.
Expand Down
8 changes: 1 addition & 7 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,13 +470,7 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,

// If we've found the message target, then we'll dispatch the
// message directly to it.
if err := syncer.ApplyGossipFilter(m); err != nil {
log.Warnf("Unable to apply gossip filter for peer=%x: "+
"%v", peer.PubKey(), err)

errChan <- err
return errChan
}
syncer.ApplyGossipFilter(m)

errChan <- nil
return errChan
Expand Down
51 changes: 2 additions & 49 deletions discovery/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,58 +939,11 @@ func (g *GossipSyncer) replyShortChanIDs(query *lnwire.QueryShortChanIDs) error
// ApplyGossipFilter applies a gossiper filter sent by the remote node to the
// state machine. Once applied, we'll ensure that we don't forward any messages
// to the peer that aren't within the time range of the filter.
func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) error {
func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) {
g.Lock()
defer g.Unlock()

g.remoteUpdateHorizon = filter

startTime := time.Unix(int64(g.remoteUpdateHorizon.FirstTimestamp), 0)
endTime := startTime.Add(
time.Duration(g.remoteUpdateHorizon.TimestampRange) * time.Second,
)

g.Unlock()

// Now that the remote peer has applied their filter, we'll query the
// database for all the messages that are beyond this filter.
newUpdatestoSend, err := g.cfg.channelSeries.UpdatesInHorizon(
g.cfg.chainHash, startTime, endTime,
)
if err != nil {
return err
}

log.Infof("GossipSyncer(%x): applying new update horizon: start=%v, "+
"end=%v, backlog_size=%v", g.cfg.peerPub[:], startTime, endTime,
len(newUpdatestoSend))

// If we don't have any to send, then we can return early.
if len(newUpdatestoSend) == 0 {
return nil
}

// We'll conclude by launching a goroutine to send out any updates.
g.wg.Add(1)
go func() {
defer g.wg.Done()

for _, msg := range newUpdatestoSend {
err := g.cfg.sendToPeerSync(msg)
switch {
case err == ErrGossipSyncerExiting:
return

case err == lnpeer.ErrPeerExiting:
return

case err != nil:
log.Errorf("Unable to send message for "+
"peer catch up: %v", err)
}
}
}()

return nil
}

// FilterGossipMsgs takes a set of gossip messages, and only send it to a peer
Expand Down
104 changes: 29 additions & 75 deletions discovery/syncer_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package discovery

import (
"errors"
"math"
"reflect"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -76,15 +78,6 @@ func newMockChannelGraphTimeSeries(
func (m *mockChannelGraphTimeSeries) HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error) {
return &m.highestID, nil
}
func (m *mockChannelGraphTimeSeries) UpdatesInHorizon(chain chainhash.Hash,
startTime time.Time, endTime time.Time) ([]lnwire.Message, error) {

m.horizonReq <- horizonQuery{
chain, startTime, endTime,
}

return <-m.horizonResp, nil
}
func (m *mockChannelGraphTimeSeries) FilterKnownChanIDs(chain chainhash.Hash,
superSet []lnwire.ShortChannelID) ([]lnwire.ShortChannelID, error) {

Expand Down Expand Up @@ -344,7 +337,7 @@ func TestGossipSyncerApplyGossipFilter(t *testing.T) {

// First, we'll create a GossipSyncer instance with a canned sendToPeer
// message to allow us to intercept their potential sends.
msgChan, syncer, chanSeries := newTestSyncer(
_, syncer, chanSeries := newTestSyncer(
lnwire.NewShortChanIDFromInt(10), defaultEncoding,
defaultChunkSize,
)
Expand All @@ -355,82 +348,43 @@ func TestGossipSyncerApplyGossipFilter(t *testing.T) {
TimestampRange: uint32(1000),
}

// Before we apply the horizon, we'll dispatch a response to the query
// that the syncer will issue.
// After applying the gossip filter, the chan series should not be
// queried using the updated horizon.
errChan := make(chan error, 1)
var wg sync.WaitGroup
wg.Add(1)
go func() {
select {
case <-time.After(time.Second * 15):
t.Fatalf("no query recvd")

case query := <-chanSeries.horizonReq:
// The syncer should have translated the time range
// into the proper star time.
if remoteHorizon.FirstTimestamp != uint32(query.start.Unix()) {
t.Fatalf("wrong query stamp: expected %v, got %v",
remoteHorizon.FirstTimestamp, query.start)
}
defer wg.Done()

// For this first response, we'll send back an empty
// set of messages. As result, we shouldn't send any
// messages.
chanSeries.horizonResp <- []lnwire.Message{}
select {
// No query received, success.
case <-time.After(3 * time.Second):
errChan <- nil

// Unexpected query received.
case <-chanSeries.horizonReq:
errChan <- errors.New("chan series should not have been " +
"queried")
}
}()

// We'll now attempt to apply the gossip filter for the remote peer.
err := syncer.ApplyGossipFilter(remoteHorizon)
if err != nil {
t.Fatalf("unable to apply filter: %v", err)
}
syncer.ApplyGossipFilter(remoteHorizon)

// There should be no messages in the message queue as we didn't send
// the syncer and messages within the horizon.
select {
case msgs := <-msgChan:
t.Fatalf("expected no msgs, instead got %v", spew.Sdump(msgs))
default:
// Ensure that the syncer's remote horizon was properly updated.
if !reflect.DeepEqual(syncer.remoteUpdateHorizon, remoteHorizon) {
t.Fatalf("expected remote horizon: %v, got: %v",
remoteHorizon, syncer.remoteUpdateHorizon)
}

// If we repeat the process, but give the syncer a set of valid
// messages, then these should be sent to the remote peer.
go func() {
select {
case <-time.After(time.Second * 15):
t.Fatalf("no query recvd")

case query := <-chanSeries.horizonReq:
// The syncer should have translated the time range
// into the proper star time.
if remoteHorizon.FirstTimestamp != uint32(query.start.Unix()) {
t.Fatalf("wrong query stamp: expected %v, got %v",
remoteHorizon.FirstTimestamp, query.start)
}
// Wait for the query check to finish.
wg.Wait()

// For this first response, we'll send back a proper
// set of messages that should be echoed back.
chanSeries.horizonResp <- []lnwire.Message{
&lnwire.ChannelUpdate{
ShortChannelID: lnwire.NewShortChanIDFromInt(25),
Timestamp: unixStamp(5),
},
}
}
}()
err = syncer.ApplyGossipFilter(remoteHorizon)
// Assert that no query was made as a result of applying the gossip
// filter.
err := <-errChan
if err != nil {
t.Fatalf("unable to apply filter: %v", err)
}

// We should get back the exact same message.
select {
case <-time.After(time.Second * 15):
t.Fatalf("no msgs received")

case msgs := <-msgChan:
if len(msgs) != 1 {
t.Fatalf("wrong messages: expected %v, got %v",
1, len(msgs))
}
t.Fatalf(err.Error())
}
}

Expand Down