Skip to content

Commit e16052e

Browse files
Merge pull request #36 from perun-network/generic-events-sub
Generic events sub
2 parents 827f7c2 + a325916 commit e16052e

File tree

4 files changed

+484
-56
lines changed

4 files changed

+484
-56
lines changed

backend/ethereum/channel/funder.go

Lines changed: 42 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
"github.com/pkg/errors"
2828

2929
"perun.network/go-perun/backend/ethereum/bindings/assetholder"
30-
cherrors "perun.network/go-perun/backend/ethereum/channel/errors"
30+
"perun.network/go-perun/backend/ethereum/subscription"
3131
"perun.network/go-perun/backend/ethereum/wallet"
3232
"perun.network/go-perun/channel"
3333
"perun.network/go-perun/client"
@@ -184,7 +184,7 @@ func (f *Funder) sendFundingTx(ctx context.Context, request channel.FundingReq,
184184
// nolint: gocritic
185185
if bal == nil || bal.Sign() <= 0 {
186186
f.log.WithFields(log.Fields{"channel": request.Params.ID(), "idx": request.Idx}).Debug("Skipped zero funding.")
187-
} else if alreadyFunded, err := checkFunded(ctx, bal, contract, fundingID); err != nil {
187+
} else if alreadyFunded, err := f.checkFunded(ctx, bal, contract, fundingID); err != nil {
188188
return nil, errors.WithMessage(err, "checking funded")
189189
} else if alreadyFunded {
190190
f.log.WithFields(log.Fields{"channel": request.Params.ID(), "idx": request.Idx}).Debug("Skipped second funding.")
@@ -210,76 +210,61 @@ func (f *Funder) deposit(ctx context.Context, bal *big.Int, asset Asset, funding
210210
}
211211

212212
// checkFunded returns whether `fundingID` holds at least `amount` funds.
213-
func checkFunded(ctx context.Context, amount *big.Int, asset assetHolder, fundingID [32]byte) (bool, error) {
214-
iter, err := filterFunds(ctx, asset, fundingID)
213+
func (f *Funder) checkFunded(ctx context.Context, amount *big.Int, asset assetHolder, fundingID [32]byte) (bool, error) {
214+
deposited := make(chan *subscription.Event, 10)
215+
subErr := make(chan error, 1)
216+
// Subscribe to events.
217+
sub, err := f.depositedSub(ctx, asset.contract, fundingID)
215218
if err != nil {
216-
return false, errors.WithMessagef(err, "filtering old Funding events for asset %d", asset.assetIndex)
219+
return false, errors.WithMessage(err, "subscribing to deposited event")
217220
}
218-
// nolint:errcheck
219-
defer iter.Close()
221+
defer sub.Close()
222+
// Read from the sub.
223+
go func() {
224+
defer close(deposited)
225+
subErr <- sub.ReadPast(ctx, deposited)
226+
}()
220227

221228
left := new(big.Int).Set(amount)
222-
for iter.Next() {
223-
left.Sub(left, iter.Event.Amount)
229+
for _event := range deposited {
230+
event := _event.Data.(*assetholder.AssetHolderDeposited)
231+
left.Sub(left, event.Amount)
224232
}
225-
return left.Sign() != 1, errors.Wrap(iter.Error(), "iterator")
233+
return left.Sign() != 1, errors.WithMessagef(<-subErr, "filtering old Funding events for asset %d", asset.assetIndex)
226234
}
227235

228-
func filterFunds(ctx context.Context, asset assetHolder, fundingIDs ...[32]byte) (*assetholder.AssetHolderDepositedIterator, error) {
229-
// Filter
230-
filterOpts := bind.FilterOpts{
231-
Start: uint64(1),
232-
End: nil,
233-
Context: ctx}
234-
iter, err := asset.FilterDeposited(&filterOpts, fundingIDs)
235-
if err != nil {
236-
err = cherrors.CheckIsChainNotReachableError(err)
237-
return nil, errors.WithMessage(err, "filtering deposited events")
236+
func (f *Funder) depositedSub(ctx context.Context, contract *bind.BoundContract, fundingIDs ...[32]byte) (*subscription.EventSub, error) {
237+
filter := make([]interface{}, len(fundingIDs))
238+
for i, fundingID := range fundingIDs {
239+
filter[i] = fundingID
238240
}
239-
240-
return iter, nil
241+
event := func() *subscription.Event {
242+
return &subscription.Event{
243+
Name: "Deposited",
244+
Data: new(assetholder.AssetHolderDeposited),
245+
Filter: [][]interface{}{filter},
246+
}
247+
}
248+
sub, err := subscription.NewEventSub(ctx, f, contract, event, startBlockOffset)
249+
return sub, errors.WithMessage(err, "subscribing to deposited event")
241250
}
242251

243252
// waitForFundingConfirmation waits for the confirmation events on the blockchain that
244253
// both we and all peers successfully funded the channel for the specified asset
245254
// according to the funding agreement.
246255
// nolint: funlen
247256
func (f *Funder) waitForFundingConfirmation(ctx context.Context, request channel.FundingReq, asset assetHolder, fundingIDs [][32]byte) error {
248-
deposited := make(chan *assetholder.AssetHolderDeposited)
249-
// Watch new events
250-
watchOpts, err := f.NewWatchOpts(ctx)
251-
if err != nil {
252-
return errors.WithMessage(err, "error creating watchopts")
253-
}
254-
sub, err := asset.WatchDeposited(watchOpts, deposited, fundingIDs)
257+
deposited := make(chan *subscription.Event)
258+
subErr := make(chan error, 1)
259+
// Subscribe to events.
260+
sub, err := f.depositedSub(ctx, asset.contract, fundingIDs...)
255261
if err != nil {
256-
err = cherrors.CheckIsChainNotReachableError(err)
257-
return errors.WithMessagef(err, "WatchDeposit on asset %d failed", asset.assetIndex)
262+
return errors.WithMessage(err, "subscribing to deposited event")
258263
}
259-
defer sub.Unsubscribe()
260-
261-
// we let the filter queries and all subscription errors write into this error
262-
// channel.
263-
errChan := make(chan error, 1)
264+
defer sub.Close()
265+
// Read from the sub.
264266
go func() {
265-
err := <-sub.Err()
266-
if err != nil {
267-
err = cherrors.CheckIsChainNotReachableError(err)
268-
}
269-
errChan <- errors.WithMessagef(err, "subscription for asset %d", asset.assetIndex)
270-
}()
271-
272-
// Query all old funding events
273-
go func() {
274-
iter, err := filterFunds(ctx, asset, fundingIDs...)
275-
if err != nil {
276-
errChan <- errors.WithMessagef(err, "filtering old Deposited events for asset %d", asset.assetIndex)
277-
return
278-
}
279-
defer iter.Close() // nolint: errcheck
280-
for iter.Next() {
281-
deposited <- iter.Event
282-
}
267+
subErr <- sub.ReadAll(ctx, deposited)
283268
}()
284269

285270
// The allocation that all participants agreed on.
@@ -290,7 +275,8 @@ func (f *Funder) waitForFundingConfirmation(ctx context.Context, request channel
290275
// Wait for all non-zero funding requests
291276
for N > 0 {
292277
select {
293-
case event := <-deposited:
278+
case rawEvent := <-deposited:
279+
event := rawEvent.Data.(*assetholder.AssetHolderDeposited)
294280
log := f.log.WithField("fundingID", event.FundingID)
295281

296282
// Calculate the position in the participant array.
@@ -320,7 +306,7 @@ func (f *Funder) waitForFundingConfirmation(ctx context.Context, request channel
320306
return &channel.AssetFundingError{Asset: asset.assetIndex, TimedOutPeers: indices}
321307
}
322308
return nil
323-
case err := <-errChan:
309+
case err := <-subErr:
324310
return err
325311
}
326312
}

backend/ethereum/subscription/doc.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright 2021 - See NOTICE file for copyright holders.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// Package subscription contains generic event subscriptions.
16+
package subscription // import "perun.network/go-perun/backend/ethereum/subscription"
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
// Copyright 2021 - See NOTICE file for copyright holders.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package subscription
16+
17+
import (
18+
"context"
19+
20+
"github.com/pkg/errors"
21+
22+
"github.com/ethereum/go-ethereum"
23+
"github.com/ethereum/go-ethereum/accounts/abi/bind"
24+
"github.com/ethereum/go-ethereum/core/types"
25+
"github.com/ethereum/go-ethereum/event"
26+
cherrors "perun.network/go-perun/backend/ethereum/channel/errors"
27+
)
28+
29+
type (
30+
// EventSub generic event subscription.
31+
// Can be used on any Contract with any Event.
32+
// This EventSub does not prevent duplicates.
33+
EventSub struct {
34+
closed chan struct{}
35+
contract *bind.BoundContract
36+
eFact EventFactory
37+
38+
watchLogs, filterLogs chan types.Log
39+
watchSub, filterSub event.Subscription
40+
}
41+
42+
// Event is a generic on-chain event.
43+
Event struct {
44+
Name string // Name of the event. Must match the ABI definition.
45+
Data interface{} // Instance of the concrete Event type.
46+
Filter Filter // Filters Events by their body.
47+
Log types.Log // Raw original log for additional information.
48+
}
49+
50+
// EventFactory is used to create `Event`s.
51+
// The `Data` and `Name` fields must be set. `Filter` is optional.
52+
EventFactory func() *Event
53+
54+
// Filter can be used to filter events.
55+
// Look at `TestEventSub_Filter` test or the auto generated Filter- and
56+
// Watch-functions in the bindings/ folder for an example.
57+
Filter [][]interface{}
58+
)
59+
60+
// NewEventSub creates a new `EventSub`. Should always be closed with `Close`.
61+
// `pastBlocks` can be used to define how many blocks into the past the sub
62+
// should query.
63+
func NewEventSub(ctx context.Context, chain ethereum.ChainReader, contract *bind.BoundContract, eFact EventFactory, pastBlocks uint64) (*EventSub, error) {
64+
// Get start block number.
65+
startBlock, err := calcStartBlock(ctx, chain, pastBlocks)
66+
if err != nil {
67+
return nil, errors.WithMessage(err, "calculating starting block number")
68+
}
69+
// Watch for future events.
70+
event := eFact()
71+
watchOpts := &bind.WatchOpts{Start: &startBlock}
72+
watchLogs, watchSub, err := contract.WatchLogs(watchOpts, event.Name, event.Filter...)
73+
if err != nil {
74+
err = cherrors.CheckIsChainNotReachableError(err)
75+
return nil, errors.WithMessage(err, "watching logs")
76+
}
77+
// Read past events.
78+
filterOpts := &bind.FilterOpts{Start: startBlock}
79+
filterLogs, filterSub, err := contract.FilterLogs(filterOpts, event.Name, event.Filter...)
80+
if err != nil {
81+
watchSub.Unsubscribe()
82+
err = cherrors.CheckIsChainNotReachableError(err)
83+
return nil, errors.WithMessage(err, "filtering logs")
84+
}
85+
86+
return &EventSub{
87+
closed: make(chan struct{}),
88+
contract: contract,
89+
eFact: eFact,
90+
watchLogs: watchLogs,
91+
filterLogs: filterLogs,
92+
watchSub: watchSub,
93+
filterSub: filterSub,
94+
}, nil
95+
}
96+
97+
func calcStartBlock(ctx context.Context, chain ethereum.ChainReader, pastBlocks uint64) (uint64, error) {
98+
current, err := chain.HeaderByNumber(ctx, nil)
99+
if err != nil {
100+
err = cherrors.CheckIsChainNotReachableError(err)
101+
return 0, errors.WithMessage(err, "retrieving latest block")
102+
}
103+
if current.Number.Uint64() <= pastBlocks {
104+
return 1, nil
105+
}
106+
return current.Number.Uint64() - pastBlocks, nil
107+
}
108+
109+
// ReadAll reads all past and future events into `sink`.
110+
// Can be aborted by cancelling `ctx` or `Close()`.
111+
// It is possible that the same event is read more than once.
112+
func (s *EventSub) ReadAll(ctx context.Context, sink chan<- *Event) error {
113+
// First read into the past.
114+
if err := s.readPast(ctx, sink); err != nil {
115+
return errors.WithMessage(err, "reading logs")
116+
}
117+
// Then wait for new events.
118+
if err := s.readFuture(ctx, sink); err != nil {
119+
return errors.WithMessage(err, "reading logs")
120+
}
121+
return nil
122+
}
123+
124+
// ReadPast reads all past events into `sink`.
125+
// Can be aborted by cancelling `ctx` or `Close()`.
126+
// It is possible that the same event is read more than once.
127+
func (s *EventSub) ReadPast(ctx context.Context, sink chan<- *Event) error {
128+
return errors.WithMessage(s.readPast(ctx, sink), "reading logs")
129+
}
130+
131+
// ReadFuture reads all future events into `sink`.
132+
// Can be aborted by cancelling `ctx` or `Close()`.
133+
// It is possible that the same event is read more than once.
134+
func (s *EventSub) ReadFuture(ctx context.Context, sink chan<- *Event) error {
135+
return errors.WithMessage(s.readFuture(ctx, sink), "reading logs")
136+
}
137+
138+
func (s *EventSub) readPast(ctx context.Context, sink chan<- *Event) error {
139+
var logs []types.Log
140+
// Two read loops are needed if the event sub is closed before all events
141+
// could be read.
142+
read1:
143+
for {
144+
select {
145+
case log := <-s.filterLogs:
146+
logs = append(logs, log)
147+
case err := <-s.filterSub.Err():
148+
if err != nil {
149+
err = cherrors.CheckIsChainNotReachableError(err)
150+
return err
151+
}
152+
break read1
153+
case <-ctx.Done():
154+
return ctx.Err()
155+
case <-s.closed:
156+
return nil
157+
}
158+
}
159+
read2:
160+
for {
161+
select {
162+
case log := <-s.filterLogs:
163+
logs = append(logs, log)
164+
case <-s.closed:
165+
return nil
166+
case <-ctx.Done():
167+
return ctx.Err()
168+
default:
169+
break read2
170+
}
171+
}
172+
173+
for _, log := range logs {
174+
event := s.eFact()
175+
if err := s.contract.UnpackLog(event.Data, event.Name, log); err != nil {
176+
return err
177+
}
178+
event.Log = log
179+
180+
select {
181+
case <-ctx.Done():
182+
return ctx.Err()
183+
case sink <- event:
184+
case <-s.closed:
185+
return nil
186+
}
187+
}
188+
return nil
189+
}
190+
191+
func (s *EventSub) readFuture(ctx context.Context, sink chan<- *Event) error {
192+
for {
193+
select {
194+
case log := <-s.watchLogs:
195+
event := s.eFact()
196+
if err := s.contract.UnpackLog(event.Data, event.Name, log); err != nil {
197+
return err
198+
}
199+
event.Log = log
200+
201+
select {
202+
case <-ctx.Done():
203+
return ctx.Err()
204+
case sink <- event:
205+
case <-s.closed:
206+
return nil
207+
}
208+
case err := <-s.watchSub.Err():
209+
err = cherrors.CheckIsChainNotReachableError(err)
210+
return err
211+
case <-ctx.Done():
212+
return ctx.Err()
213+
case <-s.closed:
214+
return nil
215+
}
216+
}
217+
}
218+
219+
// Close closes the sub and frees associated resources.
220+
// Should be called exactly once and panics otherwise.
221+
// Must not be called if the construction function returned with an error.
222+
func (s *EventSub) Close() {
223+
close(s.closed)
224+
s.watchSub.Unsubscribe()
225+
s.filterSub.Unsubscribe()
226+
}

0 commit comments

Comments
 (0)