Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
simulation: retry if we hit a collision on tcp/udp ports
Browse files Browse the repository at this point in the history
  • Loading branch information
nonsense committed Jul 29, 2019
1 parent 3be5cf3 commit 3a38a61
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 3 deletions.
2 changes: 1 addition & 1 deletion simulation/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (n *DockerNode) rpcClient() (*rpc.Client, error) {
var client *rpc.Client
var err error
wsAddr := fmt.Sprintf("ws://%s:%d", n.ipAddr, dockerWebsocketPort)
for start := time.Now(); time.Since(start) < 30*time.Second; time.Sleep(50 * time.Millisecond) {
for start := time.Now(); time.Since(start) < 30*time.Second; time.Sleep(200 * time.Millisecond) {
client, err = rpc.Dial(wsAddr)
if err == nil {
break
Expand Down
37 changes: 36 additions & 1 deletion simulation/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ func (n *ExecNode) Start() error {
return fmt.Errorf("error starting node %s: %s", n.config.ID, err)
}

// Wait channel from cmd.Wait() to know if the cmd exited before successful rpc.Dial call
waitCh := make(chan error)
go func(cmd *exec.Cmd) {
waitCh <- cmd.Wait()
}(n.cmd)

// Wait for the node to start
var client *rpc.Client
var err error
Expand All @@ -143,11 +149,40 @@ func (n *ExecNode) Start() error {
n.Stop()
}
}()
for start := time.Now(); time.Since(start) < 10*time.Second; time.Sleep(50 * time.Millisecond) {

for start := time.Now(); time.Since(start) < 30*time.Second; time.Sleep(200 * time.Millisecond) {
client, err = rpc.Dial(n.ipcPath())
if err == nil {
break
}
// rpc.Dial is failing, so let's check if command exited due to TCP/UDP pair fail
select {
case <-waitCh:
// Command exited

// Restart command, as the process got killed due to tcp/udp pair of ports being taken
n.cmd = &exec.Cmd{
Path: n.adapter.config.ExecutablePath,
Args: args,
Dir: dir,
Env: n.config.Env,
Stdout: n.config.Stdout,
Stderr: n.config.Stderr,
}

if err := n.cmd.Start(); err != nil {
n.cmd = nil
return fmt.Errorf("error starting node %s: %s", n.config.ID, err)
}

// Wait channel from cmd.Wait() to know if the cmd exited before successful rpc.Dial call
go func(cmd *exec.Cmd) {
waitCh <- cmd.Wait()
}(n.cmd)

default:
// Wait hasn't returned, so Command is still running...
}
}
if client == nil {
return fmt.Errorf("could not establish rpc connection. node %s: %v", n.config.ID, err)
Expand Down
2 changes: 1 addition & 1 deletion simulation/simulation.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ func (s *Simulation) WaitForHealthyNetwork() error {
return err
}
if !healthy.Healthy() {
return fmt.Errorf("node %s is not healthy", nodes[i].Info().ID)
return fmt.Errorf("node %s is not healthy: known <%v> ; connected <%v>", nodes[i].Info().ID, healthy.CountKnowNN, healthy.CountConnectNN)
}
return nil
})
Expand Down
104 changes: 104 additions & 0 deletions storage/pushsync/sim/pushsync_sim_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package sim

import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"os"
"testing"

"github.com/ethereum/go-ethereum/log"
"github.com/ethersphere/swarm/simulation"
colorable "github.com/mattn/go-colorable"
)

var (
nodes = flag.Int("nodes", 100, "number of nodes to create")
loglevel = flag.Int("loglevel", 3, "verbosity of logs")
rawlog = flag.Bool("rawlog", false, "remove terminal formatting from logs")
)

func init() {
flag.Parse()
log.PrintOrigins(true)
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(!*rawlog))))
}

func TestPushsyncSimSmoke(t *testing.T) {
nodeCount := *nodes

// Test exec adapter
t.Run("exec", func(t *testing.T) {
execPath := "/Users/nonsense/code/bin/swarm"

if _, err := os.Stat(execPath); err != nil {
if os.IsNotExist(err) {
t.Skip("swarm binary not found. build it before running the test")
}
}

tmpdir, err := ioutil.TempDir("", "test-sim-exec")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpdir)
adapter, err := simulation.NewExecAdapter(simulation.ExecAdapterConfig{
ExecutablePath: execPath,
BaseDataDirectory: tmpdir,
})
if err != nil {
t.Fatalf("could not create exec adapter: %v", err)
}
startSimulation(t, adapter, nodeCount)
})
}

func startSimulation(t *testing.T, adapter simulation.Adapter, count int) {
sim := simulation.NewSimulation(adapter)

defer sim.StopAll()

// Common args used by all nodes
commonArgs := []string{
"--bzznetworkid", "599",
"--maxpeers", "600",
}

// Start a cluster with 'count' nodes and a bootnode
nodes, err := sim.CreateClusterWithBootnode("test", count, commonArgs)
if err != nil {
t.Fatal(err)
}

// Wait for all nodes to be considered healthy
err = sim.WaitForHealthyNetwork()
if err != nil {
t.Errorf("Failed to get healthy network: %v", err)
}

// Check hive output on the first node
client, err := sim.RPCClient(nodes[0].Info().ID)
if err != nil {
t.Errorf("Failed to get rpc client: %v", err)
}

var hive string
err = client.Call(&hive, "bzz_hive")
if err != nil {
t.Errorf("could not get hive info: %v", err)
}

snap, err := sim.Snapshot()
if err != nil {
t.Error(err)
}

b, err := json.Marshal(snap)
if err != nil {
t.Error(err)
}
fmt.Println(string(b))

fmt.Println(hive)
}

0 comments on commit 3a38a61

Please sign in to comment.