Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm/pss: Message handler refactor #18169

Merged
merged 17 commits into from
Nov 26, 2018
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions swarm/network/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,15 @@ func NewKadParams() *KadParams {
// Kademlia is a table of live peers and a db of known peers (node records)
type Kademlia struct {
lock sync.RWMutex
*KadParams // Kademlia configuration parameters
base []byte // immutable baseaddress of the table
addrs *pot.Pot // pots container for known peer addresses
conns *pot.Pot // pots container for live peer connections
depth uint8 // stores the last current depth of saturation
nDepth int // stores the last neighbourhood depth
nDepthC chan int // returned by DepthC function to signal neighbourhood depth change
addrCountC chan int // returned by AddrCountC function to signal peer count change
*KadParams // Kademlia configuration parameters
base []byte // immutable baseaddress of the table
addrs *pot.Pot // pots container for known peer addresses
conns *pot.Pot // pots container for live peer connections
depth uint8 // stores the last current depth of saturation
nDepth int // stores the last neighbourhood depth
nDepthC chan int // returned by DepthC function to signal neighbourhood depth change
addrCountC chan int // returned by AddrCountC function to signal peer count change
Pof func(pot.Val, pot.Val, int) (int, bool) // function for calculating kademlia routing distance between two addresses
}

// NewKademlia creates a Kademlia table for base address addr
Expand All @@ -103,6 +104,7 @@ func NewKademlia(addr []byte, params *KadParams) *Kademlia {
KadParams: params,
addrs: pot.NewPot(nil, 0),
conns: pot.NewPot(nil, 0),
Pof: pof,
}
}

Expand Down Expand Up @@ -289,6 +291,7 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) {
// neighbourhood depth on each change.
// Not receiving from the returned channel will block On function
// when the neighbourhood depth is changed.
// TODO: Why is this exported, and if it should be; why can't we have more subscribers than one?
func (k *Kademlia) NeighbourhoodDepthC() <-chan int {
k.lock.Lock()
defer k.lock.Unlock()
Expand Down Expand Up @@ -429,7 +432,12 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int, bool) bool
// neighbourhoodDepth returns the proximity order that defines the distance of
// the nearest neighbour set with cardinality >= MinProxBinSize
// if there is altogether less than MinProxBinSize peers it returns 0
// caller must hold the lock
func (k *Kademlia) NeighbourhoodDepth() (depth int) {
k.lock.RLock()
defer k.lock.RUnlock()
return k.neighbourhoodDepth()
}

func (k *Kademlia) neighbourhoodDepth() (depth int) {
if k.conns.Size() < k.MinProxBinSize {
return 0
Expand Down
12 changes: 9 additions & 3 deletions swarm/pss/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ func NewAPI(ps *Pss) *API {
//
// All incoming messages to the node matching this topic will be encapsulated in the APIMsg
// struct and sent to the subscriber
func (pssapi *API) Receive(ctx context.Context, topic Topic) (*rpc.Subscription, error) {
func (pssapi *API) Receive(ctx context.Context, topic Topic, raw bool, prox bool) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return nil, fmt.Errorf("Subscribe not supported")
}

psssub := notifier.CreateSubscription()

handler := func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
hndlr := NewHandler(func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error {
apimsg := &APIMsg{
Msg: hexutil.Bytes(msg),
Asymmetric: asymmetric,
Expand All @@ -69,9 +69,15 @@ func (pssapi *API) Receive(ctx context.Context, topic Topic) (*rpc.Subscription,
log.Warn(fmt.Sprintf("notification on pss sub topic rpc (sub %v) msg %v failed!", psssub.ID, msg))
}
return nil
})
if raw {
hndlr.caps.raw = true
}
if prox {
hndlr.caps.prox = true
}

deregf := pssapi.Register(&topic, handler)
deregf := pssapi.Register(&topic, hndlr)
go func() {
defer deregf()
select {
Expand Down
2 changes: 1 addition & 1 deletion swarm/pss/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (c *Client) RunProtocol(ctx context.Context, proto *p2p.Protocol) error {
topichex := topicobj.String()
msgC := make(chan pss.APIMsg)
c.peerPool[topicobj] = make(map[string]*pssRPCRW)
sub, err := c.rpc.Subscribe(ctx, "pss", msgC, "receive", topichex)
sub, err := c.rpc.Subscribe(ctx, "pss", msgC, "receive", topichex, false, false)
if err != nil {
return fmt.Errorf("pss event subscription failed: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion swarm/pss/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ func (api *HandshakeAPI) Handshake(pubkeyid string, topic Topic, sync bool, flus

// Activate handshake functionality on a topic
func (api *HandshakeAPI) AddHandshake(topic Topic) error {
api.ctrl.deregisterFuncs[topic] = api.ctrl.pss.Register(&topic, api.ctrl.handler)
api.ctrl.deregisterFuncs[topic] = api.ctrl.pss.Register(&topic, NewHandler(api.ctrl.handler))
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions swarm/pss/notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func NewController(ps *pss.Pss) *Controller {
notifiers: make(map[string]*notifier),
subscriptions: make(map[string]*subscription),
}
ctrl.pss.Register(&controlTopic, ctrl.Handler)
ctrl.pss.Register(&controlTopic, pss.NewHandler(ctrl.Handler))
return ctrl
}

Expand Down Expand Up @@ -336,7 +336,7 @@ func (c *Controller) handleNotifyWithKeyMsg(msg *Msg) error {
// \TODO keep track of and add actual address
updaterAddr := pss.PssAddress([]byte{})
c.pss.SetSymmetricKey(symkey, topic, &updaterAddr, true)
c.pss.Register(&topic, c.Handler)
c.pss.Register(&topic, pss.NewHandler(c.Handler))
return c.subscriptions[msg.namestring].handler(msg.namestring, msg.Payload[:len(msg.Payload)-symKeyLength])
}

Expand Down
4 changes: 2 additions & 2 deletions swarm/pss/notify/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestStart(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
rmsgC := make(chan *pss.APIMsg)
rightSub, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", controlTopic)
rightSub, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", controlTopic, false, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestStart(t *testing.T) {
t.Fatalf("expected payload length %d, have %d", len(updateMsg)+symKeyLength, len(dMsg.Payload))
}

rightSubUpdate, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", rsrcTopic)
rightSubUpdate, err := rightRpc.Subscribe(ctx, "pss", rmsgC, "receive", rsrcTopic, false, false)
if err != nil {
t.Fatal(err)
}
Expand Down
5 changes: 3 additions & 2 deletions swarm/pss/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,15 @@ func testProtocol(t *testing.T) {
lmsgC := make(chan APIMsg)
lctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic)
lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic, false, false)
if err != nil {
t.Fatal(err)
}
defer lsub.Unsubscribe()
rmsgC := make(chan APIMsg)
rctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic)
rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic, false, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -130,6 +130,7 @@ func testProtocol(t *testing.T) {
log.Debug("lnode ok")
case cerr := <-lctx.Done():
t.Fatalf("test message timed out: %v", cerr)
return
}
select {
case <-rmsgC:
Expand Down
Loading