Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add exponential retry mechanism for rpc requests using utility function #593

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
rename maxElapsedTime consts has timeout
  • Loading branch information
najeal committed Dec 27, 2024
commit e91355d4500824f4e88c4cbc91ff5ee3618157d3
4 changes: 2 additions & 2 deletions relayer/application_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

const (
retryMaxElapsedTime = 10 * time.Second
retryTimeout = 10 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were making a max of 5 attempts with 10s in between each. Can we make the timeout 60s to match closer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we were waiting time.Duration(signatureRequestRetryWaitPeriodMs/maxRelayerQueryAttempts) * time.Millisecond
-> (10_000 / 5) milliseconds -> 2_000 milliseconds each round

)

// Errors
Expand Down Expand Up @@ -280,7 +280,7 @@ func (r *ApplicationRelayer) createSignedMessage(
r.signingSubnetID.String(),
)
}
err = utils.WithRetriesTimeout(r.logger, operation, retryMaxElapsedTime)
err = utils.WithRetriesTimeout(r.logger, operation, retryTimeout)
if err != nil {
r.logger.Error(
"Failed to get aggregate signature from node endpoint.",
Expand Down
8 changes: 4 additions & 4 deletions relayer/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
)

const (
retryMaxSubscribeElapsedTime = 10 * time.Second
retrySubscribeTimeout = 10 * time.Second
// TODO attempt to resubscribe in perpetuity once we are able to process missed blocks and
// refresh the chain config on reconnect.
retryMaxResubscribeElapsedTime = 10 * time.Second
retryResubscribeTimeout = 10 * time.Second
)

// Listener handles all messages sent from a given source chain
Expand Down Expand Up @@ -138,7 +138,7 @@ func newListener(

// Open the subscription. We must do this before processing any missed messages, otherwise we may
// miss an incoming message in between fetching the latest block and subscribing.
err = lstnr.Subscriber.Subscribe(retryMaxSubscribeElapsedTime)
err = lstnr.Subscriber.Subscribe(retrySubscribeTimeout)
if err != nil {
logger.Error(
"Failed to subscribe to node",
Expand Down Expand Up @@ -229,7 +229,7 @@ func (lstnr *Listener) processLogs(ctx context.Context) error {
// Sets the listener health status to false while attempting to reconnect.
func (lstnr *Listener) reconnectToSubscriber() error {
// Attempt to reconnect the subscription
err := lstnr.Subscriber.Subscribe(retryMaxResubscribeElapsedTime)
err := lstnr.Subscriber.Subscribe(retryResubscribeTimeout)
if err != nil {
return fmt.Errorf("failed to resubscribe to node: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions signature-aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type blsSignatureBuf [bls.SignatureLen]byte
const (
// Maximum amount of time to spend waiting (in addition to network round trip time per attempt)
// during relayer signature query routine
signatureRequestMaxElapsedTime = 20 * time.Second
signatureRequestTimeout = 20 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, we were waiting 20s between checks, which we did 10 times, so lets make this ~200s

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the same calcul here

)

var (
Expand Down Expand Up @@ -327,7 +327,7 @@ func (s *SignatureAggregator) CreateSignedMessage(
return errNotEnoughSignatures
}

err = utils.WithRetriesTimeout(s.logger, operation, signatureRequestMaxElapsedTime)
err = utils.WithRetriesTimeout(s.logger, operation, signatureRequestTimeout)
if err != nil {
s.logger.Warn(
"Failed to collect a threshold of signatures",
Expand Down
6 changes: 3 additions & 3 deletions utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
)

// WithRetriesTimeout uses an exponential backoff to run the operation until it
// succeeds or max elapsed time has been reached.
// succeeds or timeout limit has been reached.
func WithRetriesTimeout(
logger logging.Logger,
operation backoff.Operation,
maxElapsedTime time.Duration,
timeout time.Duration,
) error {
expBackOff := backoff.NewExponentialBackOff(
backoff.WithMaxElapsedTime(maxElapsedTime),
backoff.WithMaxElapsedTime(timeout),
)
notify := func(err error, duration time.Duration) {
logger.Warn("operation failed, retrying...")
Expand Down
12 changes: 6 additions & 6 deletions vms/evm/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
// Max buffer size for ethereum subscription channels
maxClientSubscriptionBuffer = 20000
MaxBlocksPerRequest = 200
resubscribeMaxElapsedTime = 5 * time.Second
resubscribeTimeout = 5 * time.Second
)

// subscriber implements Subscriber
Expand Down Expand Up @@ -145,28 +145,28 @@ func (s *subscriber) getHeaderByNumberRetryable(headerNumber *big.Int) (*types.H
}

// Loops forever iff maxResubscribeAttempts == 0
func (s *subscriber) Subscribe(retryMaxElapsedTime time.Duration) error {
func (s *subscriber) Subscribe(retryTimeout time.Duration) error {
// Unsubscribe before resubscribing
// s.sub should only be nil on the first call to Subscribe
if s.sub != nil {
s.sub.Unsubscribe()
}

err := s.subscribe(retryMaxElapsedTime)
err := s.subscribe(retryTimeout)
if err != nil {
return errors.New("failed to subscribe to node")
}
return nil
}

// subscribe until it succeeds or reached maxSubscribeAttempts.
func (s *subscriber) subscribe(retryMaxElapsedTime time.Duration) error {
// subscribe until it succeeds or reached timeout.
func (s *subscriber) subscribe(retryTimeout time.Duration) error {
var sub interfaces.Subscription
operation := func() (err error) {
sub, err = s.wsClient.SubscribeNewHead(context.Background(), s.headers)
return err
}
err := utils.WithRetriesTimeout(s.logger, operation, retryMaxElapsedTime)
err := utils.WithRetriesTimeout(s.logger, operation, retryTimeout)
if err != nil {
s.logger.Error(
"Failed to subscribe to node",
Expand Down
2 changes: 1 addition & 1 deletion vms/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Subscriber interface {
// Subscribe registers a subscription. After Subscribe is called,
// log events that match [filter] are written to the channel returned
// by Logs
Subscribe(retryMaxElapsedTime time.Duration) error
Subscribe(retryTimeout time.Duration) error

// Headers returns the channel that the subscription writes block headers to
Headers() <-chan *types.Header
Expand Down