diff --git a/op-conductor/conductor/config.go b/op-conductor/conductor/config.go index ca18de7d1a18..98e3ad83440e 100644 --- a/op-conductor/conductor/config.go +++ b/op-conductor/conductor/config.go @@ -19,12 +19,19 @@ import ( ) type Config struct { - // ConsensusAddr is the address to listen for consensus connections. + // ConsensusAddr is the address, excluding port, to listen on for consensus connections. + // E.g. 0.0.0.0 to bind to the external-facing network interface. ConsensusAddr string - // ConsensusPort is the port to listen for consensus connections. + // ConsensusPort is the port to listen on for consensus connections. + // If 0, the server binds to a port selected by the system. ConsensusPort int + // ConsensusAdvertisedAddr is the network address, including port, to advertise to other peers. + // This is optional: if empty, the address that the server network transport binds to is used instead. + // E.g. local tests may use temporary addresses, rather than preset known addresses. + ConsensusAdvertisedAddr string + // RaftServerID is the unique ID for this server used by raft consensus. RaftServerID string @@ -117,8 +124,11 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*Config, error) { } return &Config{ - ConsensusAddr: ctx.String(flags.ConsensusAddr.Name), - ConsensusPort: ctx.Int(flags.ConsensusPort.Name), + ConsensusAddr: ctx.String(flags.ConsensusAddr.Name), + ConsensusPort: ctx.Int(flags.ConsensusPort.Name), + // The consensus server will advertise the address it binds to if this is empty/unspecified. + ConsensusAdvertisedAddr: ctx.String(flags.AdvertisedFullAddr.Name), + RaftBootstrap: ctx.Bool(flags.RaftBootstrap.Name), RaftServerID: ctx.String(flags.RaftServerID.Name), RaftStorageDir: ctx.String(flags.RaftStorageDir.Name), diff --git a/op-conductor/conductor/service.go b/op-conductor/conductor/service.go index 89656fd727aa..cccba2c76ac9 100644 --- a/op-conductor/conductor/service.go +++ b/op-conductor/conductor/service.go @@ -169,10 +169,12 @@ func (c *OpConductor) initConsensus(ctx context.Context) error { return nil } - serverAddr := fmt.Sprintf("%s:%d", c.cfg.ConsensusAddr, c.cfg.ConsensusPort) raftConsensusConfig := &consensus.RaftConsensusConfig{ - ServerID: c.cfg.RaftServerID, - ServerAddr: serverAddr, + ServerID: c.cfg.RaftServerID, + // AdvertisedAddr may be empty: the server will then default to what it binds to. + AdvertisedAddr: raft.ServerAddress(c.cfg.ConsensusAdvertisedAddr), + ListenAddr: c.cfg.ConsensusAddr, + ListenPort: c.cfg.ConsensusPort, StorageDir: c.cfg.RaftStorageDir, Bootstrap: c.cfg.RaftBootstrap, RollupCfg: &c.cfg.RollupCfg, @@ -472,6 +474,12 @@ func (oc *OpConductor) Paused() bool { return oc.paused.Load() } +// ConsensusEndpoint returns the raft consensus server address to connect to. +func (oc *OpConductor) ConsensusEndpoint() string { + return oc.cons.Addr() +} + +// HTTPEndpoint returns the HTTP RPC endpoint func (oc *OpConductor) HTTPEndpoint() string { if oc.rpcServer == nil { return "" @@ -613,7 +621,8 @@ func (oc *OpConductor) handleHealthUpdate(hcerr error) { oc.queueAction() } - if oc.healthy.Swap(healthy) != healthy { + if old := oc.healthy.Swap(healthy); old != healthy { + oc.log.Info("Health state changed", "old", old, "new", healthy) // queue an action if health status changed. oc.queueAction() } diff --git a/op-conductor/conductor/service_test.go b/op-conductor/conductor/service_test.go index 49a05e902763..87df417a4683 100644 --- a/op-conductor/conductor/service_test.go +++ b/op-conductor/conductor/service_test.go @@ -30,7 +30,7 @@ func mockConfig(t *testing.T) Config { now := uint64(time.Now().Unix()) return Config{ ConsensusAddr: "127.0.0.1", - ConsensusPort: 50050, + ConsensusPort: 0, RaftServerID: "SequencerA", RaftStorageDir: "/tmp/raft", RaftBootstrap: false, diff --git a/op-conductor/consensus/iface.go b/op-conductor/consensus/iface.go index 69b9506c50b2..e0dcb6efd5a7 100644 --- a/op-conductor/consensus/iface.go +++ b/op-conductor/consensus/iface.go @@ -42,6 +42,9 @@ type ServerInfo struct { // //go:generate mockery --name Consensus --output mocks/ --with-expecter=true type Consensus interface { + // Addr returns the address of this consensus server. + // Internally the server may override what is advertised, or fall back to the address it listens to. + Addr() string // AddVoter adds a voting member into the cluster, voter is eligible to become leader. // If version is non-zero, this will only be applied if the current cluster version matches the expected version. AddVoter(id, addr string, version uint64) error diff --git a/op-conductor/consensus/mocks/Consensus.go b/op-conductor/consensus/mocks/Consensus.go index ca1397a690e1..902174435146 100644 --- a/op-conductor/consensus/mocks/Consensus.go +++ b/op-conductor/consensus/mocks/Consensus.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.39.1. DO NOT EDIT. +// Code generated by mockery v2.46.0. DO NOT EDIT. package mocks @@ -118,6 +118,51 @@ func (_c *Consensus_AddVoter_Call) RunAndReturn(run func(string, string, uint64) return _c } +// Addr provides a mock function with given fields: +func (_m *Consensus) Addr() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Addr") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// Consensus_Addr_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Addr' +type Consensus_Addr_Call struct { + *mock.Call +} + +// Addr is a helper method to define mock.On call +func (_e *Consensus_Expecter) Addr() *Consensus_Addr_Call { + return &Consensus_Addr_Call{Call: _e.mock.On("Addr")} +} + +func (_c *Consensus_Addr_Call) Run(run func()) *Consensus_Addr_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Consensus_Addr_Call) Return(_a0 string) *Consensus_Addr_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Consensus_Addr_Call) RunAndReturn(run func() string) *Consensus_Addr_Call { + _c.Call.Return(run) + return _c +} + // ClusterMembership provides a mock function with given fields: func (_m *Consensus) ClusterMembership() (*consensus.ClusterMembership, error) { ret := _m.Called() diff --git a/op-conductor/consensus/raft.go b/op-conductor/consensus/raft.go index f6acc0fb76f1..86b32ea00f8c 100644 --- a/op-conductor/consensus/raft.go +++ b/op-conductor/consensus/raft.go @@ -29,12 +29,30 @@ type RaftConsensus struct { serverID raft.ServerID r *raft.Raft + transport *raft.NetworkTransport + // advertisedAddr is the host & port to contact this server. + // If empty, the address of the transport should be used instead. + advertisedAddr string + unsafeTracker *unsafeHeadTracker } type RaftConsensusConfig struct { - ServerID string - ServerAddr string + ServerID string + + // AdvertisedAddr is the address to advertise, + // i.e. the address external raft peers use to contact us. + // If left empty, it defaults to the resulting + // local address that we bind the underlying transport to. + AdvertisedAddr raft.ServerAddress + + // ListenPort is the port to bind the server to. + // This may be 0, an available port will then be selected by the system. + ListenPort int + // ListenAddr is the address to bind the server to. + // E.g. use 0.0.0.0 to bind to an external-facing network. + ListenAddr string + StorageDir string Bootstrap bool RollupCfg *rollup.Config @@ -86,18 +104,31 @@ func NewRaftConsensus(log log.Logger, cfg *RaftConsensusConfig) (*RaftConsensus, return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q): %w`, baseDir, err) } - addr, err := net.ResolveTCPAddr("tcp", cfg.ServerAddr) - if err != nil { - return nil, errors.Wrap(err, "failed to resolve tcp address") + var advertiseAddr net.Addr + if cfg.AdvertisedAddr == "" { + log.Warn("No advertised address specified. Advertising local address.") + } else { + x, err := net.ResolveTCPAddr("tcp", string(cfg.AdvertisedAddr)) + if err != nil { + return nil, fmt.Errorf("failed to resolve advertised TCP address %q: %w", string(cfg.AdvertisedAddr), err) + } + advertiseAddr = x + log.Info("Resolved advertising address", "adAddr", cfg.AdvertisedAddr, + "adIP", x.IP, "adPort", x.Port, "adZone", x.Zone) } + bindAddr := fmt.Sprintf("%s:%d", cfg.ListenAddr, cfg.ListenPort) + log.Info("Binding raft server to network transport", "listenAddr", bindAddr) + maxConnPool := 10 timeout := 5 * time.Second - bindAddr := fmt.Sprintf("0.0.0.0:%d", addr.Port) - transport, err := raft.NewTCPTransportWithLogger(bindAddr, addr, maxConnPool, timeout, rc.Logger) + + // When advertiseAddr == nil, the transport will use the local address that it is bound to. + transport, err := raft.NewTCPTransportWithLogger(bindAddr, advertiseAddr, maxConnPool, timeout, rc.Logger) if err != nil { return nil, errors.Wrap(err, "failed to create raft tcp transport") } + log.Info("Raft server network transport is up", "addr", transport.LocalAddr()) fsm := NewUnsafeHeadTracker(log) @@ -110,11 +141,19 @@ func NewRaftConsensus(log log.Logger, cfg *RaftConsensusConfig) (*RaftConsensus, // If bootstrap = true, start raft in bootstrap mode, this will allow the current node to elect itself as leader when there's no other participants // and allow other nodes to join the cluster. if cfg.Bootstrap { + var advertisedAddr raft.ServerAddress + if cfg.AdvertisedAddr == "" { + advertisedAddr = transport.LocalAddr() + } else { + advertisedAddr = cfg.AdvertisedAddr + } + log.Info("Bootstrapping raft consensus cluster with self", "addr", advertisedAddr) + raftCfg := raft.Configuration{ Servers: []raft.Server{ { ID: rc.LocalID, - Address: raft.ServerAddress(cfg.ServerAddr), + Address: advertisedAddr, Suffrage: raft.Voter, }, }, @@ -132,9 +171,20 @@ func NewRaftConsensus(log log.Logger, cfg *RaftConsensusConfig) (*RaftConsensus, serverID: raft.ServerID(cfg.ServerID), unsafeTracker: fsm, rollupCfg: cfg.RollupCfg, + transport: transport, }, nil } +// Addr returns the address to contact this raft consensus server. +// If no explicit address to advertise was configured, +// the local network address that the raft-consensus server is listening on will be used. +func (rc *RaftConsensus) Addr() string { + if rc.advertisedAddr != "" { + return rc.advertisedAddr + } + return string(rc.transport.LocalAddr()) +} + // AddNonVoter implements Consensus, it tries to add a non-voting member into the cluster. func (rc *RaftConsensus) AddNonVoter(id string, addr string, version uint64) error { if err := checkTCPPortOpen(addr); err != nil { diff --git a/op-conductor/consensus/raft_test.go b/op-conductor/consensus/raft_test.go index fbd9c7cb3bc8..9c8ca48247ef 100644 --- a/op-conductor/consensus/raft_test.go +++ b/op-conductor/consensus/raft_test.go @@ -28,7 +28,9 @@ func TestCommitAndRead(t *testing.T) { } raftConsensusConfig := &RaftConsensusConfig{ ServerID: "SequencerA", - ServerAddr: "127.0.0.1:0", + ListenPort: 0, + ListenAddr: "127.0.0.1", // local test, don't bind to external interface + AdvertisedAddr: "", // use local address that the server binds to StorageDir: storageDir, Bootstrap: true, RollupCfg: rollupCfg, diff --git a/op-conductor/flags/flags.go b/op-conductor/flags/flags.go index 249e8a676e07..7c29bfcab2c3 100644 --- a/op-conductor/flags/flags.go +++ b/op-conductor/flags/flags.go @@ -19,16 +19,22 @@ const EnvVarPrefix = "OP_CONDUCTOR" var ( ConsensusAddr = &cli.StringFlag{ Name: "consensus.addr", - Usage: "Address to listen for consensus connections", + Usage: "Address (excluding port) to listen for consensus connections.", EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "CONSENSUS_ADDR"), Value: "127.0.0.1", } ConsensusPort = &cli.IntFlag{ Name: "consensus.port", - Usage: "Port to listen for consensus connections", + Usage: "Port to listen for consensus connections. May be 0 to let the system select a port.", EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "CONSENSUS_PORT"), Value: 50050, } + AdvertisedFullAddr = &cli.StringFlag{ + Name: "consensus.advertised", + Usage: "Full address (host and port) for other peers to contact the consensus server. Optional: if left empty, the local address is advertised.", + EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "CONSENSUS_ADVERTISED"), + Value: "", + } RaftBootstrap = &cli.BoolFlag{ Name: "raft.bootstrap", Usage: "If this node should bootstrap a new raft cluster", @@ -127,6 +133,7 @@ var requiredFlags = []cli.Flag{ } var optionalFlags = []cli.Flag{ + AdvertisedFullAddr, Paused, RPCEnableProxy, RaftBootstrap, diff --git a/op-e2e/system/conductor/sequencer_failover_setup.go b/op-e2e/system/conductor/sequencer_failover_setup.go index ea515ecc1e64..a4e5178e3d3e 100644 --- a/op-e2e/system/conductor/sequencer_failover_setup.go +++ b/op-e2e/system/conductor/sequencer_failover_setup.go @@ -2,9 +2,8 @@ package conductor import ( "context" + "errors" "fmt" - "math/rand" - "net" "strings" "testing" "time" @@ -51,28 +50,23 @@ const ( var retryStrategy = &retry.FixedStrategy{Dur: 50 * time.Millisecond} type conductor struct { - service *con.OpConductor - client conrpc.API - consensusPort int - rpcPort int + service *con.OpConductor + client conrpc.API } func (c *conductor) ConsensusEndpoint() string { - return fmt.Sprintf("%s:%d", localhost, c.consensusPort) + return c.service.ConsensusEndpoint() } func (c *conductor) RPCEndpoint() string { - return fmt.Sprintf("http://%s:%d", localhost, c.rpcPort) + return c.service.HTTPEndpoint() } func setupSequencerFailoverTest(t *testing.T) (*e2esys.System, map[string]*conductor, func()) { op_e2e.InitParallel(t) ctx := context.Background() - sys, conductors, err := retry.Do2(ctx, maxSetupRetries, retryStrategy, func() (*e2esys.System, map[string]*conductor, error) { - return setupHAInfra(t, ctx) - }) - require.NoError(t, err, "Expected to successfully setup sequencers and conductors after retry") + sys, conductors := setupHAInfra(t, ctx) // form a cluster c1 := conductors[Sequencer1Name] @@ -143,79 +137,80 @@ func setupSequencerFailoverTest(t *testing.T) (*e2esys.System, map[string]*condu } } -func setupHAInfra(t *testing.T, ctx context.Context) (*e2esys.System, map[string]*conductor, error) { +func setupHAInfra(t *testing.T, ctx context.Context) (*e2esys.System, map[string]*conductor) { startTime := time.Now() - - var sys *e2esys.System - var conductors map[string]*conductor - var err error - - // clean up if setup fails due to port in use. defer func() { - if err != nil { - if sys != nil { - sys.Close() - } - - for _, c := range conductors { - if c == nil || c.service == nil { - // pass. Sometimes we can get nil in this map - } else if serr := c.service.Stop(ctx); serr != nil { - t.Log("Failed to stop conductor", "error", serr) - } - } - } t.Logf("setupHAInfra took %s\n", time.Since(startTime)) }() - conductorRpcPorts := map[string]int{ - Sequencer1Name: findAvailablePort(t), - Sequencer2Name: findAvailablePort(t), - Sequencer3Name: findAvailablePort(t), + conductorsReady := map[string]chan string{ + Sequencer1Name: make(chan string, 1), + Sequencer2Name: make(chan string, 1), + Sequencer3Name: make(chan string, 1), } - // 3 sequencers, 1 verifier, 1 active sequencer. - cfg := sequencerFailoverSystemConfig(t, conductorRpcPorts) - if sys, err = cfg.Start(t); err != nil { - return nil, nil, err + // The sequencer op-node & execution engine have to be up first, to get their endpoints running. + // The conductor is then started after, using the endpoints of op-node and execution engine. + // The op-node, while starting, will wait for the conductor to be up and running, to get its endpoint. + // No endpoint is reserved/hardcoded this way, this avoids CI test flakes in the setup. + conductorEndpointFn := func(ctx context.Context, name string) (endpoint string, err error) { + endpointCh, ok := conductorsReady[name] + if !ok { + return "", errors.New("conductor %s is not known") + } + select { + case <-ctx.Done(): + return "", fmt.Errorf("failed to set up conductor timely: %w", err) + case endpoint := <-endpointCh: + return endpoint, nil + } } - // 3 conductors that connects to 1 sequencer each. - conductors = make(map[string]*conductor) + // 3 sequencers, 1 verifier, 1 active sequencer. + cfg := sequencerFailoverSystemConfig(t, conductorEndpointFn) + + // sys is configured to close itself on test cleanup. + sys, err := cfg.Start(t) + require.NoError(t, err, "must start system") + + out := make(map[string]*conductor) + // 3 conductors that connects to 1 sequencer each. // initialize all conductors in paused mode conductorCfgs := []struct { name string - port int bootstrap bool }{ - {Sequencer1Name, conductorRpcPorts[Sequencer1Name], true}, // one in bootstrap mode so that we can form a cluster. - {Sequencer2Name, conductorRpcPorts[Sequencer2Name], false}, - {Sequencer3Name, conductorRpcPorts[Sequencer3Name], false}, + {Sequencer1Name, true}, // one in bootstrap mode so that we can form a cluster. + {Sequencer2Name, false}, + {Sequencer3Name, false}, } for _, cfg := range conductorCfgs { cfg := cfg nodePRC := sys.RollupNodes[cfg.name].UserRPC().RPC() engineRPC := sys.EthInstances[cfg.name].UserRPC().RPC() - if conductors[cfg.name], err = setupConductor(t, cfg.name, t.TempDir(), nodePRC, engineRPC, cfg.port, cfg.bootstrap, *sys.RollupConfig); err != nil { - return nil, nil, err - } + + conduc, err := setupConductor(t, cfg.name, t.TempDir(), nodePRC, engineRPC, cfg.bootstrap, *sys.RollupConfig) + require.NoError(t, err, "failed to set up conductor %s", cfg.name) + out[cfg.name] = conduc + // Signal that the conductor RPC endpoint is ready + conductorsReady[cfg.name] <- conduc.RPCEndpoint() } - return sys, conductors, nil + return sys, out } func setupConductor( t *testing.T, serverID, dir, nodeRPC, engineRPC string, - rpcPort int, bootstrap bool, rollupCfg rollup.Config, ) (*conductor, error) { - consensusPort := findAvailablePort(t) cfg := con.Config{ - ConsensusAddr: localhost, - ConsensusPort: consensusPort, + ConsensusAddr: localhost, + ConsensusPort: 0, // let the system select a port, avoid conflicts + ConsensusAdvertisedAddr: "", // use the local address we bind to + RaftServerID: serverID, RaftStorageDir: dir, RaftBootstrap: bootstrap, @@ -237,17 +232,18 @@ func setupConductor( RollupCfg: rollupCfg, RPCEnableProxy: true, LogConfig: oplog.CLIConfig{ - Level: log.LevelInfo, + Level: log.LevelDebug, Color: false, }, RPC: oprpc.CLIConfig{ ListenAddr: localhost, - ListenPort: rpcPort, + ListenPort: 0, // let the system select a port }, } + logger := testlog.Logger(t, log.LevelDebug) ctx := context.Background() - service, err := con.New(ctx, &cfg, testlog.Logger(t, log.LevelInfo), "0.0.1") + service, err := con.New(ctx, &cfg, logger, "0.0.1") if err != nil { return nil, err } @@ -257,6 +253,8 @@ func setupConductor( return nil, err } + logger.Info("Started conductor", "nodeRPC", nodeRPC, "engineRPC", engineRPC) + rawClient, err := rpc.DialContext(ctx, service.HTTPEndpoint()) if err != nil { return nil, err @@ -265,10 +263,8 @@ func setupConductor( client := conrpc.NewAPIClient(rawClient) return &conductor{ - service: service, - client: client, - consensusPort: consensusPort, - rpcPort: rpcPort, + service: service, + client: client, }, nil } @@ -316,12 +312,18 @@ func setupBatcher(t *testing.T, sys *e2esys.System, conductors map[string]*condu sys.BatchSubmitter = batcher } -func sequencerFailoverSystemConfig(t *testing.T, ports map[string]int) e2esys.SystemConfig { +func sequencerFailoverSystemConfig(t *testing.T, conductorRPCEndpoints func(ctx context.Context, name string) (string, error)) e2esys.SystemConfig { cfg := e2esys.EcotoneSystemConfig(t, new(hexutil.Uint64)) delete(cfg.Nodes, "sequencer") - cfg.Nodes[Sequencer1Name] = sequencerCfg(ports[Sequencer1Name]) - cfg.Nodes[Sequencer2Name] = sequencerCfg(ports[Sequencer2Name]) - cfg.Nodes[Sequencer3Name] = sequencerCfg(ports[Sequencer3Name]) + cfg.Nodes[Sequencer1Name] = sequencerCfg(func(ctx context.Context) (string, error) { + return conductorRPCEndpoints(ctx, Sequencer1Name) + }) + cfg.Nodes[Sequencer2Name] = sequencerCfg(func(ctx context.Context) (string, error) { + return conductorRPCEndpoints(ctx, Sequencer2Name) + }) + cfg.Nodes[Sequencer3Name] = sequencerCfg(func(ctx context.Context) (string, error) { + return conductorRPCEndpoints(ctx, Sequencer3Name) + }) delete(cfg.Loggers, "sequencer") cfg.Loggers[Sequencer1Name] = testlog.Logger(t, log.LevelInfo).New("role", Sequencer1Name) @@ -338,7 +340,7 @@ func sequencerFailoverSystemConfig(t *testing.T, ports map[string]int) e2esys.Sy return cfg } -func sequencerCfg(rpcPort int) *rollupNode.Config { +func sequencerCfg(conductorRPCEndpoint rollupNode.ConductorRPCFunc) *rollupNode.Config { return &rollupNode.Config{ Driver: driver.Config{ VerifierConfDepth: 0, @@ -357,8 +359,8 @@ func sequencerCfg(rpcPort int) *rollupNode.Config { ConfigPersistence: &rollupNode.DisabledConfigPersistence{}, Sync: sync.Config{SyncMode: sync.CLSync}, ConductorEnabled: true, - ConductorRpc: fmt.Sprintf("http://%s:%d", localhost, rpcPort), - ConductorRpcTimeout: 1 * time.Second, + ConductorRpc: conductorRPCEndpoint, + ConductorRpcTimeout: 5 * time.Second, } } @@ -453,26 +455,6 @@ func sequencerActive(t *testing.T, ctx context.Context, rollupClient *sources.Ro return active } -func findAvailablePort(t *testing.T) int { - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() - for { - select { - case <-ctx.Done(): - t.Error("Failed to find available port") - default: - // private / ephemeral ports are in the range 49152-65535 - port := rand.Intn(65535-49152) + 49152 - addr := fmt.Sprintf("127.0.0.1:%d", port) - l, err := net.Listen("tcp", addr) - if err == nil { - l.Close() // Close the listener and return the port if it's available - return port - } - } - } -} - func findLeader(t *testing.T, conductors map[string]*conductor) (string, *conductor) { for id, con := range conductors { if leader(t, context.Background(), con) { diff --git a/op-e2e/system/conductor/sequencer_failover_test.go b/op-e2e/system/conductor/sequencer_failover_test.go index 74047fcc6fdd..5722dc3b82e9 100644 --- a/op-e2e/system/conductor/sequencer_failover_test.go +++ b/op-e2e/system/conductor/sequencer_failover_test.go @@ -103,7 +103,6 @@ func TestSequencerFailover_ConductorRPC(t *testing.T) { t, VerifierName, t.TempDir(), sys.RollupEndpoint(Sequencer3Name).RPC(), sys.NodeEndpoint(Sequencer3Name).RPC(), - findAvailablePort(t), false, *sys.RollupConfig, ) diff --git a/op-node/node/conductor.go b/op-node/node/conductor.go index ff5723889b95..cc45f4bfabc4 100644 --- a/op-node/node/conductor.go +++ b/op-node/node/conductor.go @@ -13,15 +13,17 @@ import ( "github.com/ethereum-optimism/optimism/op-node/rollup/conductor" "github.com/ethereum-optimism/optimism/op-service/dial" "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/locks" "github.com/ethereum-optimism/optimism/op-service/retry" ) // ConductorClient is a client for the op-conductor RPC service. type ConductorClient struct { - cfg *Config - metrics *metrics.Metrics - log log.Logger - apiClient *conductorRpc.APIClient + cfg *Config + metrics *metrics.Metrics + log log.Logger + + apiClient locks.RWValue[*conductorRpc.APIClient] // overrideLeader is used to override the leader check for disaster recovery purposes. // During disaster situations where the cluster is unhealthy (no leader, only 1 or less nodes up), @@ -41,15 +43,23 @@ func NewConductorClient(cfg *Config, log log.Logger, metrics *metrics.Metrics) c } // Initialize initializes the conductor client. -func (c *ConductorClient) initialize() error { - if c.apiClient != nil { +func (c *ConductorClient) initialize(ctx context.Context) error { + c.apiClient.Lock() + defer c.apiClient.Unlock() + if c.apiClient.Value != nil { return nil } - conductorRpcClient, err := dial.DialRPCClientWithTimeout(context.Background(), time.Minute*1, c.log, c.cfg.ConductorRpc) + endpoint, err := retry.Do[string](ctx, 10, retry.Exponential(), func() (string, error) { + return c.cfg.ConductorRpc(ctx) + }) + if err != nil { + return fmt.Errorf("no conductor RPC endpoint available: %w", err) + } + conductorRpcClient, err := dial.DialRPCClientWithTimeout(context.Background(), time.Minute*1, c.log, endpoint) if err != nil { return fmt.Errorf("failed to dial conductor RPC: %w", err) } - c.apiClient = conductorRpc.NewAPIClient(conductorRpcClient) + c.apiClient.Value = conductorRpc.NewAPIClient(conductorRpcClient) return nil } @@ -64,7 +74,7 @@ func (c *ConductorClient) Leader(ctx context.Context) (bool, error) { return true, nil } - if err := c.initialize(); err != nil { + if err := c.initialize(ctx); err != nil { return false, err } ctx, cancel := context.WithTimeout(ctx, c.cfg.ConductorRpcTimeout) @@ -72,8 +82,11 @@ func (c *ConductorClient) Leader(ctx context.Context) (bool, error) { isLeader, err := retry.Do(ctx, 2, retry.Fixed(50*time.Millisecond), func() (bool, error) { record := c.metrics.RecordRPCClientRequest("conductor_leader") - result, err := c.apiClient.Leader(ctx) + result, err := c.apiClient.Get().Leader(ctx) record(err) + if err != nil { + c.log.Error("Failed to check conductor for leadership", "err", err) + } return result, err }) return isLeader, err @@ -85,7 +98,7 @@ func (c *ConductorClient) CommitUnsafePayload(ctx context.Context, payload *eth. return nil } - if err := c.initialize(); err != nil { + if err := c.initialize(ctx); err != nil { return err } ctx, cancel := context.WithTimeout(ctx, c.cfg.ConductorRpcTimeout) @@ -93,7 +106,7 @@ func (c *ConductorClient) CommitUnsafePayload(ctx context.Context, payload *eth. err := retry.Do0(ctx, 2, retry.Fixed(50*time.Millisecond), func() error { record := c.metrics.RecordRPCClientRequest("conductor_commitUnsafePayload") - err := c.apiClient.CommitUnsafePayload(ctx, payload) + err := c.apiClient.Get().CommitUnsafePayload(ctx, payload) record(err) return err }) @@ -107,9 +120,11 @@ func (c *ConductorClient) OverrideLeader(ctx context.Context) error { } func (c *ConductorClient) Close() { - if c.apiClient == nil { + c.apiClient.Lock() + defer c.apiClient.Unlock() + if c.apiClient.Value == nil { return } - c.apiClient.Close() - c.apiClient = nil + c.apiClient.Value.Close() + c.apiClient.Value = nil } diff --git a/op-node/node/config.go b/op-node/node/config.go index a78b55853aa2..6591a756e715 100644 --- a/op-node/node/config.go +++ b/op-node/node/config.go @@ -69,13 +69,16 @@ type Config struct { // Conductor is used to determine this node is the leader sequencer. ConductorEnabled bool - ConductorRpc string + ConductorRpc ConductorRPCFunc ConductorRpcTimeout time.Duration // AltDA config AltDA altda.CLIConfig } +// ConductorRPCFunc retrieves the endpoint. The RPC may not immediately be available. +type ConductorRPCFunc func(ctx context.Context) (string, error) + type RPCConfig struct { ListenAddr string ListenPort int diff --git a/op-node/node/node.go b/op-node/node/node.go index 298c98aa2b18..2727552b8190 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -445,6 +445,7 @@ func (n *OpNode) initRPCServer(cfg *Config) error { if err := server.Start(); err != nil { return fmt.Errorf("unable to start RPC server: %w", err) } + n.log.Info("Started JSON-RPC server", "addr", server.Addr()) n.server = server return nil } diff --git a/op-node/service.go b/op-node/service.go index b24e2a638335..4d12c7f5446f 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -1,6 +1,7 @@ package opnode import ( + "context" "crypto/rand" "encoding/json" "errors" @@ -80,7 +81,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ctx.IsSet(flags.HeartbeatURLFlag.Name) { log.Warn("Heartbeat functionality is not supported anymore, CLI flags will be removed in following release.") } - + conductorRPCEndpoint := ctx.String(flags.ConductorRpcFlag.Name) cfg := &node.Config{ L1: l1Endpoint, L2: l2Endpoint, @@ -108,8 +109,10 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { Sync: *syncConfig, RollupHalt: haltOption, - ConductorEnabled: ctx.Bool(flags.ConductorEnabledFlag.Name), - ConductorRpc: ctx.String(flags.ConductorRpcFlag.Name), + ConductorEnabled: ctx.Bool(flags.ConductorEnabledFlag.Name), + ConductorRpc: func(context.Context) (string, error) { + return conductorRPCEndpoint, nil + }, ConductorRpcTimeout: ctx.Duration(flags.ConductorRpcTimeoutFlag.Name), AltDA: altda.ReadCLIConfig(ctx),