Skip to content

Commit

Permalink
Merge pull request Consensys#79 from ConsenSys/cros-15-message-signer
Browse files Browse the repository at this point in the history
Cros 15 message signer
  • Loading branch information
wcgcyx authored Jan 6, 2022
2 parents e732d16 + 5bbbc0e commit cebda15
Show file tree
Hide file tree
Showing 34 changed files with 5,419 additions and 61 deletions.
28 changes: 8 additions & 20 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,12 @@ jobs:
ls -al
echo GOROOT: $GOROOT
- run:
name: Build production code
command: make build
name: Build test docker container
command: make docker
working_directory: messaging/relayer
- run:
name: Create and Start blockchains
working_directory: test-blockchains
name: Create and Start blockchains & Relayers
working_directory: messaging/relayer
background: true
command: docker-compose up
- run: sleep 30
Expand All @@ -293,25 +293,13 @@ jobs:
command: make utest
working_directory: messaging/relayer
- run:
name: Stop blockchains
working_directory: test-blockchains
name: Stop blockchains & Relayers
working_directory: messaging/relayer
command: docker-compose stop
when: always
- run:
name: Logs from bc31
working_directory: test-blockchains
command: |
docker container logs test-blockchains_bc31node1_1
when: always
- run:
name: Logs from bc32
working_directory: test-blockchains
command: |
docker container logs test-blockchains_bc32node1_1
when: always
- run:
name: Remove blockchains
working_directory: test-blockchains
name: Remove blockchains & Relayers
working_directory: messaging/relayer
command: docker-compose down
when: always

Expand Down
4 changes: 2 additions & 2 deletions messaging/relayer/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ clean:
rm -rf ./build/*

utest:
go test ./...

go test -v --count=1 ./internal/...
go test -v --count=1 ./itest/
159 changes: 157 additions & 2 deletions messaging/relayer/cmd/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,39 @@ package main
*/

import (
"context"
"crypto/ecdsa"
"encoding/hex"
"encoding/json"
"math/big"
"strconv"
"time"

"github.com/consensys/gpact/messaging/relayer/internal/adminserver"
"github.com/consensys/gpact/messaging/relayer/internal/config"
"github.com/consensys/gpact/messaging/relayer/internal/contracts/messaging"
"github.com/consensys/gpact/messaging/relayer/internal/logging"
"github.com/consensys/gpact/messaging/relayer/internal/messages"
v1 "github.com/consensys/gpact/messaging/relayer/internal/messages/v1"
"github.com/consensys/gpact/messaging/relayer/internal/mqserver"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/secp256k1"
"github.com/ethereum/go-ethereum/ethclient"
_ "github.com/joho/godotenv/autoload"
)

// TODO, Need a separate package to put all core components.
var s *mqserver.MQServer
var api adminserver.AdminServer
var chainInfos map[byte]chainInfo

type chainInfo struct {
auth *bind.TransactOpts
chain string
esAddr string
}

func main() {
// Load config
Expand All @@ -48,8 +71,52 @@ func main() {
if err != nil {
panic(err)
}
s.Start()
err = s.Start()
if err != nil {
panic(err)
}
logging.Info("Dispatcher started.")

// For now just have one single handler for dispatching chain with type 1.
chainInfos = make(map[byte]chainInfo)
apiHandlers := make(map[byte]func(req []byte) ([]byte, error))
apiHandlers[1] = func(req []byte) ([]byte, error) {
request := Request{}
err = json.Unmarshal(req, &request)
if err != nil {
return nil, err
}
key, err := hex.DecodeString(request.Key)
if err != nil {
return nil, err
}

x, y := secp256k1.S256().ScalarBaseMult(key)
prv := &ecdsa.PrivateKey{}
prv.D = big.NewInt(0).SetBytes(key)
prv.PublicKey = ecdsa.PublicKey{
X: x,
Y: y,
Curve: secp256k1.S256(),
}
auth := bind.NewKeyedTransactor(prv)
auth.Nonce = big.NewInt(int64(request.Nonce))
auth.Value = big.NewInt(0) // in wei
auth.GasLimit = uint64(3000000) // in units
auth.GasPrice = big.NewInt(0)

chainInfos[request.BcID] = chainInfo{
auth: auth,
chain: request.Chain,
esAddr: request.EsAddr,
}
return []byte{0}, nil
}
api = adminserver.NewAdminServerImpl(conf.APIPort, apiHandlers)
err = api.Start()
if err != nil {
panic(err)
}
for {
}
}
Expand All @@ -58,5 +125,93 @@ func main() {
func simpleHandler(req messages.Message) {
// Received request from observer
msg := req.(*v1.Message)
logging.Info("Received message with ID %v and payload %v", msg.ID, msg.Payload)
logging.Info("Process message with ID: %v", msg.ID)

destID, err := strconv.Atoi(msg.Destination.NetworkID)
if err != nil {
logging.Error(err.Error())
return
}

info, ok := chainInfos[byte(destID)]
if !ok {
logging.Error("chain id %v not supported", destID)
return
}

chain, err := ethclient.Dial(info.chain)
if err != nil {
logging.Error(err.Error())
return
}
defer chain.Close()

srcID, err := strconv.Atoi(msg.Source.NetworkID)
if err != nil {
logging.Error(err.Error())
return
}
sfcAddr := common.HexToAddress(msg.Source.ContractAddress)
esAddr := common.HexToAddress(info.esAddr)

// Load verifier
verifier, err := messaging.NewSignedEventStore(esAddr, chain)
if err != nil {
logging.Error(err.Error())
return
}

// Get proof
data, err := hex.DecodeString(msg.Payload)
if err != nil {
logging.Error(err.Error())
return
}
raw := types.Log{}
err = json.Unmarshal(data, &raw)
if err != nil {
logging.Error(err.Error())
return
}
if len(msg.Proofs) == 0 {
logging.Error("Empty proofs received.")
return
}
signature, err := hex.DecodeString(msg.Proofs[0].Proof)
if err != nil {
logging.Error(err.Error())
return
}
tx, err := verifier.RelayEvent(info.auth, big.NewInt(int64(srcID)), sfcAddr, raw.Data, signature)
if err != nil {
logging.Error(err.Error())
return
}
waitForReceipt(chain, tx)
info.auth.Nonce = big.NewInt(0).Add(info.auth.Nonce, big.NewInt(1))
}

// TODO: Create a package to place all admin APIs.
type Request struct {
BcID byte `json:"bc_id"`
Key string `json:"key"`
Nonce int `json:"nonce"`
Chain string `json:"chain"`
EsAddr string `json:"es_addr"`
}

func waitForReceipt(conn *ethclient.Client, tx *types.Transaction) error {
for {
rept, err := conn.TransactionReceipt(context.Background(), tx.Hash())
if err == nil {
if rept.Status != types.ReceiptStatusSuccessful {
logging.Error("Transaction failed... ", rept)
return nil
} else {
}
break
}
time.Sleep(1 * time.Second)
}
return nil
}
110 changes: 104 additions & 6 deletions messaging/relayer/cmd/observer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,25 @@ package main
*/

import (
"time"
"encoding/hex"
"encoding/json"

"github.com/consensys/gpact/messaging/relayer/internal/adminserver"
"github.com/consensys/gpact/messaging/relayer/internal/config"
"github.com/consensys/gpact/messaging/relayer/internal/contracts/functioncall"
"github.com/consensys/gpact/messaging/relayer/internal/logging"
v1 "github.com/consensys/gpact/messaging/relayer/internal/messages/v1"
"github.com/consensys/gpact/messaging/relayer/internal/mqserver"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
_ "github.com/joho/godotenv/autoload"
)

// TODO, Need a separate package to put all core components.
var s *mqserver.MQServer
var s mqserver.MessageQueue
var api adminserver.AdminServer
var activeChains map[string]chan bool

func main() {
// Load config
Expand All @@ -42,11 +52,99 @@ func main() {
if err != nil {
panic(err)
}
s.Start()
err = s.Start()
if err != nil {
panic(err)
}
logging.Info("Observer started.")

// For now just have one single handler for observing chain with type 1.
activeChains = make(map[string]chan bool)
apiHandlers := make(map[byte]func(req []byte) ([]byte, error))
apiHandlers[1] = func(req []byte) ([]byte, error) {
request := Request{}
err = json.Unmarshal(req, &request)
if err != nil {
return nil, err
}
end, ok := activeChains[request.BcID]
if ok {
end <- true
}
end = make(chan bool)
activeChains[request.BcID] = end

go func() {
// TODO: User msgobserver.
chain, err := ethclient.Dial(request.Chain)
if err != nil {
logging.Error(err.Error())
return
}
defer chain.Close()

sfc, err := functioncall.NewSfc(common.HexToAddress(request.SFCAddr), chain)
if err != nil {
logging.Error(err.Error())
return
}
sink := make(chan *functioncall.SfcCrossCall)
event, err := sfc.WatchCrossCall(&bind.WatchOpts{Start: nil, Context: nil}, sink)
if err != nil {
logging.Error(err.Error())
return
}
logging.Info("Start observing contract %v.", request.SFCAddr)
for {
select {
case log := <-event.Err():
logging.Error(log.Error())
return
case call := <-sink:
// Pack & Send to message queue.
logging.Info("Event observed: %v.", call)
data, err := json.Marshal(call.Raw)
if err != nil {
logging.Error(err.Error())
return
}
msg := &v1.Message{
ID: "TBD", //TODO
Timestamp: call.Timestamp.Int64(),
MsgType: v1.MessageType,
Version: v1.Version,
Destination: v1.ApplicationAddress{
NetworkID: call.DestBcId.String(),
ContractAddress: call.DestContract.String(),
},
Source: v1.ApplicationAddress{
NetworkID: request.BcID,
ContractAddress: request.SFCAddr,
},
Proofs: []v1.Proof{},
Payload: hex.EncodeToString(data),
}
s.Request(v1.Version, v1.MessageType, msg)
logging.Info("Event processed.")
case <-end:
logging.Info("Stop observing contract %v.", request.SFCAddr)
}
}
}()
return []byte{0}, nil
}
api = adminserver.NewAdminServerImpl(conf.APIPort, apiHandlers)
err = api.Start()
if err != nil {
panic(err)
}
for {
// Send a random message every 3 seconds.
// go sendRandomMessage()
time.Sleep(3 * time.Second)
}
}

// TODO: Create a package to place all admin APIs.
type Request struct {
BcID string `json:"bc_id"`
Chain string `json:"chain"`
SFCAddr string `json:"sfc_addr"`
}
Loading

0 comments on commit cebda15

Please sign in to comment.