From 3b7c33b15ee8f5e6aa59b4a1a85f760f2372a9b4 Mon Sep 17 00:00:00 2001 From: ermyas Date: Tue, 24 May 2022 12:54:04 +1000 Subject: [PATCH] Refactor Relayer Observer (#125) * Refactor relayer * Minor refactoring * Maintain run status of observers * Minor refactoring --- .circleci/config.yml | 2 +- services/relayer/Makefile | 4 +- services/relayer/cmd/observer/main.go | 2 +- .../eth/observer/event_transformer.go | 34 ++--- .../eth/observer/event_transformer_test.go | 4 +- .../eth/observer/event_watcher_gpact.go | 4 +- .../eth/observer/message_observer.go | 120 ++++++++++------ .../eth/observer/message_observer_test.go | 6 +- .../msgobserver/eth/observer/observer.go | 37 ----- .../eth/observer/observer_impl_v1.go | 133 +++++++++--------- .../msgobserver/eth/observer/utils.go | 5 - 11 files changed, 175 insertions(+), 176 deletions(-) delete mode 100644 services/relayer/internal/msgobserver/eth/observer/observer.go diff --git a/.circleci/config.yml b/.circleci/config.yml index 88b7b84b..0c178964 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -462,7 +462,7 @@ jobs: - run: sleep 30 - run: name: Build test code and execute unit tests - command: make utest + command: make utest itest working_directory: services/relayer - run: name: Stop blockchains & Relayers diff --git a/services/relayer/Makefile b/services/relayer/Makefile index 12d678ab..e460621e 100644 --- a/services/relayer/Makefile +++ b/services/relayer/Makefile @@ -1,4 +1,4 @@ -.PHONY: build +.PHONY: build itest build: go build -o ./build/relayer cmd/relayer/* @@ -14,4 +14,6 @@ clean: utest: go test -v --count=1 ./internal/... + +itest: go test -v --count=1 ./itest/ diff --git a/services/relayer/cmd/observer/main.go b/services/relayer/cmd/observer/main.go index ade311f4..2bef6bc5 100644 --- a/services/relayer/cmd/observer/main.go +++ b/services/relayer/cmd/observer/main.go @@ -54,7 +54,7 @@ func main() { instance.MQ = mq defer mq.Stop() // Start the observer - observer := observer.NewObserverImplV1(conf.ObserverDSPath, mq) + observer := observer.NewMultiSourceObserver(conf.ObserverDSPath, mq) err = observer.Start() if err != nil { panic(err) diff --git a/services/relayer/internal/msgobserver/eth/observer/event_transformer.go b/services/relayer/internal/msgobserver/eth/observer/event_transformer.go index 8a287e4b..26eeb34d 100644 --- a/services/relayer/internal/msgobserver/eth/observer/event_transformer.go +++ b/services/relayer/internal/msgobserver/eth/observer/event_transformer.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "github.com/ethereum/go-ethereum/common" + "math/big" "time" v1 "github.com/consensys/gpact/services/relayer/pkg/messages/v1" @@ -44,8 +45,8 @@ type EventTransformer interface { // SFCEventTransformer converts events from a simple-function-call bridge contract to relayer messages type SFCEventTransformer struct { - Source string - SourceAddr common.Address + ChainId *big.Int + ContractAddress common.Address } // ToMessage converts a 'CrossCall' event emited from a simple-function-call bridge contract to relayer message @@ -58,7 +59,7 @@ func (t *SFCEventTransformer) ToMessage(event interface{}) (*v1.Message, error) return nil, err } - source := v1.ApplicationAddress{NetworkID: t.Source, ContractAddress: fmt.Sprintf("%#x", t.SourceAddr)} + source := v1.ApplicationAddress{NetworkID: t.ChainId.String(), ContractAddress: fmt.Sprintf("%#x", t.ContractAddress)} destination := v1.ApplicationAddress{NetworkID: sfcEvent.DestBcId.String(), ContractAddress: sfcEvent.DestContract.String()} data, err := json.Marshal(sfcEvent.Raw) @@ -68,7 +69,7 @@ func (t *SFCEventTransformer) ToMessage(event interface{}) (*v1.Message, error) data = append(sfcFuncSig[:], data...) message := v1.Message{ - ID: t.getIDForEvent(sfcEvent.Raw), + ID: getIDForEvent(t.ChainId, t.ContractAddress, sfcEvent.Raw.BlockNumber, sfcEvent.Raw.TxIndex, sfcEvent.Raw.Index), Timestamp: sfcEvent.Timestamp.Int64(), MsgType: v1.MessageType, Version: v1.Version, @@ -93,18 +94,13 @@ func (t *SFCEventTransformer) validate(event *functioncall.SfcCrossCall) error { return nil } -// getIDForEvent generates a deterministic ID for an event of the format {network_id}/{contract_address}/{block_number}/{tx_index}/{log_index} -func (t *SFCEventTransformer) getIDForEvent(event types.Log) string { - return fmt.Sprintf(MessageIDPattern, t.Source, t.SourceAddr, event.BlockNumber, event.TxIndex, event.Index) -} - -func NewSFCEventTransformer(sourceNetwork string, sourceAddr common.Address) *SFCEventTransformer { - return &SFCEventTransformer{sourceNetwork, sourceAddr} +func NewSFCEventTransformer(chainId *big.Int, sourceAddr common.Address) *SFCEventTransformer { + return &SFCEventTransformer{chainId, sourceAddr} } type GPACTEventTransformer struct { - Source string - SourceAddr common.Address + ChainId *big.Int + ContractAddress common.Address } func (t *GPACTEventTransformer) ToMessage(event interface{}) (*v1.Message, error) { @@ -130,7 +126,7 @@ func (t *GPACTEventTransformer) ToMessage(event interface{}) (*v1.Message, error } } - source := v1.ApplicationAddress{NetworkID: t.Source, ContractAddress: fmt.Sprintf("%#x", t.SourceAddr)} + source := v1.ApplicationAddress{NetworkID: t.ChainId.String(), ContractAddress: fmt.Sprintf("%#x", t.ContractAddress)} destination := v1.ApplicationAddress{NetworkID: "0", ContractAddress: ""} data, err := json.Marshal(raw) @@ -140,7 +136,7 @@ func (t *GPACTEventTransformer) ToMessage(event interface{}) (*v1.Message, error data = append(funcSig, data...) message := v1.Message{ - ID: t.getIDForEvent(raw), + ID: getIDForEvent(t.ChainId, t.ContractAddress, raw.BlockNumber, raw.TxIndex, raw.Index), Timestamp: time.Now().Unix(), //Event does not contain timestamp MsgType: v1.MessageType, Version: v1.Version, @@ -154,10 +150,10 @@ func (t *GPACTEventTransformer) ToMessage(event interface{}) (*v1.Message, error } // getIDForEvent generates a deterministic ID for an event of the format {network_id}/{contract_address}/{block_number}/{tx_index}/{log_index} -func (t *GPACTEventTransformer) getIDForEvent(event types.Log) string { - return fmt.Sprintf(MessageIDPattern, t.Source, t.SourceAddr, event.BlockNumber, event.TxIndex, event.Index) +func getIDForEvent(chainId *big.Int, contractAddr common.Address, blockNum uint64, txIndex uint, evIndex uint) string { + return fmt.Sprintf(MessageIDPattern, chainId, contractAddr, blockNum, txIndex, evIndex) } -func NewGPACTEventTransformer(sourceNetwork string, sourceAddr common.Address) *GPACTEventTransformer { - return &GPACTEventTransformer{sourceNetwork, sourceAddr} +func NewGPACTEventTransformer(chainId *big.Int, contractAddress common.Address) *GPACTEventTransformer { + return &GPACTEventTransformer{chainId, contractAddress} } diff --git a/services/relayer/internal/msgobserver/eth/observer/event_transformer_test.go b/services/relayer/internal/msgobserver/eth/observer/event_transformer_test.go index c471b843..9bbcfe73 100644 --- a/services/relayer/internal/msgobserver/eth/observer/event_transformer_test.go +++ b/services/relayer/internal/msgobserver/eth/observer/event_transformer_test.go @@ -43,7 +43,7 @@ var fixValidEvent = functioncall.SfcCrossCall{ Raw: fixLog, } -var transformer = NewSFCEventTransformer("network-001", +var transformer = NewSFCEventTransformer(big.NewInt(1), common.HexToAddress("0x8e215d06ea7ec1fdb4fc5fd21768f4b34ee92ef4")) func TestSFCTransformer(t *testing.T) { @@ -58,7 +58,7 @@ func TestSFCTransformer(t *testing.T) { assert.Equal(t, fixValidEvent.Timestamp, big.NewInt(message.Timestamp)) assert.Equal(t, hex.EncodeToString(append(sfcFuncSig[:], data...)), message.Payload) - expectedID := fmt.Sprintf(MessageIDPattern, transformer.Source, transformer.SourceAddr, + expectedID := fmt.Sprintf(MessageIDPattern, transformer.ChainId, transformer.ContractAddress, fixLog.BlockNumber, fixLog.TxIndex, fixLog.Index) assert.Equal(t, expectedID, message.ID) } diff --git a/services/relayer/internal/msgobserver/eth/observer/event_watcher_gpact.go b/services/relayer/internal/msgobserver/eth/observer/event_watcher_gpact.go index da2c0e3c..b66af6a8 100644 --- a/services/relayer/internal/msgobserver/eth/observer/event_watcher_gpact.go +++ b/services/relayer/internal/msgobserver/eth/observer/event_watcher_gpact.go @@ -136,18 +136,21 @@ func (l *GPACTFinalisedEventWatcher) fetchAndProcessEvents(filterOpts *bind.Filt if err != nil { return err } + wg.Add(1) go l.processEvents(&GpactStartEventIterator{finalisedStartEvs}, &wg) finalisedSegmentEvs, err := l.Contract.FilterSegment(filterOpts) if err != nil { return err } + wg.Add(1) go l.processEvents(&GpactSegmentEventIterator{finalisedSegmentEvs}, &wg) finalisedRootEvs, err := l.Contract.FilterRoot(filterOpts) if err != nil { return err } + wg.Add(1) go l.processEvents(&GpactRootEventIterator{finalisedRootEvs}, &wg) wg.Wait() @@ -155,7 +158,6 @@ func (l *GPACTFinalisedEventWatcher) fetchAndProcessEvents(filterOpts *bind.Filt } func (l *GPACTFinalisedEventWatcher) processEvents(events EventIterator, wg *sync.WaitGroup) { - wg.Add(1) for events.Next() { ev := events.GetEvent() l.EventHandler.Handle(ev) diff --git a/services/relayer/internal/msgobserver/eth/observer/message_observer.go b/services/relayer/internal/msgobserver/eth/observer/message_observer.go index 826b3fd8..358fa99f 100644 --- a/services/relayer/internal/msgobserver/eth/observer/message_observer.go +++ b/services/relayer/internal/msgobserver/eth/observer/message_observer.go @@ -17,23 +17,76 @@ package observer import ( "context" + "fmt" + "github.com/consensys/gpact/services/relayer/internal/logging" "github.com/ethereum/go-ethereum/common" + "math/big" "github.com/consensys/gpact/services/relayer/internal/contracts/functioncall" "github.com/consensys/gpact/services/relayer/internal/mqserver" ) -// SFCBridgeObserver listens to incoming events from an SFC contract, transforms them into relayer messages -// and then enqueues them onto a message queue them for further processing by other Relayer components -type SFCBridgeObserver struct { +// Observer is an interface for the observer. +type Observer interface { + // Start starts the observer's routine. + Start() error + + // Stop safely stops the observer. + Stop() + + // StartObserve starts a new observe. + StartObserve(chainID *big.Int, chainAP string, contractType string, contractAddr common.Address) error + + // StopObserve stops observe. + StopObserve() error + + // IsRunning returns true if the observer is running + IsRunning() bool +} + +// SingleSourceObserver is an Observer that listens to events from a given source contract, +// transforms them into Relayer messages and then enqueues them onto a message queue for further +// processing by the Relayer core component. +type SingleSourceObserver struct { + SourceId string + SourceNetwork *big.Int EventWatcher EventWatcher EventHandler EventHandler - SourceNetwork string + running bool +} + +// IsRunning returns true if the observer is running +func (o *SingleSourceObserver) IsRunning() bool { + return o.running } -func NewSFCBridgeRealtimeObserver(source string, sourceAddr common.Address, contract *functioncall.Sfc, - mq mqserver.MessageQueue) (*SFCBridgeObserver, error) { - eventTransformer := NewSFCEventTransformer(source, sourceAddr) +// Start starts the observer's monitoring of the assigned source. +func (o *SingleSourceObserver) Start() error { + if o.IsRunning() { + logging.Info("Observer already running. Start request ignored") + return nil + } + o.running = true + return o.EventWatcher.Watch() +} + +// Stop stops the observer monitoring the assigned source. +func (o *SingleSourceObserver) Stop() { + if !o.IsRunning() { + logging.Info("Observer not running. Stop request ignored") + return + } + o.running = false + if o.EventWatcher != nil { + o.EventWatcher.StopWatcher() + } +} + +// NewSFCRealtimeObserver creates an instance of SingleSourceObserver that monitors a simple-function-call +// bridge contract events. The observer processes events as they are emitted (realtime), without awaiting finalisation. +func NewSFCRealtimeObserver(chainId *big.Int, sourceAddr common.Address, contract *functioncall.Sfc, + mq mqserver.MessageQueue) (*SingleSourceObserver, error) { + eventTransformer := NewSFCEventTransformer(chainId, sourceAddr) messageHandler := NewMessageEnqueueHandler(mq, DefaultRetryOptions) eventHandler := NewSimpleEventHandler(eventTransformer, messageHandler) removedEvHandler := NewLogEventHandler("removed event") @@ -45,14 +98,20 @@ func NewSFCBridgeRealtimeObserver(source string, sourceAddr common.Address, cont return nil, err } - return &SFCBridgeObserver{EventWatcher: eventWatcher, EventHandler: eventHandler, SourceNetwork: source}, nil + sourceId := fmt.Sprintf("%s:%s:%s", chainId, sourceAddr.String(), "sfc") + return &SingleSourceObserver{SourceId: sourceId, EventWatcher: eventWatcher, EventHandler: eventHandler, + SourceNetwork: chainId}, + nil } -func NewSFCBridgeFinalisedObserver(source string, sourceAddr common.Address, contract *functioncall.Sfc, mq mqserver.MessageQueue, +// NewSFCFinalisedObserver creates an instance of SingleSourceObserver that monitors a simple-function-call +// bridge contract events. The observer processes events only once they receive a configured number of confirmations. +func NewSFCFinalisedObserver(chainId *big.Int, sourceAddr common.Address, contract *functioncall.Sfc, + mq mqserver.MessageQueue, confirmationsForFinality uint64, watcherProgressOpts WatcherProgressDsOpts, client BlockHeadProducer) ( - *SFCBridgeObserver, + *SingleSourceObserver, error) { - eventTransformer := NewSFCEventTransformer(source, sourceAddr) + eventTransformer := NewSFCEventTransformer(chainId, sourceAddr) messageHandler := NewMessageEnqueueHandler(mq, DefaultRetryOptions) eventHandler := NewSimpleEventHandler(eventTransformer, messageHandler) @@ -63,29 +122,14 @@ func NewSFCBridgeFinalisedObserver(source string, sourceAddr common.Address, con return nil, err } - return &SFCBridgeObserver{EventWatcher: eventWatcher, EventHandler: eventHandler, SourceNetwork: source}, nil -} - -func (o *SFCBridgeObserver) Start() error { - return o.EventWatcher.Watch() -} - -func (o *SFCBridgeObserver) Stop() { - if o.EventWatcher != nil { - o.EventWatcher.StopWatcher() - } -} - -// GPACTBridgeObserver is a simple gpact bridge observer. -type GPACTBridgeObserver struct { - EventWatcher EventWatcher - EventHandler EventHandler - SourceNetwork string + return &SingleSourceObserver{EventWatcher: eventWatcher, EventHandler: eventHandler, SourceNetwork: chainId}, nil } -func NewGPACTBridgeRealtimeObserver(source string, sourceAddr common.Address, contract *functioncall.Gpact, - mq mqserver.MessageQueue) (*GPACTBridgeObserver, error) { - eventTransformer := NewGPACTEventTransformer(source, sourceAddr) +// NewGPACTRealtimeObserver creates an instance of SingleSourceObserver that monitors a GPACT bridge contract. +// The observer processes events as they are emitted (realtime), without awaiting finalisation. +func NewGPACTRealtimeObserver(chainId *big.Int, sourceAddr common.Address, contract *functioncall.Gpact, + mq mqserver.MessageQueue) (*SingleSourceObserver, error) { + eventTransformer := NewGPACTEventTransformer(chainId, sourceAddr) messageHandler := NewMessageEnqueueHandler(mq, DefaultRetryOptions) eventHandler := NewSimpleEventHandler(eventTransformer, messageHandler) removedEvHandler := NewLogEventHandler("removed event") @@ -96,15 +140,5 @@ func NewGPACTBridgeRealtimeObserver(source string, sourceAddr common.Address, co return nil, err } - return &GPACTBridgeObserver{EventWatcher: eventWatcher, EventHandler: eventHandler, SourceNetwork: source}, nil -} - -func (o *GPACTBridgeObserver) Start() error { - return o.EventWatcher.Watch() -} - -func (o *GPACTBridgeObserver) Stop() { - if o.EventWatcher != nil { - o.EventWatcher.StopWatcher() - } + return &SingleSourceObserver{EventWatcher: eventWatcher, EventHandler: eventHandler, SourceNetwork: chainId}, nil } diff --git a/services/relayer/internal/msgobserver/eth/observer/message_observer_test.go b/services/relayer/internal/msgobserver/eth/observer/message_observer_test.go index e3940afe..236e2fb3 100644 --- a/services/relayer/internal/msgobserver/eth/observer/message_observer_test.go +++ b/services/relayer/internal/msgobserver/eth/observer/message_observer_test.go @@ -30,14 +30,14 @@ import ( func TestSFCBridgeObserver(t *testing.T) { fixDesAddress := common.HexToAddress("0x8e215d06ea7ec1fdb4fc5fd21768f4b34ee92ef4") fixDestID := big.NewInt(2) - fixSourceID := "1" + fixSourceID := big.NewInt(1) fixSourceAddress := common.HexToAddress("0x8e215d06ea7ec1fdb4fc5fd21768f4b34ee92ef4") simBackend, auth := simulatedBackend(t) contract := deploySFCContract(t, simBackend, auth) mockMQ := new(MockMQ) - observer, err := NewSFCBridgeRealtimeObserver(fixSourceID, fixSourceAddress, contract, mockMQ) + observer, err := NewSFCRealtimeObserver(fixSourceID, fixSourceAddress, contract, mockMQ) assert.Nil(t, err) go observer.Start() @@ -57,5 +57,5 @@ func TestSFCBridgeObserver(t *testing.T) { assert.Equal(t, fixDesAddress.String(), sentMsg.Destination.ContractAddress) assert.Equal(t, fixDestID.String(), sentMsg.Destination.NetworkID) - assert.Equal(t, fixSourceID, sentMsg.Source.NetworkID) + assert.Equal(t, fixSourceID.String(), sentMsg.Source.NetworkID) } diff --git a/services/relayer/internal/msgobserver/eth/observer/observer.go b/services/relayer/internal/msgobserver/eth/observer/observer.go deleted file mode 100644 index 4e714ce3..00000000 --- a/services/relayer/internal/msgobserver/eth/observer/observer.go +++ /dev/null @@ -1,37 +0,0 @@ -package observer - -/* - * Copyright 2022 ConsenSys Software Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ - -import ( - "math/big" - - "github.com/ethereum/go-ethereum/common" -) - -// Observer is an interface for the observer. -type Observer interface { - // Start starts the observer's routine. - Start() error - - // Stop safely stops the observer. - Stop() - - // StartObserve starts a new observe. - StartObserve(chainID *big.Int, chainAP string, contractType string, contractAddr common.Address) error - - // StopObserve stops observe. - StopObserve() error -} diff --git a/services/relayer/internal/msgobserver/eth/observer/observer_impl_v1.go b/services/relayer/internal/msgobserver/eth/observer/observer_impl_v1.go index 0076c45b..b17ae466 100644 --- a/services/relayer/internal/msgobserver/eth/observer/observer_impl_v1.go +++ b/services/relayer/internal/msgobserver/eth/observer/observer_impl_v1.go @@ -36,37 +36,46 @@ const ( activeKey = "active" ) -// observation stores the information of an active observation. -type observation struct { +// Observation stores the information of an active Observation. +type Observation struct { ChainID string `json:"chain_id"` ContractType string `json:"contract_type"` ContractAddr string `json:"contract_addr"` AP string `json:"ap"` } -// ObserverImplV1 implements observer. -type ObserverImplV1 struct { - path string - mq *mqserver.MQServer - - ds datastore.Datastore - sfcObserver *SFCBridgeObserver - gpactObserver *GPACTBridgeObserver +// MultiSourceObserver is an observer that can observer multiple different event sources. +// It creates and manages distinct SingleSourceObserver instances for each event source. +// The contract persists the list of event sources it tracks. In the event of a restart, +// the observer resumes Observation of the persisted sources. +type MultiSourceObserver struct { + dsPath string + mq *mqserver.MQServer + + ds datastore.Datastore + //TODO: update to support multiple observers + sfcObserver *SingleSourceObserver + gpactObserver *SingleSourceObserver + running bool } -// NewObserverImplV1 creates a new observer. -func NewObserverImplV1(path string, mq *mqserver.MQServer) Observer { - return &ObserverImplV1{path: path, mq: mq} +// NewMultiSourceObserver creates a new MultiSourceObserver instance +func NewMultiSourceObserver(dsPath string, mq *mqserver.MQServer) *MultiSourceObserver { + return &MultiSourceObserver{dsPath: dsPath, mq: mq} } // Start starts the observer's routine. -func (o *ObserverImplV1) Start() error { +func (o *MultiSourceObserver) Start() error { + if o.IsRunning() { + logging.Info("Multi-observer already running. Start request ignored") + return nil + } var err error if o.ds == nil { dsopts := badgerds.DefaultOptions dsopts.SyncWrites = false dsopts.Truncate = true - o.ds, err = badgerds.NewDatastore(o.path, &dsopts) + o.ds, err = badgerds.NewDatastore(o.dsPath, &dsopts) if err != nil { return err } @@ -84,30 +93,26 @@ func (o *ObserverImplV1) Start() error { if err != nil { return err } - val := observation{} - err = json.Unmarshal(data, &val) + obs := Observation{} + err = json.Unmarshal(data, &obs) if err != nil { return err } - chainID, ok := big.NewInt(0).SetString(val.ChainID, 10) - if !ok { - err = fmt.Errorf("error in setting chain id") + err = o.startObservation(obs) + if err != nil { return err } - if val.ContractType == "SFC" || val.ContractType == "sfc" { - go o.routineSFC(chainID, val.AP, common.HexToAddress(val.ContractAddr)) - } else if val.ContractType == "GPACT" || val.ContractType == "gpact" { - go o.routineGPACT(chainID, val.AP, common.HexToAddress(val.ContractAddr)) - } else { - return fmt.Errorf("contract type %v is not supported", val.ContractType) - } } } return nil } // Stop safely stops the observer. -func (o *ObserverImplV1) Stop() { +func (o *MultiSourceObserver) Stop() { + if !o.IsRunning() { + logging.Info("Multi-observer is not running. Stop request ignored") + return + } if o.sfcObserver != nil { o.sfcObserver.Stop() o.sfcObserver = nil @@ -116,20 +121,25 @@ func (o *ObserverImplV1) Stop() { o.gpactObserver.Stop() o.gpactObserver = nil } + o.running = false +} + +func (o *MultiSourceObserver) IsRunning() bool { + return o.running } // StartObserve starts a new observe. -func (o *ObserverImplV1) StartObserve(chainID *big.Int, chainAP string, contractType string, contractAddr common.Address) error { +func (o *MultiSourceObserver) StartObserve(chainID *big.Int, chainAP string, contractType string, contractAddr common.Address) error { // First, close any existing observe. o.Stop() - val := observation{ + obs := Observation{ ChainID: chainID.String(), ContractType: contractType, ContractAddr: contractAddr.String(), AP: chainAP, } - data, err := json.Marshal(val) + data, err := json.Marshal(obs) if err != nil { return err } @@ -137,19 +147,30 @@ func (o *ObserverImplV1) StartObserve(chainID *big.Int, chainAP string, contract if err != nil { return err } - if contractType == "SFC" { - go o.routineSFC(chainID, chainAP, contractAddr) - } else if contractType == "GPACT" { - go o.routineGPACT(chainID, chainAP, contractAddr) + + return o.startObservation(obs) +} + +func (o *MultiSourceObserver) startObservation(obs Observation) error { + chainID, ok := big.NewInt(0).SetString(obs.ChainID, 10) + if !ok { + return fmt.Errorf("error in setting chain id") + } + if strings.EqualFold(obs.ContractType, "SFC") { + go o.routineSFC(chainID, obs.AP, common.HexToAddress(obs.ContractAddr)) + } else if strings.EqualFold(obs.ContractType, "GPACT") { + go o.routineGPACT(chainID, obs.AP, common.HexToAddress(obs.ContractAddr)) } else { - return fmt.Errorf("contract type %v is not supported", contractType) + return fmt.Errorf("contract type %v is not supported", obs.ContractType) + } + if !o.IsRunning() { + o.running = true } - return nil } // StopObserve stops observe. -func (o *ObserverImplV1) StopObserve() error { +func (o *MultiSourceObserver) StopObserve() error { // Close any existing observe. o.Stop() @@ -160,8 +181,8 @@ func (o *ObserverImplV1) StopObserve() error { return nil } -// routineSFC is the observe SFC routine. -func (o *ObserverImplV1) routineSFC(chainID *big.Int, chainAP string, addr common.Address) { +// routineSFC starts an Observation for an SFC source event +func (o *MultiSourceObserver) routineSFC(chainID *big.Int, chainAP string, contractAddr common.Address) { for { chain, err := ethclient.Dial(chainAP) if err != nil { @@ -170,13 +191,13 @@ func (o *ObserverImplV1) routineSFC(chainID *big.Int, chainAP string, addr commo } defer chain.Close() - sfc, err := functioncall.NewSfc(addr, chain) + sfc, err := functioncall.NewSfc(contractAddr, chain) if err != nil { logging.Error(err.Error()) return } - observer, err := o.createFinalisedEventObserver(chainID.String(), addr, sfc, o.mq, chain) + observer, err := o.createFinalisedEventObserver(chainID, contractAddr, sfc, o.mq, chain) if err != nil { logging.Error(err.Error()) @@ -193,8 +214,8 @@ func (o *ObserverImplV1) routineSFC(chainID *big.Int, chainAP string, addr commo } } -// routineGPACT is the observe GPACT routine. -func (o *ObserverImplV1) routineGPACT(chainID *big.Int, chainAP string, addr common.Address) { +// routineGPACT starts an Observation for a new GPACT source event +func (o *MultiSourceObserver) routineGPACT(chainID *big.Int, chainAP string, addr common.Address) { for { chain, err := ethclient.Dial(chainAP) if err != nil { @@ -209,7 +230,7 @@ func (o *ObserverImplV1) routineGPACT(chainID *big.Int, chainAP string, addr com return } - observer, err := NewGPACTBridgeRealtimeObserver(chainID.String(), addr, gpact, o.mq) + observer, err := NewGPACTRealtimeObserver(chainID, addr, gpact, o.mq) if err != nil { logging.Error("Error creating observer for Chain: %v, Contract: %v, Error: %v", chainID.String(), addr.String(), err.Error()) return @@ -225,23 +246,9 @@ func (o *ObserverImplV1) routineGPACT(chainID *big.Int, chainAP string, addr com } } -func (o *ObserverImplV1) createFinalisedEventObserver(source string, sourceAddr common.Address, contract *functioncall.Sfc, - mq mqserver.MessageQueue, - client *ethclient.Client) (*SFCBridgeObserver, error) { - dsProgKey := datastore.NewKey(fmt.Sprintf("/%s/%s/last_finalised_block", source, sourceAddr)) +func (o *MultiSourceObserver) createFinalisedEventObserver(chainId *big.Int, contractAddr common.Address, + contract *functioncall.Sfc, mq mqserver.MessageQueue, client *ethclient.Client) (*SingleSourceObserver, error) { + dsProgKey := datastore.NewKey(fmt.Sprintf("/%s/%s/last_finalised_block", chainId, contractAddr)) watcherProgOpts := WatcherProgressDsOpts{o.ds, dsProgKey, DefaultRetryOptions} - return NewSFCBridgeFinalisedObserver(source, sourceAddr, contract, mq, 4, watcherProgOpts, client) -} - -// dsKey gets the datastore key from given chainID and contract address. -func dsKey(chainID *big.Int, contractAddr common.Address) datastore.Key { - return datastore.NewKey(fmt.Sprintf("%v-%v", chainID.String(), contractAddr.String())) -} - -// splitKey splits key to chainID and contract address. -func splitKey(key string) (*big.Int, common.Address) { - res := strings.Split(key, "-") - chainID, _ := big.NewInt(0).SetString(res[0], 10) - addr := common.HexToAddress(res[1]) - return chainID, addr + return NewSFCFinalisedObserver(chainId, contractAddr, contract, mq, 4, watcherProgOpts, client) } diff --git a/services/relayer/internal/msgobserver/eth/observer/utils.go b/services/relayer/internal/msgobserver/eth/observer/utils.go index eab98633..1641dfe7 100644 --- a/services/relayer/internal/msgobserver/eth/observer/utils.go +++ b/services/relayer/internal/msgobserver/eth/observer/utils.go @@ -16,7 +16,6 @@ package observer */ import ( - "encoding/base64" "encoding/binary" "fmt" @@ -29,10 +28,6 @@ func randomBytes(n int) []byte { return res } -func toBase64String(data []byte) string { - return base64.StdEncoding.EncodeToString(data) -} - func uintToBytes(n uint64) []byte { b := make([]byte, binary.MaxVarintLen64) binary.PutUvarint(b, n)