Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ linters:
# These could be enabled in the future:
- ifshort # we often don't use `if err := …` for readability.
- tparallel # We don't always use parallel tests.
- contextcheck # Wants contexts to be always passed down.

# Deprecated:
- maligned
Expand Down
11 changes: 5 additions & 6 deletions backend/ethereum/channel/test/adjudicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (t *SimTimeout) IsElapsed(ctx context.Context) bool {
}
defer t.sb.clockMu.Unlock()

return t.timeLeft() <= 0
return t.timeLeft(ctx) <= 0
}

// Wait advances the clock of the simulated blockchain past the timeout.
Expand All @@ -132,7 +132,7 @@ func (t *SimTimeout) Wait(ctx context.Context) error {
}
defer t.sb.clockMu.Unlock()

if d := t.timeLeft(); d > 0 {
if d := t.timeLeft(ctx); d > 0 {
if err := t.sb.AdjustTime(time.Duration(d) * time.Second); err != nil {
return errors.Wrap(err, "adjusting time")
}
Expand All @@ -141,10 +141,9 @@ func (t *SimTimeout) Wait(ctx context.Context) error {
return nil
}

func (t *SimTimeout) timeLeft() int64 {
// context is ignored by sim blockchain anyways
h, err := t.sb.HeaderByNumber(nil, nil) //nolint:staticcheck
if err != nil { // should never happen with a sim blockchain
func (t *SimTimeout) timeLeft(ctx context.Context) int64 {
h, err := t.sb.HeaderByNumber(ctx, nil)
if err != nil { // should never happen with a sim blockchain
panic(fmt.Sprint("Error getting latest block: ", err))
}
return int64(t.Time) - int64(h.Time)
Expand Down
4 changes: 2 additions & 2 deletions backend/ethereum/channel/test/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (s *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa

// FundAddress funds a given address with `test.MaxBalance` eth from a faucet.
func (s *SimulatedBackend) FundAddress(ctx context.Context, addr common.Address) {
nonce, err := s.PendingNonceAt(context.Background(), s.faucetAddr)
nonce, err := s.PendingNonceAt(ctx, s.faucetAddr)
if err != nil {
panic(err)
}
Expand All @@ -132,7 +132,7 @@ func (s *SimulatedBackend) FundAddress(ctx context.Context, addr common.Address)
panic(err)
}
s.Commit()
if _, err := bind.WaitMined(context.Background(), s, signedTX); err != nil {
if _, err := bind.WaitMined(ctx, s, signedTX); err != nil {
panic(err)
}
}
Expand Down
16 changes: 8 additions & 8 deletions channel/persistence/test/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,12 @@ func isNilSigs(s []wallet.Sig) bool {
}

// Init calls Init on the state machine and then checks the persistence.
func (c *Channel) Init(t require.TestingT, rng *rand.Rand) {
func (c *Channel) Init(ctx context.Context, t require.TestingT, rng *rand.Rand) {
initAlloc := *ctest.NewRandomAllocation(rng, ctest.WithNumParts(len(c.accounts)))
initData := channel.NewMockOp(channel.OpValid)
err := c.StateMachine.Init(nil, initAlloc, initData) //nolint:staticcheck
err := c.StateMachine.Init(ctx, initAlloc, initData)
require.NoError(t, err)
c.AssertPersisted(c.ctx, t)
c.AssertPersisted(ctx, t)
}

// EnableInit calls EnableInit on the state machine and then checks the persistence.
Expand All @@ -167,16 +167,16 @@ func (c *Channel) EnableInit(t require.TestingT) {
}

// SignAll signs the current staged state by all parties.
func (c *Channel) SignAll(t require.TestingT) {
func (c *Channel) SignAll(ctx context.Context, t require.TestingT) {
// trigger local signing
c.Sig(nil) //nolint:errcheck,staticcheck
c.AssertPersisted(c.ctx, t)
c.Sig(ctx) //nolint:errcheck
c.AssertPersisted(ctx, t)
// remote signers
for i := range c.accounts {
sig, err := channel.Sign(c.accounts[i], c.StagingState())
require.NoError(t, err)
c.AddSig(nil, channel.Index(i), sig) //nolint:errcheck,staticcheck
c.AssertPersisted(c.ctx, t)
c.AddSig(ctx, channel.Index(i), sig) //nolint:errcheck
c.AssertPersisted(ctx, t)
}
}

Expand Down
8 changes: 4 additions & 4 deletions channel/persistence/test/persistrestorertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ func GenericPersistRestorerTest(
seed := pkgtest.Seed("", subSeed, numChans, numPeers, chIndex, ch.ID())
rng := rand.New(rand.NewSource(seed)) //nolint:gosec

ch.Init(t, rng)
ch.SignAll(t)
ch.Init(ctx, t, rng)
ch.SignAll(ctx, t)
ch.EnableInit(t)

ch.SetFunded(t)
Expand All @@ -140,7 +140,7 @@ func GenericPersistRestorerTest(
ch.DiscardUpdate(t)
err = ch.Update(t, state1, ch.Idx())
require.NoError(t, err)
ch.SignAll(t)
ch.SignAll(ctx, t)
ch.EnableUpdate(t)

// Final state
Expand All @@ -149,7 +149,7 @@ func GenericPersistRestorerTest(
statef.IsFinal = true
err = ch.Update(t, statef, ch.Idx()^1)
require.NoError(t, err)
ch.SignAll(t)
ch.SignAll(ctx, t)
ch.EnableFinal(t)

ch.SetRegistering(t)
Expand Down
3 changes: 2 additions & 1 deletion client/channelconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func newChannelConn(id channel.ID, peers []wire.Address, idx channel.Index, sub
// relay to receive all update responses
relay := wire.NewRelay()
// we cache all responses for the lifetime of the relay
relay.Cache(context.Background(), func(*wire.Envelope) bool { return true })
cacheAll := func(*wire.Envelope) bool { return true }
relay.Cache(&cacheAll)
// Close the relay if anything goes wrong in the following.
// We could have a leaky subscription otherwise.
defer func() {
Expand Down
28 changes: 18 additions & 10 deletions client/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,10 @@ func (c *Client) ProposeChannel(ctx context.Context, prop ChannelProposal) (*Cha
}

// 2. send proposal, wait for response, create channel object
c.enableVer1Cache() // cache version 1 updates until channel is opened
defer c.releaseVer1Cache() // replay cached version 1 updates
// cache version 1 updates until channel is opened
c.enableVer1Cache()
// replay cached version 1 updates
defer c.releaseVer1Cache() //nolint:contextcheck
ch, err := c.proposeTwoPartyChannel(ctx, prop)
if err != nil {
return nil, errors.WithMessage(err, "channel proposal")
Expand Down Expand Up @@ -236,8 +238,10 @@ func (c *Client) handleChannelProposalAcc(
return ch, errors.WithMessage(err, "validating channel proposal acceptance")
}

c.enableVer1Cache() // cache version 1 updates
defer c.releaseVer1Cache() // replay cached version 1 updates
// cache version 1 updates
c.enableVer1Cache()
// replay cached version 1 updates
defer c.releaseVer1Cache() //nolint:contextcheck

if ch, err = c.acceptChannelProposal(ctx, prop, p, acc); err != nil {
return ch, errors.WithMessage(err, "accept channel proposal")
Expand All @@ -261,7 +265,8 @@ func (c *Client) acceptChannelProposal(
// enables caching of incoming version 0 signatures before sending any message
// that might trigger a fast peer to send those. We don't know the channel id
// yet so the cache predicate is coarser than the later subscription.
enableVer0Cache(ctx, c.conn)
pred := enableVer0Cache(c.conn)
defer c.conn.ReleaseCache(pred)

if err := c.conn.pubMsg(ctx, acc, p); err != nil {
c.logPeer(p).Errorf("error sending proposal acceptance: %v", err)
Expand Down Expand Up @@ -298,7 +303,8 @@ func (c *Client) proposeTwoPartyChannel(
// enables caching of incoming version 0 signatures before sending any message
// that might trigger a fast peer to send those. We don't know the channel id
// yet so the cache predicate is coarser than the later subscription.
enableVer0Cache(ctx, c.conn)
pred := enableVer0Cache(c.conn)
defer c.conn.ReleaseCache(pred)

proposalID := proposal.ProposalID()
isResponse := func(e *wire.Envelope) bool {
Expand Down Expand Up @@ -676,11 +682,13 @@ func (c *Client) fundSubchannel(ctx context.Context, prop *SubChannelProposal, s
}

// enableVer0Cache enables caching of incoming version 0 signatures.
func enableVer0Cache(ctx context.Context, c wire.Cacher) {
c.Cache(ctx, func(m *wire.Envelope) bool {
func enableVer0Cache(c wire.Cacher) *wire.Predicate {
p := func(m *wire.Envelope) bool {
return m.Msg.Type() == wire.ChannelUpdateAcc &&
m.Msg.(*msgChannelUpdateAcc).Version == 0
})
}
c.Cache(&p)
return &p
}

func (c *Client) enableVer1Cache() {
Expand All @@ -700,7 +708,7 @@ func (c *Client) releaseVer1Cache() {

c.version1Cache.enabled--
for _, u := range c.version1Cache.cache {
go c.handleChannelUpdate(u.uh, u.p, u.m)
go c.handleChannelUpdate(u.uh, u.p, u.m) //nolint:contextcheck
}
c.version1Cache.cache = nil
}
Expand Down
6 changes: 3 additions & 3 deletions client/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (c *Client) handleChannelUpdate(uh UpdateHandler, p wire.Address, m Channel
return
}
pidx := ch.Idx() ^ 1
ch.handleUpdateReq(pidx, m, uh)
ch.handleUpdateReq(pidx, m, uh) //nolint:contextcheck
}

func (c *Client) cacheVersion1Update(uh UpdateHandler, p wire.Address, m ChannelUpdateProposal) bool {
Expand Down Expand Up @@ -288,12 +288,12 @@ func (c *Channel) handleUpdateReq(
client := c.client

if prop, ok := req.(*virtualChannelFundingProposal); ok {
client.handleVirtualChannelFundingProposal(c, prop, responder)
client.handleVirtualChannelFundingProposal(c, prop, responder) //nolint:contextcheck
return
}

if prop, ok := req.(*virtualChannelSettlementProposal); ok {
client.handleVirtualChannelSettlementProposal(c, prop, responder)
client.handleVirtualChannelSettlementProposal(c, prop, responder) //nolint:contextcheck
return
}

Expand Down
8 changes: 4 additions & 4 deletions client/virtual_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,18 @@ func (c *Client) handleVirtualChannelFundingProposal(
) {
err := c.validateVirtualChannelFundingProposal(ch, prop)
if err != nil {
c.rejectProposal(responder, err.Error())
c.rejectProposal(responder, err.Error()) //nolint:contextcheck
}

ctx, cancel := context.WithTimeout(c.Ctx(), virtualFundingTimeout)
defer cancel()

err = c.fundingWatcher.Await(ctx, prop)
if err != nil {
c.rejectProposal(responder, err.Error())
c.rejectProposal(responder, err.Error()) //nolint:contextcheck
}

c.acceptProposal(responder)
c.acceptProposal(responder) //nolint:contextcheck
}

func (c *Channel) watchVirtual() error {
Expand Down Expand Up @@ -348,7 +348,7 @@ func (c *Client) matchFundingProposal(ctx context.Context, a, b interface{}) boo
}

go func() {
err := virtual.watchVirtual()
err := virtual.watchVirtual() //nolint:contextcheck // The context will be derived from the channel context.
c.log.Debugf("channel %v: watcher stopped: %v", virtual.ID(), err)
}()
return true
Expand Down
4 changes: 2 additions & 2 deletions client/virtual_channel_settlement.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (c *Client) handleVirtualChannelSettlementProposal(
) {
err := c.validateVirtualChannelSettlementProposal(parent, prop)
if err != nil {
c.rejectProposal(responder, err.Error())
c.rejectProposal(responder, err.Error()) //nolint:contextcheck
}

ctx, cancel := context.WithTimeout(c.Ctx(), virtualSettlementTimeout)
Expand All @@ -90,7 +90,7 @@ func (c *Client) handleVirtualChannelSettlementProposal(
resp: responder,
})
if err != nil {
c.rejectProposal(responder, err.Error())
c.rejectProposal(responder, err.Error()) //nolint:contextcheck
}
}

Expand Down
10 changes: 4 additions & 6 deletions watcher/local/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (w *Watcher) startWatching(
Sigs: signedState.Sigs,
}
ch.Go(func() { ch.handleStatesFromClient(initialTx) })
ch.Go(func() { ch.handleEventsFromChain(w.rs, w.registry) })
ch.Go(func() { ch.handleEventsFromChain(w.rs, w.registry) }) //nolint:contextcheck

return statesPubSub, eventsToClientPubSub, nil
}
Expand Down Expand Up @@ -329,7 +329,7 @@ func (ch *ch) handleEventsFromChain(registerer channel.Registerer, chRegistry *r
}

log.Debugf("Registering latest version (%d)", latestTx.Version)
err := registerDispute(chRegistry, registerer, parent)
err := registerDispute(chRegistry, registerer, parent) //nolint:contextcheck
if err != nil {
log.Error("Error registering dispute")
return
Expand All @@ -355,15 +355,13 @@ func (ch *ch) handleEventsFromChain(registerer channel.Registerer, chRegistry *r
}

// registerDispute collects the latest transaction for the parent channel and
// each of its children. It then registers dispute for the channel tree.
// each of its children. It then registers a dispute for the channel tree.
//
// This function assumes the callers has locked the parent channel.
func registerDispute(r *registry, registerer channel.Registerer, parentCh *ch) error {
parentTx, subStates := retreiveLatestSubStates(r, parentCh)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := registerer.Register(ctx, makeAdjudicatorReq(parentCh.params, parentTx), subStates)
err := registerer.Register(context.TODO(), makeAdjudicatorReq(parentCh.params, parentTx), subStates)
if err != nil {
return err
}
Expand Down
42 changes: 19 additions & 23 deletions wire/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,57 +14,53 @@

package wire

import "context"

type (
// Cache is a message cache. The default value is a valid empty cache.
// Cache is a message cache.
Cache struct {
msgs []*Envelope
preds []ctxPredicate
preds map[*Predicate]struct{}
}

// A Predicate defines a message filter.
Predicate = func(*Envelope) bool

ctxPredicate struct {
ctx context.Context
p Predicate
}

// A Cacher has the Cache method to enable caching of messages.
Cacher interface {
// Cache should enable the caching of messages
Cache(context.Context, Predicate)
Cache(*Predicate)
}
)

// MakeCache creates a new cache.
func MakeCache() Cache {
return Cache{
preds: make(map[*func(*Envelope) bool]struct{}),
}
}

// Cache is a message cache. The default value is a valid empty cache.
func (c *Cache) Cache(ctx context.Context, p Predicate) {
c.preds = append(c.preds, ctxPredicate{ctx, p})
func (c *Cache) Cache(p *Predicate) {
c.preds[p] = struct{}{}
}

// Release releases the cache predicate.
func (c *Cache) Release(p *Predicate) {
delete(c.preds, p)
}

// Put puts the message into the cache if it matches any active predicate.
// If it matches several predicates, it is still only added once to the cache.
func (c *Cache) Put(e *Envelope) bool {
// we filter the predicates for non-active and lazily remove them
preds := c.preds[:0]
any := false
for _, p := range c.preds {
select {
case <-p.ctx.Done():
continue // skip done predicate
default:
preds = append(preds, p)
}

any = any || p.p(e)
for p := range c.preds {
any = any || (*p)(e)
}

if any {
c.msgs = append(c.msgs, e)
}

c.preds = preds
return any
}

Expand Down
Loading