@@ -21,15 +21,23 @@ import (
2121 "context"
2222 "errors"
2323 "fmt"
24+ "os"
2425 "strconv"
26+ "strings"
27+ "sync"
2528 "testing"
2629 "time"
2730
2831 "github.com/ethereum/go-ethereum/common"
2932 "github.com/ethereum/go-ethereum/log"
33+ "github.com/ethereum/go-ethereum/node"
3034 "github.com/ethereum/go-ethereum/p2p/enode"
35+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
3136 p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
3237 "github.com/ethereum/go-ethereum/swarm/network"
38+ "github.com/ethereum/go-ethereum/swarm/network/simulation"
39+ "github.com/ethereum/go-ethereum/swarm/state"
40+ "github.com/ethereum/go-ethereum/swarm/storage"
3341 "golang.org/x/crypto/sha3"
3442)
3543
@@ -1105,7 +1113,6 @@ func TestRequestPeerSubscriptions(t *testing.T) {
11051113 }
11061114 }
11071115 }
1108-
11091116 // print some output
11101117 for p , subs := range fakeSubscriptions {
11111118 log .Debug (fmt .Sprintf ("Peer %s has the following fake subscriptions: " , p ))
@@ -1114,3 +1121,227 @@ func TestRequestPeerSubscriptions(t *testing.T) {
11141121 }
11151122 }
11161123}
1124+
1125+ // TestGetSubscriptions is a unit test for the api.GetPeerSubscriptions() function
1126+ func TestGetSubscriptions (t * testing.T ) {
1127+ // create an amount of dummy peers
1128+ testPeerCount := 8
1129+ // every peer will have this amount of dummy servers
1130+ testServerCount := 4
1131+ // the peerMap which will store this data for the registry
1132+ peerMap := make (map [enode.ID ]* Peer )
1133+ // create the registry
1134+ r := & Registry {}
1135+ api := NewAPI (r )
1136+ // call once, at this point should be empty
1137+ regs := api .GetPeerSubscriptions ()
1138+ if len (regs ) != 0 {
1139+ t .Fatal ("Expected subscription count to be 0, but it is not" )
1140+ }
1141+
1142+ // now create a number of dummy servers for each node
1143+ for i := 0 ; i < testPeerCount ; i ++ {
1144+ addr := network .RandomAddr ()
1145+ id := addr .ID ()
1146+ p := & Peer {}
1147+ p .servers = make (map [Stream ]* server )
1148+ for k := 0 ; k < testServerCount ; k ++ {
1149+ s := Stream {
1150+ Name : strconv .Itoa (k ),
1151+ Key : "" ,
1152+ Live : false ,
1153+ }
1154+ p .servers [s ] = & server {}
1155+ }
1156+ peerMap [id ] = p
1157+ }
1158+ r .peers = peerMap
1159+
1160+ // call the subscriptions again
1161+ regs = api .GetPeerSubscriptions ()
1162+ // count how many (fake) subscriptions there are
1163+ cnt := 0
1164+ for _ , reg := range regs {
1165+ for range reg {
1166+ cnt ++
1167+ }
1168+ }
1169+ // check expected value
1170+ expectedCount := testPeerCount * testServerCount
1171+ if cnt != expectedCount {
1172+ t .Fatalf ("Expected %d subscriptions, but got %d" , expectedCount , cnt )
1173+ }
1174+ }
1175+
1176+ /*
1177+ TestGetSubscriptionsRPC sets up a simulation network of `nodeCount` nodes,
1178+ starts the simulation, waits for SyncUpdateDelay in order to kick off
1179+ stream registration, then tests that there are subscriptions.
1180+ */
1181+ func TestGetSubscriptionsRPC (t * testing.T ) {
1182+ // arbitrarily set to 4
1183+ nodeCount := 4
1184+ // run with more nodes if `longrunning` flag is set
1185+ if * longrunning {
1186+ nodeCount = 64
1187+ }
1188+ // set the syncUpdateDelay for sync registrations to start
1189+ syncUpdateDelay := 200 * time .Millisecond
1190+ // holds the msg code for SubscribeMsg
1191+ var subscribeMsgCode uint64
1192+ var ok bool
1193+ var expectedMsgCount = 0
1194+
1195+ // this channel signalizes that the expected amount of subscriptiosn is done
1196+ allSubscriptionsDone := make (chan struct {})
1197+ lock := sync.RWMutex {}
1198+ // after the test, we need to reset the subscriptionFunc to the default
1199+ defer func () { subscriptionFunc = doRequestSubscription }()
1200+
1201+ // we use this subscriptionFunc for this test: just increases count and calls the actual subscription
1202+ subscriptionFunc = func (r * Registry , p * network.Peer , bin uint8 , subs map [enode.ID ]map [Stream ]struct {}) bool {
1203+ lock .Lock ()
1204+ expectedMsgCount ++
1205+ lock .Unlock ()
1206+ doRequestSubscription (r , p , bin , subs )
1207+ return true
1208+ }
1209+ // create a standard sim
1210+ sim := simulation .New (map [string ]simulation.ServiceFunc {
1211+ "streamer" : func (ctx * adapters.ServiceContext , bucket * sync.Map ) (s node.Service , cleanup func (), err error ) {
1212+ n := ctx .Config .Node ()
1213+ addr := network .NewAddr (n )
1214+ store , datadir , err := createTestLocalStorageForID (n .ID (), addr )
1215+ if err != nil {
1216+ return nil , nil , err
1217+ }
1218+ localStore := store .(* storage.LocalStore )
1219+ netStore , err := storage .NewNetStore (localStore , nil )
1220+ if err != nil {
1221+ return nil , nil , err
1222+ }
1223+ kad := network .NewKademlia (addr .Over (), network .NewKadParams ())
1224+ delivery := NewDelivery (kad , netStore )
1225+ netStore .NewNetFetcherFunc = network .NewFetcherFactory (dummyRequestFromPeers , true ).New
1226+ // configure so that sync registrations actually happen
1227+ r := NewRegistry (addr .ID (), delivery , netStore , state .NewInmemoryStore (), & RegistryOptions {
1228+ Retrieval : RetrievalEnabled ,
1229+ Syncing : SyncingAutoSubscribe , //enable sync registrations
1230+ SyncUpdateDelay : syncUpdateDelay ,
1231+ }, nil )
1232+ // get the SubscribeMsg code
1233+ subscribeMsgCode , ok = r .GetSpec ().GetCode (SubscribeMsg {})
1234+ if ! ok {
1235+ t .Fatal ("Message code for SubscribeMsg not found" )
1236+ }
1237+
1238+ cleanup = func () {
1239+ os .RemoveAll (datadir )
1240+ netStore .Close ()
1241+ r .Close ()
1242+ }
1243+
1244+ return r , cleanup , nil
1245+
1246+ },
1247+ })
1248+ defer sim .Close ()
1249+
1250+ ctx , cancelSimRun := context .WithTimeout (context .Background (), 1 * time .Minute )
1251+ defer cancelSimRun ()
1252+
1253+ // upload a snapshot
1254+ err := sim .UploadSnapshot (fmt .Sprintf ("testing/snapshot_%d.json" , nodeCount ))
1255+ if err != nil {
1256+ t .Fatal (err )
1257+ }
1258+
1259+ // setup the filter for SubscribeMsg
1260+ msgs := sim .PeerEvents (
1261+ context .Background (),
1262+ sim .NodeIDs (),
1263+ simulation .NewPeerEventsFilter ().ReceivedMessages ().Protocol ("stream" ).MsgCode (subscribeMsgCode ),
1264+ )
1265+
1266+ // strategy: listen to all SubscribeMsg events; after every event we wait
1267+ // if after `waitDuration` no more messages are being received, we assume the
1268+ // subscription phase has terminated!
1269+
1270+ // the loop in this go routine will either wait for new message events
1271+ // or times out after 1 second, which signals that we are not receiving
1272+ // any new subscriptions any more
1273+ go func () {
1274+ //for long running sims, waiting 1 sec will not be enough
1275+ waitDuration := time .Duration (nodeCount / 16 ) * time .Second
1276+ for {
1277+ select {
1278+ case <- ctx .Done ():
1279+ return
1280+ case m := <- msgs : // just reset the loop
1281+ if m .Error != nil {
1282+ log .Error ("stream message" , "err" , m .Error )
1283+ continue
1284+ }
1285+ log .Trace ("stream message" , "node" , m .NodeID , "peer" , m .PeerID )
1286+ case <- time .After (waitDuration ):
1287+ // one second passed, don't assume more subscriptions
1288+ allSubscriptionsDone <- struct {}{}
1289+ log .Info ("All subscriptions received" )
1290+ return
1291+
1292+ }
1293+ }
1294+ }()
1295+
1296+ //run the simulation
1297+ result := sim .Run (ctx , func (ctx context.Context , sim * simulation.Simulation ) error {
1298+ log .Info ("Simulation running" )
1299+ nodes := sim .Net .Nodes
1300+
1301+ //wait until all subscriptions are done
1302+ select {
1303+ case <- allSubscriptionsDone :
1304+ case <- ctx .Done ():
1305+ t .Fatal ("Context timed out" )
1306+ }
1307+
1308+ log .Debug ("Expected message count: " , "expectedMsgCount" , expectedMsgCount )
1309+ //now iterate again, this time we call each node via RPC to get its subscriptions
1310+ realCount := 0
1311+ for _ , node := range nodes {
1312+ //create rpc client
1313+ client , err := node .Client ()
1314+ if err != nil {
1315+ t .Fatalf ("create node 1 rpc client fail: %v" , err )
1316+ }
1317+
1318+ //ask it for subscriptions
1319+ pstreams := make (map [string ][]string )
1320+ err = client .Call (& pstreams , "stream_getPeerSubscriptions" )
1321+ if err != nil {
1322+ t .Fatal (err )
1323+ }
1324+ //length of the subscriptions can not be smaller than number of peers
1325+ log .Debug ("node subscriptions:" , "node" , node .String ())
1326+ for p , ps := range pstreams {
1327+ log .Debug ("... with: " , "peer" , p )
1328+ for _ , s := range ps {
1329+ log .Debug ("......." , "stream" , s )
1330+ // each node also has subscriptions to RETRIEVE_REQUEST streams,
1331+ // we need to ignore those, we are only counting SYNC streams
1332+ if ! strings .HasPrefix (s , "RETRIEVE_REQUEST" ) {
1333+ realCount ++
1334+ }
1335+ }
1336+ }
1337+ }
1338+ // every node is mutually subscribed to each other, so the actual count is half of it
1339+ if realCount / 2 != expectedMsgCount {
1340+ return fmt .Errorf ("Real subscriptions and expected amount don't match; real: %d, expected: %d" , realCount / 2 , expectedMsgCount )
1341+ }
1342+ return nil
1343+ })
1344+ if result .Error != nil {
1345+ t .Fatal (result .Error )
1346+ }
1347+ }
0 commit comments