Skip to content

Commit

Permalink
Refactor Relayer Observer (#125)
Browse files Browse the repository at this point in the history
* Refactor relayer

* Minor refactoring

* Maintain run status of observers

* Minor refactoring
  • Loading branch information
ermyas authored May 24, 2022
1 parent 50e19cd commit 3b7c33b
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 176 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ jobs:
- run: sleep 30
- run:
name: Build test code and execute unit tests
command: make utest
command: make utest itest
working_directory: services/relayer
- run:
name: Stop blockchains & Relayers
Expand Down
4 changes: 3 additions & 1 deletion services/relayer/Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: build
.PHONY: build itest

build:
go build -o ./build/relayer cmd/relayer/*
Expand All @@ -14,4 +14,6 @@ clean:

utest:
go test -v --count=1 ./internal/...

itest:
go test -v --count=1 ./itest/
2 changes: 1 addition & 1 deletion services/relayer/cmd/observer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func main() {
instance.MQ = mq
defer mq.Stop()
// Start the observer
observer := observer.NewObserverImplV1(conf.ObserverDSPath, mq)
observer := observer.NewMultiSourceObserver(conf.ObserverDSPath, mq)
err = observer.Start()
if err != nil {
panic(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common"
"math/big"
"time"

v1 "github.com/consensys/gpact/services/relayer/pkg/messages/v1"
Expand All @@ -44,8 +45,8 @@ type EventTransformer interface {

// SFCEventTransformer converts events from a simple-function-call bridge contract to relayer messages
type SFCEventTransformer struct {
Source string
SourceAddr common.Address
ChainId *big.Int
ContractAddress common.Address
}

// ToMessage converts a 'CrossCall' event emited from a simple-function-call bridge contract to relayer message
Expand All @@ -58,7 +59,7 @@ func (t *SFCEventTransformer) ToMessage(event interface{}) (*v1.Message, error)
return nil, err
}

source := v1.ApplicationAddress{NetworkID: t.Source, ContractAddress: fmt.Sprintf("%#x", t.SourceAddr)}
source := v1.ApplicationAddress{NetworkID: t.ChainId.String(), ContractAddress: fmt.Sprintf("%#x", t.ContractAddress)}
destination := v1.ApplicationAddress{NetworkID: sfcEvent.DestBcId.String(), ContractAddress: sfcEvent.DestContract.String()}

data, err := json.Marshal(sfcEvent.Raw)
Expand All @@ -68,7 +69,7 @@ func (t *SFCEventTransformer) ToMessage(event interface{}) (*v1.Message, error)
data = append(sfcFuncSig[:], data...)

message := v1.Message{
ID: t.getIDForEvent(sfcEvent.Raw),
ID: getIDForEvent(t.ChainId, t.ContractAddress, sfcEvent.Raw.BlockNumber, sfcEvent.Raw.TxIndex, sfcEvent.Raw.Index),
Timestamp: sfcEvent.Timestamp.Int64(),
MsgType: v1.MessageType,
Version: v1.Version,
Expand All @@ -93,18 +94,13 @@ func (t *SFCEventTransformer) validate(event *functioncall.SfcCrossCall) error {
return nil
}

// getIDForEvent generates a deterministic ID for an event of the format {network_id}/{contract_address}/{block_number}/{tx_index}/{log_index}
func (t *SFCEventTransformer) getIDForEvent(event types.Log) string {
return fmt.Sprintf(MessageIDPattern, t.Source, t.SourceAddr, event.BlockNumber, event.TxIndex, event.Index)
}

func NewSFCEventTransformer(sourceNetwork string, sourceAddr common.Address) *SFCEventTransformer {
return &SFCEventTransformer{sourceNetwork, sourceAddr}
func NewSFCEventTransformer(chainId *big.Int, sourceAddr common.Address) *SFCEventTransformer {
return &SFCEventTransformer{chainId, sourceAddr}
}

type GPACTEventTransformer struct {
Source string
SourceAddr common.Address
ChainId *big.Int
ContractAddress common.Address
}

func (t *GPACTEventTransformer) ToMessage(event interface{}) (*v1.Message, error) {
Expand All @@ -130,7 +126,7 @@ func (t *GPACTEventTransformer) ToMessage(event interface{}) (*v1.Message, error
}
}

source := v1.ApplicationAddress{NetworkID: t.Source, ContractAddress: fmt.Sprintf("%#x", t.SourceAddr)}
source := v1.ApplicationAddress{NetworkID: t.ChainId.String(), ContractAddress: fmt.Sprintf("%#x", t.ContractAddress)}
destination := v1.ApplicationAddress{NetworkID: "0", ContractAddress: ""}

data, err := json.Marshal(raw)
Expand All @@ -140,7 +136,7 @@ func (t *GPACTEventTransformer) ToMessage(event interface{}) (*v1.Message, error
data = append(funcSig, data...)

message := v1.Message{
ID: t.getIDForEvent(raw),
ID: getIDForEvent(t.ChainId, t.ContractAddress, raw.BlockNumber, raw.TxIndex, raw.Index),
Timestamp: time.Now().Unix(), //Event does not contain timestamp
MsgType: v1.MessageType,
Version: v1.Version,
Expand All @@ -154,10 +150,10 @@ func (t *GPACTEventTransformer) ToMessage(event interface{}) (*v1.Message, error
}

// getIDForEvent generates a deterministic ID for an event of the format {network_id}/{contract_address}/{block_number}/{tx_index}/{log_index}
func (t *GPACTEventTransformer) getIDForEvent(event types.Log) string {
return fmt.Sprintf(MessageIDPattern, t.Source, t.SourceAddr, event.BlockNumber, event.TxIndex, event.Index)
func getIDForEvent(chainId *big.Int, contractAddr common.Address, blockNum uint64, txIndex uint, evIndex uint) string {
return fmt.Sprintf(MessageIDPattern, chainId, contractAddr, blockNum, txIndex, evIndex)
}

func NewGPACTEventTransformer(sourceNetwork string, sourceAddr common.Address) *GPACTEventTransformer {
return &GPACTEventTransformer{sourceNetwork, sourceAddr}
func NewGPACTEventTransformer(chainId *big.Int, contractAddress common.Address) *GPACTEventTransformer {
return &GPACTEventTransformer{chainId, contractAddress}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var fixValidEvent = functioncall.SfcCrossCall{
Raw: fixLog,
}

var transformer = NewSFCEventTransformer("network-001",
var transformer = NewSFCEventTransformer(big.NewInt(1),
common.HexToAddress("0x8e215d06ea7ec1fdb4fc5fd21768f4b34ee92ef4"))

func TestSFCTransformer(t *testing.T) {
Expand All @@ -58,7 +58,7 @@ func TestSFCTransformer(t *testing.T) {
assert.Equal(t, fixValidEvent.Timestamp, big.NewInt(message.Timestamp))
assert.Equal(t, hex.EncodeToString(append(sfcFuncSig[:], data...)), message.Payload)

expectedID := fmt.Sprintf(MessageIDPattern, transformer.Source, transformer.SourceAddr,
expectedID := fmt.Sprintf(MessageIDPattern, transformer.ChainId, transformer.ContractAddress,
fixLog.BlockNumber, fixLog.TxIndex, fixLog.Index)
assert.Equal(t, expectedID, message.ID)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,26 +136,28 @@ func (l *GPACTFinalisedEventWatcher) fetchAndProcessEvents(filterOpts *bind.Filt
if err != nil {
return err
}
wg.Add(1)
go l.processEvents(&GpactStartEventIterator{finalisedStartEvs}, &wg)

finalisedSegmentEvs, err := l.Contract.FilterSegment(filterOpts)
if err != nil {
return err
}
wg.Add(1)
go l.processEvents(&GpactSegmentEventIterator{finalisedSegmentEvs}, &wg)

finalisedRootEvs, err := l.Contract.FilterRoot(filterOpts)
if err != nil {
return err
}
wg.Add(1)
go l.processEvents(&GpactRootEventIterator{finalisedRootEvs}, &wg)

wg.Wait()
return nil
}

func (l *GPACTFinalisedEventWatcher) processEvents(events EventIterator, wg *sync.WaitGroup) {
wg.Add(1)
for events.Next() {
ev := events.GetEvent()
l.EventHandler.Handle(ev)
Expand Down
120 changes: 77 additions & 43 deletions services/relayer/internal/msgobserver/eth/observer/message_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,76 @@ package observer

import (
"context"
"fmt"
"github.com/consensys/gpact/services/relayer/internal/logging"
"github.com/ethereum/go-ethereum/common"
"math/big"

"github.com/consensys/gpact/services/relayer/internal/contracts/functioncall"
"github.com/consensys/gpact/services/relayer/internal/mqserver"
)

// SFCBridgeObserver listens to incoming events from an SFC contract, transforms them into relayer messages
// and then enqueues them onto a message queue them for further processing by other Relayer components
type SFCBridgeObserver struct {
// Observer is an interface for the observer.
type Observer interface {
// Start starts the observer's routine.
Start() error

// Stop safely stops the observer.
Stop()

// StartObserve starts a new observe.
StartObserve(chainID *big.Int, chainAP string, contractType string, contractAddr common.Address) error

// StopObserve stops observe.
StopObserve() error

// IsRunning returns true if the observer is running
IsRunning() bool
}

// SingleSourceObserver is an Observer that listens to events from a given source contract,
// transforms them into Relayer messages and then enqueues them onto a message queue for further
// processing by the Relayer core component.
type SingleSourceObserver struct {
SourceId string
SourceNetwork *big.Int
EventWatcher EventWatcher
EventHandler EventHandler
SourceNetwork string
running bool
}

// IsRunning returns true if the observer is running
func (o *SingleSourceObserver) IsRunning() bool {
return o.running
}

func NewSFCBridgeRealtimeObserver(source string, sourceAddr common.Address, contract *functioncall.Sfc,
mq mqserver.MessageQueue) (*SFCBridgeObserver, error) {
eventTransformer := NewSFCEventTransformer(source, sourceAddr)
// Start starts the observer's monitoring of the assigned source.
func (o *SingleSourceObserver) Start() error {
if o.IsRunning() {
logging.Info("Observer already running. Start request ignored")
return nil
}
o.running = true
return o.EventWatcher.Watch()
}

// Stop stops the observer monitoring the assigned source.
func (o *SingleSourceObserver) Stop() {
if !o.IsRunning() {
logging.Info("Observer not running. Stop request ignored")
return
}
o.running = false
if o.EventWatcher != nil {
o.EventWatcher.StopWatcher()
}
}

// NewSFCRealtimeObserver creates an instance of SingleSourceObserver that monitors a simple-function-call
// bridge contract events. The observer processes events as they are emitted (realtime), without awaiting finalisation.
func NewSFCRealtimeObserver(chainId *big.Int, sourceAddr common.Address, contract *functioncall.Sfc,
mq mqserver.MessageQueue) (*SingleSourceObserver, error) {
eventTransformer := NewSFCEventTransformer(chainId, sourceAddr)
messageHandler := NewMessageEnqueueHandler(mq, DefaultRetryOptions)
eventHandler := NewSimpleEventHandler(eventTransformer, messageHandler)
removedEvHandler := NewLogEventHandler("removed event")
Expand All @@ -45,14 +98,20 @@ func NewSFCBridgeRealtimeObserver(source string, sourceAddr common.Address, cont
return nil, err
}

return &SFCBridgeObserver{EventWatcher: eventWatcher, EventHandler: eventHandler, SourceNetwork: source}, nil
sourceId := fmt.Sprintf("%s:%s:%s", chainId, sourceAddr.String(), "sfc")
return &SingleSourceObserver{SourceId: sourceId, EventWatcher: eventWatcher, EventHandler: eventHandler,
SourceNetwork: chainId},
nil
}

func NewSFCBridgeFinalisedObserver(source string, sourceAddr common.Address, contract *functioncall.Sfc, mq mqserver.MessageQueue,
// NewSFCFinalisedObserver creates an instance of SingleSourceObserver that monitors a simple-function-call
// bridge contract events. The observer processes events only once they receive a configured number of confirmations.
func NewSFCFinalisedObserver(chainId *big.Int, sourceAddr common.Address, contract *functioncall.Sfc,
mq mqserver.MessageQueue,
confirmationsForFinality uint64, watcherProgressOpts WatcherProgressDsOpts, client BlockHeadProducer) (
*SFCBridgeObserver,
*SingleSourceObserver,
error) {
eventTransformer := NewSFCEventTransformer(source, sourceAddr)
eventTransformer := NewSFCEventTransformer(chainId, sourceAddr)
messageHandler := NewMessageEnqueueHandler(mq, DefaultRetryOptions)
eventHandler := NewSimpleEventHandler(eventTransformer, messageHandler)

Expand All @@ -63,29 +122,14 @@ func NewSFCBridgeFinalisedObserver(source string, sourceAddr common.Address, con
return nil, err
}

return &SFCBridgeObserver{EventWatcher: eventWatcher, EventHandler: eventHandler, SourceNetwork: source}, nil
}

func (o *SFCBridgeObserver) Start() error {
return o.EventWatcher.Watch()
}

func (o *SFCBridgeObserver) Stop() {
if o.EventWatcher != nil {
o.EventWatcher.StopWatcher()
}
}

// GPACTBridgeObserver is a simple gpact bridge observer.
type GPACTBridgeObserver struct {
EventWatcher EventWatcher
EventHandler EventHandler
SourceNetwork string
return &SingleSourceObserver{EventWatcher: eventWatcher, EventHandler: eventHandler, SourceNetwork: chainId}, nil
}

func NewGPACTBridgeRealtimeObserver(source string, sourceAddr common.Address, contract *functioncall.Gpact,
mq mqserver.MessageQueue) (*GPACTBridgeObserver, error) {
eventTransformer := NewGPACTEventTransformer(source, sourceAddr)
// NewGPACTRealtimeObserver creates an instance of SingleSourceObserver that monitors a GPACT bridge contract.
// The observer processes events as they are emitted (realtime), without awaiting finalisation.
func NewGPACTRealtimeObserver(chainId *big.Int, sourceAddr common.Address, contract *functioncall.Gpact,
mq mqserver.MessageQueue) (*SingleSourceObserver, error) {
eventTransformer := NewGPACTEventTransformer(chainId, sourceAddr)
messageHandler := NewMessageEnqueueHandler(mq, DefaultRetryOptions)
eventHandler := NewSimpleEventHandler(eventTransformer, messageHandler)
removedEvHandler := NewLogEventHandler("removed event")
Expand All @@ -96,15 +140,5 @@ func NewGPACTBridgeRealtimeObserver(source string, sourceAddr common.Address, co
return nil, err
}

return &GPACTBridgeObserver{EventWatcher: eventWatcher, EventHandler: eventHandler, SourceNetwork: source}, nil
}

func (o *GPACTBridgeObserver) Start() error {
return o.EventWatcher.Watch()
}

func (o *GPACTBridgeObserver) Stop() {
if o.EventWatcher != nil {
o.EventWatcher.StopWatcher()
}
return &SingleSourceObserver{EventWatcher: eventWatcher, EventHandler: eventHandler, SourceNetwork: chainId}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ import (
func TestSFCBridgeObserver(t *testing.T) {
fixDesAddress := common.HexToAddress("0x8e215d06ea7ec1fdb4fc5fd21768f4b34ee92ef4")
fixDestID := big.NewInt(2)
fixSourceID := "1"
fixSourceID := big.NewInt(1)
fixSourceAddress := common.HexToAddress("0x8e215d06ea7ec1fdb4fc5fd21768f4b34ee92ef4")

simBackend, auth := simulatedBackend(t)
contract := deploySFCContract(t, simBackend, auth)
mockMQ := new(MockMQ)

observer, err := NewSFCBridgeRealtimeObserver(fixSourceID, fixSourceAddress, contract, mockMQ)
observer, err := NewSFCRealtimeObserver(fixSourceID, fixSourceAddress, contract, mockMQ)
assert.Nil(t, err)
go observer.Start()

Expand All @@ -57,5 +57,5 @@ func TestSFCBridgeObserver(t *testing.T) {

assert.Equal(t, fixDesAddress.String(), sentMsg.Destination.ContractAddress)
assert.Equal(t, fixDestID.String(), sentMsg.Destination.NetworkID)
assert.Equal(t, fixSourceID, sentMsg.Source.NetworkID)
assert.Equal(t, fixSourceID.String(), sentMsg.Source.NetworkID)
}
37 changes: 0 additions & 37 deletions services/relayer/internal/msgobserver/eth/observer/observer.go

This file was deleted.

Loading

0 comments on commit 3b7c33b

Please sign in to comment.