-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Periodically resubscribe to chain event logs #922
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ import ( | |
"fmt" | ||
"io" | ||
"log" | ||
"time" | ||
|
||
"github.com/ethereum/go-ethereum" | ||
"github.com/ethereum/go-ethereum/accounts/abi/bind" | ||
|
@@ -37,6 +38,10 @@ type EthChainService struct { | |
logger *log.Logger | ||
} | ||
|
||
// RESUB_INTERVAL is how often we resubscribe to log events. | ||
// We do this to avoid https://github.com/ethereum/go-ethereum/issues/23845 | ||
const RESUB_INTERVAL = 60 * time.Second | ||
|
||
// NewEthChainService constructs a chain service that submits transactions to a NitroAdjudicator | ||
// and listens to events from an eventSource | ||
func NewEthChainService(chain ethChain, na *NitroAdjudicator.NitroAdjudicator, | ||
|
@@ -111,25 +116,41 @@ func (ecs *EthChainService) SendTransaction(tx protocols.ChainTransaction) error | |
} | ||
|
||
func (ecs *EthChainService) subcribeToEvents() error { | ||
|
||
go ecs.listenForLogEvents() | ||
return nil | ||
} | ||
|
||
func (ecs *EthChainService) listenForLogEvents() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am probably missing something, but why is the below logic moved to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Based on our test run behaviour I'm pretty sure that our issue is being caused by the geth issue as:
Agreed 😔.
That's a good point! I haven't run into that problem when running testground runs. I think we may be ok but I'll investigate. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
For a test with a long runtime, does the test open/close more on-chain channels than a short running test? Or do longer running tests open/use/close just more virtual channels? We use a websocket for connecting to an Ethereum JSON rpc endpoint. I wonder if there is some timeout issues with that websocket... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Nope, the long running test would create the same amount of ledger channels, so we should have the same amount of on chain channels (every participant creates a ledger channel with every hub).
This comment was marked as off-topic.
Sorry, something went wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did a little spelunking through
5 minutes seems roughly in line with the failures we've seen, so that might explain it. Edit: I've posted my hunch to the issue There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it makes sense to change our resubscription period to something like 4 minutes? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yup, now that we know it's a 5 minute timeout it makes sense to use a larger value. |
||
// Subsribe to Adjudicator events | ||
query := ethereum.FilterQuery{ | ||
Addresses: []common.Address{ecs.naAddress}, | ||
} | ||
logs := make(chan ethTypes.Log) | ||
sub, err := ecs.chain.SubscribeFilterLogs(context.Background(), query, logs) | ||
if err != nil { | ||
return err | ||
panic(err) | ||
} | ||
go ecs.listenForLogEvents(sub, logs) | ||
return nil | ||
} | ||
|
||
func (ecs *EthChainService) listenForLogEvents(sub ethereum.Subscription, logs chan ethTypes.Log) { | ||
for { | ||
select { | ||
case err := <-sub.Err(): | ||
// TODO should we try resubscribing to chain events | ||
ecs.logger.Printf("event subscription error: %v", err) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
// If the error is nil then the subscription was closed and we need to re-subscribe. | ||
// This is a workaround for https://github.com/ethereum/go-ethereum/issues/23845 | ||
var sErr error | ||
sub, sErr = ecs.chain.SubscribeFilterLogs(context.Background(), query, logs) | ||
if sErr != nil { | ||
panic(err) | ||
} | ||
ecs.logger.Println("resubscribed to filtered logs") | ||
|
||
case <-time.After(RESUB_INTERVAL): | ||
// Due to https://github.com/ethereum/go-ethereum/issues/23845 we can't rely on a long running subscription. | ||
// We unsub here and recreate the subscription in the next iteration of the select. | ||
sub.Unsubscribe() | ||
case chainEvent := <-logs: | ||
switch chainEvent.Topics[0] { | ||
case depositedTopic: | ||
|
@@ -168,11 +189,13 @@ func (ecs *EthChainService) listenForLogEvents(sub ethereum.Subscription, logs c | |
|
||
event := ConcludedEvent{commonEvent: commonEvent{channelID: ce.ChannelId, BlockNum: chainEvent.BlockNumber}} | ||
ecs.out <- event | ||
|
||
default: | ||
ecs.logger.Printf("Unknown chain event") | ||
} | ||
} | ||
} | ||
|
||
} | ||
|
||
// EventFeed returns the out chan, and narrows the type so that external consumers may only receive on it. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.