Skip to content

Commit

Permalink
add band liveliness logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Tanut Lertwarachai authored and Tanut Lertwarachai committed Dec 20, 2024
1 parent e101efd commit 33297b8
Show file tree
Hide file tree
Showing 12 changed files with 95 additions and 77 deletions.
5 changes: 3 additions & 2 deletions internal/relayertest/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ var CustomCfg = falcon.Config{
LogLevel: "info",
},
BandChain: band.Config{
RpcEndpoints: []string{"http://localhost:26657", "http://localhost:26658"},
Timeout: 3 * time.Second,
RpcEndpoints: []string{"http://localhost:26657", "http://localhost:26658"},
Timeout: 3 * time.Second,
LivelinessCheckingInterval: 15 * time.Minute,
},
TargetChains: chains.ChainProviderConfigs{
"testnet": &evm.EVMChainProviderConfig{
Expand Down
26 changes: 12 additions & 14 deletions internal/relayertest/mocks/band_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/relayertest/testdata/custom_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ penalty_exponential_factor = 1.1
[bandchain]
rpc_endpoints = ['http://localhost:26657', 'http://localhost:26658']
timeout = 3000000000
liveliness_checking_interval = 900000000000

[target_chains]
[target_chains.testnet]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ penalty_exponential_factor = 1.1
[bandchain]
rpc_endpoints = ['http://localhost:26657', 'http://localhost:26658']
timeout = '3s'
liveliness_checking_interval = '15m'

[target_chains]
[target_chains.testnet]
Expand Down
1 change: 1 addition & 0 deletions internal/relayertest/testdata/default_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ penalty_exponential_factor = 1.0
[bandchain]
rpc_endpoints = ['http://localhost:26657']
timeout = 3000000000
liveliness_checking_interval = 900000000000

[target_chains]
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ penalty_exponential_factor = 1.0
[bandchain]
rpc_endpoints = ['http://localhost:26657']
timeout = 3000000000
liveliness_checking_interval = 900000000000

[target_chains]
[target_chains.testnet]
Expand Down
16 changes: 2 additions & 14 deletions relayer/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"os"
"path"

cosmosclient "github.com/cosmos/cosmos-sdk/client"
"github.com/joho/godotenv"
"github.com/pelletier/go-toml/v2"
"github.com/spf13/viper"
Expand Down Expand Up @@ -82,26 +81,15 @@ func (a *App) Init(ctx context.Context) error {

// initialize band client
if a.Config != nil {
if err := a.initBandClient(); err != nil {
return err
}
a.BandClient = band.NewClient(nil, a.Log, &a.Config.BandChain)
a.BandClient.Init(ctx)
}

a.EnvPassphrase = a.loadEnvPassphrase()

return nil
}

// initBandClient establishes connection to rpc endpoints.
func (a *App) initBandClient() error {
c := band.NewClient(cosmosclient.Context{}, nil, a.Log, a.Config.BandChain.RpcEndpoints)
if err := c.Connect(uint(a.Config.BandChain.Timeout)); err != nil {
return err
}
a.BandClient = c
return nil
}

// InitLogger initializes the logger with the given log level.
func (a *App) initLogger(configLogLevel string) error {
logLevel := a.Viper.GetString("log-level")
Expand Down
2 changes: 1 addition & 1 deletion relayer/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *AppTestSuite) SetupTest() {
s.chainProvider.EXPECT().Init(gomock.Any()).Return(nil).AnyTimes()

cfg := relayer.Config{
BandChain: band.Config{},
BandChain: band.Config{LivelinessCheckingInterval: 15 * time.Minute},
TargetChains: map[string]chains.ChainProviderConfig{
"testnet_evm": s.chainProviderConfig,
},
Expand Down
87 changes: 62 additions & 25 deletions relayer/band/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package band
import (
"context"
"fmt"
"time"

httpclient "github.com/cometbft/cometbft/rpc/client/http"
cosmosclient "github.com/cosmos/cosmos-sdk/client"
Expand All @@ -20,15 +21,16 @@ var _ Client = &client{}

// Client is the interface to interact with the BandChain.
type Client interface {
// Init initializes the BandChain client by connecting to the chain and starting
// periodic liveliness checks.
Init(ctx context.Context)

// GetTunnelPacket returns the packet with the given tunnelID and sequence.
GetTunnelPacket(ctx context.Context, tunnelID uint64, sequence uint64) (*types.Packet, error)

// GetTunnel returns the tunnel with the given tunnelID.
GetTunnel(ctx context.Context, tunnelID uint64) (*types.Tunnel, error)

// Connect will establish connection to rpc endpoints
Connect(timeout uint) error

// GetTunnels returns all tunnel in band chain.
GetTunnels(ctx context.Context) ([]types.Tunnel, error)
}
Expand All @@ -52,25 +54,41 @@ func NewQueryClient(

// client is the BandChain client struct.
type client struct {
Context cosmosclient.Context
QueryClient *QueryClient
Log *zap.Logger
RpcEndpoints []string
Context cosmosclient.Context
QueryClient *QueryClient
Log *zap.Logger
Config *Config
}

// NewClient creates a new BandChain client instance.
func NewClient(ctx cosmosclient.Context, queryClient *QueryClient, log *zap.Logger, rpcEndpoints []string) Client {
func NewClient(queryClient *QueryClient, log *zap.Logger, bandChainCfg *Config) Client {
encodingConfig := MakeEncodingConfig()
ctx := cosmosclient.Context{}.
WithCodec(encodingConfig.Marshaler).
WithInterfaceRegistry(encodingConfig.InterfaceRegistry)

return &client{
Context: ctx,
QueryClient: queryClient,
Log: log,
RpcEndpoints: rpcEndpoints,
Context: ctx,
QueryClient: queryClient,
Log: log,
Config: bandChainCfg,
}
}

// Connect connects to the Band chain using the provided RPC endpoints.
func (c *client) Connect(timeout uint) error {
for _, rpcEndpoint := range c.RpcEndpoints {
// Init initializes the BandChain client by connecting to the chain and starting
// periodic liveliness checks.
func (c *client) Init(ctx context.Context) {
timeout := uint(c.Config.Timeout)
if err := c.connect(timeout); err != nil {
c.Log.Debug("Failed to connect to BandChain", zap.Error(err))
}

go c.startLivelinessCheck(ctx, timeout, c.Config.LivelinessCheckingInterval)
}

// connect connects to the BandChain using the provided RPC endpoints.
func (c *client) connect(timeout uint) error {
for _, rpcEndpoint := range c.Config.RpcEndpoints {
// Create a new HTTP client for the specified node URI
client, err := httpclient.NewWithTimeout(rpcEndpoint, "/websocket", timeout)
if err != nil {
Expand All @@ -84,22 +102,41 @@ func (c *client) Connect(timeout uint) error {
continue // Try the next endpoint if starting the client fails
}

// Create a new client context and configure it with necessary parameters
encodingConfig := MakeEncodingConfig()
ctx := cosmosclient.Context{}.
WithClient(client).
WithCodec(encodingConfig.Marshaler).
WithInterfaceRegistry(encodingConfig.InterfaceRegistry)

c.Context = ctx
c.QueryClient = NewQueryClient(tunneltypes.NewQueryClient(ctx), bandtsstypes.NewQueryClient(ctx))
c.Context.Client = client
c.Context.NodeURI = rpcEndpoint
c.QueryClient = NewQueryClient(tunneltypes.NewQueryClient(c.Context), bandtsstypes.NewQueryClient(c.Context))

c.Log.Info("Connected to Band chain", zap.String("endpoint", rpcEndpoint))

return nil
}
return fmt.Errorf("no available RPC endpoint")
}

return nil
// StartLivelinessCheck starts the liveliness check for the BandChain.
func (c *client) startLivelinessCheck(ctx context.Context, timeout uint, interval time.Duration) {
ticker := time.NewTicker(interval)
for {
select {
case <-ctx.Done():
c.Log.Info("Stopping liveliness check")

ticker.Stop()

return
case <-ticker.C:
if _, err := c.Context.Client.Status(ctx); err != nil {
c.Log.Error(
"BandChain client disconnected",
zap.String("rpcEndpoint", c.Context.NodeURI),
zap.Error(err),
)
if err := c.connect(timeout); err != nil {
c.Log.Error("Liveliness check: unable to reconnect to any endpoints", zap.Error(err))
}
}
}
}
}

// GetTunnel gets tunnel info from band client
Expand Down
22 changes: 5 additions & 17 deletions relayer/band/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"cosmossdk.io/math"
cmbytes "github.com/cometbft/cometbft/libs/bytes"
cosmosclient "github.com/cosmos/cosmos-sdk/client"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/gogoproto/proto"
Expand Down Expand Up @@ -50,6 +49,11 @@ func (s *AppTestSuite) SetupTest() {
s.log = log
s.tunnelQueryClient = mocks.NewMockTunnelQueryClient(ctrl)
s.bandtssQueryClient = mocks.NewMockBandtssQueryClient(ctrl)
s.client = band.NewClient(
band.NewQueryClient(s.tunnelQueryClient, s.bandtssQueryClient),
s.log,
&band.Config{LivelinessCheckingInterval: 15 * time.Minute},
)
s.ctx = context.Background()
}

Expand Down Expand Up @@ -88,14 +92,6 @@ func (s *AppTestSuite) TestGetTunnel() {
s.tunnelQueryClient.EXPECT().Tunnel(s.ctx, &tunneltypes.QueryTunnelRequest{
TunnelId: uint64(1),
}).Return(queryResponse, nil)
encodingConfig := band.MakeEncodingConfig()
s.client = band.NewClient(
cosmosclient.Context{}.
WithCodec(encodingConfig.Marshaler).
WithInterfaceRegistry(encodingConfig.InterfaceRegistry),
band.NewQueryClient(s.tunnelQueryClient, s.bandtssQueryClient),
s.log,
[]string{})

expected := bandclienttypes.NewTunnel(1, 100, "0xe00F1f85abDB2aF6760759547d450da68CE66Bb1", "eth", false)

Expand Down Expand Up @@ -181,14 +177,6 @@ func (s *AppTestSuite) TestGetTunnelPacket() {
)

// actual result
encodingConfig := band.MakeEncodingConfig()
s.client = band.NewClient(
cosmosclient.Context{}.
WithCodec(encodingConfig.Marshaler).
WithInterfaceRegistry(encodingConfig.InterfaceRegistry),
band.NewQueryClient(s.tunnelQueryClient, s.bandtssQueryClient),
s.log,
[]string{})
actual, err := s.client.GetTunnelPacket(s.ctx, uint64(1), uint64(100))
s.Require().NoError(err)
s.Require().Equal(expected, actual)
Expand Down
5 changes: 3 additions & 2 deletions relayer/band/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import "time"

// Config defines the configuration for the BandChain client.
type Config struct {
RpcEndpoints []string `mapstructure:"rpc_endpoints" toml:"rpc_endpoints"`
Timeout time.Duration `mapstructure:"timeout" toml:"timeout"`
RpcEndpoints []string `mapstructure:"rpc_endpoints" toml:"rpc_endpoints"`
Timeout time.Duration `mapstructure:"timeout" toml:"timeout"`
LivelinessCheckingInterval time.Duration `mapstructure:"liveliness_checking_interval" toml:"liveliness_checking_interval"`
}
5 changes: 3 additions & 2 deletions relayer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,9 @@ func ParseConfig(wrappedCfg *ConfigInputWrapper) (*Config, error) {
func DefaultConfig() *Config {
return &Config{
BandChain: band.Config{
RpcEndpoints: []string{"http://localhost:26657"},
Timeout: 3 * time.Second,
RpcEndpoints: []string{"http://localhost:26657"},
Timeout: 3 * time.Second,
LivelinessCheckingInterval: 15 * time.Minute,
},
TargetChains: make(map[string]chains.ChainProviderConfig),
Global: GlobalConfig{
Expand Down

0 comments on commit 33297b8

Please sign in to comment.