diff --git a/messaging/relayer/cmd/observer/main.go b/messaging/relayer/cmd/observer/main.go index 3266834a..a5b760e9 100644 --- a/messaging/relayer/cmd/observer/main.go +++ b/messaging/relayer/cmd/observer/main.go @@ -16,16 +16,14 @@ package main */ import ( - "encoding/hex" "encoding/json" "github.com/consensys/gpact/messaging/relayer/internal/adminserver" "github.com/consensys/gpact/messaging/relayer/internal/config" "github.com/consensys/gpact/messaging/relayer/internal/contracts/functioncall" "github.com/consensys/gpact/messaging/relayer/internal/logging" - v1 "github.com/consensys/gpact/messaging/relayer/internal/messages/v1" "github.com/consensys/gpact/messaging/relayer/internal/mqserver" - "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/consensys/gpact/messaging/relayer/internal/msgobserver/eth" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" _ "github.com/joho/godotenv/autoload" @@ -75,7 +73,6 @@ func main() { activeChains[request.BcID] = end go func() { - // TODO: User msgobserver. chain, err := ethclient.Dial(request.Chain) if err != nil { logging.Error(err.Error()) @@ -88,48 +85,13 @@ func main() { logging.Error(err.Error()) return } - sink := make(chan *functioncall.SfcCrossCall) - event, err := sfc.WatchCrossCall(&bind.WatchOpts{Start: nil, Context: nil}, sink) + + observer, err := eth.NewSFCBridgeObserver(request.BcID, request.SFCAddr, sfc, s, end) if err != nil { logging.Error(err.Error()) return } - logging.Info("Start observing contract %v.", request.SFCAddr) - for { - select { - case log := <-event.Err(): - logging.Error(log.Error()) - return - case call := <-sink: - // Pack & Send to message queue. - logging.Info("Event observed: %v.", call) - data, err := json.Marshal(call.Raw) - if err != nil { - logging.Error(err.Error()) - return - } - msg := &v1.Message{ - ID: "TBD", //TODO - Timestamp: call.Timestamp.Int64(), - MsgType: v1.MessageType, - Version: v1.Version, - Destination: v1.ApplicationAddress{ - NetworkID: call.DestBcId.String(), - ContractAddress: call.DestContract.String(), - }, - Source: v1.ApplicationAddress{ - NetworkID: request.BcID, - ContractAddress: request.SFCAddr, - }, - Proofs: []v1.Proof{}, - Payload: hex.EncodeToString(data), - } - s.Request(v1.Version, v1.MessageType, msg) - logging.Info("Event processed.") - case <-end: - logging.Info("Stop observing contract %v.", request.SFCAddr) - } - } + observer.Start() }() return []byte{0}, nil } diff --git a/messaging/relayer/internal/msgobserver/eth/event_transformer.go b/messaging/relayer/internal/msgobserver/eth/event_transformer.go index 9a22bd55..f2304176 100644 --- a/messaging/relayer/internal/msgobserver/eth/event_transformer.go +++ b/messaging/relayer/internal/msgobserver/eth/event_transformer.go @@ -17,6 +17,7 @@ package eth import ( "encoding/hex" + "encoding/json" "errors" "fmt" @@ -31,7 +32,8 @@ type EventTransformer interface { // SFCEventTransformer converts events from a simple-function-call bridge contract to relayer messages type SFCEventTransformer struct { - Source string + Source string + SourceAddr string } // ToMessage converts a 'CrossCall' event emited from a simple-function-call bridge contract to relayer message @@ -44,9 +46,14 @@ func (t *SFCEventTransformer) ToMessage(event interface{}) (*v1.Message, error) return nil, err } - source := v1.ApplicationAddress{NetworkID: t.Source, ContractAddress: sfcEvent.DestContract.String()} + source := v1.ApplicationAddress{NetworkID: t.Source, ContractAddress: t.SourceAddr} destination := v1.ApplicationAddress{NetworkID: sfcEvent.DestBcId.String(), ContractAddress: sfcEvent.DestContract.String()} + data, err := json.Marshal(sfcEvent.Raw) + if err != nil { + return nil, err + } + message := v1.Message{ ID: hex.EncodeToString(randomBytes(16)), // TODO: replace with a proper message id scheme Timestamp: sfcEvent.Timestamp.Int64(), @@ -54,7 +61,8 @@ func (t *SFCEventTransformer) ToMessage(event interface{}) (*v1.Message, error) Version: v1.Version, Destination: destination, Source: source, - Payload: toBase64String(sfcEvent.DestFunctionCall), + Proofs: []v1.Proof{}, + Payload: hex.EncodeToString(data), } return &message, nil @@ -72,6 +80,6 @@ func (t *SFCEventTransformer) validate(event *functioncall.SfcCrossCall) error { return nil } -func NewSFCEventTransformer(sourceNetwork string) *SFCEventTransformer { - return &SFCEventTransformer{sourceNetwork} +func NewSFCEventTransformer(sourceNetwork string, sourceAddr string) *SFCEventTransformer { + return &SFCEventTransformer{sourceNetwork, sourceAddr} } diff --git a/messaging/relayer/internal/msgobserver/eth/event_transformer_test.go b/messaging/relayer/internal/msgobserver/eth/event_transformer_test.go index 548388a3..4ea87fd6 100644 --- a/messaging/relayer/internal/msgobserver/eth/event_transformer_test.go +++ b/messaging/relayer/internal/msgobserver/eth/event_transformer_test.go @@ -16,6 +16,8 @@ package eth */ import ( + "encoding/hex" + "encoding/json" "math/big" "testing" @@ -30,16 +32,19 @@ var fixValidEvent = functioncall.SfcCrossCall{ Timestamp: big.NewInt(1639527190), DestFunctionCall: randomBytes(10), } -var transformer = NewSFCEventTransformer("network-001") +var transformer = NewSFCEventTransformer("network-001", "0x8e215d06ea7ec1fdb4fc5fd21768f4b34ee92ef4") func TestSFCTransformer(t *testing.T) { message, err := transformer.ToMessage(&fixValidEvent) assert.Nil(t, err) + data, err := json.Marshal(fixValidEvent.Raw) + assert.Nil(t, err) + assert.Equal(t, fixValidEvent.DestBcId.String(), message.Destination.NetworkID) assert.Equal(t, fixValidEvent.DestContract.String(), message.Destination.ContractAddress) assert.Equal(t, fixValidEvent.Timestamp, big.NewInt(message.Timestamp)) - assert.Equal(t, toBase64String(fixValidEvent.DestFunctionCall), message.Payload) + assert.Equal(t, hex.EncodeToString(data), message.Payload) } func TestSFCTransformerFailsOnInvalidEventType(t *testing.T) { assert.Panics(t, func() { transformer.ToMessage("invalid event") }) diff --git a/messaging/relayer/internal/msgobserver/eth/event_watcher.go b/messaging/relayer/internal/msgobserver/eth/event_watcher.go index 3afe8047..176faa11 100644 --- a/messaging/relayer/internal/msgobserver/eth/event_watcher.go +++ b/messaging/relayer/internal/msgobserver/eth/event_watcher.go @@ -20,6 +20,7 @@ import ( "log" "github.com/consensys/gpact/messaging/relayer/internal/contracts/functioncall" + "github.com/consensys/gpact/messaging/relayer/internal/logging" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/event" ) @@ -39,6 +40,8 @@ type EventWatcherConfig struct { type SFCCrossCallWatcher struct { EventWatcherConfig SfcContract *functioncall.Sfc + + end chan bool } // Watch subscribes and starts listening to 'CrossCall' events from a given simple-function-call contract. @@ -55,17 +58,21 @@ func (l *SFCCrossCallWatcher) Watch() { } func (l *SFCCrossCallWatcher) start(sub event.Subscription, chanEvents <-chan *functioncall.SfcCrossCall) { + logging.Info("Start watching %v...", l.SfcContract) for { select { case err := <-sub.Err(): // TODO: communicate this to the calling context - log.Fatalf("error in log subscription %v", err) + logging.Error("error in log subscription %v", err) case log := <-chanEvents: l.Handler.Handle(log) + case <-l.end: + logging.Info("Stop watching %v.", l.SfcContract) + return } } } -func NewSFCCrossCallWatcher(context context.Context, handler EventHandler, contract *functioncall.Sfc) *SFCCrossCallWatcher { - return &SFCCrossCallWatcher{EventWatcherConfig: EventWatcherConfig{Context: context, Handler: handler}, SfcContract: contract} +func NewSFCCrossCallWatcher(context context.Context, handler EventHandler, contract *functioncall.Sfc, end chan bool) *SFCCrossCallWatcher { + return &SFCCrossCallWatcher{EventWatcherConfig: EventWatcherConfig{Context: context, Handler: handler}, SfcContract: contract, end: end} } diff --git a/messaging/relayer/internal/msgobserver/eth/event_watcher_test.go b/messaging/relayer/internal/msgobserver/eth/event_watcher_test.go index b7b6086e..113a5dd6 100644 --- a/messaging/relayer/internal/msgobserver/eth/event_watcher_test.go +++ b/messaging/relayer/internal/msgobserver/eth/event_watcher_test.go @@ -39,7 +39,7 @@ func TestSFCCrossCallWatcher(t *testing.T) { handler := new(MockEventHandler) handler.On("Handle", mock.AnythingOfType("*functioncall.SfcCrossCall")).Once().Return(nil) - watcher := NewSFCCrossCallWatcher(auth.Context, handler, contract) + watcher := NewSFCCrossCallWatcher(auth.Context, handler, contract, make(chan bool)) go watcher.Watch() _, err := contract.SfcTransactor.CrossBlockchainCall(auth, big.NewInt(100), auth.From, []byte("payload")) diff --git a/messaging/relayer/internal/msgobserver/eth/message_observer.go b/messaging/relayer/internal/msgobserver/eth/message_observer.go index f6b42e9b..dd2f7131 100644 --- a/messaging/relayer/internal/msgobserver/eth/message_observer.go +++ b/messaging/relayer/internal/msgobserver/eth/message_observer.go @@ -30,11 +30,11 @@ type SFCBridgeObserver struct { SourceNetwork string } -func NewSFCBridgeObserver(source string, contract *functioncall.Sfc, mq mqserver.MessageQueue) (*SFCBridgeObserver, error) { - eventTransformer := NewSFCEventTransformer(source) +func NewSFCBridgeObserver(source string, sourceAddr string, contract *functioncall.Sfc, mq mqserver.MessageQueue, end chan bool) (*SFCBridgeObserver, error) { + eventTransformer := NewSFCEventTransformer(source, sourceAddr) messageHandler := NewMessageEnqueueHandler(mq) eventHandler := NewSimpleEventHandler(eventTransformer, messageHandler) - eventWatcher := NewSFCCrossCallWatcher(context.Background(), eventHandler, contract) + eventWatcher := NewSFCCrossCallWatcher(context.Background(), eventHandler, contract, end) return &SFCBridgeObserver{EventWatcher: eventWatcher, EventHandler: eventHandler, SourceNetwork: source}, nil } diff --git a/messaging/relayer/internal/msgobserver/eth/message_observer_test.go b/messaging/relayer/internal/msgobserver/eth/message_observer_test.go index b130c0a5..ac15fb2e 100644 --- a/messaging/relayer/internal/msgobserver/eth/message_observer_test.go +++ b/messaging/relayer/internal/msgobserver/eth/message_observer_test.go @@ -30,12 +30,13 @@ func TestSFCBridgeObserver(t *testing.T) { fixDesAddress := common.HexToAddress("0x8e215d06ea7ec1fdb4fc5fd21768f4b34ee92ef4") fixDestID := big.NewInt(2) fixSourceID := "1" + fixSourceAddress := "0x8e215d06ea7ec1fdb4fc5fd21768f4b34ee92ef4" simBackend, auth := simulatedBackend(t) contract := deployContract(t, simBackend, auth) mockMQ := new(MockMQ) - observer, err := NewSFCBridgeObserver(fixSourceID, contract, mockMQ) + observer, err := NewSFCBridgeObserver(fixSourceID, fixSourceAddress, contract, mockMQ, make(chan bool)) assert.Nil(t, err) go observer.Start()