Skip to content

Commit

Permalink
Merge pull request #3746 from onflow/m4ksio/integration-test-observer
Browse files Browse the repository at this point in the history
AddObserver adds a node to a suite
  • Loading branch information
m4ksio authored Jan 10, 2023
2 parents fc142de + dde544f commit d6f8335
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 122 deletions.
162 changes: 77 additions & 85 deletions integration/testnet/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package testnet
import (
"context"
"encoding/hex"
"errors"
"fmt"
"io/fs"
"math/rand"
gonet "net"
"os"
Expand All @@ -24,7 +22,6 @@ import (
"github.com/dapperlabs/testingdock"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/network"
dockerclient "github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
"github.com/onflow/cadence"
Expand Down Expand Up @@ -780,7 +777,7 @@ type ObserverConfig struct {
AccessGRPCSecurePort string // Does not change the access node
}

func (net *FlowNetwork) AddObserver(t *testing.T, ctx context.Context, conf *ObserverConfig) (stop func(), err error) {
func (net *FlowNetwork) AddObserver(t *testing.T, ctx context.Context, conf *ObserverConfig) (err error) {
// Find the public key for the access node
accessPublicKey := ""
for _, stakedConf := range net.BootstrapData.StakedConfs {
Expand Down Expand Up @@ -816,12 +813,14 @@ func (net *FlowNetwork) AddObserver(t *testing.T, ctx context.Context, conf *Obs
}()

// Setup directories
tmpdir, _ := os.MkdirTemp(TmpRoot, "flow-integration-node")
flowDataDir := filepath.Join(tmpdir, DefaultFlowDataDir)
nodeBootstrapDir := filepath.Join(tmpdir, DefaultBootstrapDir)
flowProfilerDir := filepath.Join(flowDataDir, "./profiler")
tmpdir := t.TempDir()

_ = io.CopyDirectory(net.BootstrapDir, nodeBootstrapDir)
flowDataDir := net.makeDir(t, tmpdir, DefaultFlowDataDir)
nodeBootstrapDir := net.makeDir(t, tmpdir, DefaultBootstrapDir)
flowProfilerDir := net.makeDir(t, flowDataDir, "./profiler")

err = io.CopyDirectory(net.BootstrapDir, nodeBootstrapDir)
require.NoError(t, err)

observerUnsecurePort := testingdock.RandomPort(t)
observerSecurePort := testingdock.RandomPort(t)
Expand All @@ -831,71 +830,71 @@ func (net *FlowNetwork) AddObserver(t *testing.T, ctx context.Context, conf *Obs
net.ObserverPorts[ObserverNodeAPISecurePort] = observerSecurePort
net.ObserverPorts[ObserverNodeAPIProxyPort] = observerHttpPort

container, err := net.cli.ContainerCreate(ctx,
&container.Config{
Image: conf.ObserverImage,
Cmd: []string{
fmt.Sprintf("--bootstrap-node-addresses=%s:%s", conf.AccessName, conf.AccessPublicNetworkPort),
fmt.Sprintf("--bootstrap-node-public-keys=%s", accessPublicKey),
fmt.Sprintf("--upstream-node-addresses=%s:%s", conf.AccessName, conf.AccessGRPCSecurePort),
fmt.Sprintf("--upstream-node-public-keys=%s", accessPublicKey),
fmt.Sprintf("--observer-networking-key-path=/bootstrap/private-root-information/%s_key", conf.ObserverName),
"--bind=0.0.0.0:0",
fmt.Sprintf("--rpc-addr=%s:%s", conf.ObserverName, "9000"),
fmt.Sprintf("--secure-rpc-addr=%s:%s", conf.ObserverName, "9001"),
fmt.Sprintf("--http-addr=%s:%s", conf.ObserverName, "8000"),
"--bootstrapdir=/bootstrap",
"--datadir=/data/protocol",
"--secretsdir=/data/secrets",
"--loglevel=DEBUG",
fmt.Sprintf("--profiler-enabled=%t", false),
fmt.Sprintf("--tracer-enabled=%t", false),
"--profiler-dir=/profiler",
"--profiler-interval=2m",
},
ExposedPorts: nat.PortSet{
"9000": struct{}{},
"9001": struct{}{},
"8000": struct{}{},
},
containerConfig := &container.Config{
Image: conf.ObserverImage,
User: currentUser(),
Cmd: []string{
fmt.Sprintf("--bootstrap-node-addresses=%s:%s", conf.AccessName, conf.AccessPublicNetworkPort),
fmt.Sprintf("--bootstrap-node-public-keys=%s", accessPublicKey),
fmt.Sprintf("--upstream-node-addresses=%s:%s", conf.AccessName, conf.AccessGRPCSecurePort),
fmt.Sprintf("--upstream-node-public-keys=%s", accessPublicKey),
fmt.Sprintf("--observer-networking-key-path=/bootstrap/private-root-information/%s_key", conf.ObserverName),
"--bind=0.0.0.0:0",
fmt.Sprintf("--rpc-addr=%s:%s", conf.ObserverName, "9000"),
fmt.Sprintf("--secure-rpc-addr=%s:%s", conf.ObserverName, "9001"),
fmt.Sprintf("--http-addr=%s:%s", conf.ObserverName, "8000"),
"--bootstrapdir=/bootstrap",
"--datadir=/data/protocol",
"--secretsdir=/data/secrets",
"--loglevel=DEBUG",
fmt.Sprintf("--profiler-enabled=%t", false),
fmt.Sprintf("--tracer-enabled=%t", false),
"--profiler-dir=/profiler",
"--profiler-interval=2m",
},

ExposedPorts: nat.PortSet{
"9000": struct{}{},
"9001": struct{}{},
"8000": struct{}{},
},
&container.HostConfig{
AutoRemove: true,
Binds: []string{
fmt.Sprintf("%s:%s:rw", flowDataDir, "/data"),
fmt.Sprintf("%s:%s:rw", flowProfilerDir, "/profiler"),
fmt.Sprintf("%s:%s:ro", nodeBootstrapDir, "/bootstrap"),
},
PortBindings: nat.PortMap{
"9000": []nat.PortBinding{{HostIP: "0.0.0.0", HostPort: observerUnsecurePort}},
"9001": []nat.PortBinding{{HostIP: "0.0.0.0", HostPort: observerSecurePort}},
"8000": []nat.PortBinding{{HostIP: "0.0.0.0", HostPort: observerHttpPort}},
},
}
containerHostConfig := &container.HostConfig{
Binds: []string{
fmt.Sprintf("%s:%s:rw", flowDataDir, "/data"),
fmt.Sprintf("%s:%s:rw", flowProfilerDir, "/profiler"),
fmt.Sprintf("%s:%s:ro", nodeBootstrapDir, "/bootstrap"),
},
&network.NetworkingConfig{
EndpointsConfig: map[string]*network.EndpointSettings{
net.config.Name: {
NetworkID: net.network.ID(),
},
},
PortBindings: nat.PortMap{
"9000": []nat.PortBinding{{HostIP: "0.0.0.0", HostPort: observerUnsecurePort}},
"9001": []nat.PortBinding{{HostIP: "0.0.0.0", HostPort: observerSecurePort}},
"8000": []nat.PortBinding{{HostIP: "0.0.0.0", HostPort: observerHttpPort}},
},
conf.ObserverName,
)
}

if err != nil {
return nil, err
containerOpts := testingdock.ContainerOpts{
ForcePull: false,
Config: containerConfig,
HostConfig: containerHostConfig,
Name: conf.ObserverName,
HealthCheck: testingdock.HealthCheckCustom(healthcheckAccessGRPC(observerUnsecurePort)),
}

err = net.cli.ContainerStart(ctx, container.ID, types.ContainerStartOptions{})
if err != nil {
return nil, err
suiteContainer := net.suite.Container(containerOpts)

nodeContainer := &Container{
Ports: make(map[string]string),
datadir: tmpdir,
net: net,
opts: &containerOpts,
}

containerID := container.ID
return func() {
// shutdown func
_ = net.cli.ContainerStop(ctx, containerID, nil)
}, nil
nodeContainer.Container = suiteContainer
net.Containers[nodeContainer.Name()] = nodeContainer

net.network.After(suiteContainer)

return nil
}

// AddNode creates a node container with the given config and adds it to the
Expand All @@ -922,13 +921,7 @@ func (net *FlowNetwork) AddNode(t *testing.T, bootstrapDir string, nodeConf Cont
HostConfig: &container.HostConfig{},
}

// get a temporary directory in the host. On macOS the default tmp
// directory is NOT accessible to Docker by default, so we use /tmp
// instead.
tmpdir, err := os.MkdirTemp(TmpRoot, "flow-integration-node")
if err != nil {
return fmt.Errorf("could not get tmp dir: %w", err)
}
tmpdir := t.TempDir()

t.Logf("%v adding container %v for %v node", time.Now().UTC(), nodeConf.ContainerName, nodeConf.Role)

Expand All @@ -941,27 +934,19 @@ func (net *FlowNetwork) AddNode(t *testing.T, bootstrapDir string, nodeConf Cont
}

// create a directory for the node database
flowDataDir := filepath.Join(tmpdir, DefaultFlowDataDir)
err = os.Mkdir(flowDataDir, 0700)
require.NoError(t, err)
flowDataDir := net.makeDir(t, tmpdir, DefaultFlowDataDir)

// create the profiler dir for the node
flowProfilerDir := filepath.Join(flowDataDir, "./profiler")
flowProfilerDir := net.makeDir(t, flowDataDir, "./profiler")
t.Logf("create profiler dir: %v", flowProfilerDir)
err = os.MkdirAll(flowProfilerDir, 0755)
if err != nil && !errors.Is(err, fs.ErrExist) {
panic(err)
}

// create a directory for the bootstrap files
// we create a node-specific bootstrap directory to enable testing nodes
// bootstrapping from different root state snapshots and epochs
nodeBootstrapDir := filepath.Join(tmpdir, DefaultBootstrapDir)
err = os.Mkdir(nodeBootstrapDir, 0700)
require.NoError(t, err)
nodeBootstrapDir := net.makeDir(t, tmpdir, DefaultBootstrapDir)

// copy bootstrap files to node-specific bootstrap directory
err = io.CopyDirectory(bootstrapDir, nodeBootstrapDir)
err := io.CopyDirectory(bootstrapDir, nodeBootstrapDir)
require.NoError(t, err)

// Bind the host directory to the container's database directory
Expand Down Expand Up @@ -1183,6 +1168,13 @@ func (net *FlowNetwork) WriteRootSnapshot(snapshot *inmem.Snapshot) {
require.NoError(net.t, err)
}

func (net *FlowNetwork) makeDir(t *testing.T, base string, dir string) string {
flowDataDir := filepath.Join(base, dir)
err := os.Mkdir(flowDataDir, 0700)
require.NoError(t, err)
return flowDataDir
}

func followerNodeInfos(confs []ConsensusFollowerConfig) ([]bootstrap.NodeInfo, error) {
var nodeInfos []bootstrap.NodeInfo

Expand Down
64 changes: 27 additions & 37 deletions integration/tests/access/observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"net"
"testing"
"time"

"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -35,9 +34,7 @@ type ObserverSuite struct {
}

func (suite *ObserverSuite) TearDownTest() {
if suite.teardown != nil {
suite.teardown()
}
suite.net.Remove()
}

func (suite *ObserverSuite) SetupTest() {
Expand Down Expand Up @@ -82,9 +79,8 @@ func (suite *ObserverSuite) SetupTest() {

// start the network
ctx := context.Background()
suite.net.Start(ctx)

stop, err := suite.net.AddObserver(suite.T(), ctx, &testnet.ObserverConfig{
err := suite.net.AddObserver(suite.T(), ctx, &testnet.ObserverConfig{
ObserverName: "observer_1",
ObserverImage: "gcr.io/flow-container-registry/observer:latest",
AccessName: "access_1",
Expand All @@ -93,13 +89,7 @@ func (suite *ObserverSuite) SetupTest() {
})
require.NoError(suite.T(), err)

time.Sleep(time.Second * 3) // needs breathing room for the observer to start listening

// set the teardown function
suite.teardown = func() {
stop()
suite.net.Remove()
}
suite.net.Start(ctx)
}

func (suite *ObserverSuite) TestObserverConnection() {
Expand All @@ -116,6 +106,30 @@ func (suite *ObserverSuite) TestObserverConnection() {
assert.NoError(t, err)
}

func (suite *ObserverSuite) TestObserverCompareRPCs() {
ctx := context.Background()
t := suite.T()

// get an observer and access client
observer, err := suite.getObserverClient()
assert.NoError(t, err)

access, err := suite.getAccessClient()
assert.NoError(t, err)

// verify that both clients return the same errors
for _, rpc := range suite.getRPCs() {
if _, local := suite.local[rpc.name]; local {
continue
}
t.Run(rpc.name, func(t *testing.T) {
accessErr := rpc.call(ctx, access)
observerErr := rpc.call(ctx, observer)
assert.Equal(t, accessErr, observerErr)
})
}
}

func (suite *ObserverSuite) TestObserverWithoutAccess() {
// tests that the observer returns errors when the access node is stopped
ctx := context.Background()
Expand Down Expand Up @@ -161,30 +175,6 @@ func (suite *ObserverSuite) TestObserverWithoutAccess() {

}

func (suite *ObserverSuite) TestObserverCompareRPCs() {
ctx := context.Background()
t := suite.T()

// get an observer and access client
observer, err := suite.getObserverClient()
assert.NoError(t, err)

access, err := suite.getAccessClient()
assert.NoError(t, err)

// verify that both clients return the same errors
for _, rpc := range suite.getRPCs() {
if _, local := suite.local[rpc.name]; local {
continue
}
t.Run(rpc.name, func(t *testing.T) {
accessErr := rpc.call(ctx, access)
observerErr := rpc.call(ctx, observer)
assert.Equal(t, accessErr, observerErr)
})
}
}

func (suite *ObserverSuite) getAccessClient() (accessproto.AccessAPIClient, error) {
return suite.getClient(net.JoinHostPort("localhost", suite.net.AccessPorts[testnet.AccessNodeAPIPort]))
}
Expand Down

0 comments on commit d6f8335

Please sign in to comment.