Skip to content

Commit

Permalink
use workaround
Browse files Browse the repository at this point in the history
  • Loading branch information
lalexgap committed Oct 4, 2022
1 parent 952c19a commit c0fb21f
Showing 1 changed file with 31 additions and 8 deletions.
39 changes: 31 additions & 8 deletions client/engine/chainservice/eth_chainservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"log"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand Down Expand Up @@ -37,6 +38,10 @@ type EthChainService struct {
logger *log.Logger
}

// RESUB_INTERVAL is how often we resubscribe to log events.
// We do this to avoid https://github.com/ethereum/go-ethereum/issues/23845
const RESUB_INTERVAL = 60 * time.Second

// NewEthChainService constructs a chain service that submits transactions to a NitroAdjudicator
// and listens to events from an eventSource
func NewEthChainService(chain ethChain, na *NitroAdjudicator.NitroAdjudicator,
Expand Down Expand Up @@ -111,25 +116,41 @@ func (ecs *EthChainService) SendTransaction(tx protocols.ChainTransaction) error
}

func (ecs *EthChainService) subcribeToEvents() error {

go ecs.listenForLogEvents()
return nil
}

func (ecs *EthChainService) listenForLogEvents() {
// Subsribe to Adjudicator events
query := ethereum.FilterQuery{
Addresses: []common.Address{ecs.naAddress},
}
logs := make(chan ethTypes.Log)
sub, err := ecs.chain.SubscribeFilterLogs(context.Background(), query, logs)
if err != nil {
return err
panic(err)
}
go ecs.listenForLogEvents(sub, logs)
return nil
}

func (ecs *EthChainService) listenForLogEvents(sub ethereum.Subscription, logs chan ethTypes.Log) {
for {
select {
case err := <-sub.Err():
// TODO should we try resubscribing to chain events
ecs.logger.Printf("event subscription error: %v", err)
// If the error is nil then the subscription was closed and we need to re-subscribe.
// This is a workaround for https://github.com/ethereum/go-ethereum/issues/23845
if err == nil {

var sErr error
sub, sErr = ecs.chain.SubscribeFilterLogs(context.Background(), query, logs)
if sErr != nil {
panic(err)
}
ecs.logger.Println("resubscribed to filtered logs")
} else {
panic(err)
}
case <-time.After(RESUB_INTERVAL):
// Due to https://github.com/ethereum/go-ethereum/issues/23845 we can't rely on a long running subscription.
// We unsub here and recreate the subscription in the next iteration of the select.
sub.Unsubscribe()
case chainEvent := <-logs:
switch chainEvent.Topics[0] {
case depositedTopic:
Expand Down Expand Up @@ -168,11 +189,13 @@ func (ecs *EthChainService) listenForLogEvents(sub ethereum.Subscription, logs c

event := ConcludedEvent{commonEvent: commonEvent{channelID: ce.ChannelId, BlockNum: chainEvent.BlockNumber}}
ecs.out <- event

default:
ecs.logger.Printf("Unknown chain event")
}
}
}

}

// EventFeed returns the out chan, and narrows the type so that external consumers may only receive on it.
Expand Down

0 comments on commit c0fb21f

Please sign in to comment.