Skip to content

Commit 029e577

Browse files
committed
client, wire: Remove context from Cache
According to the package documentation, a context should not be stored in a struct type. Furthermore, the context had no major relevance. Signed-off-by: Matthias Geihs <matthias@perun.network>
1 parent 3a47a0f commit 029e577

File tree

6 files changed

+53
-43
lines changed

6 files changed

+53
-43
lines changed

client/channelconn.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ func newChannelConn(id channel.ID, peers []wire.Address, idx channel.Index, sub
4646
// relay to receive all update responses
4747
relay := wire.NewRelay()
4848
// we cache all responses for the lifetime of the relay
49-
relay.Cache(context.Background(), func(*wire.Envelope) bool { return true })
49+
cacheAll := func(*wire.Envelope) bool { return true }
50+
relay.Cache(&cacheAll)
5051
// Close the relay if anything goes wrong in the following.
5152
// We could have a leaky subscription otherwise.
5253
defer func() {

client/proposal.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,8 @@ func (c *Client) acceptChannelProposal(
265265
// enables caching of incoming version 0 signatures before sending any message
266266
// that might trigger a fast peer to send those. We don't know the channel id
267267
// yet so the cache predicate is coarser than the later subscription.
268-
enableVer0Cache(ctx, c.conn)
268+
pred := enableVer0Cache(c.conn)
269+
defer c.conn.ReleaseCache(pred)
269270

270271
if err := c.conn.pubMsg(ctx, acc, p); err != nil {
271272
c.logPeer(p).Errorf("error sending proposal acceptance: %v", err)
@@ -302,7 +303,8 @@ func (c *Client) proposeTwoPartyChannel(
302303
// enables caching of incoming version 0 signatures before sending any message
303304
// that might trigger a fast peer to send those. We don't know the channel id
304305
// yet so the cache predicate is coarser than the later subscription.
305-
enableVer0Cache(ctx, c.conn)
306+
pred := enableVer0Cache(c.conn)
307+
defer c.conn.ReleaseCache(pred)
306308

307309
proposalID := proposal.ProposalID()
308310
isResponse := func(e *wire.Envelope) bool {
@@ -680,11 +682,13 @@ func (c *Client) fundSubchannel(ctx context.Context, prop *SubChannelProposal, s
680682
}
681683

682684
// enableVer0Cache enables caching of incoming version 0 signatures.
683-
func enableVer0Cache(ctx context.Context, c wire.Cacher) {
684-
c.Cache(ctx, func(m *wire.Envelope) bool {
685+
func enableVer0Cache(c wire.Cacher) *wire.Predicate {
686+
p := func(m *wire.Envelope) bool {
685687
return m.Msg.Type() == wire.ChannelUpdateAcc &&
686688
m.Msg.(*msgChannelUpdateAcc).Version == 0
687-
})
689+
}
690+
c.Cache(&p)
691+
return &p
688692
}
689693

690694
func (c *Client) enableVer1Cache() {

wire/cache.go

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,57 +14,53 @@
1414

1515
package wire
1616

17-
import "context"
18-
1917
type (
20-
// Cache is a message cache. The default value is a valid empty cache.
18+
// Cache is a message cache.
2119
Cache struct {
2220
msgs []*Envelope
23-
preds []ctxPredicate
21+
preds map[*Predicate]struct{}
2422
}
2523

2624
// A Predicate defines a message filter.
2725
Predicate = func(*Envelope) bool
2826

29-
ctxPredicate struct {
30-
ctx context.Context
31-
p Predicate
32-
}
33-
3427
// A Cacher has the Cache method to enable caching of messages.
3528
Cacher interface {
3629
// Cache should enable the caching of messages
37-
Cache(context.Context, Predicate)
30+
Cache(*Predicate)
3831
}
3932
)
4033

34+
// MakeCache creates a new cache.
35+
func MakeCache() Cache {
36+
return Cache{
37+
preds: make(map[*func(*Envelope) bool]struct{}),
38+
}
39+
}
40+
4141
// Cache is a message cache. The default value is a valid empty cache.
42-
func (c *Cache) Cache(ctx context.Context, p Predicate) {
43-
c.preds = append(c.preds, ctxPredicate{ctx, p})
42+
func (c *Cache) Cache(p *Predicate) {
43+
c.preds[p] = struct{}{}
44+
}
45+
46+
// Release releases the cache predicate.
47+
func (c *Cache) Release(p *Predicate) {
48+
delete(c.preds, p)
4449
}
4550

4651
// Put puts the message into the cache if it matches any active predicate.
4752
// If it matches several predicates, it is still only added once to the cache.
4853
func (c *Cache) Put(e *Envelope) bool {
4954
// we filter the predicates for non-active and lazily remove them
50-
preds := c.preds[:0]
5155
any := false
52-
for _, p := range c.preds {
53-
select {
54-
case <-p.ctx.Done():
55-
continue // skip done predicate
56-
default:
57-
preds = append(preds, p)
58-
}
59-
60-
any = any || p.p(e)
56+
for p := range c.preds {
57+
any = any || (*p)(e)
6158
}
6259

6360
if any {
6461
c.msgs = append(c.msgs, e)
6562
}
6663

67-
c.preds = preds
6864
return any
6965
}
7066

wire/cache_internal_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
package wire
1616

1717
import (
18-
"context"
1918
"testing"
2019

2120
"github.com/stretchr/testify/assert"
@@ -28,7 +27,7 @@ func TestCache(t *testing.T) {
2827
assert, require := assert.New(t), require.New(t)
2928
rng := test.Prng(t)
3029

31-
var c Cache
30+
c := MakeCache()
3231
require.Zero(c.Size())
3332

3433
ping0 := NewRandomEnvelope(rng, NewPingMsg())
@@ -42,8 +41,7 @@ func TestCache(t *testing.T) {
4241
assert.Zero(c.Size())
4342

4443
isPing := func(e *Envelope) bool { return e.Msg.Type() == Ping }
45-
ctx, cancel := context.WithCancel(context.Background())
46-
c.Cache(ctx, isPing)
44+
c.Cache(&isPing)
4745
assert.True(c.Put(ping0), "Put into cache with predicate")
4846
assert.Equal(1, c.Size())
4947
assert.False(c.Put(pong), "Put into cache with non-matching prediacte")
@@ -54,7 +52,7 @@ func TestCache(t *testing.T) {
5452
empty := c.Messages(func(*Envelope) bool { return false })
5553
assert.Len(empty, 0)
5654

57-
cancel()
55+
c.Release(&isPing)
5856
assert.False(c.Put(ping2), "Put into cache with canceled predicate")
5957
assert.Equal(2, c.Size())
6058
assert.Len(c.preds, 0, "internal: Put should have removed canceled predicate")
@@ -67,7 +65,7 @@ func TestCache(t *testing.T) {
6765
require.Len(msgs, 1)
6866
assert.Same(msgs[0], ping0)
6967

70-
c.Cache(context.Background(), isPing)
68+
c.Cache(&isPing)
7169
c.Flush()
7270
assert.Equal(0, c.Size())
7371
assert.False(c.Put(ping0), "flushed cache should not hold any predicates")

wire/relay.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
package wire
1616

1717
import (
18-
"context"
1918
stdsync "sync"
2019

2120
"github.com/pkg/errors"
@@ -41,7 +40,10 @@ type subscription struct {
4140

4241
// NewRelay returns a new Relay which logs unhandled messages.
4342
func NewRelay() *Relay {
44-
return &Relay{defaultMsgHandler: logUnhandledMsg}
43+
return &Relay{
44+
defaultMsgHandler: logUnhandledMsg,
45+
cache: MakeCache(),
46+
}
4547
}
4648

4749
// Close closes the relay.
@@ -64,16 +66,24 @@ func (p *Relay) Close() error {
6466
}
6567

6668
// Cache enables caching of messages that don't match any consumer. They are
67-
// only cached if they match the given predicate, within the given context.
68-
func (p *Relay) Cache(ctx context.Context, predicate Predicate) {
69+
// only cached if they match the given predicate.
70+
func (p *Relay) Cache(predicate *Predicate) {
6971
p.mutex.Lock()
7072
defer p.mutex.Unlock()
7173

7274
if p.IsClosed() {
7375
return
7476
}
7577

76-
p.cache.Cache(ctx, predicate)
78+
p.cache.Cache(predicate)
79+
}
80+
81+
// ReleaseCache disable caching for the given predicate.
82+
func (p *Relay) ReleaseCache(predicate *Predicate) {
83+
p.mutex.Lock()
84+
defer p.mutex.Unlock()
85+
86+
p.cache.Release(predicate)
7787
}
7888

7989
// Subscribe adds a Consumer to the subscriptions.

wire/relay_internal_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func TestProducer_caching(t *testing.T) {
126126
prod.SetDefaultMsgHandler(func(e *Envelope) { unhandlesMsg = append(unhandlesMsg, e) })
127127

128128
ctx := context.Background()
129-
prod.Cache(ctx, isPing)
129+
prod.Cache(&isPing)
130130

131131
rng := test.Prng(t)
132132
ping0 := NewRandomEnvelope(rng, NewPingMsg())
@@ -141,7 +141,7 @@ func TestProducer_caching(t *testing.T) {
141141
assert.Equal(1, prod.cache.Size())
142142
assert.Len(unhandlesMsg, 1)
143143

144-
prod.Cache(ctx, isPong)
144+
prod.Cache(&isPong)
145145
prod.Put(pong2)
146146
assert.Equal(2, prod.cache.Size())
147147
assert.Len(unhandlesMsg, 1)
@@ -161,7 +161,8 @@ func TestProducer_caching(t *testing.T) {
161161
assert.Contains(err.Error(), "cache")
162162
assert.Zero(prod.cache.Size(), "producer.Close should flush the cache")
163163

164-
prod.Cache(ctx, func(*Envelope) bool { return true })
164+
p := func(*Envelope) bool { return true }
165+
prod.Cache(&p)
165166
prod.cache.Put(ping0)
166167
assert.Zero(prod.cache.Size(), "Cache on closed producer should not enable caching")
167168
}

0 commit comments

Comments
 (0)