Skip to content

Commit 43e1b7b

Browse files
holisticodenonsense
authored andcommitted
swarm: GetPeerSubscriptions RPC (ethereum#18972)
1 parent 8cfe1a6 commit 43e1b7b

File tree

3 files changed

+257
-1
lines changed

3 files changed

+257
-1
lines changed

swarm/network/stream/stream.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -929,3 +929,27 @@ func (api *API) SubscribeStream(peerId enode.ID, s Stream, history *Range, prior
929929
func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error {
930930
return api.streamer.Unsubscribe(peerId, s)
931931
}
932+
933+
/*
934+
GetPeerSubscriptions is a API function which allows to query a peer for stream subscriptions it has.
935+
It can be called via RPC.
936+
It returns a map of node IDs with an array of string representations of Stream objects.
937+
*/
938+
func (api *API) GetPeerSubscriptions() map[string][]string {
939+
//create the empty map
940+
pstreams := make(map[string][]string)
941+
//iterate all streamer peers
942+
for id, p := range api.streamer.peers {
943+
var streams []string
944+
//every peer has a map of stream servers
945+
//every stream server represents a subscription
946+
for s := range p.servers {
947+
//append the string representation of the stream
948+
//to the list for this peer
949+
streams = append(streams, s.String())
950+
}
951+
//set the array of stream servers to the map
952+
pstreams[id.String()] = streams
953+
}
954+
return pstreams
955+
}

swarm/network/stream/streamer_test.go

Lines changed: 232 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,23 @@ import (
2121
"context"
2222
"errors"
2323
"fmt"
24+
"os"
2425
"strconv"
26+
"strings"
27+
"sync"
2528
"testing"
2629
"time"
2730

2831
"github.com/ethereum/go-ethereum/common"
2932
"github.com/ethereum/go-ethereum/log"
33+
"github.com/ethereum/go-ethereum/node"
3034
"github.com/ethereum/go-ethereum/p2p/enode"
35+
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
3136
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
3237
"github.com/ethereum/go-ethereum/swarm/network"
38+
"github.com/ethereum/go-ethereum/swarm/network/simulation"
39+
"github.com/ethereum/go-ethereum/swarm/state"
40+
"github.com/ethereum/go-ethereum/swarm/storage"
3341
"golang.org/x/crypto/sha3"
3442
)
3543

@@ -1105,7 +1113,6 @@ func TestRequestPeerSubscriptions(t *testing.T) {
11051113
}
11061114
}
11071115
}
1108-
11091116
// print some output
11101117
for p, subs := range fakeSubscriptions {
11111118
log.Debug(fmt.Sprintf("Peer %s has the following fake subscriptions: ", p))
@@ -1114,3 +1121,227 @@ func TestRequestPeerSubscriptions(t *testing.T) {
11141121
}
11151122
}
11161123
}
1124+
1125+
// TestGetSubscriptions is a unit test for the api.GetPeerSubscriptions() function
1126+
func TestGetSubscriptions(t *testing.T) {
1127+
// create an amount of dummy peers
1128+
testPeerCount := 8
1129+
// every peer will have this amount of dummy servers
1130+
testServerCount := 4
1131+
// the peerMap which will store this data for the registry
1132+
peerMap := make(map[enode.ID]*Peer)
1133+
// create the registry
1134+
r := &Registry{}
1135+
api := NewAPI(r)
1136+
// call once, at this point should be empty
1137+
regs := api.GetPeerSubscriptions()
1138+
if len(regs) != 0 {
1139+
t.Fatal("Expected subscription count to be 0, but it is not")
1140+
}
1141+
1142+
// now create a number of dummy servers for each node
1143+
for i := 0; i < testPeerCount; i++ {
1144+
addr := network.RandomAddr()
1145+
id := addr.ID()
1146+
p := &Peer{}
1147+
p.servers = make(map[Stream]*server)
1148+
for k := 0; k < testServerCount; k++ {
1149+
s := Stream{
1150+
Name: strconv.Itoa(k),
1151+
Key: "",
1152+
Live: false,
1153+
}
1154+
p.servers[s] = &server{}
1155+
}
1156+
peerMap[id] = p
1157+
}
1158+
r.peers = peerMap
1159+
1160+
// call the subscriptions again
1161+
regs = api.GetPeerSubscriptions()
1162+
// count how many (fake) subscriptions there are
1163+
cnt := 0
1164+
for _, reg := range regs {
1165+
for range reg {
1166+
cnt++
1167+
}
1168+
}
1169+
// check expected value
1170+
expectedCount := testPeerCount * testServerCount
1171+
if cnt != expectedCount {
1172+
t.Fatalf("Expected %d subscriptions, but got %d", expectedCount, cnt)
1173+
}
1174+
}
1175+
1176+
/*
1177+
TestGetSubscriptionsRPC sets up a simulation network of `nodeCount` nodes,
1178+
starts the simulation, waits for SyncUpdateDelay in order to kick off
1179+
stream registration, then tests that there are subscriptions.
1180+
*/
1181+
func TestGetSubscriptionsRPC(t *testing.T) {
1182+
// arbitrarily set to 4
1183+
nodeCount := 4
1184+
// run with more nodes if `longrunning` flag is set
1185+
if *longrunning {
1186+
nodeCount = 64
1187+
}
1188+
// set the syncUpdateDelay for sync registrations to start
1189+
syncUpdateDelay := 200 * time.Millisecond
1190+
// holds the msg code for SubscribeMsg
1191+
var subscribeMsgCode uint64
1192+
var ok bool
1193+
var expectedMsgCount = 0
1194+
1195+
// this channel signalizes that the expected amount of subscriptiosn is done
1196+
allSubscriptionsDone := make(chan struct{})
1197+
lock := sync.RWMutex{}
1198+
// after the test, we need to reset the subscriptionFunc to the default
1199+
defer func() { subscriptionFunc = doRequestSubscription }()
1200+
1201+
// we use this subscriptionFunc for this test: just increases count and calls the actual subscription
1202+
subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool {
1203+
lock.Lock()
1204+
expectedMsgCount++
1205+
lock.Unlock()
1206+
doRequestSubscription(r, p, bin, subs)
1207+
return true
1208+
}
1209+
// create a standard sim
1210+
sim := simulation.New(map[string]simulation.ServiceFunc{
1211+
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
1212+
n := ctx.Config.Node()
1213+
addr := network.NewAddr(n)
1214+
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
1215+
if err != nil {
1216+
return nil, nil, err
1217+
}
1218+
localStore := store.(*storage.LocalStore)
1219+
netStore, err := storage.NewNetStore(localStore, nil)
1220+
if err != nil {
1221+
return nil, nil, err
1222+
}
1223+
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
1224+
delivery := NewDelivery(kad, netStore)
1225+
netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
1226+
// configure so that sync registrations actually happen
1227+
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
1228+
Retrieval: RetrievalEnabled,
1229+
Syncing: SyncingAutoSubscribe, //enable sync registrations
1230+
SyncUpdateDelay: syncUpdateDelay,
1231+
}, nil)
1232+
// get the SubscribeMsg code
1233+
subscribeMsgCode, ok = r.GetSpec().GetCode(SubscribeMsg{})
1234+
if !ok {
1235+
t.Fatal("Message code for SubscribeMsg not found")
1236+
}
1237+
1238+
cleanup = func() {
1239+
os.RemoveAll(datadir)
1240+
netStore.Close()
1241+
r.Close()
1242+
}
1243+
1244+
return r, cleanup, nil
1245+
1246+
},
1247+
})
1248+
defer sim.Close()
1249+
1250+
ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
1251+
defer cancelSimRun()
1252+
1253+
// upload a snapshot
1254+
err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
1255+
if err != nil {
1256+
t.Fatal(err)
1257+
}
1258+
1259+
// setup the filter for SubscribeMsg
1260+
msgs := sim.PeerEvents(
1261+
context.Background(),
1262+
sim.NodeIDs(),
1263+
simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(subscribeMsgCode),
1264+
)
1265+
1266+
// strategy: listen to all SubscribeMsg events; after every event we wait
1267+
// if after `waitDuration` no more messages are being received, we assume the
1268+
// subscription phase has terminated!
1269+
1270+
// the loop in this go routine will either wait for new message events
1271+
// or times out after 1 second, which signals that we are not receiving
1272+
// any new subscriptions any more
1273+
go func() {
1274+
//for long running sims, waiting 1 sec will not be enough
1275+
waitDuration := time.Duration(nodeCount/16) * time.Second
1276+
for {
1277+
select {
1278+
case <-ctx.Done():
1279+
return
1280+
case m := <-msgs: // just reset the loop
1281+
if m.Error != nil {
1282+
log.Error("stream message", "err", m.Error)
1283+
continue
1284+
}
1285+
log.Trace("stream message", "node", m.NodeID, "peer", m.PeerID)
1286+
case <-time.After(waitDuration):
1287+
// one second passed, don't assume more subscriptions
1288+
allSubscriptionsDone <- struct{}{}
1289+
log.Info("All subscriptions received")
1290+
return
1291+
1292+
}
1293+
}
1294+
}()
1295+
1296+
//run the simulation
1297+
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
1298+
log.Info("Simulation running")
1299+
nodes := sim.Net.Nodes
1300+
1301+
//wait until all subscriptions are done
1302+
select {
1303+
case <-allSubscriptionsDone:
1304+
case <-ctx.Done():
1305+
t.Fatal("Context timed out")
1306+
}
1307+
1308+
log.Debug("Expected message count: ", "expectedMsgCount", expectedMsgCount)
1309+
//now iterate again, this time we call each node via RPC to get its subscriptions
1310+
realCount := 0
1311+
for _, node := range nodes {
1312+
//create rpc client
1313+
client, err := node.Client()
1314+
if err != nil {
1315+
t.Fatalf("create node 1 rpc client fail: %v", err)
1316+
}
1317+
1318+
//ask it for subscriptions
1319+
pstreams := make(map[string][]string)
1320+
err = client.Call(&pstreams, "stream_getPeerSubscriptions")
1321+
if err != nil {
1322+
t.Fatal(err)
1323+
}
1324+
//length of the subscriptions can not be smaller than number of peers
1325+
log.Debug("node subscriptions:", "node", node.String())
1326+
for p, ps := range pstreams {
1327+
log.Debug("... with: ", "peer", p)
1328+
for _, s := range ps {
1329+
log.Debug(".......", "stream", s)
1330+
// each node also has subscriptions to RETRIEVE_REQUEST streams,
1331+
// we need to ignore those, we are only counting SYNC streams
1332+
if !strings.HasPrefix(s, "RETRIEVE_REQUEST") {
1333+
realCount++
1334+
}
1335+
}
1336+
}
1337+
}
1338+
// every node is mutually subscribed to each other, so the actual count is half of it
1339+
if realCount/2 != expectedMsgCount {
1340+
return fmt.Errorf("Real subscriptions and expected amount don't match; real: %d, expected: %d", realCount/2, expectedMsgCount)
1341+
}
1342+
return nil
1343+
})
1344+
if result.Error != nil {
1345+
t.Fatal(result.Error)
1346+
}
1347+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"nodes":[{"node":{"config":{"id":"73d6ad4a75069dced660fa4cb98143ee5573df7cb15d9a295acf1655e9683384","private_key":"e567b7d9c554e5102cdc99b6523bace02dbb8951415c8816d82ba2d2e97fa23b","name":"node01","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","private_key":"c7526db70acd02f36d3b201ef3e1d85e38c52bee6931453213dbc5edec4d0976","name":"node02","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","private_key":"61b5728f59bc43080c3b8eb0458fb30d7723e2747355b6dc980f35f3ed431199","name":"node03","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}},{"node":{"config":{"id":"d7768334f79d626adb433f44b703a818555e3331056036ef3f8d1282586bf044","private_key":"075b07c29ceac4ffa2a114afd67b21dfc438126bc169bf7c154be6d81d86ed38","name":"node04","services":["bzz","pss"],"enable_msg_events":false,"port":0},"up":true}}],"conns":[{"one":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","other":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","up":true},{"one":"73d6ad4a75069dced660fa4cb98143ee5573df7cb15d9a295acf1655e9683384","other":"6e8da86abb894ab35044c8c455147225df96cab498da067a118f1fb9a417f9e3","up":true},{"one":"8a1eb78ff13df318e7f8116dffee98cd7d9905650fa53f16766b754a63f387ac","other":"d7768334f79d626adb433f44b703a818555e3331056036ef3f8d1282586bf044","up":true}]}

0 commit comments

Comments
 (0)