Skip to content

Commit

Permalink
Change the default watcher used by the observer (Consensys#97)
Browse files Browse the repository at this point in the history
* Refactor common retry options

* Change default watcher to finalised event watcher
  • Loading branch information
ermyas authored Feb 9, 2022
1 parent 958ac0f commit 1a521a2
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ipfs/go-datastore"
badgerds "github.com/ipfs/go-ds-badger"
"log"
"time"
)

// EventWatcher listens to blockchain events
Expand Down Expand Up @@ -111,29 +109,11 @@ type BlockHeadProducer interface {
SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error)
}

// RetryOpts encapsulates configuration for retry operations
type RetryOpts struct {
retry uint // number of attempt to make if saving progress to ds fails
retryDelay time.Duration // delay between each retry attempt
}

// WatcherProgressDsOpts encapsulates configuration details for persisting the progress of a watcher
type WatcherProgressDsOpts struct {
ds *badgerds.Datastore // datastore for persisting the progress of a watcher
dsProgKey datastore.Key // specific key used in a KV datastore, for storing the latest progress
RetryOpts // configuration for how retries will be performed if persisting progress fails
}

var DefaultWatcherProgressDsOpts = WatcherProgressDsOpts{
RetryOpts: RetryOpts{
retry: 3,
retryDelay: 500 * time.Millisecond,
},
}

var DefaultEventHandleRetryOpts = RetryOpts{
retry: 4,
retryDelay: 500 * time.Millisecond,
ds datastore.Datastore // datastore for persisting the progress of a watcher
dsProgKey datastore.Key // specific key used in a KV datastore, for storing the latest progress
FailureRetryOpts // configuration for how retries will be performed if persisting progress fails
}

// SFCCrossCallFinalisedEventWatcher listens to events from a 'Simple Function Call' bridge and processes them only once they are
Expand All @@ -143,7 +123,7 @@ type SFCCrossCallFinalisedEventWatcher struct {
EventWatcherOpts
WatcherProgressOpts WatcherProgressDsOpts
// EventHandleRetryOpts specifies how retries will be attempted if fetching or processing events fails
EventHandleRetryOpts RetryOpts
EventHandleRetryOpts FailureRetryOpts
SfcContract *functioncall.Sfc
confirmationsForFinality uint64 // the number of block confirmations required before an event is considered 'final'.
client BlockHeadProducer
Expand Down Expand Up @@ -238,8 +218,8 @@ func (l *SFCCrossCallFinalisedEventWatcher) handleEventsWithRetry(startBlock uin
l.handleEvents(finalisedEvs)
return nil
},
retry.Attempts(l.EventHandleRetryOpts.retry),
retry.Delay(l.EventHandleRetryOpts.retryDelay),
retry.Attempts(l.EventHandleRetryOpts.RetryAttempts),
retry.Delay(l.EventHandleRetryOpts.RetryDelay),
retry.OnRetry(func(attempt uint, err error) {
logging.Error("Error processing finalised events in blocks %d-%d. Retry attempt: %d, error: %v", startBlock,
lastBlock, attempt+1, err)
Expand All @@ -253,8 +233,8 @@ func (l *SFCCrossCallFinalisedEventWatcher) saveProgressToDsWithRetry(lastFinali
func() error {
return l.WatcherProgressOpts.ds.Put(context.Background(), l.WatcherProgressOpts.dsProgKey, uintToBytes(lastFinalisedBlock))
},
retry.Attempts(l.WatcherProgressOpts.retry),
retry.Delay(l.WatcherProgressOpts.retryDelay),
retry.Attempts(l.WatcherProgressOpts.RetryAttempts),
retry.Delay(l.WatcherProgressOpts.RetryDelay),
retry.OnRetry(func(attempt uint, err error) {
logging.Error("Error persisting watcher progress to db. Last finalised block: %d, Retry Attempt: %d, Error: %v",
lastFinalisedBlock, attempt+1, err)
Expand Down Expand Up @@ -289,7 +269,7 @@ func (l *SFCCrossCallFinalisedEventWatcher) setNextBlockToProcess() {
// NewSFCCrossCallFinalisedEventWatcher creates an `SFCCrossCall` event watcher that processes events only once they receive sufficient confirmations.
// Note: 1 block confirmation means the instant the transaction generating the event is mined.
func NewSFCCrossCallFinalisedEventWatcher(watcherOpts EventWatcherOpts, watchProgressDbOpts WatcherProgressDsOpts,
handlerRetryOpts RetryOpts, confirmsForFinality uint64,
handlerRetryOpts FailureRetryOpts, confirmsForFinality uint64,
contract *functioncall.Sfc, client BlockHeadProducer) (*SFCCrossCallFinalisedEventWatcher, error) {
if confirmsForFinality < 1 {
return nil, fmt.Errorf("block confirmationsForFinality cannot be less than 1. supplied value: %d", confirmsForFinality)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ func (m *MockEventHandler) Handle(event interface{}) {
m.Called(event)
}

var fixWatcherProgressDsOpts = WatcherProgressDsOpts{
FailureRetryOpts: FailureRetryOpts{
RetryAttempts: 3,
RetryDelay: 500 * time.Millisecond,
},
}
var fixEventHandleRetryOpts = DefaultRetryOptions

func TestSFCCrossCallRealtimeEventWatcher(t *testing.T) {
simBackend, auth := simulatedBackend(t)
contract := deployContract(t, simBackend, auth)
Expand Down Expand Up @@ -97,13 +105,13 @@ func TestSFCCrossCallRealtimeEventWatcher_RemovedEvent(t *testing.T) {
func TestSFCCrossCallFinalisedEventWatcher_FailsIfConfirmationTooLow(t *testing.T) {
handler := new(MockEventHandler)
opts := EventWatcherOpts{EventHandler: handler}
_, err := NewSFCCrossCallFinalisedEventWatcher(opts, DefaultWatcherProgressDsOpts, DefaultEventHandleRetryOpts, 0, nil, nil)
_, err := NewSFCCrossCallFinalisedEventWatcher(opts, fixWatcherProgressDsOpts, fixEventHandleRetryOpts, 0, nil, nil)
assert.NotNil(t, err)
}

func TestSFCCrossCallFinalisedEventWatcher_FailsIfEventHandlerNil(t *testing.T) {
opts := EventWatcherOpts{EventHandler: nil}
_, err := NewSFCCrossCallFinalisedEventWatcher(opts, DefaultWatcherProgressDsOpts, DefaultEventHandleRetryOpts, 2, nil, nil)
_, err := NewSFCCrossCallFinalisedEventWatcher(opts, fixWatcherProgressDsOpts, fixEventHandleRetryOpts, 2, nil, nil)
assert.NotNil(t, err)
}

Expand Down Expand Up @@ -132,7 +140,7 @@ func TestSFCCrossCallFinalisedEventWatcher(t *testing.T) {
}

progOpts.dsProgKey = datastore.NewKey(k)
watcher, e := NewSFCCrossCallFinalisedEventWatcher(opts, progOpts, DefaultEventHandleRetryOpts, v.confirmations, contract, simBackend)
watcher, e := NewSFCCrossCallFinalisedEventWatcher(opts, progOpts, fixEventHandleRetryOpts, v.confirmations, contract, simBackend)
assert.Nil(t, e)
go watcher.Watch()

Expand Down Expand Up @@ -184,7 +192,7 @@ func TestSFCCrossCallFinalisedEventWatcher_MultipleBlocksFinalised(t *testing.T)
EventHandler: handler,
}
progOpts.dsProgKey = datastore.NewKey(k)
watcher, e := NewSFCCrossCallFinalisedEventWatcher(opts, progOpts, DefaultEventHandleRetryOpts, v.confirmations, contract, simBackend)
watcher, e := NewSFCCrossCallFinalisedEventWatcher(opts, progOpts, fixEventHandleRetryOpts, v.confirmations, contract, simBackend)
assert.Nil(t, e)
go watcher.Watch()

Expand Down Expand Up @@ -218,7 +226,7 @@ func TestSFCCrossCallFinalisedEventWatcher_ProgressPersisted(t *testing.T) {
defer dsClose()
progOpts := createProgressOpts(ds)

watcher, e := NewSFCCrossCallFinalisedEventWatcher(opts, progOpts, DefaultEventHandleRetryOpts, fixConfirms, contract, simBackend)
watcher, e := NewSFCCrossCallFinalisedEventWatcher(opts, progOpts, fixEventHandleRetryOpts, fixConfirms, contract, simBackend)
assert.Nil(t, e)

go watcher.Watch()
Expand All @@ -235,7 +243,7 @@ func TestSFCCrossCallFinalisedEventWatcher_ProgressPersisted(t *testing.T) {
assert.Equal(t, fixLastFinalised, progress)

// Test that the saved progress is used when restarting a new watcher
newWatcher, e := NewSFCCrossCallFinalisedEventWatcher(opts, progOpts, DefaultEventHandleRetryOpts, fixConfirms, contract, simBackend)
newWatcher, e := NewSFCCrossCallFinalisedEventWatcher(opts, progOpts, fixEventHandleRetryOpts, fixConfirms, contract, simBackend)
go newWatcher.Watch()
time.Sleep(1 * time.Second)
assert.Equal(t, progress+1, newWatcher.GetNextBlockToProcess())
Expand Down Expand Up @@ -263,7 +271,7 @@ func TestSFCCrossCallFinalisedEventWatcher_Reorg(t *testing.T) {
defer dsClose()
progOpts := createProgressOpts(ds)

watcher, e := NewSFCCrossCallFinalisedEventWatcher(opts, progOpts, DefaultEventHandleRetryOpts, 2, contract, simBackend)
watcher, e := NewSFCCrossCallFinalisedEventWatcher(opts, progOpts, fixEventHandleRetryOpts, 2, contract, simBackend)
assert.Nil(t, e)
go watcher.Watch()
defer watcher.StopWatcher()
Expand Down Expand Up @@ -304,7 +312,7 @@ func commitAndSleep(backend *backends.SimulatedBackend) {
}

func createProgressOpts(ds *badgerds.Datastore) WatcherProgressDsOpts {
progOpts := DefaultWatcherProgressDsOpts
progOpts := fixWatcherProgressDsOpts
progOpts.ds = ds
progOpts.dsProgKey = datastore.NewKey("reorg_test")
return progOpts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,13 @@ import (
"github.com/consensys/gpact/messaging/relayer/internal/logging"
v1 "github.com/consensys/gpact/messaging/relayer/internal/messages/v1"
"github.com/consensys/gpact/messaging/relayer/internal/mqserver"
"time"
)

// MessageHandler processes relayer messages
type MessageHandler interface {
Handle(m *v1.Message)
}

type FailureRetryOpts struct {
RetryAttempts uint
RetryDelay time.Duration
}

var DefaultRetryOptions = FailureRetryOpts{
RetryAttempts: 5,
RetryDelay: 500 * time.Millisecond,
}

// MessageEnqueueHandler enqueues relayer messages onto a configured message queue server
type MessageEnqueueHandler struct {
MQ mqserver.MessageQueue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,42 @@ import (
// SFCBridgeObserver listens to incoming events from an SFC contract, transforms them into relayer messages
// and then enqueues them onto a message queue them for further processing by other Relayer components
type SFCBridgeObserver struct {
EventWatcher *SFCCrossCallRealtimeEventWatcher
EventHandler *SimpleEventHandler
EventWatcher EventWatcher
EventHandler EventHandler
SourceNetwork string
}

func NewSFCBridgeObserver(source string, sourceAddr string, contract *functioncall.Sfc, mq mqserver.MessageQueue) (*SFCBridgeObserver, error) {
func NewSFCBridgeRealtimeObserver(source string, sourceAddr string, contract *functioncall.Sfc, mq mqserver.MessageQueue) (*SFCBridgeObserver, error) {
eventTransformer := NewSFCEventTransformer(source, sourceAddr)
messageHandler := NewMessageEnqueueHandler(mq, DefaultRetryOptions)
eventHandler := NewSimpleEventHandler(eventTransformer, messageHandler)
removedEvHandler := NewLogEventHandler("removed event")
eventWatcher, err := NewSFCCrossCallRealtimeEventWatcher(EventWatcherOpts{Context: context.Background(), EventHandler: eventHandler},
removedEvHandler, contract)

// TODO: expose the start option of watcher opts
watcherOpts := EventWatcherOpts{Context: context.Background(), EventHandler: eventHandler}
eventWatcher, err := NewSFCCrossCallRealtimeEventWatcher(watcherOpts, removedEvHandler, contract)
if err != nil {
return nil, err
}

return &SFCBridgeObserver{EventWatcher: eventWatcher, EventHandler: eventHandler, SourceNetwork: source}, nil
}

func NewSFCBridgeFinalisedObserver(source string, sourceAddr string, contract *functioncall.Sfc, mq mqserver.MessageQueue,
confirmationsForFinality uint64, watcherProgressOpts WatcherProgressDsOpts, client BlockHeadProducer) (
*SFCBridgeObserver,
error) {
eventTransformer := NewSFCEventTransformer(source, sourceAddr)
messageHandler := NewMessageEnqueueHandler(mq, DefaultRetryOptions)
eventHandler := NewSimpleEventHandler(eventTransformer, messageHandler)

// TODO: expose the start option of watcher opts
watcherOpts := EventWatcherOpts{Context: context.Background(), EventHandler: eventHandler}
eventWatcher, err := NewSFCCrossCallFinalisedEventWatcher(watcherOpts, watcherProgressOpts, DefaultRetryOptions, confirmationsForFinality, contract, client)
if err != nil {
return nil, err
}

return &SFCBridgeObserver{EventWatcher: eventWatcher, EventHandler: eventHandler, SourceNetwork: source}, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestSFCBridgeObserver(t *testing.T) {
contract := deployContract(t, simBackend, auth)
mockMQ := new(MockMQ)

observer, err := NewSFCBridgeObserver(fixSourceID, fixSourceAddress, contract, mockMQ)
observer, err := NewSFCBridgeRealtimeObserver(fixSourceID, fixSourceAddress, contract, mockMQ)
assert.Nil(t, err)
go observer.Start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ func (o *ObserverImplV1) routine(chainID *big.Int, chainAP string, addr common.A
return
}

observer, err := NewSFCBridgeObserver(chainID.String(), addr.String(), sfc, o.mq)
observer, err := o.createFinalisedEventObserver(chainID.String(), addr.String(), sfc, o.mq, chain)

if err != nil {
logging.Error(err.Error())
return
Expand All @@ -171,6 +172,13 @@ func (o *ObserverImplV1) routine(chainID *big.Int, chainAP string, addr common.A
}
}

func (o *ObserverImplV1) createFinalisedEventObserver(source string, sourceAddr string, contract *functioncall.Sfc, mq mqserver.MessageQueue,
client *ethclient.Client) (*SFCBridgeObserver, error) {
dsProgKey := datastore.NewKey(fmt.Sprintf("/%s/%s/last_finalised_block", source, sourceAddr))
watcherProgOpts := WatcherProgressDsOpts{o.ds, dsProgKey, DefaultRetryOptions}
return NewSFCBridgeFinalisedObserver(source, sourceAddr, contract, mq, 4, watcherProgOpts, client)
}

// dsKey gets the datastore key from given chainID and contract address.
func dsKey(chainID *big.Int, contractAddr common.Address) datastore.Key {
return datastore.NewKey(fmt.Sprintf("%v-%v", chainID.String(), contractAddr.String()))
Expand Down
13 changes: 13 additions & 0 deletions services/relayer/internal/msgobserver/eth/observer/retry_opts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package observer

import "time"

type FailureRetryOpts struct {
RetryAttempts uint
RetryDelay time.Duration
}

var DefaultRetryOptions = FailureRetryOpts{
RetryAttempts: 5,
RetryDelay: 300 * time.Millisecond,
}

0 comments on commit 1a521a2

Please sign in to comment.