From 33297b8c889d662cc9ff1547919e245a2027db2b Mon Sep 17 00:00:00 2001 From: Tanut Lertwarachai Date: Fri, 20 Dec 2024 12:27:50 +0700 Subject: [PATCH] add band liveliness logic --- internal/relayertest/constants.go | 5 +- internal/relayertest/mocks/band_client.go | 26 +++--- .../relayertest/testdata/custom_config.toml | 1 + .../testdata/custom_config_with_time_str.toml | 1 + .../relayertest/testdata/default_config.toml | 1 + .../testdata/default_with_chain_config.toml | 1 + relayer/app.go | 16 +--- relayer/app_test.go | 2 +- relayer/band/client.go | 87 +++++++++++++------ relayer/band/client_test.go | 22 ++--- relayer/band/config.go | 5 +- relayer/config.go | 5 +- 12 files changed, 95 insertions(+), 77 deletions(-) diff --git a/internal/relayertest/constants.go b/internal/relayertest/constants.go index dc1420d..afd0707 100644 --- a/internal/relayertest/constants.go +++ b/internal/relayertest/constants.go @@ -28,8 +28,9 @@ var CustomCfg = falcon.Config{ LogLevel: "info", }, BandChain: band.Config{ - RpcEndpoints: []string{"http://localhost:26657", "http://localhost:26658"}, - Timeout: 3 * time.Second, + RpcEndpoints: []string{"http://localhost:26657", "http://localhost:26658"}, + Timeout: 3 * time.Second, + LivelinessCheckingInterval: 15 * time.Minute, }, TargetChains: chains.ChainProviderConfigs{ "testnet": &evm.EVMChainProviderConfig{ diff --git a/internal/relayertest/mocks/band_client.go b/internal/relayertest/mocks/band_client.go index 7459471..8a7d21d 100644 --- a/internal/relayertest/mocks/band_client.go +++ b/internal/relayertest/mocks/band_client.go @@ -41,20 +41,6 @@ func (m *MockClient) EXPECT() *MockClientMockRecorder { return m.recorder } -// Connect mocks base method. -func (m *MockClient) Connect(timeout uint) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Connect", timeout) - ret0, _ := ret[0].(error) - return ret0 -} - -// Connect indicates an expected call of Connect. -func (mr *MockClientMockRecorder) Connect(timeout any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Connect", reflect.TypeOf((*MockClient)(nil).Connect), timeout) -} - // GetTunnel mocks base method. func (m *MockClient) GetTunnel(ctx context.Context, tunnelID uint64) (*types.Tunnel, error) { m.ctrl.T.Helper() @@ -99,3 +85,15 @@ func (mr *MockClientMockRecorder) GetTunnels(ctx any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTunnels", reflect.TypeOf((*MockClient)(nil).GetTunnels), ctx) } + +// Init mocks base method. +func (m *MockClient) Init(ctx context.Context) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Init", ctx) +} + +// Init indicates an expected call of Init. +func (mr *MockClientMockRecorder) Init(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockClient)(nil).Init), ctx) +} diff --git a/internal/relayertest/testdata/custom_config.toml b/internal/relayertest/testdata/custom_config.toml index 87e4f0e..db7349e 100644 --- a/internal/relayertest/testdata/custom_config.toml +++ b/internal/relayertest/testdata/custom_config.toml @@ -8,6 +8,7 @@ penalty_exponential_factor = 1.1 [bandchain] rpc_endpoints = ['http://localhost:26657', 'http://localhost:26658'] timeout = 3000000000 +liveliness_checking_interval = 900000000000 [target_chains] [target_chains.testnet] diff --git a/internal/relayertest/testdata/custom_config_with_time_str.toml b/internal/relayertest/testdata/custom_config_with_time_str.toml index efb3cff..8f5dbf3 100644 --- a/internal/relayertest/testdata/custom_config_with_time_str.toml +++ b/internal/relayertest/testdata/custom_config_with_time_str.toml @@ -8,6 +8,7 @@ penalty_exponential_factor = 1.1 [bandchain] rpc_endpoints = ['http://localhost:26657', 'http://localhost:26658'] timeout = '3s' +liveliness_checking_interval = '15m' [target_chains] [target_chains.testnet] diff --git a/internal/relayertest/testdata/default_config.toml b/internal/relayertest/testdata/default_config.toml index 14ec07f..a3354ea 100644 --- a/internal/relayertest/testdata/default_config.toml +++ b/internal/relayertest/testdata/default_config.toml @@ -8,5 +8,6 @@ penalty_exponential_factor = 1.0 [bandchain] rpc_endpoints = ['http://localhost:26657'] timeout = 3000000000 +liveliness_checking_interval = 900000000000 [target_chains] diff --git a/internal/relayertest/testdata/default_with_chain_config.toml b/internal/relayertest/testdata/default_with_chain_config.toml index 7de744a..d8b815c 100644 --- a/internal/relayertest/testdata/default_with_chain_config.toml +++ b/internal/relayertest/testdata/default_with_chain_config.toml @@ -8,6 +8,7 @@ penalty_exponential_factor = 1.0 [bandchain] rpc_endpoints = ['http://localhost:26657'] timeout = 3000000000 +liveliness_checking_interval = 900000000000 [target_chains] [target_chains.testnet] diff --git a/relayer/app.go b/relayer/app.go index 5f78b5e..5099363 100644 --- a/relayer/app.go +++ b/relayer/app.go @@ -9,7 +9,6 @@ import ( "os" "path" - cosmosclient "github.com/cosmos/cosmos-sdk/client" "github.com/joho/godotenv" "github.com/pelletier/go-toml/v2" "github.com/spf13/viper" @@ -82,9 +81,8 @@ func (a *App) Init(ctx context.Context) error { // initialize band client if a.Config != nil { - if err := a.initBandClient(); err != nil { - return err - } + a.BandClient = band.NewClient(nil, a.Log, &a.Config.BandChain) + a.BandClient.Init(ctx) } a.EnvPassphrase = a.loadEnvPassphrase() @@ -92,16 +90,6 @@ func (a *App) Init(ctx context.Context) error { return nil } -// initBandClient establishes connection to rpc endpoints. -func (a *App) initBandClient() error { - c := band.NewClient(cosmosclient.Context{}, nil, a.Log, a.Config.BandChain.RpcEndpoints) - if err := c.Connect(uint(a.Config.BandChain.Timeout)); err != nil { - return err - } - a.BandClient = c - return nil -} - // InitLogger initializes the logger with the given log level. func (a *App) initLogger(configLogLevel string) error { logLevel := a.Viper.GetString("log-level") diff --git a/relayer/app_test.go b/relayer/app_test.go index ec21785..6faae1b 100644 --- a/relayer/app_test.go +++ b/relayer/app_test.go @@ -54,7 +54,7 @@ func (s *AppTestSuite) SetupTest() { s.chainProvider.EXPECT().Init(gomock.Any()).Return(nil).AnyTimes() cfg := relayer.Config{ - BandChain: band.Config{}, + BandChain: band.Config{LivelinessCheckingInterval: 15 * time.Minute}, TargetChains: map[string]chains.ChainProviderConfig{ "testnet_evm": s.chainProviderConfig, }, diff --git a/relayer/band/client.go b/relayer/band/client.go index bfe5400..fefb399 100644 --- a/relayer/band/client.go +++ b/relayer/band/client.go @@ -3,6 +3,7 @@ package band import ( "context" "fmt" + "time" httpclient "github.com/cometbft/cometbft/rpc/client/http" cosmosclient "github.com/cosmos/cosmos-sdk/client" @@ -20,15 +21,16 @@ var _ Client = &client{} // Client is the interface to interact with the BandChain. type Client interface { + // Init initializes the BandChain client by connecting to the chain and starting + // periodic liveliness checks. + Init(ctx context.Context) + // GetTunnelPacket returns the packet with the given tunnelID and sequence. GetTunnelPacket(ctx context.Context, tunnelID uint64, sequence uint64) (*types.Packet, error) // GetTunnel returns the tunnel with the given tunnelID. GetTunnel(ctx context.Context, tunnelID uint64) (*types.Tunnel, error) - // Connect will establish connection to rpc endpoints - Connect(timeout uint) error - // GetTunnels returns all tunnel in band chain. GetTunnels(ctx context.Context) ([]types.Tunnel, error) } @@ -52,25 +54,41 @@ func NewQueryClient( // client is the BandChain client struct. type client struct { - Context cosmosclient.Context - QueryClient *QueryClient - Log *zap.Logger - RpcEndpoints []string + Context cosmosclient.Context + QueryClient *QueryClient + Log *zap.Logger + Config *Config } // NewClient creates a new BandChain client instance. -func NewClient(ctx cosmosclient.Context, queryClient *QueryClient, log *zap.Logger, rpcEndpoints []string) Client { +func NewClient(queryClient *QueryClient, log *zap.Logger, bandChainCfg *Config) Client { + encodingConfig := MakeEncodingConfig() + ctx := cosmosclient.Context{}. + WithCodec(encodingConfig.Marshaler). + WithInterfaceRegistry(encodingConfig.InterfaceRegistry) + return &client{ - Context: ctx, - QueryClient: queryClient, - Log: log, - RpcEndpoints: rpcEndpoints, + Context: ctx, + QueryClient: queryClient, + Log: log, + Config: bandChainCfg, } } -// Connect connects to the Band chain using the provided RPC endpoints. -func (c *client) Connect(timeout uint) error { - for _, rpcEndpoint := range c.RpcEndpoints { +// Init initializes the BandChain client by connecting to the chain and starting +// periodic liveliness checks. +func (c *client) Init(ctx context.Context) { + timeout := uint(c.Config.Timeout) + if err := c.connect(timeout); err != nil { + c.Log.Debug("Failed to connect to BandChain", zap.Error(err)) + } + + go c.startLivelinessCheck(ctx, timeout, c.Config.LivelinessCheckingInterval) +} + +// connect connects to the BandChain using the provided RPC endpoints. +func (c *client) connect(timeout uint) error { + for _, rpcEndpoint := range c.Config.RpcEndpoints { // Create a new HTTP client for the specified node URI client, err := httpclient.NewWithTimeout(rpcEndpoint, "/websocket", timeout) if err != nil { @@ -84,22 +102,41 @@ func (c *client) Connect(timeout uint) error { continue // Try the next endpoint if starting the client fails } - // Create a new client context and configure it with necessary parameters - encodingConfig := MakeEncodingConfig() - ctx := cosmosclient.Context{}. - WithClient(client). - WithCodec(encodingConfig.Marshaler). - WithInterfaceRegistry(encodingConfig.InterfaceRegistry) - - c.Context = ctx - c.QueryClient = NewQueryClient(tunneltypes.NewQueryClient(ctx), bandtsstypes.NewQueryClient(ctx)) + c.Context.Client = client + c.Context.NodeURI = rpcEndpoint + c.QueryClient = NewQueryClient(tunneltypes.NewQueryClient(c.Context), bandtsstypes.NewQueryClient(c.Context)) c.Log.Info("Connected to Band chain", zap.String("endpoint", rpcEndpoint)) return nil } + return fmt.Errorf("no available RPC endpoint") +} - return nil +// StartLivelinessCheck starts the liveliness check for the BandChain. +func (c *client) startLivelinessCheck(ctx context.Context, timeout uint, interval time.Duration) { + ticker := time.NewTicker(interval) + for { + select { + case <-ctx.Done(): + c.Log.Info("Stopping liveliness check") + + ticker.Stop() + + return + case <-ticker.C: + if _, err := c.Context.Client.Status(ctx); err != nil { + c.Log.Error( + "BandChain client disconnected", + zap.String("rpcEndpoint", c.Context.NodeURI), + zap.Error(err), + ) + if err := c.connect(timeout); err != nil { + c.Log.Error("Liveliness check: unable to reconnect to any endpoints", zap.Error(err)) + } + } + } + } } // GetTunnel gets tunnel info from band client diff --git a/relayer/band/client_test.go b/relayer/band/client_test.go index 52a1462..1d9fd1f 100644 --- a/relayer/band/client_test.go +++ b/relayer/band/client_test.go @@ -7,7 +7,6 @@ import ( "cosmossdk.io/math" cmbytes "github.com/cometbft/cometbft/libs/bytes" - cosmosclient "github.com/cosmos/cosmos-sdk/client" codectypes "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/gogoproto/proto" @@ -50,6 +49,11 @@ func (s *AppTestSuite) SetupTest() { s.log = log s.tunnelQueryClient = mocks.NewMockTunnelQueryClient(ctrl) s.bandtssQueryClient = mocks.NewMockBandtssQueryClient(ctrl) + s.client = band.NewClient( + band.NewQueryClient(s.tunnelQueryClient, s.bandtssQueryClient), + s.log, + &band.Config{LivelinessCheckingInterval: 15 * time.Minute}, + ) s.ctx = context.Background() } @@ -88,14 +92,6 @@ func (s *AppTestSuite) TestGetTunnel() { s.tunnelQueryClient.EXPECT().Tunnel(s.ctx, &tunneltypes.QueryTunnelRequest{ TunnelId: uint64(1), }).Return(queryResponse, nil) - encodingConfig := band.MakeEncodingConfig() - s.client = band.NewClient( - cosmosclient.Context{}. - WithCodec(encodingConfig.Marshaler). - WithInterfaceRegistry(encodingConfig.InterfaceRegistry), - band.NewQueryClient(s.tunnelQueryClient, s.bandtssQueryClient), - s.log, - []string{}) expected := bandclienttypes.NewTunnel(1, 100, "0xe00F1f85abDB2aF6760759547d450da68CE66Bb1", "eth", false) @@ -181,14 +177,6 @@ func (s *AppTestSuite) TestGetTunnelPacket() { ) // actual result - encodingConfig := band.MakeEncodingConfig() - s.client = band.NewClient( - cosmosclient.Context{}. - WithCodec(encodingConfig.Marshaler). - WithInterfaceRegistry(encodingConfig.InterfaceRegistry), - band.NewQueryClient(s.tunnelQueryClient, s.bandtssQueryClient), - s.log, - []string{}) actual, err := s.client.GetTunnelPacket(s.ctx, uint64(1), uint64(100)) s.Require().NoError(err) s.Require().Equal(expected, actual) diff --git a/relayer/band/config.go b/relayer/band/config.go index 536c34a..5de9a6f 100644 --- a/relayer/band/config.go +++ b/relayer/band/config.go @@ -4,6 +4,7 @@ import "time" // Config defines the configuration for the BandChain client. type Config struct { - RpcEndpoints []string `mapstructure:"rpc_endpoints" toml:"rpc_endpoints"` - Timeout time.Duration `mapstructure:"timeout" toml:"timeout"` + RpcEndpoints []string `mapstructure:"rpc_endpoints" toml:"rpc_endpoints"` + Timeout time.Duration `mapstructure:"timeout" toml:"timeout"` + LivelinessCheckingInterval time.Duration `mapstructure:"liveliness_checking_interval" toml:"liveliness_checking_interval"` } diff --git a/relayer/config.go b/relayer/config.go index 484028c..cb14295 100644 --- a/relayer/config.go +++ b/relayer/config.go @@ -128,8 +128,9 @@ func ParseConfig(wrappedCfg *ConfigInputWrapper) (*Config, error) { func DefaultConfig() *Config { return &Config{ BandChain: band.Config{ - RpcEndpoints: []string{"http://localhost:26657"}, - Timeout: 3 * time.Second, + RpcEndpoints: []string{"http://localhost:26657"}, + Timeout: 3 * time.Second, + LivelinessCheckingInterval: 15 * time.Minute, }, TargetChains: make(map[string]chains.ChainProviderConfig), Global: GlobalConfig{