Skip to content

Commit

Permalink
Use observer eth pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
wcgcyx committed Jan 6, 2022
1 parent 5bbbc0e commit 6af4c72
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 57 deletions.
46 changes: 4 additions & 42 deletions messaging/relayer/cmd/observer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@ package main
*/

import (
"encoding/hex"
"encoding/json"

"github.com/consensys/gpact/messaging/relayer/internal/adminserver"
"github.com/consensys/gpact/messaging/relayer/internal/config"
"github.com/consensys/gpact/messaging/relayer/internal/contracts/functioncall"
"github.com/consensys/gpact/messaging/relayer/internal/logging"
v1 "github.com/consensys/gpact/messaging/relayer/internal/messages/v1"
"github.com/consensys/gpact/messaging/relayer/internal/mqserver"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/consensys/gpact/messaging/relayer/internal/msgobserver/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
_ "github.com/joho/godotenv/autoload"
Expand Down Expand Up @@ -75,7 +73,6 @@ func main() {
activeChains[request.BcID] = end

go func() {
// TODO: User msgobserver.
chain, err := ethclient.Dial(request.Chain)
if err != nil {
logging.Error(err.Error())
Expand All @@ -88,48 +85,13 @@ func main() {
logging.Error(err.Error())
return
}
sink := make(chan *functioncall.SfcCrossCall)
event, err := sfc.WatchCrossCall(&bind.WatchOpts{Start: nil, Context: nil}, sink)

observer, err := eth.NewSFCBridgeObserver(request.BcID, request.SFCAddr, sfc, s, end)
if err != nil {
logging.Error(err.Error())
return
}
logging.Info("Start observing contract %v.", request.SFCAddr)
for {
select {
case log := <-event.Err():
logging.Error(log.Error())
return
case call := <-sink:
// Pack & Send to message queue.
logging.Info("Event observed: %v.", call)
data, err := json.Marshal(call.Raw)
if err != nil {
logging.Error(err.Error())
return
}
msg := &v1.Message{
ID: "TBD", //TODO
Timestamp: call.Timestamp.Int64(),
MsgType: v1.MessageType,
Version: v1.Version,
Destination: v1.ApplicationAddress{
NetworkID: call.DestBcId.String(),
ContractAddress: call.DestContract.String(),
},
Source: v1.ApplicationAddress{
NetworkID: request.BcID,
ContractAddress: request.SFCAddr,
},
Proofs: []v1.Proof{},
Payload: hex.EncodeToString(data),
}
s.Request(v1.Version, v1.MessageType, msg)
logging.Info("Event processed.")
case <-end:
logging.Info("Stop observing contract %v.", request.SFCAddr)
}
}
observer.Start()
}()
return []byte{0}, nil
}
Expand Down
18 changes: 13 additions & 5 deletions messaging/relayer/internal/msgobserver/eth/event_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package eth

import (
"encoding/hex"
"encoding/json"
"errors"
"fmt"

Expand All @@ -31,7 +32,8 @@ type EventTransformer interface {

// SFCEventTransformer converts events from a simple-function-call bridge contract to relayer messages
type SFCEventTransformer struct {
Source string
Source string
SourceAddr string
}

// ToMessage converts a 'CrossCall' event emited from a simple-function-call bridge contract to relayer message
Expand All @@ -44,17 +46,23 @@ func (t *SFCEventTransformer) ToMessage(event interface{}) (*v1.Message, error)
return nil, err
}

source := v1.ApplicationAddress{NetworkID: t.Source, ContractAddress: sfcEvent.DestContract.String()}
source := v1.ApplicationAddress{NetworkID: t.Source, ContractAddress: t.SourceAddr}
destination := v1.ApplicationAddress{NetworkID: sfcEvent.DestBcId.String(), ContractAddress: sfcEvent.DestContract.String()}

data, err := json.Marshal(sfcEvent.Raw)
if err != nil {
return nil, err
}

message := v1.Message{
ID: hex.EncodeToString(randomBytes(16)), // TODO: replace with a proper message id scheme
Timestamp: sfcEvent.Timestamp.Int64(),
MsgType: v1.MessageType,
Version: v1.Version,
Destination: destination,
Source: source,
Payload: toBase64String(sfcEvent.DestFunctionCall),
Proofs: []v1.Proof{},
Payload: hex.EncodeToString(data),
}

return &message, nil
Expand All @@ -72,6 +80,6 @@ func (t *SFCEventTransformer) validate(event *functioncall.SfcCrossCall) error {
return nil
}

func NewSFCEventTransformer(sourceNetwork string) *SFCEventTransformer {
return &SFCEventTransformer{sourceNetwork}
func NewSFCEventTransformer(sourceNetwork string, sourceAddr string) *SFCEventTransformer {
return &SFCEventTransformer{sourceNetwork, sourceAddr}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package eth
*/

import (
"encoding/hex"
"encoding/json"
"math/big"
"testing"

Expand All @@ -30,16 +32,19 @@ var fixValidEvent = functioncall.SfcCrossCall{
Timestamp: big.NewInt(1639527190),
DestFunctionCall: randomBytes(10),
}
var transformer = NewSFCEventTransformer("network-001")
var transformer = NewSFCEventTransformer("network-001", "0x8e215d06ea7ec1fdb4fc5fd21768f4b34ee92ef4")

func TestSFCTransformer(t *testing.T) {
message, err := transformer.ToMessage(&fixValidEvent)
assert.Nil(t, err)

data, err := json.Marshal(fixValidEvent.Raw)
assert.Nil(t, err)

assert.Equal(t, fixValidEvent.DestBcId.String(), message.Destination.NetworkID)
assert.Equal(t, fixValidEvent.DestContract.String(), message.Destination.ContractAddress)
assert.Equal(t, fixValidEvent.Timestamp, big.NewInt(message.Timestamp))
assert.Equal(t, toBase64String(fixValidEvent.DestFunctionCall), message.Payload)
assert.Equal(t, hex.EncodeToString(data), message.Payload)
}
func TestSFCTransformerFailsOnInvalidEventType(t *testing.T) {
assert.Panics(t, func() { transformer.ToMessage("invalid event") })
Expand Down
13 changes: 10 additions & 3 deletions messaging/relayer/internal/msgobserver/eth/event_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"log"

"github.com/consensys/gpact/messaging/relayer/internal/contracts/functioncall"
"github.com/consensys/gpact/messaging/relayer/internal/logging"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/event"
)
Expand All @@ -39,6 +40,8 @@ type EventWatcherConfig struct {
type SFCCrossCallWatcher struct {
EventWatcherConfig
SfcContract *functioncall.Sfc

end chan bool
}

// Watch subscribes and starts listening to 'CrossCall' events from a given simple-function-call contract.
Expand All @@ -55,17 +58,21 @@ func (l *SFCCrossCallWatcher) Watch() {
}

func (l *SFCCrossCallWatcher) start(sub event.Subscription, chanEvents <-chan *functioncall.SfcCrossCall) {
logging.Info("Start watching %v...", l.SfcContract)
for {
select {
case err := <-sub.Err():
// TODO: communicate this to the calling context
log.Fatalf("error in log subscription %v", err)
logging.Error("error in log subscription %v", err)
case log := <-chanEvents:
l.Handler.Handle(log)
case <-l.end:
logging.Info("Stop watching %v.", l.SfcContract)
return
}
}
}

func NewSFCCrossCallWatcher(context context.Context, handler EventHandler, contract *functioncall.Sfc) *SFCCrossCallWatcher {
return &SFCCrossCallWatcher{EventWatcherConfig: EventWatcherConfig{Context: context, Handler: handler}, SfcContract: contract}
func NewSFCCrossCallWatcher(context context.Context, handler EventHandler, contract *functioncall.Sfc, end chan bool) *SFCCrossCallWatcher {
return &SFCCrossCallWatcher{EventWatcherConfig: EventWatcherConfig{Context: context, Handler: handler}, SfcContract: contract, end: end}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestSFCCrossCallWatcher(t *testing.T) {
handler := new(MockEventHandler)
handler.On("Handle", mock.AnythingOfType("*functioncall.SfcCrossCall")).Once().Return(nil)

watcher := NewSFCCrossCallWatcher(auth.Context, handler, contract)
watcher := NewSFCCrossCallWatcher(auth.Context, handler, contract, make(chan bool))
go watcher.Watch()

_, err := contract.SfcTransactor.CrossBlockchainCall(auth, big.NewInt(100), auth.From, []byte("payload"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ type SFCBridgeObserver struct {
SourceNetwork string
}

func NewSFCBridgeObserver(source string, contract *functioncall.Sfc, mq mqserver.MessageQueue) (*SFCBridgeObserver, error) {
eventTransformer := NewSFCEventTransformer(source)
func NewSFCBridgeObserver(source string, sourceAddr string, contract *functioncall.Sfc, mq mqserver.MessageQueue, end chan bool) (*SFCBridgeObserver, error) {
eventTransformer := NewSFCEventTransformer(source, sourceAddr)
messageHandler := NewMessageEnqueueHandler(mq)
eventHandler := NewSimpleEventHandler(eventTransformer, messageHandler)
eventWatcher := NewSFCCrossCallWatcher(context.Background(), eventHandler, contract)
eventWatcher := NewSFCCrossCallWatcher(context.Background(), eventHandler, contract, end)

return &SFCBridgeObserver{EventWatcher: eventWatcher, EventHandler: eventHandler, SourceNetwork: source}, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ func TestSFCBridgeObserver(t *testing.T) {
fixDesAddress := common.HexToAddress("0x8e215d06ea7ec1fdb4fc5fd21768f4b34ee92ef4")
fixDestID := big.NewInt(2)
fixSourceID := "1"
fixSourceAddress := "0x8e215d06ea7ec1fdb4fc5fd21768f4b34ee92ef4"

simBackend, auth := simulatedBackend(t)
contract := deployContract(t, simBackend, auth)
mockMQ := new(MockMQ)

observer, err := NewSFCBridgeObserver(fixSourceID, contract, mockMQ)
observer, err := NewSFCBridgeObserver(fixSourceID, fixSourceAddress, contract, mockMQ, make(chan bool))
assert.Nil(t, err)
go observer.Start()

Expand Down

0 comments on commit 6af4c72

Please sign in to comment.