Skip to content

Commit

Permalink
Temp fix to the observer to survive network loss (Consensys#90)
Browse files Browse the repository at this point in the history
  • Loading branch information
wcgcyx authored Feb 3, 2022
1 parent d1814f2 commit 90ea172
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 30 deletions.
10 changes: 5 additions & 5 deletions services/relayer/deploy/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ resource "aws_instance" "relayer" {
sudo apt-get install -y docker-ce=5:20.10.12~3-0~ubuntu-focal docker-ce-cli=5:20.10.12~3-0~ubuntu-focal containerd.io
wget https://github.com/grafana/loki/releases/download/v2.3.0/promtail-linux-amd64.zip
wget https://golang.org/dl/go1.17.1.linux-amd64.tar.gz
wget https://raw.githubusercontent.com/ConsenSys/gpact/cros-15-message-signer/messaging/relayer/deploy/promtail-cloud-config.yaml
unzip ./promtail-linux-amd64.zip
sudo tar -C /usr/local -xzf go1.17.1.linux-amd64.tar.gz
mkdir /home/ubuntu/go
Expand All @@ -42,17 +41,18 @@ resource "aws_instance" "relayer" {
export PATH=$PATH:/usr/local/go/bin:$GOPATH/bin
git clone https://github.com/ConsenSys/gpact.git
cd ./gpact
git checkout cros-15-message-signer
cd ./messaging/relayer
cd ./services/relayer
make build
cp ./deploy/promtail-cloud-config.yaml ../../../
export IP="${aws_instance.relayer-monitor.public_ip}"
echo " host: node1" >> ../../../promtail-cloud-config.yaml
echo "clients:" >> ../../../promtail-cloud-config.yaml
echo " - url: http://$IP:3100/loki/api/v1/push" >> ../../../promtail-cloud-config.yaml
docker run -d -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management
sleep 30
cd ./build
LOG_SERVICE_NAME=observer LOG_LEVEL=debug LOG_TARGET=STDOUT LOG_DIR=.observer/log LOG_FILE=observer.log LOG_MAX_BACKUPS=3 LOG_MAX_AGE=28 LOG_MAX_SIZE=500 LOG_COMPRESS=true LOG_TIME_FORMAT=RFC3339 OUTBOUND_MQ_ADDR=amqp://guest:guest@localhost:5672/ OUTBOUND_CH_NAME=channel1 API_PORT=9425 OBSERVER_DS_PATH=.relayer-observer/ nohup ./observer >> /home/ubuntu/all.log 2>&1 &
LOG_SERVICE_NAME=observer1 LOG_LEVEL=debug LOG_TARGET=STDOUT LOG_DIR=.observer/log1 LOG_FILE=observer1.log LOG_MAX_BACKUPS=3 LOG_MAX_AGE=28 LOG_MAX_SIZE=500 LOG_COMPRESS=true LOG_TIME_FORMAT=RFC3339 OUTBOUND_MQ_ADDR=amqp://guest:guest@localhost:5672/ OUTBOUND_CH_NAME=channel1 API_PORT=9424 OBSERVER_DS_PATH=.relayer-observer1/ nohup ./observer >> /home/ubuntu/all.log 2>&1 &
LOG_SERVICE_NAME=observer2 LOG_LEVEL=debug LOG_TARGET=STDOUT LOG_DIR=.observer/log2 LOG_FILE=observer2.log LOG_MAX_BACKUPS=3 LOG_MAX_AGE=28 LOG_MAX_SIZE=500 LOG_COMPRESS=true LOG_TIME_FORMAT=RFC3339 OUTBOUND_MQ_ADDR=amqp://guest:guest@localhost:5672/ OUTBOUND_CH_NAME=channel1 API_PORT=9425 OBSERVER_DS_PATH=.relayer-observer2/ nohup ./observer >> /home/ubuntu/all.log 2>&1 &
LOG_SERVICE_NAME=relayer LOG_LEVEL=debug LOG_TARGET=STDOUT LOG_DIR=.relayer/log LOG_FILE=relayer.log LOG_MAX_BACKUPS=3 LOG_MAX_AGE=28 LOG_MAX_SIZE=500 LOG_COMPRESS=true LOG_TIME_FORMAT=RFC3339 INBOUND_MQ_ADDR=amqp://guest:guest@localhost:5672/ INBOUND_CH_NAME=channel1 OUTBOUND_MQ_ADDR=amqp://guest:guest@localhost:5672/ OUTBOUND_CH_NAME=channel2 API_PORT=9426 SIGNER_DS_PATH=.relayer-signer/ nohup ./relayer >> /home/ubuntu/all.log 2>&1 &
LOG_SERVICE_NAME=dispatcher LOG_LEVEL=debug LOG_TARGET=STDOUT LOG_DIR=.dispatcher/log LOG_FILE=dispatcher.log LOG_MAX_BACKUPS=3 LOG_MAX_AGE=28 LOG_MAX_SIZE=500 LOG_COMPRESS=true LOG_TIME_FORMAT=RFC3339 INBOUND_MQ_ADDR=amqp://guest:guest@localhost:5672/ INBOUND_CH_NAME=channel2 API_PORT=9427 TRANSACTOR_DS_PATH=.relayer-transactor/ VERIFIER_DS_PATH=.relayer-verifier/ nohup ./dispatcher >> /home/ubuntu/all.log 2>&1 &
cd ../../../../
Expand Down Expand Up @@ -133,7 +133,7 @@ resource "aws_security_group" "security_relayer" {
}

ingress {
from_port = 9425
from_port = 9424
to_port = 9427
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
Expand Down
2 changes: 1 addition & 1 deletion services/relayer/internal/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Init(conf *viper.Viper) zerolog.Logger {
writer := getLogTarget(conf)
service := getLogServiceName(conf)
logger := zerolog.New(writer).With().Timestamp().Str("service", service).Logger()
log.Logger = logger
log.Logger = logger.With().Caller().Logger()
return logger
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ package observer
import (
"context"
"fmt"
"log"

"github.com/consensys/gpact/messaging/relayer/internal/contracts/functioncall"
"github.com/consensys/gpact/messaging/relayer/internal/logging"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"log"
)

// EventWatcher listens to blockchain events
Expand All @@ -51,23 +52,23 @@ type SFCCrossCallRealtimeEventWatcher struct {
// Watch subscribes and starts listening to 'CrossCall' events from a given Simple Function Call contract.
// Events received are passed to an event handler for processing.
// The method fails if subscribing to the event with the underlying network is not successful.
func (l *SFCCrossCallRealtimeEventWatcher) Watch() {
func (l *SFCCrossCallRealtimeEventWatcher) Watch() error {
opts := bind.WatchOpts{Start: &l.Start, Context: l.Context}
chanEvents := make(chan *functioncall.SfcCrossCall)
sub, err := l.SfcContract.WatchCrossCall(&opts, chanEvents)
if err != nil {
log.Fatalf("failed to subscribe to crosschaincall event %v", err)
}
l.start(sub, chanEvents)
return l.start(sub, chanEvents)
}

func (l *SFCCrossCallRealtimeEventWatcher) start(sub event.Subscription, chanEvents <-chan *functioncall.SfcCrossCall) {
func (l *SFCCrossCallRealtimeEventWatcher) start(sub event.Subscription, chanEvents <-chan *functioncall.SfcCrossCall) error {
logging.Info("Start watching %v...", l.SfcContract)
for {
select {
case err := <-sub.Err():
// TODO: communicate this to the calling context
logging.Error("error in log subscription %v", err)
return fmt.Errorf("error in log subscription %v", err)
case ev := <-chanEvents:
if ev.Raw.Removed {
l.RemovedEventHandler.Handle(ev)
Expand All @@ -76,7 +77,7 @@ func (l *SFCCrossCallRealtimeEventWatcher) start(sub event.Subscription, chanEve
}
case <-l.end:
logging.Info("Stop watching %v.", l.SfcContract)
return
return nil
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ func NewSFCBridgeObserver(source string, sourceAddr string, contract *functionca
return &SFCBridgeObserver{EventWatcher: eventWatcher, EventHandler: eventHandler, SourceNetwork: source}, nil
}

func (o *SFCBridgeObserver) Start() {
o.EventWatcher.Watch()
func (o *SFCBridgeObserver) Start() error {
return o.EventWatcher.Watch()
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"math/big"
"strings"
"time"

"github.com/consensys/gpact/messaging/relayer/internal/contracts/functioncall"
"github.com/consensys/gpact/messaging/relayer/internal/logging"
Expand Down Expand Up @@ -145,25 +146,33 @@ func (o *ObserverImplV1) StopObserve() error {

// routine is the observe routine.
func (o *ObserverImplV1) routine(chainID *big.Int, chainAP string, addr common.Address, end chan bool) {
chain, err := ethclient.Dial(chainAP)
if err != nil {
logging.Error(err.Error())
return
}
defer chain.Close()
for {
chain, err := ethclient.Dial(chainAP)
if err != nil {
logging.Error(err.Error())
return
}
defer chain.Close()

sfc, err := functioncall.NewSfc(addr, chain)
if err != nil {
logging.Error(err.Error())
return
}
sfc, err := functioncall.NewSfc(addr, chain)
if err != nil {
logging.Error(err.Error())
return
}

observer, err := NewSFCBridgeObserver(chainID.String(), addr.String(), sfc, o.mq, end)
if err != nil {
logging.Error(err.Error())
return
observer, err := NewSFCBridgeObserver(chainID.String(), addr.String(), sfc, o.mq, end)
if err != nil {
logging.Error(err.Error())
return
}
if observer.Start() == nil {
break
} else {
logging.Warn("Error in observing event. Retry in 3 seconds...")
chain.Close()
time.Sleep(3 * time.Second)
}
}
observer.Start()
}

// dsKey gets the datastore key from given chainID and contract address.
Expand Down

0 comments on commit 90ea172

Please sign in to comment.